This article explores a quantitative trading strategy built using the
backtrader framework, enhanced with a Kalman Filter for
trend estimation and robust trailing stops for risk management. We’ll
delve into the strategy’s logic, the implementation details, and a
comprehensive rolling backtesting methodology to assess its performance
over time.
The core idea of this strategy,
KalmanFilterTrendWithTrail, is to identify the underlying
trend of an asset using a Kalman Filter and then enter positions in the
direction of that trend. A crucial element for risk management is the
inclusion of a trailing stop loss, which automatically adjusts to
protect profits as the trade moves favorably.
Key Components:
For the strategy to function, a KalmanFilterIndicator is
essential. While its full implementation is not provided, its purpose
within this strategy is to output two critical lines:
kf_price: The Kalman Filter’s estimate of the “true”
price, smoothed from the raw market data.kf_velocity: The estimated velocity or “trend” of the
price. This is the primary signal for our strategy.Here’s an assumed minimal structure for the
KalmanFilterIndicator for context:
import backtrader as bt
import numpy as np
class KalmanFilterIndicator(bt.Indicator):
lines = ('kf_price', 'kf_velocity',)
params = (
('process_noise', 1e-5), # Q: Covariance of the process noise
('measurement_noise', 1e-1), # R: Covariance of the measurement noise
)
def __init__(self):
self.addproduct(bt.indicators.SMA(self.data.close, period=1)) # Placeholder to ensure 'data.close' access
# Initial state (price, velocity) and covariance
self.state = np.zeros(2)
self.P = np.identity(2) * 1e-1 # Initial error covariance matrix
# State transition matrix (assuming constant velocity)
self.F = np.array([[1, 1],
[0, 1]]) # dt=1 for daily data
# Measurement matrix (we only observe price, not velocity directly)
self.H = np.array([[1, 0]])
# Process noise covariance
self.Q = np.identity(2) * self.p.process_noise
# Measurement noise covariance
self.R = np.identity(1) * self.p.measurement_noise
def next(self):
if len(self.data.close) < 2: # Need at least two data points for velocity
self.lines.kf_price[0] = self.data.close[0]
self.lines.kf_velocity[0] = 0.0
return
# Prediction Step
x_pred = np.dot(self.F, self.state)
P_pred = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q
# Update Step
y = self.data.close[0] - np.dot(self.H, x_pred)
S = np.dot(np.dot(self.H, P_pred), self.H.T) + self.R
K = np.dot(np.dot(P_pred, self.H.T), np.linalg.inv(S))
self.state = x_pred + np.dot(K, y)
self.P = P_pred - np.dot(np.dot(K, self.H), P_pred)
self.lines.kf_price[0] = self.state[0]
self.lines.kf_velocity[0] = self.state[1]Explanation:
lines = ('kf_price', 'kf_velocity',):
Defines the outputs of our indicator.params = (...):
process_noise (Q): Represents the uncertainty in the
system’s dynamics (how much the underlying trend can change
unexpectedly). A smaller value means the model trusts its own
predictions more.measurement_noise (R): Represents the uncertainty in
our observations (how noisy the market price is). A smaller value means
the filter trusts the raw price data more.__init__(self): Initializes the Kalman
Filter’s state variables (estimated price and velocity) and covariance
matrices (P, F, H,
Q, R).next(self): This method is called for
each new data point. It performs the two core steps of the Kalman
Filter:
KalmanFilterTrendWithTrail StrategyThis is the main backtrader strategy that integrates the
Kalman Filter and applies the trading logic.
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import dateutil.relativedelta as rd
import matplotlib.pyplot as plt
import seaborn as sns
# Assuming KalmanFilterIndicator is defined as above or imported from another file
# from kalman_filter_indicator import KalmanFilterIndicator
class KalmanFilterTrendWithTrail(bt.Strategy):
"""
Uses KalmanFilterIndicator for signals and includes a Trailing Stop.
"""
params = (
('process_noise', 1e-5), # Passed to indicator
('measurement_noise', 1e-1),# Passed to indicator
('trail_percent', 0.05), # Trailing stop percentage
('printlog', True), # Enable/disable logging
)
def __init__(self):
# Instantiate the Kalman Filter Indicator
self.kf = KalmanFilterIndicator(
process_noise=self.p.process_noise,
measurement_noise=self.p.measurement_noise
)
# Keep references to the indicator lines for convenience
self.kf_price = self.kf.lines.kf_price
self.kf_velocity = self.kf.lines.kf_velocity
self.order = None # Tracks the entry order
self.stop_order = None # Tracks the trailing stop order
if self.params.printlog:
print(f"Strategy Parameters: Process Noise={self.params.process_noise}, "
f"Measurement Noise={self.params.measurement_noise}, "
f"Trail Percent={self.params.trail_percent * 100:.2f}%")
def log(self, txt, dt=None, doprint=False):
# Logging function
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()} {txt}')
def notify_order(self, order):
# Handles order execution and placement of trailing stops
if order.status in [order.Submitted, order.Accepted]:
return # Order submitted/accepted, nothing to do yet
if order.status == order.Completed:
if self.order and order.ref == self.order.ref: # Our entry order completed
entry_type = "BUY" if order.isbuy() else "SELL"
# Determine the correct exit function (sell for buy, buy for sell)
exit_func = self.sell if order.isbuy() else self.buy
self.log(f'{entry_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
# Place a trailing stop if trail_percent is valid
if self.p.trail_percent and self.p.trail_percent > 0.0:
self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
self.log(f'Trailing Stop Placed for {entry_type} order ref {self.stop_order.ref} at {self.p.trail_percent * 100:.2f}% trail', doprint=True)
else:
self.log(f'No Trailing Stop Placed (trail_percent={self.p.trail_percent})', doprint=True)
self.order = None # Clear entry order reference
elif self.stop_order and order.ref == self.stop_order.ref: # Our trailing stop order completed
exit_type = "STOP BUY (Cover)" if order.isbuy() else "STOP SELL (Exit Long)"
self.log(f'{exit_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
self.stop_order = None # Clear stop order reference
self.order = None # Clear any lingering entry order reference (should be none)
# Handle failed orders
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log(f'Order Failed: Status {order.getstatusname()}, Ref: {order.ref}', doprint=True)
if self.order and order.ref == self.order.ref: self.order = None
elif self.stop_order and order.ref == self.stop_order.ref:
self.log(f'WARNING: Trailing Stop Order Failed!', doprint=True)
self.stop_order = None
def notify_trade(self, trade):
# Reports profit/loss on closed trades
if not trade.isclosed: return
self.log(f'OPERATION PROFIT, GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}', doprint=True)
def next(self):
# Prevents new orders if an entry order is already active
if self.order:
return
# Ensure Kalman Filter indicator has enough data to produce output
if len(self.kf_velocity) == 0:
return
estimated_velocity = self.kf_velocity[0]
current_position_size = self.position.size
current_close = self.data.close[0]
# --- Trading Logic ---
if current_position_size == 0: # If we are currently flat (no open position)
if self.stop_order: # Safety check: If flat but a stop order exists, cancel it
self.log("Warning: Position flat but stop order exists. Cancelling.", doprint=True)
self.cancel(self.stop_order)
self.stop_order = None
if estimated_velocity > 0: # If Kalman Filter velocity indicates an uptrend
self.log(f'BUY CREATE (KF Vel > 0), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
self.order = self.buy() # Place a buy order
elif estimated_velocity < 0: # If Kalman Filter velocity indicates a downtrend
self.log(f'SELL CREATE (KF Vel < 0 - Short Entry), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
self.order = self.sell() # Place a sell (short) order
else: # If we are already in a position (long or short)
pass # We rely on the trailing stop to close the position
def stop(self):
# Called when the backtest ends, cancels any pending stop orders
if self.stop_order:
self.log(f"Strategy stopped. Cancelling pending stop order ref: {self.stop_order.ref}", doprint=True)
self.cancel(self.stop_order)Explanation:
params: Defines configurable
parameters for the strategy, including the noise parameters for the
Kalman Filter and the trail_percent for the trailing
stop.__init__(self):
KalmanFilterIndicator,
passing the noise parameters from the strategy’s own parameters.kf_price and
kf_velocity lines of the indicator for easy access in
next().self.order and self.stop_order are used to
track pending orders, preventing multiple orders for the same
position.log(self, ...): A simple logging
utility to print information during the backtest.notify_order(self, order): This
crucial method is called by backtrader whenever an order’s
status changes.
self.order) is
Completed (executed). Upon completion, it places a
bt.Order.StopTrail order (trailing stop) using the
specified trail_percent.stop_order itself is
Completed, indicating an exit from the position.Canceled, Margin, or
Rejected orders, clearing the order references.notify_trade(self, trade): This method
is called when a trade is closed, reporting the gross and net
profit/loss.next(self): This is the core logic
that runs on each new bar (e.g., daily).
self.order to ensure no entry order is
currently pending.estimated_velocity from the
kf_velocity line of the Kalman Filter.self.position.size == 0):
estimated_velocity > 0, it places a
self.buy() order to go long, assuming an uptrend.estimated_velocity < 0, it places a
self.sell() order to go short, assuming a downtrend.self.position.size != 0), it simply passes,
as the trailing stop is responsible for managing the exit.stop(self): Called at the end of the
backtest to cancel any outstanding stop orders.To get a more realistic and robust assessment of the strategy’s performance, a rolling backtesting approach is used. Instead of testing over one continuous period, the backtest is run over consecutive, non-overlapping time windows. This helps in understanding the strategy’s consistency and adaptability across different market regimes.
# ... (imports and strategy definition as above) ...
def run_rolling_backtest(
ticker="ETH-USD",
start="2018-01-01",
end="2025-12-31",
window_months=3,
strategy_params=None
):
strategy_params = strategy_params or {}
all_results = []
start_dt = pd.to_datetime(start)
end_dt = pd.to_datetime(end)
current_start = start_dt
while True:
current_end = current_start + rd.relativedelta(months=window_months)
if current_end > end_dt:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
# Data download using yfinance, respecting the user's preference for auto_adjust=False and droplevel
data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
if data.empty or len(data) < 90: # Ensure enough data for the window
print("Not enough data.")
current_start += rd.relativedelta(months=window_months)
continue
# Apply droplevel if data is a MultiIndex, as per user's preference
data = data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data
feed = bt.feeds.PandasData(dataname=data)
cerebro = bt.Cerebro()
cerebro.addstrategy(strategy, **strategy_params)
cerebro.adddata(feed)
cerebro.broker.setcash(100000) # Starting cash
cerebro.broker.setcommission(commission=0.001) # Commission
cerebro.addsizer(bt.sizers.PercentSizer, percents=95) # Allocate 95% of capital
start_val = cerebro.broker.getvalue()
cerebro.run() # Execute the backtest for the current window
final_val = cerebro.broker.getvalue()
ret = (final_val - start_val) / start_val * 100
all_results.append({
'start': current_start.date(),
'end': current_end.date(),
'return_pct': ret,
'final_value': final_val,
})
print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
current_start += rd.relativedelta(months=window_months) # Move to the next window
return pd.DataFrame(all_results)Explanation:
run_rolling_backtest(...):
ticker, start, end
dates, window_months for the rolling periods, and
strategy_params.window_months long
periods.yfinance with
auto_adjust=False and
droplevel(axis=1, level=1) as per the user’s saved
preference.backtrader.Cerebro, adds the strategy
and data, sets initial cash, commission, and a sizer (to determine
position size).report_stats(df): Calculates and
prints key statistical metrics (mean, median, std dev, min, max return,
Sharpe Ratio) from the collected rolling returns.plot_return_distribution(df) (deprecated by
plot_four_charts): Visualizes the distribution of
returns across the rolling windows using a histogram.plot_four_charts(df, rolling_sharpe_window=4):
Provides a comprehensive visual analysis of the rolling backtest
results:
plot_return_distribution.The if __name__ == '__main__': block demonstrates how to
execute the rolling backtest and generate the reports.
# ... (all code as above) ...
if __name__ == '__main__':
# You can adjust ticker, start, end dates, and window_months here
# Example: Test with custom parameters for the strategy
# custom_params = {'process_noise': 1e-4, 'measurement_noise': 1e-2, 'trail_percent': 0.03}
# df = run_rolling_backtest(ticker="SPY", start="2000-01-01", end="2024-12-31", window_months=6, strategy_params=custom_params)
df = run_rolling_backtest() # Runs with default parameters (ETH-USD, 3-month windows)
print("\n=== ROLLING BACKTEST RESULTS ===")
print(df) # Prints the DataFrame of results for each window
stats = report_stats(df) # Prints the aggregated statistics
plot_four_charts(df) # Displays the four analytical plotsWhen you run this script, it will perform a rolling backtest for
ETH-USD (Ethereum) from 2018 to the end of 2025, using
3-month windows. You’ll see printouts for each window’s results,
followed by aggregated statistics and the four performance charts.
This strategy provides a robust framework for trend following using a Kalman Filter for signal generation and a trailing stop for dynamic risk management. The rolling backtesting methodology is crucial for understanding the strategy’s long-term viability and performance consistency across various market conditions, providing a more reliable assessment than a single, long backtest. By analyzing the period returns, cumulative returns, rolling Sharpe ratio, and the distribution of returns, traders can gain deeper insights into the strategy’s strengths and weaknesses. Further optimization of the Kalman Filter noise parameters and the trailing stop percentage could yield improved results.