This article explores a quantitative trading strategy built using the
backtrader
framework, enhanced with a Kalman Filter for
trend estimation and robust trailing stops for risk management. We’ll
delve into the strategy’s logic, the implementation details, and a
comprehensive rolling backtesting methodology to assess its performance
over time.
The core idea of this strategy,
KalmanFilterTrendWithTrail
, is to identify the underlying
trend of an asset using a Kalman Filter and then enter positions in the
direction of that trend. A crucial element for risk management is the
inclusion of a trailing stop loss, which automatically adjusts to
protect profits as the trade moves favorably.
Key Components:
For the strategy to function, a KalmanFilterIndicator
is
essential. While its full implementation is not provided, its purpose
within this strategy is to output two critical lines:
kf_price
: The Kalman Filter’s estimate of the “true”
price, smoothed from the raw market data.kf_velocity
: The estimated velocity or “trend” of the
price. This is the primary signal for our strategy.Here’s an assumed minimal structure for the
KalmanFilterIndicator
for context:
import backtrader as bt
import numpy as np
class KalmanFilterIndicator(bt.Indicator):
= ('kf_price', 'kf_velocity',)
lines = (
params 'process_noise', 1e-5), # Q: Covariance of the process noise
('measurement_noise', 1e-1), # R: Covariance of the measurement noise
(
)
def __init__(self):
self.addproduct(bt.indicators.SMA(self.data.close, period=1)) # Placeholder to ensure 'data.close' access
# Initial state (price, velocity) and covariance
self.state = np.zeros(2)
self.P = np.identity(2) * 1e-1 # Initial error covariance matrix
# State transition matrix (assuming constant velocity)
self.F = np.array([[1, 1],
0, 1]]) # dt=1 for daily data
[
# Measurement matrix (we only observe price, not velocity directly)
self.H = np.array([[1, 0]])
# Process noise covariance
self.Q = np.identity(2) * self.p.process_noise
# Measurement noise covariance
self.R = np.identity(1) * self.p.measurement_noise
def next(self):
if len(self.data.close) < 2: # Need at least two data points for velocity
self.lines.kf_price[0] = self.data.close[0]
self.lines.kf_velocity[0] = 0.0
return
# Prediction Step
= np.dot(self.F, self.state)
x_pred = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q
P_pred
# Update Step
= self.data.close[0] - np.dot(self.H, x_pred)
y = np.dot(np.dot(self.H, P_pred), self.H.T) + self.R
S = np.dot(np.dot(P_pred, self.H.T), np.linalg.inv(S))
K
self.state = x_pred + np.dot(K, y)
self.P = P_pred - np.dot(np.dot(K, self.H), P_pred)
self.lines.kf_price[0] = self.state[0]
self.lines.kf_velocity[0] = self.state[1]
Explanation:
lines = ('kf_price', 'kf_velocity',)
:
Defines the outputs of our indicator.params = (...)
:
process_noise (Q)
: Represents the uncertainty in the
system’s dynamics (how much the underlying trend can change
unexpectedly). A smaller value means the model trusts its own
predictions more.measurement_noise (R)
: Represents the uncertainty in
our observations (how noisy the market price is). A smaller value means
the filter trusts the raw price data more.__init__(self)
: Initializes the Kalman
Filter’s state variables (estimated price and velocity) and covariance
matrices (P
, F
, H
,
Q
, R
).next(self)
: This method is called for
each new data point. It performs the two core steps of the Kalman
Filter:
KalmanFilterTrendWithTrail
StrategyThis is the main backtrader
strategy that integrates the
Kalman Filter and applies the trading logic.
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import dateutil.relativedelta as rd
import matplotlib.pyplot as plt
import seaborn as sns
# Assuming KalmanFilterIndicator is defined as above or imported from another file
# from kalman_filter_indicator import KalmanFilterIndicator
class KalmanFilterTrendWithTrail(bt.Strategy):
"""
Uses KalmanFilterIndicator for signals and includes a Trailing Stop.
"""
= (
params 'process_noise', 1e-5), # Passed to indicator
('measurement_noise', 1e-1),# Passed to indicator
('trail_percent', 0.05), # Trailing stop percentage
('printlog', True), # Enable/disable logging
(
)
def __init__(self):
# Instantiate the Kalman Filter Indicator
self.kf = KalmanFilterIndicator(
=self.p.process_noise,
process_noise=self.p.measurement_noise
measurement_noise
)
# Keep references to the indicator lines for convenience
self.kf_price = self.kf.lines.kf_price
self.kf_velocity = self.kf.lines.kf_velocity
self.order = None # Tracks the entry order
self.stop_order = None # Tracks the trailing stop order
if self.params.printlog:
print(f"Strategy Parameters: Process Noise={self.params.process_noise}, "
f"Measurement Noise={self.params.measurement_noise}, "
f"Trail Percent={self.params.trail_percent * 100:.2f}%")
def log(self, txt, dt=None, doprint=False):
# Logging function
if self.params.printlog or doprint:
= dt or self.datas[0].datetime.date(0)
dt print(f'{dt.isoformat()} {txt}')
def notify_order(self, order):
# Handles order execution and placement of trailing stops
if order.status in [order.Submitted, order.Accepted]:
return # Order submitted/accepted, nothing to do yet
if order.status == order.Completed:
if self.order and order.ref == self.order.ref: # Our entry order completed
= "BUY" if order.isbuy() else "SELL"
entry_type # Determine the correct exit function (sell for buy, buy for sell)
= self.sell if order.isbuy() else self.buy
exit_func
self.log(f'{entry_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
# Place a trailing stop if trail_percent is valid
if self.p.trail_percent and self.p.trail_percent > 0.0:
self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
self.log(f'Trailing Stop Placed for {entry_type} order ref {self.stop_order.ref} at {self.p.trail_percent * 100:.2f}% trail', doprint=True)
else:
self.log(f'No Trailing Stop Placed (trail_percent={self.p.trail_percent})', doprint=True)
self.order = None # Clear entry order reference
elif self.stop_order and order.ref == self.stop_order.ref: # Our trailing stop order completed
= "STOP BUY (Cover)" if order.isbuy() else "STOP SELL (Exit Long)"
exit_type self.log(f'{exit_type} EXECUTED, Price: {order.executed.price:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}, Size: {order.executed.size:.4f}', doprint=True)
self.stop_order = None # Clear stop order reference
self.order = None # Clear any lingering entry order reference (should be none)
# Handle failed orders
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log(f'Order Failed: Status {order.getstatusname()}, Ref: {order.ref}', doprint=True)
if self.order and order.ref == self.order.ref: self.order = None
elif self.stop_order and order.ref == self.stop_order.ref:
self.log(f'WARNING: Trailing Stop Order Failed!', doprint=True)
self.stop_order = None
def notify_trade(self, trade):
# Reports profit/loss on closed trades
if not trade.isclosed: return
self.log(f'OPERATION PROFIT, GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}', doprint=True)
def next(self):
# Prevents new orders if an entry order is already active
if self.order:
return
# Ensure Kalman Filter indicator has enough data to produce output
if len(self.kf_velocity) == 0:
return
= self.kf_velocity[0]
estimated_velocity = self.position.size
current_position_size = self.data.close[0]
current_close
# --- Trading Logic ---
if current_position_size == 0: # If we are currently flat (no open position)
if self.stop_order: # Safety check: If flat but a stop order exists, cancel it
self.log("Warning: Position flat but stop order exists. Cancelling.", doprint=True)
self.cancel(self.stop_order)
self.stop_order = None
if estimated_velocity > 0: # If Kalman Filter velocity indicates an uptrend
self.log(f'BUY CREATE (KF Vel > 0), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
self.order = self.buy() # Place a buy order
elif estimated_velocity < 0: # If Kalman Filter velocity indicates a downtrend
self.log(f'SELL CREATE (KF Vel < 0 - Short Entry), Close={current_close:.2f}, KF Vel={estimated_velocity:.4f}', doprint=True)
self.order = self.sell() # Place a sell (short) order
else: # If we are already in a position (long or short)
pass # We rely on the trailing stop to close the position
def stop(self):
# Called when the backtest ends, cancels any pending stop orders
if self.stop_order:
self.log(f"Strategy stopped. Cancelling pending stop order ref: {self.stop_order.ref}", doprint=True)
self.cancel(self.stop_order)
Explanation:
params
: Defines configurable
parameters for the strategy, including the noise parameters for the
Kalman Filter and the trail_percent
for the trailing
stop.__init__(self)
:
KalmanFilterIndicator
,
passing the noise parameters from the strategy’s own parameters.kf_price
and
kf_velocity
lines of the indicator for easy access in
next()
.self.order
and self.stop_order
are used to
track pending orders, preventing multiple orders for the same
position.log(self, ...)
: A simple logging
utility to print information during the backtest.notify_order(self, order)
: This
crucial method is called by backtrader
whenever an order’s
status changes.
self.order
) is
Completed
(executed). Upon completion, it places a
bt.Order.StopTrail
order (trailing stop) using the
specified trail_percent
.stop_order
itself is
Completed
, indicating an exit from the position.Canceled
, Margin
, or
Rejected
orders, clearing the order references.notify_trade(self, trade)
: This method
is called when a trade is closed, reporting the gross and net
profit/loss.next(self)
: This is the core logic
that runs on each new bar (e.g., daily).
self.order
to ensure no entry order is
currently pending.estimated_velocity
from the
kf_velocity
line of the Kalman Filter.self.position.size == 0
):
estimated_velocity > 0
, it places a
self.buy()
order to go long, assuming an uptrend.estimated_velocity < 0
, it places a
self.sell()
order to go short, assuming a downtrend.self.position.size != 0
), it simply pass
es,
as the trailing stop is responsible for managing the exit.stop(self)
: Called at the end of the
backtest to cancel any outstanding stop orders.To get a more realistic and robust assessment of the strategy’s performance, a rolling backtesting approach is used. Instead of testing over one continuous period, the backtest is run over consecutive, non-overlapping time windows. This helps in understanding the strategy’s consistency and adaptability across different market regimes.
# ... (imports and strategy definition as above) ...
def run_rolling_backtest(
="ETH-USD",
ticker="2018-01-01",
start="2025-12-31",
end=3,
window_months=None
strategy_params
):= strategy_params or {}
strategy_params = []
all_results = pd.to_datetime(start)
start_dt = pd.to_datetime(end)
end_dt = start_dt
current_start
while True:
= current_start + rd.relativedelta(months=window_months)
current_end if current_end > end_dt:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
# Data download using yfinance, respecting the user's preference for auto_adjust=False and droplevel
= yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
data if data.empty or len(data) < 90: # Ensure enough data for the window
print("Not enough data.")
+= rd.relativedelta(months=window_months)
current_start continue
# Apply droplevel if data is a MultiIndex, as per user's preference
= data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data
data
= bt.feeds.PandasData(dataname=data)
feed = bt.Cerebro()
cerebro **strategy_params)
cerebro.addstrategy(strategy,
cerebro.adddata(feed)100000) # Starting cash
cerebro.broker.setcash(=0.001) # Commission
cerebro.broker.setcommission(commission=95) # Allocate 95% of capital
cerebro.addsizer(bt.sizers.PercentSizer, percents
= cerebro.broker.getvalue()
start_val # Execute the backtest for the current window
cerebro.run() = cerebro.broker.getvalue()
final_val = (final_val - start_val) / start_val * 100
ret
all_results.append({'start': current_start.date(),
'end': current_end.date(),
'return_pct': ret,
'final_value': final_val,
})
print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
+= rd.relativedelta(months=window_months) # Move to the next window
current_start
return pd.DataFrame(all_results)
Explanation:
run_rolling_backtest(...)
:
ticker
, start
, end
dates, window_months
for the rolling periods, and
strategy_params
.window_months
long
periods.yfinance
with
auto_adjust=False
and
droplevel(axis=1, level=1)
as per the user’s saved
preference.backtrader.Cerebro
, adds the strategy
and data, sets initial cash, commission, and a sizer (to determine
position size).report_stats(df)
: Calculates and
prints key statistical metrics (mean, median, std dev, min, max return,
Sharpe Ratio) from the collected rolling returns.plot_return_distribution(df)
(deprecated by
plot_four_charts
): Visualizes the distribution of
returns across the rolling windows using a histogram.plot_four_charts(df, rolling_sharpe_window=4)
:
Provides a comprehensive visual analysis of the rolling backtest
results:
plot_return_distribution
.The if __name__ == '__main__':
block demonstrates how to
execute the rolling backtest and generate the reports.
# ... (all code as above) ...
if __name__ == '__main__':
# You can adjust ticker, start, end dates, and window_months here
# Example: Test with custom parameters for the strategy
# custom_params = {'process_noise': 1e-4, 'measurement_noise': 1e-2, 'trail_percent': 0.03}
# df = run_rolling_backtest(ticker="SPY", start="2000-01-01", end="2024-12-31", window_months=6, strategy_params=custom_params)
= run_rolling_backtest() # Runs with default parameters (ETH-USD, 3-month windows)
df
print("\n=== ROLLING BACKTEST RESULTS ===")
print(df) # Prints the DataFrame of results for each window
= report_stats(df) # Prints the aggregated statistics
stats # Displays the four analytical plots plot_four_charts(df)
When you run this script, it will perform a rolling backtest for
ETH-USD
(Ethereum) from 2018 to the end of 2025, using
3-month windows. You’ll see printouts for each window’s results,
followed by aggregated statistics and the four performance charts.
This strategy provides a robust framework for trend following using a Kalman Filter for signal generation and a trailing stop for dynamic risk management. The rolling backtesting methodology is crucial for understanding the strategy’s long-term viability and performance consistency across various market conditions, providing a more reliable assessment than a single, long backtest. By analyzing the period returns, cumulative returns, rolling Sharpe ratio, and the distribution of returns, traders can gain deeper insights into the strategy’s strengths and weaknesses. Further optimization of the Kalman Filter noise parameters and the trailing stop percentage could yield improved results.