← Back to Home
Adapting to Market Noise A Kalman Filter Strategy with Dynamic Noise Parameters

Adapting to Market Noise A Kalman Filter Strategy with Dynamic Noise Parameters

This article explores an advanced quantitative trading strategy that leverages an Adaptive Kalman Filter to estimate market price and velocity, and then uses these estimations for directional trading. The core innovation lies in the dynamic adjustment of the Kalman Filter’s noise parameters based on prevailing market volatility, as measured by the Average True Range (ATR). This adaptability allows the filter to better track market movements and reduce lag or overshooting in varying market conditions.

The Adaptive Kalman Filter Indicator

The Kalman Filter is a powerful algorithm for estimating the state of a dynamic system from a series of noisy measurements. In financial markets, it can be used to estimate the underlying “true” price and its velocity (momentum) by filtering out market noise. The AdaptiveKalmanFilterIndicator enhances this by making its internal noise parameters responsive to real-time market volatility.

import backtrader as bt
import numpy as np

class AdaptiveKalmanFilterIndicator(bt.Indicator):
    """
    Calculates the Kalman Filter estimated price and velocity with adaptive noise.
    Noise parameters adjust based on market volatility (e.g., ATR).
    """
    lines = ('kf_price', 'kf_velocity',)
    params = (
        ('atr_period', 14),
        ('atr_average_period', 50), # New parameter for averaging ATR
        ('min_process_noise', 1e-6),
        ('max_process_noise', 1e-4),
        ('min_measurement_noise', 1e-2),
        ('max_measurement_noise', 1e-0),
        ('smoothing_factor', 0.1), # How much current ATR influences noise
    )
    plotinfo = dict(subplot=False, plotlinelabels=True)

    def __init__(self):
        self.dataclose = self.data.close
        self.atr = bt.indicators.ATR(self.data, period=self.p.atr_period)
        # Calculate the Simple Moving Average of the ATR
        self.avg_atr = bt.indicators.SMA(self.atr, period=self.p.atr_average_period)

        self.F = np.array([[1.0, 1.0], [0.0, 1.0]])
        self.H = np.array([[1.0, 0.0]])
        self.I = np.eye(2)
        self.x = None
        self.P = None
        self.initialized = False

    def _lazyinit(self):
        try:
            initial_price = self.dataclose[0]
            self.x = np.array([initial_price, 0.0])
            self.P = np.array([[1.0, 0.0], [0.0, 100.0]])
            self.initialized = True
        except IndexError:
            pass

    def next(self):
        if not self.initialized:
            self._lazyinit()
            if not self.initialized: return
        
        # Ensure ATR and its average have enough data
        if len(self.atr) < self.p.atr_period or len(self.avg_atr) < self.p.atr_average_period:
            return

        # Adapt noise based on current ATR relative to its average
        current_atr_normalized = self.atr[0] / self.avg_atr[0] if self.avg_atr[0] else 1.0
        
        # Calculate process and measurement noise using a smoothing factor
        process_noise = self.p.min_process_noise + (self.p.max_process_noise - self.p.min_process_noise) * (1 - np.exp(-self.p.smoothing_factor * current_atr_normalized))
        measurement_noise = self.p.min_measurement_noise + (self.p.max_measurement_noise - self.p.min_measurement_noise) * (1 - np.exp(-self.p.smoothing_factor * current_atr_normalized))

        q_val = process_noise
        # Process noise covariance matrix (Q)
        self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
                            [(q_val**2)/2,  q_val**2]])
        # Measurement noise covariance (R)
        self.R = np.array([[measurement_noise**2]])

        # Kalman Filter steps
        z = self.dataclose[0] # Current measurement (close price)
        x_pred = self.F @ self.x # Predict state
        P_pred = (self.F @ self.P @ self.F.T) + self.Q # Predict error covariance
        y = z - (self.H @ x_pred) # Innovation (measurement residual)
        S = (self.H @ P_pred @ self.H.T) + self.R # Innovation covariance

        try:
            S_inv = np.linalg.inv(S)
        except np.linalg.LinAlgError: # Handle singular matrix case
            S_inv = 1.0 / S[0,0] if np.abs(S[0,0]) > 1e-8 else 1.0
        K = P_pred @ self.H.T @ S_inv # Kalman Gain
        self.x = x_pred + (K @ y) # Update state estimate
        self.P = (self.I - (K @ self.H)) @ P_pred # Update error covariance

        self.lines.kf_price[0] = self.x[0] # Estimated price
        self.lines.kf_velocity[0] = self.x[1] # Estimated velocity (momentum)

Parameters

Initialization (__init__ and _lazyinit)

The __init__ method sets up the dataclose and the ATR indicators (self.atr, self.avg_atr). It also initializes the fixed matrices used in the Kalman Filter:

The _lazyinit method is a workaround to initialize self.x (state vector) and self.P (error covariance matrix) only when the first data point is available. self.x initially contains the first price and zero velocity. self.P reflects the initial uncertainty in these estimates.

Noise Adaptation (next)

Before performing the Kalman Filter calculations in the next method, the noise parameters are adapted:

Kalman Filter Steps (next)

The next method then proceeds with the standard Kalman Filter recursive steps:

  1. Prediction:
    • x_pred: The next state is predicted based on the current state (self.x) and the state transition model (F).
    • P_pred: The error covariance matrix is predicted, incorporating the uncertainty from the state transition (P) and the process noise (Q).
  2. Update:
    • y: The “innovation” or measurement residual is calculated as the difference between the actual measurement (z, current closing price) and the predicted measurement (H @ x_pred).
    • S: The innovation covariance is calculated, representing the uncertainty in the innovation.
    • K (Kalman Gain): This is the crucial factor that weights the innovation, determining how much the predicted state should be corrected by the new measurement. It balances the certainty of the prediction (P_pred) versus the certainty of the measurement (R).
    • self.x: The state estimate is updated by adding a weighted portion of the innovation (Kalman Gain times innovation) to the predicted state.
    • self.P: The error covariance matrix is updated to reflect the reduced uncertainty after incorporating the new measurement.

Finally, the estimated price (self.x[0]) and velocity (self.x[1]) are outputted as indicator lines.

The Adaptive Kalman Filter Strategy

The AdaptiveKalmanFilterStrategy utilizes the estimated velocity from the AdaptiveKalmanFilterIndicator to generate trading signals and manage positions with a trailing stop.

import backtrader as bt
import numpy as np

# ... (AdaptiveKalmanFilterIndicator class remains the same as above) ...

class AdaptiveKalmanFilterStrategy(bt.Strategy):
    params = (
        ('atr_period', 7),
        ('atr_average_period', 30),
        ('min_process_noise', 1e-4),
        ('max_process_noise', 1e-2),
        ('min_measurement_noise', 1e-2),
        ('max_measurement_noise', 1e-0),
        ('smoothing_factor', 0.1),
        ('trail_percent', 0.02),
    )

    def __init__(self):
        self.kf = AdaptiveKalmanFilterIndicator(
            atr_period=self.p.atr_period,
            atr_average_period=self.p.atr_average_period,
            min_process_noise=self.p.min_process_noise,
            max_process_noise=self.p.max_process_noise,
            min_measurement_noise=self.p.min_measurement_noise,
            max_measurement_noise=self.p.max_measurement_noise,
            smoothing_factor=self.p.smoothing_factor
        )
        self.kf_price = self.kf.lines.kf_price
        self.kf_velocity = self.kf.lines.kf_velocity
        self.order = None
        self.stop_order = None # For the trailing stop

    def next(self):
        if self.order: # If an order is pending, do nothing
            return
        if len(self.kf_velocity) == 0 or len(self.kf.avg_atr) == 0: # Ensure indicators have warmed up
            return

        estimated_velocity = self.kf_velocity[0]
        current_position_size = self.position.size

        if current_position_size == 0: # No open position, look for entry
            if self.stop_order: # If there was a previous trailing stop, cancel it
                self.cancel(self.stop_order)
                self.stop_order = None
            if estimated_velocity > 0: # Positive velocity suggests upward momentum
                self.order = self.buy()
            elif estimated_velocity < 0: # Negative velocity suggests downward momentum
                self.order = self.sell()
                
    def notify_order(self, order):
        if order.status in [bt.Order.Submitted, bt.Order.Accepted]:
            return # Ignore if order is still processing

        if order.status == bt.Order.Completed:
            if self.order and order.ref == self.order.ref: # If the entry order completed
                exit_func = self.sell if order.isbuy() else self.buy
                if self.p.trail_percent and self.p.trail_percent > 0.0:
                    # Place a trailing stop order
                    self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
                self.order = None # Clear the entry order reference
            elif self.stop_order and order.ref == self.stop_order.ref: # If the trailing stop order completed
                self.stop_order = None # Clear the stop order reference
                self.order = None # Also clear the general order reference, as position is closed
        elif order.status in [bt.Order.Canceled, bt.Order.Margin, bt.Order.Rejected]:
            # Handle failed orders
            if self.order and order.ref == self.order.ref:
                self.order = None
            elif self.stop_order and order.ref == self.stop_order.ref:
                self.stop_order = None

    def stop(self):
        # Ensure any pending stop order is cancelled when the strategy stops
        if self.stop_order:
            self.cancel(self.stop_order)

Parameters

The strategy inherits and uses many parameters from the AdaptiveKalmanFilterIndicator, plus one for its exit logic:

Initialization (__init__)

The __init__ method sets up the AdaptiveKalmanFilterIndicator and stores references to its output lines (kf_price, kf_velocity). It also initializes self.order and self.stop_order to None to manage orders.

Trading Logic (next)

The next method implements the strategy’s trading rules:

  1. Pre-Checks: It first checks if an entry order is already pending (self.order) or if the Kalman Filter indicator has not yet warmed up sufficiently. If either is true, it returns.
  2. Entry Conditions: If there is no open position (current_position_size == 0):
    • Any existing trailing stop order (from a previously closed position) is cancelled to ensure a clean slate before a new entry.
    • If estimated_velocity (from self.kf_velocity[0]) is positive, it indicates upward momentum, and a buy order is placed.
    • If estimated_velocity is negative, it indicates downward momentum, and a sell order is placed.

Order Notification (notify_order)

This method is critical for handling order lifecycle and managing the trailing stop:

Strategy Stop (stop)

The stop method is called by backtrader when the backtest ends. It ensures that any lingering trailing stop order is cancelled cleanly.

Rolling Backtesting Setup

To comprehensively evaluate the strategy’s performance, a rolling backtest is used. This method assesses the strategy over multiple, successive time windows, providing a more robust view of its consistency compared to a single, fixed backtest.

from collections import deque
import backtrader as bt
import numpy as np
import pandas as pd
import yfinance as yf
import dateutil.relativedelta as rd

# Assuming AdaptiveKalmanFilterIndicator and AdaptiveKalmanFilterStrategy classes are defined above this section.

def run_rolling_backtest(
    ticker,
    start,
    end,
    window_months,
    strategy_class,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            current_end = end_dt
            if current_start >= current_end:
                break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)

        if data.empty or len(data) < 90:
            print("Not enough data for this period. Skipping.")
            current_start += rd.relativedelta(months=window_months)
            continue

        if isinstance(data.columns, pd.MultiIndex):
            data = data.droplevel(1, 1)

        start_price = data['Close'].iloc[0]
        end_price = data['Close'].iloc[-1]
        benchmark_ret = (end_price - start_price) / start_price * 100

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        
        cerebro.addstrategy(strategy_class, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000)
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

        start_val = cerebro.broker.getvalue()
        try:
            cerebro.run()
        except Exception as e:
            print(f"Error running backtest for {current_start.date()} to {current_end.date()}: {e}")
            current_start += rd.relativedelta(months=window_months)
            continue

        final_val = cerebro.broker.getvalue()
        strategy_ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'strategy_return_pct': strategy_ret,
            'benchmark_return_pct': benchmark_ret,
            'final_value': final_val,
        })

        print(f"Strategy Return: {strategy_ret:.2f}% | Buy & Hold Return: {benchmark_ret:.2f}%")

        current_start += rd.relativedelta(months=window_months)

        if current_start > end_dt:
            break

    return pd.DataFrame(all_results)

How the Rolling Backtest Works:

Pasted image 20250722012026.png Pasted image 20250722012033.png Pasted image 20250722012040.png

Conclusion

The AdaptiveKalmanFilterStrategy offers a sophisticated and adaptable approach to quantitative trading by leveraging the power of the Kalman Filter and dynamically adjusting its noise parameters based on market volatility. This allows for a more robust estimation of underlying price and momentum, which forms the basis for directional trading signals. The strategy’s straightforward entry logic based on estimated velocity, combined with a crucial trailing stop-loss for risk management, aims to capitalize on trends while protecting capital. The use of a rolling backtest is essential for thoroughly evaluating the consistency and robustness of such an adaptive strategy across various market conditions, providing a more reliable assessment of its performance.