Traditional technical indicators often suffer from lag, providing signals after a significant price move has already occurred. The Kalman Filter, a sophisticated algorithm originating from engineering, offers an adaptive solution by estimating the true state of a system (e.g., price and its velocity) amidst noise and uncertainty. This article presents a KalmanFilterTrendVelocityTrailingStopStrategy that leverages the Kalman Filter’s real-time estimations for trend direction and incorporates robust risk management through adaptive trailing stops.
1. The Kalman Filter Indicator
(KalmanFilterIndicator)
The core of this strategy is a custom backtrader
indicator that implements a simplified Kalman Filter. It models the
price as a system with two states: its current value and its velocity
(rate of change).
import backtrader as bt
import numpy as np
class KalmanFilterIndicator(bt.Indicator):
"""
Calculates the Kalman Filter estimated price and velocity.
"""
lines = ('kf_price', 'kf_velocity',) # Output lines: estimated price and velocity
params = (
('process_noise', 1e-5), # Q: Uncertainty in system dynamics (how much the true state changes)
('measurement_noise', 1e-1), # R: Uncertainty in measurements (how noisy the observed price is)
)
plotinfo = dict(
subplot=False, # Plot on the main price chart
plotlinelabels=True
)
def __init__(self):
self.dataclose = self.data.close
# State transition matrix (F): How the state evolves from t-1 to t
self.F = np.array([[1.0, 1.0], [0.0, 1.0]]) # Price_t = Price_t-1 + Velocity_t-1, Velocity_t = Velocity_t-1
# Measurement matrix (H): How observations relate to the state
self.H = np.array([[1.0, 0.0]]) # We observe price, which is the first state variable
# Process Noise Covariance (Q): Uncertainty introduced by the system model
q_val = self.p.process_noise
self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
[(q_val**2)/2, q_val**2]])
# Measurement Noise Covariance (R): Uncertainty in our observations
self.R = np.array([[self.p.measurement_noise**2]])
self.I = np.eye(2) # Identity matrix
self.x = None # State vector (price, velocity)
self.P = None # Error Covariance matrix
self.initialized = False # Flag for initial setup
def _lazyinit(self):
# Initializes state and covariance on the first available data point
try:
initial_price = self.dataclose[0]
self.x = np.array([initial_price, 0.0]) # Initial price, zero initial velocity
self.P = np.array([[1.0, 0.0], [0.0, 100.0]]) # Initial error covariance
self.initialized = True
except IndexError:
pass # Not enough data yet
def next(self):
# Initialize the filter when data is first available
if not self.initialized:
self._lazyinit()
if not self.initialized: return # If still not initialized, wait for more data
z = self.dataclose[0] # Current observed price (measurement)
# 1. Prediction Step: Estimate next state and covariance
x_pred = self.F @ self.x # Predicted state
P_pred = (self.F @ self.P @ self.F.T) + self.Q # Predicted error covariance
# 2. Update Step: Correct estimates with current measurement
y = z - (self.H @ x_pred) # Measurement residual
S = (self.H @ P_pred @ self.H.T) + self.R # Residual covariance
# Calculate Kalman Gain (K)
try:
S_inv = np.linalg.inv(S) # Inverse of S
except np.linalg.LinAlgError:
# Handle singular matrix case for S (can happen with very low noise or limited data)
S_inv = 1.0 / S[0,0] if np.abs(S[0,0]) > 1e-8 else 1.0
K = P_pred @ self.H.T @ S_inv # Kalman Gain
self.x = x_pred + (K @ y) # Updated state estimate
self.P = (self.I - (K @ self.H)) @ P_pred # Updated error covariance
# Output the estimated price and velocity
self.lines.kf_price[0] = self.x[0]
self.lines.kf_velocity[0] = self.x[1]The filter continuously performs two steps:
process_noise and measurement_noise). The
kf_price line provides a smoothed price, while
kf_velocity indicates the estimated rate of change.2. Kalman Filter Trend Velocity Trailing Stop Strategy
(KalmanFilterTrendWithTrail)
This strategy, named
KalmanFilterTrendVelocityTrailingStopStrategy to
describe its functionality, uses the estimated kf_velocity
to determine trend direction.
class KalmanFilterTrendWithTrail(bt.Strategy):
"""
Uses KalmanFilterIndicator for trend signals (based on velocity) and includes a Trailing Stop for exits.
"""
params = (
('process_noise', 1e-3),
('measurement_noise', 1e-1),
('trail_percent', 0.02), # Trailing stop percentage (e.g., 2%)
)
def __init__(self):
self.kf = KalmanFilterIndicator( # Initialize the custom Kalman Filter
process_noise=self.p.process_noise,
measurement_noise=self.p.measurement_noise
)
self.kf_price = self.kf.lines.kf_price
self.kf_velocity = self.kf.lines.kf_velocity
self.order = None # To track pending entry/exit orders
self.stop_order = None # To track the active trailing stop order
def next(self):
if self.order: # If an order is pending, do nothing
return
# Ensure Kalman Filter has calculated values
if len(self.kf_velocity) == 0 or np.isnan(self.kf_velocity[0]):
return
estimated_velocity = self.kf_velocity[0]
current_position_size = self.position.size
# If currently flat, evaluate entry signals
if current_position_size == 0:
# Cancel any lingering stop orders from previous trades if they somehow persist
if self.stop_order:
self.cancel(self.stop_order)
self.stop_order = None
if estimated_velocity > 0: # If velocity is positive, enter long
self.order = self.buy()
elif estimated_velocity < 0: # If velocity is negative, enter short
self.order = self.sell()
# The strategy maintains the position as long as the trailing stop is not hit.
# No explicit exit logic based on velocity reversal is in 'next', as
# exits are primarily managed by the trailing stop.
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return # Ignore pending orders
if order.status == order.Completed:
# If our main entry order (buy/sell) completed successfully
if self.order and order.ref == self.order.ref:
# Place a trailing stop order after entry
exit_func = self.sell if order.isbuy() else self.buy
if self.p.trail_percent and self.p.trail_percent > 0.0:
self.stop_order = exit_func(exectype=bt.Order.StopTrail, trailpercent=self.p.trail_percent)
self.order = None # Clear the main order reference
# If a stop order (trailing stop) completed, means we've exited the position
elif self.stop_order and order.ref == self.stop_order.ref:
self.stop_order = None # Clear the stop order reference
self.order = None # Ensure main order is also cleared (though it should be already)
# Handle canceled, margin, or rejected orders
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
if self.order and order.ref == self.order.ref:
self.order = None
elif self.stop_order and order.ref == self.stop_order.ref:
self.stop_order = None
def stop(self):
# Ensure any pending stop orders are cancelled at the end of backtest
if self.stop_order:
self.cancel(self.stop_order)Execution Logic:
kf_velocity output from the Kalman Filter.
kf_velocity is positive, it signals an upward trend,
prompting a long entry.kf_velocity is negative, it signals a downward
trend, prompting a short entry.kf_velocity.kf_velocity direction.bt.Order.StopTrail order. As
per your preference, notify_order ensures that immediately
upon a successful entry, a trailing stop is placed with a
trail_percent (e.g., 2%). This adaptive stop allows for
riding trends while protecting accumulated profits.notify_order
method diligently tracks the lifecycle of orders, ensuring that pending
orders are managed correctly and that the trailing stop is established
promptly upon trade execution.1. Parameter Optimization: Finding the Best Fit
Parameter optimization systematically tests various combinations of a strategy’s input parameters to find those that yield the best historical performance according to a chosen metric (e.g., Sharpe Ratio, total return). This process helps in identifying the most effective settings for a given strategy on a specific dataset.
import backtrader as bt
import numpy as np
import pandas as pd
import yfinance as yf # Assuming yfinance is used for data fetching
def optimize_parameters(strategy_class, opt_params, ticker, start_date, end_date):
"""Run optimization to find best parameters with diagnostics"""
print("="*60)
print(f"OPTIMIZING: {strategy_class.__name__} on {ticker}")
print("="*60)
# Fetch data for optimization
print(f"Fetching data from {start_date} to {end_date}...")
# User-specified: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
df = yf.download(ticker, start=start_date, end=end_date, auto_adjust=False, progress=False)
if isinstance(df.columns, pd.MultiIndex):
df = df.droplevel(1, axis=1)
if df.empty:
print("No data fetched for optimization. Exiting.")
return None
print(f"Data shape: {df.shape}")
print(f"Date range: {df.index[0].date()} to {df.index[-1].date()}")
# Set up optimization
cerebro = bt.Cerebro()
data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data)
start_cash = 10000.0
cerebro.broker.setcash(start_cash)
cerebro.broker.setcommission(commission=0.001)
# Add analyzers for performance metrics
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', timeframe=bt.TimeFrame.Days, annualize=True, riskfreerate=0.0)
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
print("Testing parameter combinations...")
cerebro.optstrategy(strategy_class, **opt_params) # Run the optimization
stratruns = cerebro.run()
print(f"Optimization complete! Tested {len(stratruns)} combinations.")
# Collect and analyze results
results = []
for i, run in enumerate(stratruns):
strategy = run[0]
sharpe_analysis = strategy.analyzers.sharpe.get_analysis()
returns_analysis = strategy.analyzers.returns.get_analysis()
trades_analysis = strategy.analyzers.trades.get_analysis()
rtot = returns_analysis.get('rtot', 0.0)
final_value = start_cash * (1 + rtot)
sharpe_ratio = sharpe_analysis.get('sharperatio', -999.0) # Default to a low number
total_trades = trades_analysis.get('total', {}).get('total', 0)
if sharpe_ratio is None or np.isnan(sharpe_ratio):
sharpe_ratio = -999.0
result = {
'sharpe_ratio': sharpe_ratio,
'final_value': final_value,
'return_pct': rtot * 100,
'total_trades': total_trades,
}
# Dynamically add parameter values to the results
param_values = {p: getattr(strategy.p, p) for p in opt_params.keys()}
result.update(param_values)
results.append(result)
# Filter for valid results (at least one trade) and sort
valid_results = [r for r in results if r['total_trades'] > 0]
if not valid_results:
print("\nNo combinations resulted in any trades. Cannot determine best parameters.")
return None
results_sorted = sorted(valid_results, key=lambda x: x['sharpe_ratio'], reverse=True)
print(f"\n{'='*120}")
print("TOP 5 PARAMETER COMBINATIONS BY SHARPE RATIO")
print(f"{'='*120}")
top_5_df = pd.DataFrame(results_sorted[:5])
print(top_5_df.to_string())
best_params = results_sorted[0]
print(f"\nBest Parameters Found: {best_params}")
return best_paramsKey Features of
optimize_parameters:
yfinance to
download historical data, ensuring auto_adjust=False and
droplevel(axis=1, level=1) for consistency.backtrader’s SharpeRatio,
Returns, and TradeAnalyzer to evaluate each
parameter set comprehensively.2. Generalized Rolling Backtesting: Assessing Out-of-Sample Performance
Once optimal parameters are identified from an in-sample optimization period, a rolling backtest (also known as walk-forward optimization) assesses the strategy’s stability and performance on unseen data. This method simulates how a strategy would perform in live trading by iteratively optimizing on one period and testing on a subsequent, out-of-sample period.
import dateutil.relativedelta as rd # Needed for date calculations in rolling backtest
def run_rolling_backtest(strategy_class, strategy_params, ticker, start, end, window_months):
"""Generalized rolling backtest function"""
all_results = []
start_dt = pd.to_datetime(start)
end_dt = pd.to_datetime(end)
current_start = start_dt
while True:
current_end = current_start + rd.relativedelta(months=window_months)
if current_end > end_dt:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
# Fetch data for the current window
# User-specified: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
if data.empty or len(data) < 30: # Need at least some data for indicators to warm up
print("Not enough data for this period. Skipping window.")
current_start += rd.relativedelta(months=window_months)
continue
if isinstance(data.columns, pd.MultiIndex):
data = data.droplevel(1, 1)
# Calculate Buy & Hold return for the period as a benchmark
start_price = data['Close'].iloc[0]
end_price = data['Close'].iloc[-1]
benchmark_ret = (end_price - start_price) / start_price * 100
# Setup and run Cerebro for the current window
feed = bt.feeds.PandasData(dataname=data)
cerebro = bt.Cerebro()
cerebro.addstrategy(strategy_class, **strategy_params) # Use the optimized parameters
cerebro.adddata(feed)
cerebro.broker.setcash(100000) # Initial cash for the window
cerebro.broker.setcommission(commission=0.001)
cerebro.addsizer(bt.sizers.PercentSizer, percents=95) # Allocate 95% of capital per trade
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
start_val = cerebro.broker.getvalue()
results_run = cerebro.run()
final_val = cerebro.broker.getvalue()
strategy_ret = (final_val - start_val) / start_val * 100
# Get trade statistics
trades_analysis = results_run[0].analyzers.trades.get_analysis()
total_trades = trades_analysis.get('total', {}).get('total', 0)
all_results.append({
'start': current_start.date(),
'end': current_end.date(),
'return_pct': strategy_ret,
'benchmark_pct': benchmark_ret,
'trades': total_trades,
})
print(f"Strategy Return: {strategy_ret:.2f}% | Buy & Hold Return: {benchmark_ret:.2f}% | Trades: {total_trades}")
current_start = current_end # Move to the next window
return pd.DataFrame(all_results)Key Features of
run_rolling_backtest:
Conclusion
The KalmanFilterTrendVelocityTrailingStopStrategy demonstrates an innovative application of the Kalman Filter in quantitative trading. By leveraging the filter’s adaptive noise reduction to estimate price velocity as a trend signal, and combining it with dynamic trailing stops, the strategy offers a robust framework for capturing trends and managing risk in a constantly changing market environment.