This article describes an advanced trading strategy implemented in Backtrader that uses an adaptive Kalman Filter to track price trends and velocity, incorporating volatility-adjusted position sizing, stop-loss, take-profit, and trend confirmation with an Exponential Moving Average (EMA). The strategy trades based on significant velocity signals, aligning with market trends, and includes robust risk management and performance tracking.
The Enhanced Adaptive Kalman Filter Trading Strategy includes the following components:
Below is the complete Backtrader code for the strategy, including a custom LogReturns indicator and the enhanced Kalman Filter strategy:
import backtrader as bt
import yfinance as yf
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
# Custom LogReturns Indicator
class LogReturns(bt.Indicator):
= ('logret',)
lines = (('period', 1),)
params
def __init__(self):
self.addminperiod(self.p.period + 1)
def next(self):
= self.data[0]
price_now = self.data[-self.p.period]
price_prev if price_prev and not np.isnan(price_prev):
self.lines.logret[0] = np.log(price_now / price_prev)
else:
self.lines.logret[0] = np.nan
# Adaptive Kalman Filter Strategy
class AdaptiveKalmanFilterStrategy(bt.Strategy):
= ('kf_price', 'kf_velocity', 'adaptive_R', 'adaptive_Q0', 'adaptive_Q1')
lines
= dict(
params =20,
vol_period=1e-4,
delta=0.1,
R_base=1.0,
R_scale=0.5,
Q_scale_factor=1.0,
initial_cov=0.05, # 5% stop loss
stop_loss_pct=0.2, # 20% take profit
take_profit_pct=0.01, # Minimum vol for position sizing
min_volatility_threshold=0.9, # Max 90% of cash
max_position_pct=0.5, # Min 50% of cash
min_position_pct=30, # EMA for trend confirmation
ema_period=0.01, # Minimum velocity for entry
velocity_threshold=False,
printlog
)
def __init__(self):
self.data_close = self.datas[0].close
# Kalman state & matrices
self.x = np.zeros(2)
self.P = np.eye(2) * self.params.initial_cov
self.F = np.array([[1., 1.], [0., 1.]])
self.H = np.array([[1., 0.]])
self.I = np.eye(2)
self.initialized = False
self.Q = np.eye(2) * self.params.delta
self.R = self.params.R_base
# Indicators
self.log_returns = LogReturns(self.data_close, period=1)
self.volatility = bt.indicators.StandardDeviation(
self.log_returns.logret, period=self.params.vol_period
)self.ema = bt.indicators.EMA(period=self.params.ema_period)
# Position tracking
self.entry_price = None
self.stop_price = None
self.target_price = None
self.order = None
def log(self, txt, dt=None, doprint=False):
if self.params.printlog or doprint:
= dt or self.datas[0].datetime.date(0)
dt print(f'{dt.isoformat()} {txt}')
def _initialize_kalman(self, price):
self.x[:] = [price, 0.0]
self.P = np.eye(2) * self.params.initial_cov
self.initialized = True
def _calculate_position_size(self, volatility):
"""Volatility-adjusted position sizing"""
if volatility < self.params.min_volatility_threshold:
= 1.0
vol_factor else:
= self.params.min_volatility_threshold / volatility
vol_factor = min(1.0, max(0.1, vol_factor))
vol_factor
= self.params.min_position_pct + (
position_pct self.params.max_position_pct - self.params.min_position_pct
* vol_factor
)
return position_pct
def _set_stops_and_targets(self, entry_price, is_long):
"""Set stop loss and take profit levels"""
if is_long:
self.stop_price = entry_price * (1 - self.params.stop_loss_pct)
self.target_price = entry_price * (1 + self.params.take_profit_pct)
else:
self.stop_price = entry_price * (1 + self.params.stop_loss_pct)
self.target_price = entry_price * (1 - self.params.take_profit_pct)
def notify_order(self, order):
if order.status in [order.Completed]:
if order.isbuy():
self.entry_price = order.executed.price
self._set_stops_and_targets(self.entry_price, True)
self.log(f'LONG ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
else:
self.entry_price = order.executed.price
self._set_stops_and_targets(self.entry_price, False)
self.log(f'SHORT ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
self.order = None
def next(self):
if self.order:
return
= self.data_close[0]
price
# Wait for initialization
if not self.initialized:
if len(self) > self.params.vol_period and not np.isnan(self.volatility[0]):
self._initialize_kalman(price)
return
= self.volatility[0]
vol if np.isnan(vol) or np.isnan(price):
for ln in self.lines:
getattr(self.lines, ln)[0] = np.nan
return
# Kalman Filter Update
# Predict
self.x = self.F.dot(self.x)
self.P = self.F.dot(self.P).dot(self.F.T) + self.Q
# Adapt Q & R
= max(vol, 1e-8)
vol self.R = self.params.R_base * (1 + self.params.R_scale * vol)
= self.params.delta * (1 + self.params.Q_scale_factor * vol**2)
qvar self.Q = np.diag([qvar, qvar])
# Update
= price - (self.H.dot(self.x))[0]
y = (self.H.dot(self.P).dot(self.H.T))[0, 0] + self.R
S = self.P.dot(self.H.T) / S
K self.x = self.x + (K.flatten() * y)
self.P = (self.I - K.dot(self.H)).dot(self.P)
# Record lines
self.lines.kf_price[0] = self.x[0]
self.lines.kf_velocity[0] = self.x[1]
self.lines.adaptive_R[0] = self.R
self.lines.adaptive_Q0[0] = self.Q[0, 0]
self.lines.adaptive_Q1[0] = self.Q[1, 1]
# Risk management for existing positions
if self.position:
= self.data_close[0]
current_price
if self.position.size > 0: # Long position
if current_price <= self.stop_price:
self.order = self.close()
self.log(f'STOP LOSS HIT: ${current_price:.2f}')
return
elif current_price >= self.target_price:
self.order = self.close()
self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
return
else: # Short position
if current_price >= self.stop_price:
self.order = self.close()
self.log(f'STOP LOSS HIT: ${current_price:.2f}')
return
elif current_price <= self.target_price:
self.order = self.close()
self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
return
# Trading logic
= self.x[1]
vel = self.x[0]
kf_price = self.data_close[0]
current_price = self.ema[0]
ema_price
= abs(vel) > self.params.velocity_threshold
velocity_significant
= current_price > ema_price
price_above_ema = current_price < ema_price
price_below_ema = kf_price > ema_price
kf_above_ema = kf_price < ema_price
kf_below_ema
= self._calculate_position_size(vol)
position_pct = self.broker.getcash()
cash = int(cash * position_pct / current_price)
position_size
if not self.position and velocity_significant and position_size > 0:
if vel > 0 and price_above_ema and kf_above_ema:
self.log(f'BUY SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
self.order = self.buy(size=position_size)
elif vel < 0 and price_below_ema and kf_below_ema:
self.log(f'SELL SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
self.order = self.sell(size=position_size)
elif self.position:
if self.position.size > 0 and vel < -self.params.velocity_threshold and price_below_ema:
self.log(f'REVERSE TO SHORT: vel={vel:.4f}')
self.order = self.close()
elif self.position.size < 0 and vel > self.params.velocity_threshold and price_above_ema:
self.log(f'REVERSE TO LONG: vel={vel:.4f}')
self.order = self.close()
def stop(self):
= self.broker.getvalue()
final_value self.log(f'Ending Portfolio Value: ${final_value:.2f}', doprint=True)
self.entry_price = None
self.stop_price = None
self.target_price = None
The strategy uses a Kalman Filter with enhanced features for trend-following and risk management:
Indicators:
LogReturns
indicator.Kalman Filter Setup:
F
): Assumes price evolves with
constant velocity.H
): Observes only the price
level.P
): Set to a diagonal matrix with
initial_cov
(1.0).Q
): Starts with a small constant
(delta
= 1e-4) and adjusts based on volatility
squared.R
): Starts at R_base
(0.1) and scales with volatility.Kalman Filter Logic (next
):
R
based on volatility:
R = R_base * (1 + R_scale * volatility)
.Q
based on volatility squared:
Q = delta * (1 + Q_scale_factor * volatility^2)
.Position Sizing
(_calculate_position_size
):
Risk Management
(_set_stops_and_targets
and next
):
Trading Logic (next
):
Logging:
ema_period
, velocity_threshold
,
stop_loss_pct
, or take_profit_pct
for specific
assets.This strategy is designed for trending markets with varying volatility, such as cryptocurrencies or equities, and can be backtested to assess its effectiveness across different timeframes and assets.