This article describes an advanced trading strategy implemented in Backtrader that uses an adaptive Kalman Filter to track price trends and velocity, incorporating volatility-adjusted position sizing, stop-loss, take-profit, and trend confirmation with an Exponential Moving Average (EMA). The strategy trades based on significant velocity signals, aligning with market trends, and includes robust risk management and performance tracking.
The Enhanced Adaptive Kalman Filter Trading Strategy includes the following components:
Below is the complete Backtrader code for the strategy, including a custom LogReturns indicator and the enhanced Kalman Filter strategy:
import backtrader as bt
import yfinance as yf
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
# Custom LogReturns Indicator
class LogReturns(bt.Indicator):
lines = ('logret',)
params = (('period', 1),)
def __init__(self):
self.addminperiod(self.p.period + 1)
def next(self):
price_now = self.data[0]
price_prev = self.data[-self.p.period]
if price_prev and not np.isnan(price_prev):
self.lines.logret[0] = np.log(price_now / price_prev)
else:
self.lines.logret[0] = np.nan
# Adaptive Kalman Filter Strategy
class AdaptiveKalmanFilterStrategy(bt.Strategy):
lines = ('kf_price', 'kf_velocity', 'adaptive_R', 'adaptive_Q0', 'adaptive_Q1')
params = dict(
vol_period=20,
delta=1e-4,
R_base=0.1,
R_scale=1.0,
Q_scale_factor=0.5,
initial_cov=1.0,
stop_loss_pct=0.05, # 5% stop loss
take_profit_pct=0.2, # 20% take profit
min_volatility_threshold=0.01, # Minimum vol for position sizing
max_position_pct=0.9, # Max 90% of cash
min_position_pct=0.5, # Min 50% of cash
ema_period=30, # EMA for trend confirmation
velocity_threshold=0.01, # Minimum velocity for entry
printlog=False,
)
def __init__(self):
self.data_close = self.datas[0].close
# Kalman state & matrices
self.x = np.zeros(2)
self.P = np.eye(2) * self.params.initial_cov
self.F = np.array([[1., 1.], [0., 1.]])
self.H = np.array([[1., 0.]])
self.I = np.eye(2)
self.initialized = False
self.Q = np.eye(2) * self.params.delta
self.R = self.params.R_base
# Indicators
self.log_returns = LogReturns(self.data_close, period=1)
self.volatility = bt.indicators.StandardDeviation(
self.log_returns.logret, period=self.params.vol_period
)
self.ema = bt.indicators.EMA(period=self.params.ema_period)
# Position tracking
self.entry_price = None
self.stop_price = None
self.target_price = None
self.order = None
def log(self, txt, dt=None, doprint=False):
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()} {txt}')
def _initialize_kalman(self, price):
self.x[:] = [price, 0.0]
self.P = np.eye(2) * self.params.initial_cov
self.initialized = True
def _calculate_position_size(self, volatility):
"""Volatility-adjusted position sizing"""
if volatility < self.params.min_volatility_threshold:
vol_factor = 1.0
else:
vol_factor = self.params.min_volatility_threshold / volatility
vol_factor = min(1.0, max(0.1, vol_factor))
position_pct = self.params.min_position_pct + (
self.params.max_position_pct - self.params.min_position_pct
) * vol_factor
return position_pct
def _set_stops_and_targets(self, entry_price, is_long):
"""Set stop loss and take profit levels"""
if is_long:
self.stop_price = entry_price * (1 - self.params.stop_loss_pct)
self.target_price = entry_price * (1 + self.params.take_profit_pct)
else:
self.stop_price = entry_price * (1 + self.params.stop_loss_pct)
self.target_price = entry_price * (1 - self.params.take_profit_pct)
def notify_order(self, order):
if order.status in [order.Completed]:
if order.isbuy():
self.entry_price = order.executed.price
self._set_stops_and_targets(self.entry_price, True)
self.log(f'LONG ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
else:
self.entry_price = order.executed.price
self._set_stops_and_targets(self.entry_price, False)
self.log(f'SHORT ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
self.order = None
def next(self):
if self.order:
return
price = self.data_close[0]
# Wait for initialization
if not self.initialized:
if len(self) > self.params.vol_period and not np.isnan(self.volatility[0]):
self._initialize_kalman(price)
return
vol = self.volatility[0]
if np.isnan(vol) or np.isnan(price):
for ln in self.lines:
getattr(self.lines, ln)[0] = np.nan
return
# Kalman Filter Update
# Predict
self.x = self.F.dot(self.x)
self.P = self.F.dot(self.P).dot(self.F.T) + self.Q
# Adapt Q & R
vol = max(vol, 1e-8)
self.R = self.params.R_base * (1 + self.params.R_scale * vol)
qvar = self.params.delta * (1 + self.params.Q_scale_factor * vol**2)
self.Q = np.diag([qvar, qvar])
# Update
y = price - (self.H.dot(self.x))[0]
S = (self.H.dot(self.P).dot(self.H.T))[0, 0] + self.R
K = self.P.dot(self.H.T) / S
self.x = self.x + (K.flatten() * y)
self.P = (self.I - K.dot(self.H)).dot(self.P)
# Record lines
self.lines.kf_price[0] = self.x[0]
self.lines.kf_velocity[0] = self.x[1]
self.lines.adaptive_R[0] = self.R
self.lines.adaptive_Q0[0] = self.Q[0, 0]
self.lines.adaptive_Q1[0] = self.Q[1, 1]
# Risk management for existing positions
if self.position:
current_price = self.data_close[0]
if self.position.size > 0: # Long position
if current_price <= self.stop_price:
self.order = self.close()
self.log(f'STOP LOSS HIT: ${current_price:.2f}')
return
elif current_price >= self.target_price:
self.order = self.close()
self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
return
else: # Short position
if current_price >= self.stop_price:
self.order = self.close()
self.log(f'STOP LOSS HIT: ${current_price:.2f}')
return
elif current_price <= self.target_price:
self.order = self.close()
self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
return
# Trading logic
vel = self.x[1]
kf_price = self.x[0]
current_price = self.data_close[0]
ema_price = self.ema[0]
velocity_significant = abs(vel) > self.params.velocity_threshold
price_above_ema = current_price > ema_price
price_below_ema = current_price < ema_price
kf_above_ema = kf_price > ema_price
kf_below_ema = kf_price < ema_price
position_pct = self._calculate_position_size(vol)
cash = self.broker.getcash()
position_size = int(cash * position_pct / current_price)
if not self.position and velocity_significant and position_size > 0:
if vel > 0 and price_above_ema and kf_above_ema:
self.log(f'BUY SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
self.order = self.buy(size=position_size)
elif vel < 0 and price_below_ema and kf_below_ema:
self.log(f'SELL SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
self.order = self.sell(size=position_size)
elif self.position:
if self.position.size > 0 and vel < -self.params.velocity_threshold and price_below_ema:
self.log(f'REVERSE TO SHORT: vel={vel:.4f}')
self.order = self.close()
elif self.position.size < 0 and vel > self.params.velocity_threshold and price_above_ema:
self.log(f'REVERSE TO LONG: vel={vel:.4f}')
self.order = self.close()
def stop(self):
final_value = self.broker.getvalue()
self.log(f'Ending Portfolio Value: ${final_value:.2f}', doprint=True)
self.entry_price = None
self.stop_price = None
self.target_price = NoneThe strategy uses a Kalman Filter with enhanced features for trend-following and risk management:
Indicators:
LogReturns indicator.Kalman Filter Setup:
F): Assumes price evolves with
constant velocity.H): Observes only the price
level.P): Set to a diagonal matrix with
initial_cov (1.0).Q): Starts with a small constant
(delta = 1e-4) and adjusts based on volatility
squared.R): Starts at R_base
(0.1) and scales with volatility.Kalman Filter Logic (next):
R based on volatility:
R = R_base * (1 + R_scale * volatility).Q based on volatility squared:
Q = delta * (1 + Q_scale_factor * volatility^2).Position Sizing
(_calculate_position_size):
Risk Management
(_set_stops_and_targets and next):
Trading Logic (next):
Logging:
ema_period, velocity_threshold,
stop_loss_pct, or take_profit_pct for specific
assets.This strategy is designed for trending markets with varying volatility, such as cryptocurrencies or equities, and can be backtested to assess its effectiveness across different timeframes and assets.