← Back to Home
Kalman Filter Trend Following - An Adaptive Approach with Trailing Stops - Optimization & Rolling Backtest

Kalman Filter Trend Following - An Adaptive Approach with Trailing Stops - Optimization & Rolling Backtest

Traditional technical indicators often suffer from lag, providing signals after a significant price move has already occurred. The Kalman Filter, a sophisticated algorithm originating from engineering, offers an adaptive solution by estimating the true state of a system (e.g., price and its velocity) amidst noise and uncertainty. This article presents a KalmanFilterTrendVelocityTrailingStopStrategy that leverages the Kalman Filter’s real-time estimations for trend direction and incorporates robust risk management through adaptive trailing stops.

Figure_1.png

1. The Kalman Filter Indicator (KalmanFilterIndicator)

The core of this strategy is a custom backtrader indicator that implements a simplified Kalman Filter. It models the price as a system with two states: its current value and its velocity (rate of change).

import backtrader as bt
import numpy as np

class KalmanFilterIndicator(bt.Indicator):
    """
    Calculates the Kalman Filter estimated price and velocity.
    """
    lines = ('kf_price', 'kf_velocity',) # Output lines: estimated price and velocity
    params = (
        ('process_noise', 1e-5),       # Q: Uncertainty in system dynamics (how much the true state changes)
        ('measurement_noise', 1e-1),   # R: Uncertainty in measurements (how noisy the observed price is)
    )
    plotinfo = dict(
        subplot=False, # Plot on the main price chart
        plotlinelabels=True
    )

    def __init__(self):
        self.dataclose = self.data.close
        # State transition matrix (F): How the state evolves from t-1 to t
        self.F = np.array([[1.0, 1.0], [0.0, 1.0]]) # Price_t = Price_t-1 + Velocity_t-1, Velocity_t = Velocity_t-1
        # Measurement matrix (H): How observations relate to the state
        self.H = np.array([[1.0, 0.0]]) # We observe price, which is the first state variable
        
        # Process Noise Covariance (Q): Uncertainty introduced by the system model
        q_val = self.p.process_noise
        self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
                           [(q_val**2)/2,  q_val**2]])
        # Measurement Noise Covariance (R): Uncertainty in our observations
        self.R = np.array([[self.p.measurement_noise**2]])
        
        self.I = np.eye(2) # Identity matrix
        self.x = None      # State vector (price, velocity)
        self.P = None      # Error Covariance matrix
        self.initialized = False # Flag for initial setup

    def _lazyinit(self):
        # Initializes state and covariance on the first available data point
        try:
            initial_price = self.dataclose[0]
            self.x = np.array([initial_price, 0.0]) # Initial price, zero initial velocity
            self.P = np.array([[1.0, 0.0], [0.0, 100.0]]) # Initial error covariance
            self.initialized = True
        except IndexError:
            pass # Not enough data yet

    def next(self):
        # Initialize the filter when data is first available
        if not self.initialized:
            self._lazyinit()
            if not self.initialized: return # If still not initialized, wait for more data

        z = self.dataclose[0] # Current observed price (measurement)

        # 1. Prediction Step: Estimate next state and covariance
        x_pred = self.F @ self.x # Predicted state
        P_pred = (self.F @ self.P @ self.F.T) + self.Q # Predicted error covariance

        # 2. Update Step: Correct estimates with current measurement
        y = z - (self.H @ x_pred) # Measurement residual
        S = (self.H @ P_pred @ self.H.T) + self.R # Residual covariance
        
        # Calculate Kalman Gain (K)
        try:
            S_inv = np.linalg.inv(S) # Inverse of S
        except np.linalg.LinAlgError:
            # Handle singular matrix case for S (can happen with very low noise or limited data)
            S_inv = 1.0 / S[0,0] if np.abs(S[0,0]) > 1e-8 else 1.0
            
        K = P_pred @ self.H.T @ S_inv # Kalman Gain

        self.x = x_pred + (K @ y) # Updated state estimate
        self.P = (self.I - (K @ self.H)) @ P_pred # Updated error covariance

        # Output the estimated price and velocity
        self.lines.kf_price[0] = self.x[0]
        self.lines.kf_velocity[0] = self.x[1]

The filter continuously performs two steps:

2. Kalman Filter Trend Velocity Trailing Stop Strategy (KalmanFilterTrendWithTrail)

This strategy, named KalmanFilterTrendVelocityTrailingStopStrategy to describe its functionality, uses the estimated kf_velocity to determine trend direction.

class KalmanFilterTrendWithTrail(bt.Strategy):
    """
    Uses KalmanFilterIndicator for trend signals (based on velocity) and includes a Trailing Stop for exits.
    """
    params = (
        ('process_noise', 1e-3),
        ('measurement_noise', 1e-1),
        ('trail_percent', 0.02), # Trailing stop percentage (e.g., 2%)
    )

    def __init__(self):
        self.kf = KalmanFilterIndicator( # Initialize the custom Kalman Filter
            process_noise=self.p.process_noise,
            measurement_noise=self.p.measurement_noise
        )
        self.kf_price = self.kf.lines.kf_price
        self.kf_velocity = self.kf.lines.kf_velocity
        self.order = None      # To track pending entry/exit orders
        self.stop_order = None # To track the active trailing stop order

    def next(self):
        if self.order: # If an order is pending, do nothing
            return

        # Ensure Kalman Filter has calculated values
        if len(self.kf_velocity) == 0 or np.isnan(self.kf_velocity[0]):
             return

        estimated_velocity = self.kf_velocity[0]
        current_position_size = self.position.size

        # If currently flat, evaluate entry signals
        if current_position_size == 0:
            # Cancel any lingering stop orders from previous trades if they somehow persist
            if self.stop_order:
                self.cancel(self.stop_order)
                self.stop_order = None

            if estimated_velocity > 0: # If velocity is positive, enter long
                self.order = self.buy()
            elif estimated_velocity < 0: # If velocity is negative, enter short
                self.order = self.sell()
        # The strategy maintains the position as long as the trailing stop is not hit.
        # No explicit exit logic based on velocity reversal is in 'next', as
        # exits are primarily managed by the trailing stop.

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            return # Ignore pending orders

        if order.status == order.Completed:
            # If our main entry order (buy/sell) completed successfully
            if self.order and order.ref == self.order.ref:
                # Place a trailing stop order after entry
                exit_func = self.sell if order.isbuy() else self.buy
                if self.p.trail_percent and self.p.trail_percent > 0.0:
                    self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
                self.order = None # Clear the main order reference
            # If a stop order (trailing stop) completed, means we've exited the position
            elif self.stop_order and order.ref == self.stop_order.ref:
                self.stop_order = None # Clear the stop order reference
                self.order = None # Ensure main order is also cleared (though it should be already)
        # Handle canceled, margin, or rejected orders
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            if self.order and order.ref == self.order.ref:
                self.order = None
            elif self.stop_order and order.ref == self.stop_order.ref:
                self.stop_order = None

    def stop(self):
        # Ensure any pending stop orders are cancelled at the end of backtest
        if self.stop_order:
            self.cancel(self.stop_order)

Execution Logic:

  1. Trend Detection: The strategy continuously monitors the kf_velocity output from the Kalman Filter.
    • If kf_velocity is positive, it signals an upward trend, prompting a long entry.
    • If kf_velocity is negative, it signals a downward trend, prompting a short entry.
  2. Position Management:
    • The strategy aims to be continuously in the market, switching direction based on kf_velocity.
    • If not currently in a position, it enters based on the kf_velocity direction.
    • Crucially, all position exits (including profit-taking and loss-cutting) are managed by a bt.Order.StopTrail order. As per your preference, notify_order ensures that immediately upon a successful entry, a trailing stop is placed with a trail_percent (e.g., 2%). This adaptive stop allows for riding trends while protecting accumulated profits.
  3. Order Handling: The notify_order method diligently tracks the lifecycle of orders, ensuring that pending orders are managed correctly and that the trailing stop is established promptly upon trade execution.

1. Parameter Optimization: Finding the Best Fit

Parameter optimization systematically tests various combinations of a strategy’s input parameters to find those that yield the best historical performance according to a chosen metric (e.g., Sharpe Ratio, total return). This process helps in identifying the most effective settings for a given strategy on a specific dataset.

import backtrader as bt
import numpy as np
import pandas as pd
import yfinance as yf # Assuming yfinance is used for data fetching

def optimize_parameters(strategy_class, opt_params, ticker, start_date, end_date):
    """Run optimization to find best parameters with diagnostics"""
    print("="*60)
    print(f"OPTIMIZING: {strategy_class.__name__} on {ticker}")
    print("="*60)

    # Fetch data for optimization
    print(f"Fetching data from {start_date} to {end_date}...")
    # User-specified: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
    df = yf.download(ticker, start=start_date, end=end_date, auto_adjust=False, progress=False)
    if isinstance(df.columns, pd.MultiIndex):
        df = df.droplevel(1, axis=1)

    if df.empty:
        print("No data fetched for optimization. Exiting.")
        return None

    print(f"Data shape: {df.shape}")
    print(f"Date range: {df.index[0].date()} to {df.index[-1].date()}")

    # Set up optimization
    cerebro = bt.Cerebro()
    data = bt.feeds.PandasData(dataname=df)
    cerebro.adddata(data)
    
    start_cash = 10000.0
    cerebro.broker.setcash(start_cash)
    cerebro.broker.setcommission(commission=0.001)
    
    # Add analyzers for performance metrics
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', timeframe=bt.TimeFrame.Days, annualize=True, riskfreerate=0.0)
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')

    print("Testing parameter combinations...")
    cerebro.optstrategy(strategy_class, **opt_params) # Run the optimization

    stratruns = cerebro.run()
    print(f"Optimization complete! Tested {len(stratruns)} combinations.")

    # Collect and analyze results
    results = []
    for i, run in enumerate(stratruns):
        strategy = run[0]
        sharpe_analysis = strategy.analyzers.sharpe.get_analysis()
        returns_analysis = strategy.analyzers.returns.get_analysis()
        trades_analysis = strategy.analyzers.trades.get_analysis()
        
        rtot = returns_analysis.get('rtot', 0.0)
        final_value = start_cash * (1 + rtot)
        sharpe_ratio = sharpe_analysis.get('sharperatio', -999.0) # Default to a low number
        total_trades = trades_analysis.get('total', {}).get('total', 0)

        if sharpe_ratio is None or np.isnan(sharpe_ratio):
            sharpe_ratio = -999.0

        result = {
            'sharpe_ratio': sharpe_ratio,
            'final_value': final_value,
            'return_pct': rtot * 100,
            'total_trades': total_trades,
        }
        
        # Dynamically add parameter values to the results
        param_values = {p: getattr(strategy.p, p) for p in opt_params.keys()}
        result.update(param_values)
        
        results.append(result)

    # Filter for valid results (at least one trade) and sort
    valid_results = [r for r in results if r['total_trades'] > 0]
    
    if not valid_results:
        print("\nNo combinations resulted in any trades. Cannot determine best parameters.")
        return None
        
    results_sorted = sorted(valid_results, key=lambda x: x['sharpe_ratio'], reverse=True)
    
    print(f"\n{'='*120}")
    print("TOP 5 PARAMETER COMBINATIONS BY SHARPE RATIO")
    print(f"{'='*120}")
    
    top_5_df = pd.DataFrame(results_sorted[:5])
    print(top_5_df.to_string())
    
    best_params = results_sorted[0]
    print(f"\nBest Parameters Found: {best_params}")
    
    return best_params

Key Features of optimize_parameters:

2. Generalized Rolling Backtesting: Assessing Out-of-Sample Performance

Once optimal parameters are identified from an in-sample optimization period, a rolling backtest (also known as walk-forward optimization) assesses the strategy’s stability and performance on unseen data. This method simulates how a strategy would perform in live trading by iteratively optimizing on one period and testing on a subsequent, out-of-sample period.

import dateutil.relativedelta as rd # Needed for date calculations in rolling backtest

def run_rolling_backtest(strategy_class, strategy_params, ticker, start, end, window_months):
    """Generalized rolling backtest function"""
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt
    
    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            break
            
        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
        
        # Fetch data for the current window
        # User-specified: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
        
        if data.empty or len(data) < 30: # Need at least some data for indicators to warm up
            print("Not enough data for this period. Skipping window.")
            current_start += rd.relativedelta(months=window_months)
            continue
            
        if isinstance(data.columns, pd.MultiIndex):
            data = data.droplevel(1, 1)
        
        # Calculate Buy & Hold return for the period as a benchmark
        start_price = data['Close'].iloc[0]
        end_price = data['Close'].iloc[-1]
        benchmark_ret = (end_price - start_price) / start_price * 100
        
        # Setup and run Cerebro for the current window
        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        
        cerebro.addstrategy(strategy_class, **strategy_params) # Use the optimized parameters
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000) # Initial cash for the window
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95) # Allocate 95% of capital per trade
        cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
        
        start_val = cerebro.broker.getvalue()
        results_run = cerebro.run()
        final_val = cerebro.broker.getvalue()
        strategy_ret = (final_val - start_val) / start_val * 100
        
        # Get trade statistics
        trades_analysis = results_run[0].analyzers.trades.get_analysis()
        total_trades = trades_analysis.get('total', {}).get('total', 0)
        
        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': strategy_ret,
            'benchmark_pct': benchmark_ret,
            'trades': total_trades,
        })
        
        print(f"Strategy Return: {strategy_ret:.2f}% | Buy & Hold Return: {benchmark_ret:.2f}% | Trades: {total_trades}")
        current_start = current_end # Move to the next window
        
    return pd.DataFrame(all_results)

Key Features of run_rolling_backtest:

Conclusion

The KalmanFilterTrendVelocityTrailingStopStrategy demonstrates an innovative application of the Kalman Filter in quantitative trading. By leveraging the filter’s adaptive noise reduction to estimate price velocity as a trend signal, and combining it with dynamic trailing stops, the strategy offers a robust framework for capturing trends and managing risk in a constantly changing market environment.