← Back to Home
Adaptive Kalman Filter Trading Strategy with Volatility-Based Sizing and Risk Management

Adaptive Kalman Filter Trading Strategy with Volatility-Based Sizing and Risk Management

This article describes an advanced trading strategy implemented in Backtrader that uses an adaptive Kalman Filter to track price trends and velocity, incorporating volatility-adjusted position sizing, stop-loss, take-profit, and trend confirmation with an Exponential Moving Average (EMA). The strategy trades based on significant velocity signals, aligning with market trends, and includes robust risk management and performance tracking.

Strategy Overview

The Enhanced Adaptive Kalman Filter Trading Strategy includes the following components:

Code Implementation

Below is the complete Backtrader code for the strategy, including a custom LogReturns indicator and the enhanced Kalman Filter strategy:

import backtrader as bt
import yfinance as yf
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt

# Custom LogReturns Indicator
class LogReturns(bt.Indicator):
    lines = ('logret',)
    params = (('period', 1),)

    def __init__(self):
        self.addminperiod(self.p.period + 1)

    def next(self):
        price_now = self.data[0]
        price_prev = self.data[-self.p.period]
        if price_prev and not np.isnan(price_prev):
            self.lines.logret[0] = np.log(price_now / price_prev)
        else:
            self.lines.logret[0] = np.nan

# Adaptive Kalman Filter Strategy
class AdaptiveKalmanFilterStrategy(bt.Strategy):
    lines = ('kf_price', 'kf_velocity', 'adaptive_R', 'adaptive_Q0', 'adaptive_Q1')
    
    params = dict(
        vol_period=20,
        delta=1e-4,
        R_base=0.1,
        R_scale=1.0,
        Q_scale_factor=0.5,
        initial_cov=1.0,
        stop_loss_pct=0.05,          # 5% stop loss
        take_profit_pct=0.2,         # 20% take profit
        min_volatility_threshold=0.01, # Minimum vol for position sizing
        max_position_pct=0.9,        # Max 90% of cash
        min_position_pct=0.5,        # Min 50% of cash
        ema_period=30,               # EMA for trend confirmation
        velocity_threshold=0.01,     # Minimum velocity for entry
        printlog=False,
    )

    def __init__(self):
        self.data_close = self.datas[0].close
        
        # Kalman state & matrices
        self.x = np.zeros(2)
        self.P = np.eye(2) * self.params.initial_cov
        self.F = np.array([[1., 1.], [0., 1.]])
        self.H = np.array([[1., 0.]])
        self.I = np.eye(2)
        self.initialized = False
        self.Q = np.eye(2) * self.params.delta
        self.R = self.params.R_base
        
        # Indicators
        self.log_returns = LogReturns(self.data_close, period=1)
        self.volatility = bt.indicators.StandardDeviation(
            self.log_returns.logret, period=self.params.vol_period
        )
        self.ema = bt.indicators.EMA(period=self.params.ema_period)
        
        # Position tracking
        self.entry_price = None
        self.stop_price = None
        self.target_price = None
        self.order = None

    def log(self, txt, dt=None, doprint=False):
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()} {txt}')

    def _initialize_kalman(self, price):
        self.x[:] = [price, 0.0]
        self.P = np.eye(2) * self.params.initial_cov
        self.initialized = True

    def _calculate_position_size(self, volatility):
        """Volatility-adjusted position sizing"""
        if volatility < self.params.min_volatility_threshold:
            vol_factor = 1.0
        else:
            vol_factor = self.params.min_volatility_threshold / volatility
            vol_factor = min(1.0, max(0.1, vol_factor))
        
        position_pct = self.params.min_position_pct + (
            self.params.max_position_pct - self.params.min_position_pct
        ) * vol_factor
        
        return position_pct

    def _set_stops_and_targets(self, entry_price, is_long):
        """Set stop loss and take profit levels"""
        if is_long:
            self.stop_price = entry_price * (1 - self.params.stop_loss_pct)
            self.target_price = entry_price * (1 + self.params.take_profit_pct)
        else:
            self.stop_price = entry_price * (1 + self.params.stop_loss_pct)
            self.target_price = entry_price * (1 - self.params.take_profit_pct)

    def notify_order(self, order):
        if order.status in [order.Completed]:
            if order.isbuy():
                self.entry_price = order.executed.price
                self._set_stops_and_targets(self.entry_price, True)
                self.log(f'LONG ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
            else:
                self.entry_price = order.executed.price
                self._set_stops_and_targets(self.entry_price, False)
                self.log(f'SHORT ENTRY: ${self.entry_price:.2f}, Stop: ${self.stop_price:.2f}, Target: ${self.target_price:.2f}')
        self.order = None

    def next(self):
        if self.order:
            return
            
        price = self.data_close[0]
        
        # Wait for initialization
        if not self.initialized:
            if len(self) > self.params.vol_period and not np.isnan(self.volatility[0]):
                self._initialize_kalman(price)
            return

        vol = self.volatility[0]
        if np.isnan(vol) or np.isnan(price):
            for ln in self.lines:
                getattr(self.lines, ln)[0] = np.nan
            return

        # Kalman Filter Update
        # Predict
        self.x = self.F.dot(self.x)
        self.P = self.F.dot(self.P).dot(self.F.T) + self.Q
        
        # Adapt Q & R
        vol = max(vol, 1e-8)
        self.R = self.params.R_base * (1 + self.params.R_scale * vol)
        qvar = self.params.delta * (1 + self.params.Q_scale_factor * vol**2)
        self.Q = np.diag([qvar, qvar])
        
        # Update
        y = price - (self.H.dot(self.x))[0]
        S = (self.H.dot(self.P).dot(self.H.T))[0, 0] + self.R
        K = self.P.dot(self.H.T) / S
        self.x = self.x + (K.flatten() * y)
        self.P = (self.I - K.dot(self.H)).dot(self.P)
        
        # Record lines
        self.lines.kf_price[0] = self.x[0]
        self.lines.kf_velocity[0] = self.x[1]
        self.lines.adaptive_R[0] = self.R
        self.lines.adaptive_Q0[0] = self.Q[0, 0]
        self.lines.adaptive_Q1[0] = self.Q[1, 1]

        # Risk management for existing positions
        if self.position:
            current_price = self.data_close[0]
            
            if self.position.size > 0:  # Long position
                if current_price <= self.stop_price:
                    self.order = self.close()
                    self.log(f'STOP LOSS HIT: ${current_price:.2f}')
                    return
                elif current_price >= self.target_price:
                    self.order = self.close()
                    self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
                    return
            else:  # Short position
                if current_price >= self.stop_price:
                    self.order = self.close()
                    self.log(f'STOP LOSS HIT: ${current_price:.2f}')
                    return
                elif current_price <= self.target_price:
                    self.order = self.close()
                    self.log(f'TAKE PROFIT HIT: ${current_price:.2f}')
                    return

        # Trading logic
        vel = self.x[1]
        kf_price = self.x[0]
        current_price = self.data_close[0]
        ema_price = self.ema[0]
        
        velocity_significant = abs(vel) > self.params.velocity_threshold
        
        price_above_ema = current_price > ema_price
        price_below_ema = current_price < ema_price
        kf_above_ema = kf_price > ema_price
        kf_below_ema = kf_price < ema_price
        
        position_pct = self._calculate_position_size(vol)
        cash = self.broker.getcash()
        position_size = int(cash * position_pct / current_price)
        
        if not self.position and velocity_significant and position_size > 0:
            if vel > 0 and price_above_ema and kf_above_ema:
                self.log(f'BUY SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
                self.order = self.buy(size=position_size)
            elif vel < 0 and price_below_ema and kf_below_ema:
                self.log(f'SELL SIGNAL: vel={vel:.4f}, vol={vol:.4f}, size_pct={position_pct:.2f}')
                self.order = self.sell(size=position_size)
                
        elif self.position:
            if self.position.size > 0 and vel < -self.params.velocity_threshold and price_below_ema:
                self.log(f'REVERSE TO SHORT: vel={vel:.4f}')
                self.order = self.close()
            elif self.position.size < 0 and vel > self.params.velocity_threshold and price_above_ema:
                self.log(f'REVERSE TO LONG: vel={vel:.4f}')
                self.order = self.close()

    def stop(self):
        final_value = self.broker.getvalue()
        self.log(f'Ending Portfolio Value: ${final_value:.2f}', doprint=True)
        
        self.entry_price = None
        self.stop_price = None
        self.target_price = None

Strategy Explanation

1. LogReturns Indicator

2. AdaptiveKalmanFilterStrategy

The strategy uses a Kalman Filter with enhanced features for trend-following and risk management:

Key Features

Pasted image 20250716034008.png Pasted image 20250716034016.png

Potential Improvements

This strategy is designed for trending markets with varying volatility, such as cryptocurrencies or equities, and can be backtested to assess its effectiveness across different timeframes and assets.