This article explores an advanced quantitative trading strategy that leverages an Adaptive Kalman Filter to estimate market price and velocity, and then uses these estimations for directional trading. The core innovation lies in the dynamic adjustment of the Kalman Filter’s noise parameters based on prevailing market volatility, as measured by the Average True Range (ATR). This adaptability allows the filter to better track market movements and reduce lag or overshooting in varying market conditions.
The Kalman Filter is a powerful algorithm for estimating the state of
a dynamic system from a series of noisy measurements. In financial
markets, it can be used to estimate the underlying “true” price and its
velocity (momentum) by filtering out market noise. The
AdaptiveKalmanFilterIndicator
enhances this by making its
internal noise parameters responsive to real-time market volatility.
import backtrader as bt
import numpy as np
class AdaptiveKalmanFilterIndicator(bt.Indicator):
"""
Calculates the Kalman Filter estimated price and velocity with adaptive noise.
Noise parameters adjust based on market volatility (e.g., ATR).
"""
= ('kf_price', 'kf_velocity',)
lines = (
params 'atr_period', 14),
('atr_average_period', 50), # New parameter for averaging ATR
('min_process_noise', 1e-6),
('max_process_noise', 1e-4),
('min_measurement_noise', 1e-2),
('max_measurement_noise', 1e-0),
('smoothing_factor', 0.1), # How much current ATR influences noise
(
)= dict(subplot=False, plotlinelabels=True)
plotinfo
def __init__(self):
self.dataclose = self.data.close
self.atr = bt.indicators.ATR(self.data, period=self.p.atr_period)
# Calculate the Simple Moving Average of the ATR
self.avg_atr = bt.indicators.SMA(self.atr, period=self.p.atr_average_period)
self.F = np.array([[1.0, 1.0], [0.0, 1.0]])
self.H = np.array([[1.0, 0.0]])
self.I = np.eye(2)
self.x = None
self.P = None
self.initialized = False
def _lazyinit(self):
try:
= self.dataclose[0]
initial_price self.x = np.array([initial_price, 0.0])
self.P = np.array([[1.0, 0.0], [0.0, 100.0]])
self.initialized = True
except IndexError:
pass
def next(self):
if not self.initialized:
self._lazyinit()
if not self.initialized: return
# Ensure ATR and its average have enough data
if len(self.atr) < self.p.atr_period or len(self.avg_atr) < self.p.atr_average_period:
return
# Adapt noise based on current ATR relative to its average
= self.atr[0] / self.avg_atr[0] if self.avg_atr[0] else 1.0
current_atr_normalized
# Calculate process and measurement noise using a smoothing factor
= self.p.min_process_noise + (self.p.max_process_noise - self.p.min_process_noise) * (1 - np.exp(-self.p.smoothing_factor * current_atr_normalized))
process_noise = self.p.min_measurement_noise + (self.p.max_measurement_noise - self.p.min_measurement_noise) * (1 - np.exp(-self.p.smoothing_factor * current_atr_normalized))
measurement_noise
= process_noise
q_val # Process noise covariance matrix (Q)
self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
**2)/2, q_val**2]])
[(q_val# Measurement noise covariance (R)
self.R = np.array([[measurement_noise**2]])
# Kalman Filter steps
= self.dataclose[0] # Current measurement (close price)
z = self.F @ self.x # Predict state
x_pred = (self.F @ self.P @ self.F.T) + self.Q # Predict error covariance
P_pred = z - (self.H @ x_pred) # Innovation (measurement residual)
y = (self.H @ P_pred @ self.H.T) + self.R # Innovation covariance
S
try:
= np.linalg.inv(S)
S_inv except np.linalg.LinAlgError: # Handle singular matrix case
= 1.0 / S[0,0] if np.abs(S[0,0]) > 1e-8 else 1.0
S_inv = P_pred @ self.H.T @ S_inv # Kalman Gain
K self.x = x_pred + (K @ y) # Update state estimate
self.P = (self.I - (K @ self.H)) @ P_pred # Update error covariance
self.lines.kf_price[0] = self.x[0] # Estimated price
self.lines.kf_velocity[0] = self.x[1] # Estimated velocity (momentum)
atr_period
: Period for the Average
True Range (ATR) calculation, used to gauge current volatility.atr_average_period
: Period for the
Simple Moving Average (SMA) of the ATR, providing a baseline for typical
volatility.min_process_noise
,
max_process_noise
: The minimum and maximum
bounds for the process noise. This parameter reflects
uncertainty in the model’s prediction of the system’s next state (e.g.,
price and velocity). Higher process noise means the model trusts its
predictions less and relies more on new measurements.min_measurement_noise
,
max_measurement_noise
: The minimum and
maximum bounds for the measurement noise. This
parameter reflects uncertainty in the observed data (e.g., the current
closing price). Higher measurement noise means the filter trusts the
current measurement less and relies more on its internal state
estimate.smoothing_factor
: Controls how
strongly current ATR influences the adaptive noise parameters. A higher
factor makes the adaptation more responsive to recent volatility
changes.__init__
and _lazyinit
)The __init__
method sets up the dataclose
and the ATR indicators (self.atr
,
self.avg_atr
). It also initializes the fixed matrices used
in the Kalman Filter:
F
(State Transition Matrix): Describes how the state
evolves from one time step to the next. Here, it represents a constant
velocity model where
price_next = price_current + velocity_current
and
velocity_next = velocity_current
.H
(Observation Matrix): Relates the state to the
measurement. Here, only the price component of the state is directly
observed.I
(Identity Matrix): Used in the update step.The _lazyinit
method is a workaround to initialize
self.x
(state vector) and self.P
(error
covariance matrix) only when the first data point is available.
self.x
initially contains the first price and zero
velocity. self.P
reflects the initial uncertainty in these
estimates.
next
)Before performing the Kalman Filter calculations in the
next
method, the noise parameters are adapted:
current_atr_normalized
: The current
ATR is normalized by its average over atr_average_period
.
This gives a relative measure of how volatile the market is compared to
its recent history.process_noise
and
measurement_noise
: These values are
dynamically scaled between their min_
and max_
bounds. The formula
(1 - np.exp(-smoothing_factor * current_atr_normalized))
ensures a non-linear adaptation: as volatility (normalized ATR)
increases, both process and measurement noise increase. This means in
high volatility, the filter becomes more responsive to new data but also
potentially less stable, reflecting the increased uncertainty.
Conversely, in low volatility, the noise values decrease, leading to a
smoother estimate.Q
(Process Noise Covariance) and
R
(Measurement Noise Covariance): These
matrices are then constructed using the calculated
process_noise
and measurement_noise
values.
Q
quantifies the uncertainty of the system model, and
R
quantifies the uncertainty of the measurements.next
)The next
method then proceeds with the standard Kalman
Filter recursive steps:
x_pred
: The next state is predicted based on the
current state (self.x
) and the state transition model
(F
).P_pred
: The error covariance matrix is predicted,
incorporating the uncertainty from the state transition (P
)
and the process noise (Q
).y
: The “innovation” or measurement residual is
calculated as the difference between the actual measurement
(z
, current closing price) and the predicted measurement
(H @ x_pred
).S
: The innovation covariance is calculated,
representing the uncertainty in the innovation.K
(Kalman Gain): This is the crucial factor that
weights the innovation, determining how much the predicted state should
be corrected by the new measurement. It balances the certainty of the
prediction (P_pred
) versus the certainty of the measurement
(R
).self.x
: The state estimate is updated by adding a
weighted portion of the innovation (Kalman Gain times innovation) to the
predicted state.self.P
: The error covariance matrix is updated to
reflect the reduced uncertainty after incorporating the new
measurement.Finally, the estimated price (self.x[0]
) and velocity
(self.x[1]
) are outputted as indicator lines.
The AdaptiveKalmanFilterStrategy
utilizes the estimated
velocity from the AdaptiveKalmanFilterIndicator
to generate
trading signals and manage positions with a trailing stop.
import backtrader as bt
import numpy as np
# ... (AdaptiveKalmanFilterIndicator class remains the same as above) ...
class AdaptiveKalmanFilterStrategy(bt.Strategy):
= (
params 'atr_period', 7),
('atr_average_period', 30),
('min_process_noise', 1e-4),
('max_process_noise', 1e-2),
('min_measurement_noise', 1e-2),
('max_measurement_noise', 1e-0),
('smoothing_factor', 0.1),
('trail_percent', 0.02),
(
)
def __init__(self):
self.kf = AdaptiveKalmanFilterIndicator(
=self.p.atr_period,
atr_period=self.p.atr_average_period,
atr_average_period=self.p.min_process_noise,
min_process_noise=self.p.max_process_noise,
max_process_noise=self.p.min_measurement_noise,
min_measurement_noise=self.p.max_measurement_noise,
max_measurement_noise=self.p.smoothing_factor
smoothing_factor
)self.kf_price = self.kf.lines.kf_price
self.kf_velocity = self.kf.lines.kf_velocity
self.order = None
self.stop_order = None # For the trailing stop
def next(self):
if self.order: # If an order is pending, do nothing
return
if len(self.kf_velocity) == 0 or len(self.kf.avg_atr) == 0: # Ensure indicators have warmed up
return
= self.kf_velocity[0]
estimated_velocity = self.position.size
current_position_size
if current_position_size == 0: # No open position, look for entry
if self.stop_order: # If there was a previous trailing stop, cancel it
self.cancel(self.stop_order)
self.stop_order = None
if estimated_velocity > 0: # Positive velocity suggests upward momentum
self.order = self.buy()
elif estimated_velocity < 0: # Negative velocity suggests downward momentum
self.order = self.sell()
def notify_order(self, order):
if order.status in [bt.Order.Submitted, bt.Order.Accepted]:
return # Ignore if order is still processing
if order.status == bt.Order.Completed:
if self.order and order.ref == self.order.ref: # If the entry order completed
= self.sell if order.isbuy() else self.buy
exit_func if self.p.trail_percent and self.p.trail_percent > 0.0:
# Place a trailing stop order
self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
self.order = None # Clear the entry order reference
elif self.stop_order and order.ref == self.stop_order.ref: # If the trailing stop order completed
self.stop_order = None # Clear the stop order reference
self.order = None # Also clear the general order reference, as position is closed
elif order.status in [bt.Order.Canceled, bt.Order.Margin, bt.Order.Rejected]:
# Handle failed orders
if self.order and order.ref == self.order.ref:
self.order = None
elif self.stop_order and order.ref == self.stop_order.ref:
self.stop_order = None
def stop(self):
# Ensure any pending stop order is cancelled when the strategy stops
if self.stop_order:
self.cancel(self.stop_order)
The strategy inherits and uses many parameters from the
AdaptiveKalmanFilterIndicator
, plus one for its exit
logic:
atr_period
,
atr_average_period
,
min_process_noise
,
max_process_noise
,
min_measurement_noise
,
max_measurement_noise
,
smoothing_factor
: These are passed
directly to the AdaptiveKalmanFilterIndicator
instance.trail_percent
: The percentage at which
the dynamic trailing stop-loss trails the market price.__init__
)The __init__
method sets up the
AdaptiveKalmanFilterIndicator
and stores references to its
output lines (kf_price
, kf_velocity
). It also
initializes self.order
and self.stop_order
to
None
to manage orders.
next
)The next
method implements the strategy’s trading
rules:
self.order
) or if the Kalman Filter
indicator has not yet warmed up sufficiently. If either is true, it
returns.current_position_size == 0
):
estimated_velocity
(from
self.kf_velocity[0]
) is positive, it
indicates upward momentum, and a buy
order is placed.estimated_velocity
is
negative, it indicates downward momentum, and a
sell
order is placed.notify_order
)This method is critical for handling order lifecycle and managing the trailing stop:
bt.Order.StopTrail
) for the current position using the
specified trail_percent
. The self.order
reference is then cleared.self.stop_order
and
self.order
references are cleared.stop
)The stop
method is called by backtrader
when the backtest ends. It ensures that any lingering trailing stop
order is cancelled cleanly.
To comprehensively evaluate the strategy’s performance, a rolling backtest is used. This method assesses the strategy over multiple, successive time windows, providing a more robust view of its consistency compared to a single, fixed backtest.
from collections import deque
import backtrader as bt
import numpy as np
import pandas as pd
import yfinance as yf
import dateutil.relativedelta as rd
# Assuming AdaptiveKalmanFilterIndicator and AdaptiveKalmanFilterStrategy classes are defined above this section.
def run_rolling_backtest(
ticker,
start,
end,
window_months,
strategy_class,=None
strategy_params
):= strategy_params or {}
strategy_params = []
all_results = pd.to_datetime(start)
start_dt = pd.to_datetime(end)
end_dt = start_dt
current_start
while True:
= current_start + rd.relativedelta(months=window_months)
current_end if current_end > end_dt:
= end_dt
current_end if current_start >= current_end:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
= yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
data
if data.empty or len(data) < 90:
print("Not enough data for this period. Skipping.")
+= rd.relativedelta(months=window_months)
current_start continue
if isinstance(data.columns, pd.MultiIndex):
= data.droplevel(1, 1)
data
= data['Close'].iloc[0]
start_price = data['Close'].iloc[-1]
end_price = (end_price - start_price) / start_price * 100
benchmark_ret
= bt.feeds.PandasData(dataname=data)
feed = bt.Cerebro()
cerebro
**strategy_params)
cerebro.addstrategy(strategy_class,
cerebro.adddata(feed)100000)
cerebro.broker.setcash(=0.001)
cerebro.broker.setcommission(commission=95)
cerebro.addsizer(bt.sizers.PercentSizer, percents
= cerebro.broker.getvalue()
start_val try:
cerebro.run()except Exception as e:
print(f"Error running backtest for {current_start.date()} to {current_end.date()}: {e}")
+= rd.relativedelta(months=window_months)
current_start continue
= cerebro.broker.getvalue()
final_val = (final_val - start_val) / start_val * 100
strategy_ret
all_results.append({'start': current_start.date(),
'end': current_end.date(),
'strategy_return_pct': strategy_ret,
'benchmark_return_pct': benchmark_ret,
'final_value': final_val,
})
print(f"Strategy Return: {strategy_ret:.2f}% | Buy & Hold Return: {benchmark_ret:.2f}%")
+= rd.relativedelta(months=window_months)
current_start
if current_start > end_dt:
break
return pd.DataFrame(all_results)
ticker
,
start
, end
:
The asset symbol and the overall historical period for the
backtest.window_months
: The duration of each
individual backtesting window in months.strategy_class
: The
backtrader.Strategy
class to be tested (e.g.,
AdaptiveKalmanFilterStrategy
).strategy_params
: A dictionary to pass
specific parameters to the chosen strategy for each run.yfinance
with
auto_adjust=False
and droplevel
applied.backtrader.Cerebro
instance.strategy_class
and its
parameters.Pandas DataFrame
containing the
comprehensive results for each rolling window.
The AdaptiveKalmanFilterStrategy
offers a sophisticated
and adaptable approach to quantitative trading by leveraging the power
of the Kalman Filter and dynamically adjusting its
noise parameters based on market volatility. This allows for a more
robust estimation of underlying price and momentum, which forms the
basis for directional trading signals. The strategy’s straightforward
entry logic based on estimated velocity, combined with a crucial
trailing stop-loss for risk management, aims to
capitalize on trends while protecting capital. The use of a
rolling backtest is essential for thoroughly evaluating
the consistency and robustness of such an adaptive strategy across
various market conditions, providing a more reliable assessment of its
performance.