← Back to Home
Trend Following with Kalman Filter and Trailing Stops A Robust Backtesting Approach

Trend Following with Kalman Filter and Trailing Stops A Robust Backtesting Approach

This article explores a quantitative trading strategy built using the backtrader framework, enhanced with a Kalman Filter for trend estimation and robust trailing stops for risk management. We’ll delve into the strategy’s logic, the implementation details, and a comprehensive rolling backtesting methodology to assess its performance over time.

1. Introduction to the Strategy

The core idea of this strategy, KalmanFilterTrendWithTrail, is to identify the underlying trend of an asset using a Kalman Filter and then enter positions in the direction of that trend. A crucial element for risk management is the inclusion of a trailing stop loss, which automatically adjusts to protect profits as the trade moves favorably.

Key Components:

2. The Kalman Filter Indicator (Assumed Structure)

For the strategy to function, a KalmanFilterIndicator is essential. While its full implementation is not provided, its purpose within this strategy is to output two critical lines:

Here’s an assumed minimal structure for the KalmanFilterIndicator for context:

import backtrader as bt
import numpy as np

class KalmanFilterIndicator(bt.Indicator):
    lines = ('kf_price', 'kf_velocity',)
    params = (
        ('process_noise', 1e-5),    # Q: Covariance of the process noise
        ('measurement_noise', 1e-1), # R: Covariance of the measurement noise
    )

    def __init__(self):
        self.addproduct(bt.indicators.SMA(self.data.close, period=1)) # Placeholder to ensure 'data.close' access
        
        # Initial state (price, velocity) and covariance
        self.state = np.zeros(2) 
        self.P = np.identity(2) * 1e-1 # Initial error covariance matrix

        # State transition matrix (assuming constant velocity)
        self.F = np.array([[1, 1], 
                           [0, 1]]) # dt=1 for daily data

        # Measurement matrix (we only observe price, not velocity directly)
        self.H = np.array([[1, 0]])

        # Process noise covariance
        self.Q = np.identity(2) * self.p.process_noise

        # Measurement noise covariance
        self.R = np.identity(1) * self.p.measurement_noise
        
    def next(self):
        if len(self.data.close) < 2: # Need at least two data points for velocity
            self.lines.kf_price[0] = self.data.close[0]
            self.lines.kf_velocity[0] = 0.0
            return

        # Prediction Step
        x_pred = np.dot(self.F, self.state)
        P_pred = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q

        # Update Step
        y = self.data.close[0] - np.dot(self.H, x_pred)
        S = np.dot(np.dot(self.H, P_pred), self.H.T) + self.R
        K = np.dot(np.dot(P_pred, self.H.T), np.linalg.inv(S))

        self.state = x_pred + np.dot(K, y)
        self.P = P_pred - np.dot(np.dot(K, self.H), P_pred)

        self.lines.kf_price[0] = self.state[0]
        self.lines.kf_velocity[0] = self.state[1]

Explanation:

3. The KalmanFilterTrendWithTrail Strategy

This is the main backtrader strategy that integrates the Kalman Filter and applies the trading logic.

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import dateutil.relativedelta as rd
import matplotlib.pyplot as plt
import seaborn as sns

# Assuming KalmanFilterIndicator is defined as above or imported from another file
# from kalman_filter_indicator import KalmanFilterIndicator 


class KalmanFilterTrendWithTrail(bt.Strategy):
    """
    Uses KalmanFilterIndicator for signals and includes a Trailing Stop.
    """
    params = (
        ('process_noise', 1e-5),    # Passed to indicator
        ('measurement_noise', 1e-1),# Passed to indicator
        ('trail_percent', 0.05),    # Trailing stop percentage
        ('printlog', True),         # Enable/disable logging
    )

    def __init__(self):
        # Instantiate the Kalman Filter Indicator
        self.kf = KalmanFilterIndicator(
            process_noise=self.p.process_noise,
            measurement_noise=self.p.measurement_noise
        )

        # Keep references to the indicator lines for convenience
        self.kf_price = self.kf.lines.kf_price
        self.kf_velocity = self.kf.lines.kf_velocity

        self.order = None # Tracks the entry order
        self.stop_order = None # Tracks the trailing stop order

        if self.params.printlog:
            print(f"Strategy Parameters: Process Noise={self.params.process_noise}, "
                  f"Measurement Noise={self.params.measurement_noise}, "
                  f"Trail Percent={self.params.trail_percent * 100:.2f}%")

    def log(self, txt, dt=None, doprint=False):
        # Logging function
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()} {txt}')

    def notify_order(self, order):
        # Handles order execution and placement of trailing stops
        if order.status in [order.Submitted, order.Accepted]:
            return # Order submitted/accepted, nothing to do yet

        if order.status == order.Completed:
            if self.order and order.ref == self.order.ref: # Our entry order completed
                entry_type = "BUY" if order.isbuy() else "SELL"
                # Determine the correct exit function (sell for buy, buy for sell)
                exit_func = self.sell if order.isbuy() else self.buy 
                
                self.log(f'{entry_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
                
                # Place a trailing stop if trail_percent is valid
                if self.p.trail_percent and self.p.trail_percent > 0.0:
                    self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
                    self.log(f'Trailing Stop Placed for {entry_type} order ref {self.stop_order.ref} at {self.p.trail_percent * 100:.2f}% trail', doprint=True)
                else:
                    self.log(f'No Trailing Stop Placed (trail_percent={self.p.trail_percent})', doprint=True)
                self.order = None # Clear entry order reference

            elif self.stop_order and order.ref == self.stop_order.ref: # Our trailing stop order completed
                exit_type = "STOP BUY (Cover)" if order.isbuy() else "STOP SELL (Exit Long)"
                self.log(f'{exit_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
                self.stop_order = None # Clear stop order reference
                self.order = None # Clear any lingering entry order reference (should be none)
        
        # Handle failed orders
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log(f'Order Failed: Status {order.getstatusname()}, Ref: {order.ref}', doprint=True)
            if self.order and order.ref == self.order.ref: self.order = None
            elif self.stop_order and order.ref == self.stop_order.ref:
                self.log(f'WARNING: Trailing Stop Order Failed!', doprint=True)
                self.stop_order = None

    def notify_trade(self, trade):
        # Reports profit/loss on closed trades
        if not trade.isclosed: return
        self.log(f'OPERATION PROFIT, GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}', doprint=True)

    def next(self):
        # Prevents new orders if an entry order is already active
        if self.order:
            return

        # Ensure Kalman Filter indicator has enough data to produce output
        if len(self.kf_velocity) == 0:
            return 

        estimated_velocity = self.kf_velocity[0]
        current_position_size = self.position.size
        current_close = self.data.close[0] 

        # --- Trading Logic ---
        if current_position_size == 0: # If we are currently flat (no open position)
            if self.stop_order: # Safety check: If flat but a stop order exists, cancel it
                self.log("Warning: Position flat but stop order exists. Cancelling.", doprint=True)
                self.cancel(self.stop_order)
                self.stop_order = None

            if estimated_velocity > 0: # If Kalman Filter velocity indicates an uptrend
                self.log(f'BUY CREATE (KF Vel > 0), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
                self.order = self.buy() # Place a buy order
            elif estimated_velocity < 0: # If Kalman Filter velocity indicates a downtrend
                self.log(f'SELL CREATE (KF Vel < 0 - Short Entry), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
                self.order = self.sell() # Place a sell (short) order
        else: # If we are already in a position (long or short)
            pass # We rely on the trailing stop to close the position

    def stop(self):
        # Called when the backtest ends, cancels any pending stop orders
        if self.stop_order:
            self.log(f"Strategy stopped. Cancelling pending stop order ref: {self.stop_order.ref}", doprint=True)
            self.cancel(self.stop_order)
Pasted image 20250620152703.png

Explanation:

4. Rolling Backtesting Framework

To get a more realistic and robust assessment of the strategy’s performance, a rolling backtesting approach is used. Instead of testing over one continuous period, the backtest is run over consecutive, non-overlapping time windows. This helps in understanding the strategy’s consistency and adaptability across different market regimes.

# ... (imports and strategy definition as above) ...

def run_rolling_backtest(
    ticker="ETH-USD",
    start="2018-01-01",
    end="2025-12-31",
    window_months=3,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        # Data download using yfinance, respecting the user's preference for auto_adjust=False and droplevel
        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
        if data.empty or len(data) < 90: # Ensure enough data for the window
            print("Not enough data.")
            current_start += rd.relativedelta(months=window_months)
            continue
        
        # Apply droplevel if data is a MultiIndex, as per user's preference
        data = data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        cerebro.addstrategy(strategy, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000) # Starting cash
        cerebro.broker.setcommission(commission=0.001) # Commission
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95) # Allocate 95% of capital

        start_val = cerebro.broker.getvalue()
        cerebro.run() # Execute the backtest for the current window
        final_val = cerebro.broker.getvalue()
        ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': ret,
            'final_value': final_val,
        })

        print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
        current_start += rd.relativedelta(months=window_months) # Move to the next window

    return pd.DataFrame(all_results)

Explanation:

5. Running the Backtest and Analysis

The if __name__ == '__main__': block demonstrates how to execute the rolling backtest and generate the reports.

# ... (all code as above) ...

if __name__ == '__main__':
    # You can adjust ticker, start, end dates, and window_months here
    # Example: Test with custom parameters for the strategy
    # custom_params = {'process_noise': 1e-4, 'measurement_noise': 1e-2, 'trail_percent': 0.03}
    # df = run_rolling_backtest(ticker="SPY", start="2000-01-01", end="2024-12-31", window_months=6, strategy_params=custom_params)
    
    df = run_rolling_backtest() # Runs with default parameters (ETH-USD, 3-month windows)

    print("\n=== ROLLING BACKTEST RESULTS ===")
    print(df) # Prints the DataFrame of results for each window

    stats = report_stats(df) # Prints the aggregated statistics
    plot_four_charts(df) # Displays the four analytical plots
Pasted image 20250620152635.png

When you run this script, it will perform a rolling backtest for ETH-USD (Ethereum) from 2018 to the end of 2025, using 3-month windows. You’ll see printouts for each window’s results, followed by aggregated statistics and the four performance charts.

6. Conclusion

This strategy provides a robust framework for trend following using a Kalman Filter for signal generation and a trailing stop for dynamic risk management. The rolling backtesting methodology is crucial for understanding the strategy’s long-term viability and performance consistency across various market conditions, providing a more reliable assessment than a single, long backtest. By analyzing the period returns, cumulative returns, rolling Sharpe ratio, and the distribution of returns, traders can gain deeper insights into the strategy’s strengths and weaknesses. Further optimization of the Kalman Filter noise parameters and the trailing stop percentage could yield improved results.