Kalman filters are well known in control theory and signal processing for extracting a latent “true” state from noisy observations. Applied to financial time series, a Kalman filter can be used to estimate a smoothed price and the hidden velocity of the price process. That velocity can serve as a dynamic trend indicator which adapts to changing volatility regimes.
This article presents a complete multi-asset cryptocurrency portfolio strategy that uses:
Backtrader for backtesting
CCXT for fetching hourly OHLCV data from Binance
A custom adaptive Kalman filter indicator that re-tunes its noise parameters on the fly
ADX, DI and ATR to filter trends and size positions
QuantStats and Matplotlib for performance reporting and visualization
The final implementation produces results over 90 days such as:
Final portfolio value: 155,373.74 from an initial 100,000
Sharpe ratio: 1.74
Sortino ratio: 3.73
Maximum drawdown: 32.05%
CAGR (annualized): 238.82%
Win rate: 48.89%
Trade count: 328
The backtest runs on ten USDC pairs and generates the cumulative equity curve and drawdown shown in the figure at the start. Everything is implemented in plain Python using the code blocks detailed below.
The first component is a robust data loader that pulls OHLCV candles from Binance spot markets via CCXT, handles pagination, and returns a clean pandas DataFrame indexed by datetime.
import backtrader as bt
import ccxt
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from collections import deque
from itertools import product
import warnings
warnings.filterwarnings(action='ignore')Data fetching function:
def fetch_binance_data(symbol, timeframe='1h', limit_days=90):
"""
Fetches historical OHLCV data from Binance via CCXT.
Handles pagination to get full history.
"""
exchange = ccxt.binance({
'enableRateLimit': True,
'options': {'defaultType': 'spot'} # Change to 'future' if derivatives needed
})
# Calculate start time (milliseconds)
since = exchange.parse8601((datetime.utcnow() - timedelta(days=limit_days)).isoformat())
all_ohlcv = []
print(f"Fetching {symbol} ({timeframe}) data for last {limit_days} days...")
while True:
try:
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit=1000)
if not ohlcv:
break
all_ohlcv.extend(ohlcv)
# Update 'since' to the last timestamp + 1 timeframe duration (approx)
# Binance returns [timestamp, open, high, low, close, volume]
last_timestamp = ohlcv[-1][0]
since = last_timestamp + 1
# Break if we reached current time close enough
if len(ohlcv) < 1000:
break
except Exception as e:
print(f"Error fetching data: {e}")
break
# Convert to DataFrame
df = pd.DataFrame(all_ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('datetime', inplace=True)
df.drop(columns=['timestamp'], inplace=True)
# Ensure numeric types
cols = ['open', 'high', 'low', 'close', 'volume']
df[cols] = df[cols].apply(pd.to_numeric)
return dfKey points:
limit_days defines the lookback window; the function
computes since as current UTC time minus that many
days.
CCXT’s fetch_ohlcv is called repeatedly with
limit=1000 until the returned batch is smaller than 1000 or
empty.
Each call returns
[timestamp, open, high, low, close, volume]. These are
accumulated and converted to a DataFrame, with timestamp converted to a
timezone-naive datetime index.
All OHLCV columns are cast to numeric types to avoid downstream type issues in Backtrader.
This function acts as a generic binance-to-pandas adapter for any symbol and timeframe.
The indicator models the price process as a two-dimensional state:
State vector:
x_t = [price_t, velocity_t]^T
Linear state transition:
x_t = F * x_{t-1} + w_t
F = [[1, 1],
[0, 1]]
Observation model:
z_t = H * x_t + v_t
H = [1, 0]
where
w_t is process noise with covariance Q
v_t is measurement noise with covariance R
Prediction step:
x_pred = F * x_prev
P_pred = F * P_prev * F^T + Q
Update step:
y = z_t - H * x_pred (innovation)
S = H * P_pred * H^T + R (innovation covariance)
K = P_pred * H^T * (1 / S) (Kalman gain; scalar division here)
x_new = x_pred + K * y
P_new = (I - K * H) * P_pred
The indicator outputs:
kf_price = first element of x_new
kf_velocity = second element of x_new
prediction_error = z_t - x_pred[0]
current_q and current_r, exposing the noise parameters used at each bar.
The following class implements the model above. It also performs periodic re-tuning of the noise parameters using historical observation data.
class AdaptiveKalmanFilterIndicator(bt.Indicator):
lines = ('kf_price', 'kf_velocity', 'prediction_error', 'current_q', 'current_r')
params = (
('initial_process_noise', 1e-3),
('initial_measurement_noise', 1e-1),
('retune_frequency', 100),
('error_buffer_size', 200),
('grid_size', 5),
('min_q', 1e-6),
('max_q', 1e-2),
('min_r', 1e-3),
('max_r', 1e0),
)
plotinfo = dict(subplot=False, plotlinelabels=True)
def __init__(self):
self.dataclose = self.data.close
self.F = np.array([[1.0, 1.0], [0.0, 1.0]])
self.H = np.array([[1.0, 0.0]])
self.I = np.eye(2)
self.current_q = self.p.initial_process_noise
self.current_r = self.p.initial_measurement_noise
self._update_noise_matrices()
self.x = None
self.P = None
self.initialized = False
self.error_buffer = deque(maxlen=self.p.error_buffer_size)
self.prediction_buffer = deque(maxlen=self.p.error_buffer_size)
self.observation_buffer = deque(maxlen=self.p.error_buffer_size)
self.last_tune_bar = 0
self.tune_count = 0Noise matrix setup:
def _update_noise_matrices(self):
q_val = self.current_q
self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
[(q_val**2)/2, q_val**2]])
self.R = np.array([[self.current_r**2]])The process noise covariance is defined in terms of
q_val^2. A higher q_val lets the filter adjust
more quickly; a lower q_val forces more smoothing.
Lazy initialization uses the first available close price:
def _lazyinit(self):
try:
initial_price = float(self.dataclose[0])
except (IndexError, TypeError, ValueError):
return
self.x = np.array([initial_price, 0.0])
self.P = np.array([[1.0, 0.0], [0.0, 100.0]])
self.initialized = True
self.last_tune_bar = len(self)The core adaptation logic uses a small grid search of candidate (q, r) pairs. For each pair, a short Kalman simulation is run over the stored observations and mean squared prediction error is measured.
def _grid_search_optimal_noise(self):
if len(self.error_buffer) < 50:
return self.current_q, self.current_r
q_values = np.logspace(np.log10(self.p.min_q), np.log10(self.p.max_q), self.p.grid_size)
r_values = np.logspace(np.log10(self.p.min_r), np.log10(self.p.max_r), self.p.grid_size)
best_mse = float('inf')
best_q, best_r = self.current_q, self.current_r
observations = np.array(list(self.observation_buffer))
for q_test, r_test in product(q_values, r_values):
try:
mse = self._evaluate_parameters(q_test, r_test, observations)
except Exception:
mse = float('inf')
if mse < best_mse:
best_mse = mse
best_q, best_r = q_test, r_test
return best_q, best_rEvaluation function:
def _evaluate_parameters(self, q_val, r_val, observations):
if len(observations) < 10:
return float('inf')
F, H = self.F.copy(), self.H.copy()
Q = np.array([[(q_val**2)/4, (q_val**2)/2],
[(q_val**2)/2, q_val**2]])
R = np.array([[r_val**2]])
x = np.array([observations[0], 0.0])
P = np.array([[1.0, 0.0], [0.0, 100.0]])
errs = []
for i in range(1, min(len(observations), 100)):
z = observations[i]
x_pred = F @ x
P_pred = (F @ P @ F.T) + Q
errs.append((z - x_pred[0])**2)
y = z - (H @ x_pred)
S = (H @ P_pred @ H.T) + R
S_inv = 1.0 / S[0, 0] if np.abs(S[0, 0]) > 1e-8 else 1.0
K = P_pred @ H.T * S_inv
x = x_pred + (K * y)
P = (self.I - (K @ H)) @ P_pred
return np.mean(errs) if errs else float('inf')Retuning condition:
def _should_retune(self):
return (len(self) - self.last_tune_bar) >= self.p.retune_frequencyFull next method with prediction, update, buffering and
retuning:
def next(self):
if not self.initialized:
self._lazyinit()
if not self.initialized:
return
current_observation = float(self.dataclose[0])
self.observation_buffer.append(current_observation)
# Prediction
x_pred = self.F @ self.x
P_pred = (self.F @ self.P @ self.F.T) + self.Q
try:
predicted_price = float(x_pred[0])
except Exception:
predicted_price = current_observation
self.prediction_buffer.append(predicted_price)
prediction_error = current_observation - predicted_price
self.error_buffer.append(prediction_error)
# Update
z = current_observation
y = z - (self.H @ x_pred).flatten()[0]
S = (self.H @ P_pred @ self.H.T) + self.R
S_inv = 1.0 / S[0, 0] if np.abs(S[0, 0]) > 1e-8 else 1.0
K = P_pred @ self.H.T * S_inv
self.x = x_pred + (K * y).flatten()
self.P = (self.I - (K @ self.H)) @ P_pred
if len(self.x) != 2:
self.x = np.array([float(self.x[0]), 0.0]) if len(self.x) > 0 else np.array([current_observation, 0.0])
# Adaptive retuning
if self._should_retune() and len(self.error_buffer) >= 50:
old_q, old_r = self.current_q, self.current_r
new_q, new_r = self._grid_search_optimal_noise()
if abs(np.log10(new_q) - np.log10(old_q)) > 0.1 or abs(np.log10(new_r) - np.log10(old_r)) > 0.1:
self.current_q, self.current_r = new_q, new_r
self._update_noise_matrices()
self.last_tune_bar = len(self)
self.tune_count += 1
# Output lines
self.lines.kf_price[0] = float(self.x[0])
self.lines.kf_velocity[0] = float(self.x[1])
self.lines.prediction_error[0] = float(prediction_error)
self.lines.current_q[0] = float(self.current_q)
self.lines.current_r[0] = float(self.current_r)The indicator’s velocity line becomes the primary trend signal in the trading strategy.
The portfolio strategy trades multiple USDC pairs simultaneously. For each asset it:
Computes Kalman velocity
Uses ADX and directional indicators to confirm trend strength
Uses ATR to scale position size relative to volatility and risk budget
Uses a trailing stop to lock in profits and define exits
Strategy parameters:
class AdaptiveKalmanCryptoPortfolio(bt.Strategy):
params = dict(
initial_process_noise=1e-3,
initial_measurement_noise=1e-1,
retune_frequency=100,
velocity_threshold=0.001,
adx_period=14,
atr_period=14,
adx_trend_threshold=20.0, # threshold for trend strength
trail_percent=0.02,
risk_per_trade=0.01,
max_gross_leverage=2.0,
printlog=True,
)Logging helper:
def log(self, txt, dt=None, doprint=False):
if self.p.printlog or doprint:
dt = dt or self.datas[0].datetime.datetime(0)
print(f"{dt.isoformat()} - {txt}")Per-asset state initialization:
def __init__(self):
self.assets = {}
for data in self.datas:
state = {}
state['data'] = data
state['kf'] = AdaptiveKalmanFilterIndicator(
data,
initial_process_noise=self.p.initial_process_noise,
initial_measurement_noise=self.p.initial_measurement_noise,
retune_frequency=self.p.retune_frequency,
)
state['kf_price'] = state['kf'].lines.kf_price
state['kf_velocity'] = state['kf'].lines.kf_velocity
state['adx'] = bt.indicators.ADX(data, period=self.p.adx_period)
state['plusdi'] = bt.indicators.PlusDI(data, period=self.p.adx_period)
state['minusdi'] = bt.indicators.MinusDI(data, period=self.p.adx_period)
state['atr'] = bt.indicators.ATR(data, period=self.p.atr_period)
state['order'] = None
state['stop_order'] = None
state['last_signal'] = None
self.assets[data._name] = stateEach asset has its own Kalman filter, ADX, DI, ATR, and state for working orders and trailing stops.
The notify_order hook reacts to order states and
attaches trailing stops on successful entries.
def notify_order(self, order):
data = order.data
name = data._name
state = self.assets.get(name)
if state is None:
return
if order.status in [order.Submitted, order.Accepted]:
return
if order.status == order.Completed:
side = 'BUY' if order.isbuy() else 'SELL'
cost = order.executed.value
price = order.executed.price
self.log(f"{name} {side} EXECUTED @ {price:.2f}, Size: {order.executed.size:.4f}, Cost: {cost:.2f}")
if self.p.trail_percent > 0:
if order.isbuy():
state['stop_order'] = self.sell(
data=data,
exectype=bt.Order.StopTrail,
trailpercent=self.p.trail_percent
)
else:
state['stop_order'] = self.buy(
data=data,
exectype=bt.Order.StopTrail,
trailpercent=self.p.trail_percent
)
state['order'] = None
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log(f"{name} ORDER {order.getstatusname()}")
if state['order'] is order:
state['order'] = None
if state['stop_order'] is order:
state['stop_order'] = NoneFor long entries, a StopTrail sell order is placed; for short
entries, a StopTrail buy order is placed. The trailing distance is
defined as a fraction of the entry price via
trail_percent.
Position size is calculated from ATR, portfolio equity, and a risk budget per trade.
Risk logic in plain text:
portfolio_value = current broker value
risk_budget = risk_per_trade * portfolio_value
dollar_vol_per_unit = ATR
units = risk_budget / dollar_vol_per_unit
This number of units ensures that a move of magnitude ATR risks
approximately risk_per_trade of the equity. A leverage cap
is then applied:
Final position size is:
Implementation:
def _compute_target_size(self, data, atr_value, signal_direction):
if atr_value <= 0 or signal_direction == 0:
return 0
portfolio_value = self.broker.getvalue()
risk_budget = self.p.risk_per_trade * portfolio_value
dollar_vol_per_unit = float(atr_value)
if dollar_vol_per_unit == 0:
return 0
units = risk_budget / dollar_vol_per_unit
price = data.close[0]
if price > 0:
max_units_by_leverage = (self.p.max_gross_leverage * portfolio_value) / price
units = min(units, max_units_by_leverage)
return int(signal_direction * units)The cast to int ensures discrete position sizes;
Backtrader can handle floats as well, but integer sizes are often
adequate for crypto contracts measured in units.
The main decision logic resides in next.
def next(self):
active_assets = []
min_bars_needed = max(self.p.adx_period, self.p.atr_period) + 1
for name, state in self.assets.items():
if len(state['data']) >= min_bars_needed:
active_assets.append(name)
if not active_assets:
returnOnly assets with sufficient history for ADX and ATR are considered. For each asset:
for name, state in self.assets.items():
if name not in active_assets:
continue
data = state['data']
if state['order']:
continue # wait for pending order to finish
kf_vel = state['kf_velocity'][0]
adx = state['adx'][0]
plusdi = state['plusdi'][0]
minusdi = state['minusdi'][0]
atr = state['atr'][0]
pos = self.getposition(data).size
# Trend filter
if adx < self.p.adx_trend_threshold:
if pos != 0 and state['stop_order']:
self.cancel(state['stop_order'])
state['stop_order'] = None
continue
strong_bullish = (kf_vel > self.p.velocity_threshold and plusdi > minusdi)
strong_bearish = (kf_vel < -self.p.velocity_threshold and minusdi > plusdi)
desired_dir = 1 if strong_bullish else (-1 if strong_bearish else 0)Interpretation:
ADX below adx_trend_threshold indicates a weak
trend; positions are not opened and existing trailing stops are
cancelled if any.
A long bias is created when Kalman velocity is above
velocity_threshold and +DI exceeds -DI.
A short bias is created when Kalman velocity is below
-velocity_threshold and -DI exceeds +DI.
Otherwise, the desired direction is flat.
Entry conditions:
if pos == 0:
if desired_dir != 0:
sig = 'buy' if desired_dir > 0 else 'sell'
if state['last_signal'] != sig:
size = self._compute_target_size(data, atr, desired_dir)
if size != 0:
self.log(f"{name} ENTRY Signal: {sig} | Vel: {kf_vel:.5f} | ADX: {adx:.1f}")
if desired_dir > 0:
state['order'] = self.buy(data=data, size=abs(size))
else:
state['order'] = self.sell(data=data, size=abs(size))
state['last_signal'] = sigThe strategy only takes a new position if:
There is no current position,
The desired direction is non-zero, and
The new signal is not the same as the last signal, to avoid duplicate orders.
Exit and reversal logic:
else:
current_dir = 1 if pos > 0 else -1
if desired_dir == 0:
# Exit to flat
if state['stop_order']:
self.cancel(state['stop_order'])
state['stop_order'] = None
self.log(f"{name} EXIT Signal (Trend Lost)")
if current_dir > 0:
state['order'] = self.sell(data=data, size=abs(pos))
else:
state['order'] = self.buy(data=data, size=abs(pos))
state['last_signal'] = None
elif desired_dir != current_dir:
# Reversal
if state['stop_order']:
self.cancel(state['stop_order'])
state['stop_order'] = None
size = self._compute_target_size(data, atr, desired_dir)
total_size = abs(pos) + abs(size)
self.log(f"{name} REVERSAL Signal")
if current_dir > 0:
state['order'] = self.sell(data=data, size=total_size)
else:
state['order'] = self.buy(data=data, size=total_size)
state['last_signal'] = 'buy' if desired_dir > 0 else 'sell'If the trend disappears (desired direction zero), the strategy closes the position and cancels trailing stops. If the trend reverses direction, a single reversal order is submitted whose size equals existing position plus new target size.
At the end of the backtest, the strategy reports final portfolio value:
def stop(self):
self.log(f"Strategy Ended. Value: {self.broker.getvalue():.2f}", doprint=True)The main block sets up the Backtrader engine, data feeds, analyzers and reporting.
if __name__ == '__main__':
# A. Setup
cerebro = bt.Cerebro()
cerebro.broker.setcash(100000.0)
cerebro.broker.setcommission(commission=0.001) # 0.1% Standard Crypto FeeUniverse of liquid USDC pairs:
# B. Define Universe
symbols = ['BTC/USDC', 'ETH/USDC', 'SOL/USDC', 'BNB/USDC', 'XRP/USDC',
'ADA/USDC', 'DOGE/USDC', 'AVAX/USDC', 'LINK/USDC', 'LTC/USDC']Data ingestion loop (hourly bars, 90 days):
# C. Data Ingestion
print("--- Starting Data Download ---")
for sym in symbols:
try:
df = fetch_binance_data(sym, timeframe='1h', limit_days=90)
if df.empty:
print(f"Skipping {sym}: No data found.")
continue
data_feed = bt.feeds.PandasData(
dataname=df,
name=sym,
timeframe=bt.TimeFrame.Minutes,
compression=60
)
cerebro.adddata(data_feed)
print(f"Loaded {sym}: {len(df)} bars")
except Exception as e:
print(f"Failed to load {sym}: {e}")Strategy injection with a more conservative configuration than the default parameters:
# D. Strategy Injection (Conservative Tuning)
cerebro.addstrategy(
AdaptiveKalmanCryptoPortfolio,
risk_per_trade=0.005, # 0.5% risk per trade
max_gross_leverage=1.0, # no leverage, cash-only exposure
trail_percent=0.02, # 2% trailing stop
velocity_threshold=0.0005,
adx_trend_threshold=25.0 # require stronger trends
)Analyzers for performance metrics:
# E. Analytics
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe',
timeframe=bt.TimeFrame.Days, annualize=True)
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='time_return',
timeframe=bt.TimeFrame.Days)Execution and core console report:
# F. Run
print(f"\nStarting Portfolio Value: {cerebro.broker.getvalue():,.2f}")
results = cerebro.run()
strat = results[0]
# G. Report
final_val = cerebro.broker.getvalue()
print(f"Final Portfolio Value: {final_val:,.2f}")
sharpe = strat.analyzers.sharpe.get_analysis()
dd = strat.analyzers.drawdown.get_analysis()
trades = strat.analyzers.trades.get_analysis()
print("\n--- Performance Metrics ---")
print(f"Sharpe Ratio: {sharpe['sharperatio']:.3f}" if sharpe['sharperatio'] else "Sharpe Ratio: N/A")
print(f"Max Drawdown: {dd['max']['drawdown']:.2f}%")
print(f"Total Trades: {trades.get('total', {}).get('total', 0)}")For the sample run, these lines produce:
Final Portfolio Value: 155,373.74
Sharpe Ratio: 1.741
Max Drawdown: 32.05%
Total Trades: 328
The remaining block extracts daily returns, cleans them, and produces both Matplotlib plots and a QuantStats HTML report.
# H. ADVANCED REPORTING & BENCHMARKING
import quantstats as qs
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('inline')
print("\n--- Generating Professional Report ---")
# 1. Extract Strategy Returns
return_analyzer = strat.analyzers.time_return.get_analysis()
returns_series = pd.Series(return_analyzer)Index and value cleaning:
returns_series.index = pd.to_datetime(returns_series.index)
returns_series = returns_series[returns_series.index.year < 2100]
returns_series.index = returns_series.index.tz_localize(None)
returns_series = returns_series.astype(float)
returns_series.name = "Portfolio_Strategy"
returns_series = returns_series.sort_index()Benchmark using daily BTC-USD from yfinance, aligned to the same date index:
if returns_series.empty:
print("Error: No return data found. Double check Section E has TimeFrame.Days")
else:
start_date = returns_series.index[0]
end_date = returns_series.index[-1]
print(f"Fetching Benchmark for: {start_date.date()} to {end_date.date()}")
try:
import yfinance as yf
bench_df = yf.download("BTC-USD", start=start_date,
end=end_date + pd.Timedelta(days=1),
progress=False)
benchmark = bench_df['Close'].pct_change().dropna()
benchmark.index = pd.to_datetime(benchmark.index).tz_localize(None)
benchmark = benchmark.reindex(returns_series.index).fillna(0.0)
except Exception as e:
print(f"Could not fetch benchmark: {e}")
benchmark = NoneManual Matplotlib equity curve and drawdown:
try:
plt.style.use('seaborn-v0_8-darkgrid')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8),
gridspec_kw={'height_ratios': [2, 1]})
strat_cum = (1 + returns_series).cumprod()
bench_cum = (1 + benchmark).cumprod() if benchmark is not None else None
# Equity curve
ax1.plot(strat_cum.index, strat_cum.values,
label='Strategy (Kalman)', linewidth=2, color='#2ca02c')
if bench_cum is not None:
ax1.plot(bench_cum.index, bench_cum.values,
label='Bitcoin Benchmark', linewidth=1.5,
color='#7f7f7f', alpha=0.7)
ax1.set_title('Cumulative Performance', fontsize=12, fontweight='bold')
ax1.legend()
ax1.set_ylabel('Growth (1.0 = Start)')
# Drawdown
running_max = strat_cum.cummax()
drawdown = (strat_cum - running_max) / running_max
ax2.fill_between(drawdown.index, drawdown.values, 0,
color='#d62728', alpha=0.3)
ax2.plot(drawdown.index, drawdown.values,
color='#d62728', linewidth=1)
ax2.set_title('Portfolio Drawdown', fontsize=12, fontweight='bold')
ax2.set_ylabel('Drawdown %')
plt.tight_layout()
plt.savefig('portfolio_performance_chart.png')
print(">> Plot saved as 'portfolio_performance_chart.png'")
plt.show()
except Exception as e:
print(f"Manual Plotting Error: {e}")QuantStats HTML report and additional terminal stats:
try:
qs.reports.html(
returns_series,
benchmark=benchmark,
output="portfolio_metrics_report.html",
title='Crypto Kalman Portfolio Audit'
)
print(">> Full HTML Report saved as 'portfolio_metrics_report.html'")
except Exception as e:
print(f"HTML Report Error: {e}")
print("(Note: Try installing: `pip install --upgrade pandas quantstats`)")
print("\n[Strategy Risk Metrics]")
try:
print(f"Sortino Ratio: {qs.stats.sortino(returns_series):.2f}")
print(f"CAGR: {qs.stats.cagr(returns_series) * 100:.2f}%")
print(f"Win Rate: {qs.stats.win_rate(returns_series) * 100:.2f}%")
except:
print("Metrics calculation failed.")From this run, QuantStats reports:
Sortino Ratio: 3.73
CAGR: 238.82 percent
Win Rate: 48.89 percent
The figure at the top shows the cumulative performance of the strategy versus the Bitcoin benchmark, and the corresponding drawdown profile over the backtest period (2025-09-04 to 2025-12-03). The adaptive Kalman portfolio substantially outperforms buy-and-hold Bitcoin over this window while maintaining a manageable drawdown profile.
The presented framework demonstrates that an adaptive Kalman filter can serve as a powerful trend component in a multi-asset crypto portfolio:
Kalman velocity provides a smooth, dynamically tuned measure of trend direction and strength.
ADX and DI serve as a regime filter, restricting trades to periods with sufficiently strong directional movement.
ATR-based position sizing and trailing stops link risk directly to volatility and account equity.
Several extensions are natural:
Include transaction cost modeling beyond a fixed commission to account for slippage and spread.
Add regime detection based on volatility or volume and allow Kalman parameters or thresholds to vary across regimes.
Integrate position-level take-profit rules in addition to trailing stops.
Extend the universe to futures contracts and allow shorting via derivatives with explicit funding rate modeling.
Use Bayesian or EM-based approaches to adapt Q and R instead of grid search to reduce computational burden.
Despite these limitations, the current implementation already provides a strong starting point for an adaptive, statistically grounded crypto portfolio strategy, along with a full, production-grade pipeline from data acquisition to comprehensive performance reporting.