← Back to Home
Adaptive Kalman Filter Crypto Portfolio With Backtrader, CCXT, and QuantStats

Adaptive Kalman Filter Crypto Portfolio With Backtrader, CCXT, and QuantStats

Kalman filters are well known in control theory and signal processing for extracting a latent “true” state from noisy observations. Applied to financial time series, a Kalman filter can be used to estimate a smoothed price and the hidden velocity of the price process. That velocity can serve as a dynamic trend indicator which adapts to changing volatility regimes.

This article presents a complete multi-asset cryptocurrency portfolio strategy that uses:

The final implementation produces results over 90 days such as:

The backtest runs on ten USDC pairs and generates the cumulative equity curve and drawdown shown in the figure at the start. Everything is implemented in plain Python using the code blocks detailed below.

Data pipeline: Binance OHLCV into pandas

The first component is a robust data loader that pulls OHLCV candles from Binance spot markets via CCXT, handles pagination, and returns a clean pandas DataFrame indexed by datetime.

import backtrader as bt
import ccxt
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from collections import deque
from itertools import product
import warnings
warnings.filterwarnings(action='ignore')

Data fetching function:

def fetch_binance_data(symbol, timeframe='1h', limit_days=90):
    """
    Fetches historical OHLCV data from Binance via CCXT.
    Handles pagination to get full history.
    """
    exchange = ccxt.binance({
        'enableRateLimit': True,
        'options': {'defaultType': 'spot'}  # Change to 'future' if derivatives needed
    })

    # Calculate start time (milliseconds)
    since = exchange.parse8601((datetime.utcnow() - timedelta(days=limit_days)).isoformat())
    
    all_ohlcv = []
    
    print(f"Fetching {symbol} ({timeframe}) data for last {limit_days} days...")
    
    while True:
        try:
            ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit=1000)
            if not ohlcv:
                break
            
            all_ohlcv.extend(ohlcv)
            
            # Update 'since' to the last timestamp + 1 timeframe duration (approx)
            # Binance returns [timestamp, open, high, low, close, volume]
            last_timestamp = ohlcv[-1][0]
            since = last_timestamp + 1
            
            # Break if we reached current time close enough
            if len(ohlcv) < 1000:
                break
                
        except Exception as e:
            print(f"Error fetching data: {e}")
            break

    # Convert to DataFrame
    df = pd.DataFrame(all_ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('datetime', inplace=True)
    df.drop(columns=['timestamp'], inplace=True)
    
    # Ensure numeric types
    cols = ['open', 'high', 'low', 'close', 'volume']
    df[cols] = df[cols].apply(pd.to_numeric)
    
    return df

Key points:

This function acts as a generic binance-to-pandas adapter for any symbol and timeframe.

Kalman filter formulation

The indicator models the price process as a two-dimensional state:

where

Prediction step:

Update step:

The indicator outputs:

Adaptive Kalman Filter indicator in Backtrader

The following class implements the model above. It also performs periodic re-tuning of the noise parameters using historical observation data.

class AdaptiveKalmanFilterIndicator(bt.Indicator):
    lines = ('kf_price', 'kf_velocity', 'prediction_error', 'current_q', 'current_r')
    params = (
        ('initial_process_noise', 1e-3),
        ('initial_measurement_noise', 1e-1),
        ('retune_frequency', 100),
        ('error_buffer_size', 200),
        ('grid_size', 5),
        ('min_q', 1e-6),
        ('max_q', 1e-2),
        ('min_r', 1e-3),
        ('max_r', 1e0),
    )
    plotinfo = dict(subplot=False, plotlinelabels=True)

    def __init__(self):
        self.dataclose = self.data.close
        self.F = np.array([[1.0, 1.0], [0.0, 1.0]]) 
        self.H = np.array([[1.0, 0.0]])
        self.I = np.eye(2)
        self.current_q = self.p.initial_process_noise
        self.current_r = self.p.initial_measurement_noise
        self._update_noise_matrices()
        self.x = None
        self.P = None
        self.initialized = False
        self.error_buffer = deque(maxlen=self.p.error_buffer_size)
        self.prediction_buffer = deque(maxlen=self.p.error_buffer_size)
        self.observation_buffer = deque(maxlen=self.p.error_buffer_size)
        self.last_tune_bar = 0
        self.tune_count = 0

Noise matrix setup:

    def _update_noise_matrices(self):
        q_val = self.current_q
        self.Q = np.array([[(q_val**2)/4, (q_val**2)/2],
                           [(q_val**2)/2,  q_val**2]])
        self.R = np.array([[self.current_r**2]])

The process noise covariance is defined in terms of q_val^2. A higher q_val lets the filter adjust more quickly; a lower q_val forces more smoothing.

Lazy initialization uses the first available close price:

    def _lazyinit(self):
        try:
            initial_price = float(self.dataclose[0])
        except (IndexError, TypeError, ValueError):
            return
        self.x = np.array([initial_price, 0.0])
        self.P = np.array([[1.0, 0.0], [0.0, 100.0]])
        self.initialized = True
        self.last_tune_bar = len(self)

The core adaptation logic uses a small grid search of candidate (q, r) pairs. For each pair, a short Kalman simulation is run over the stored observations and mean squared prediction error is measured.

    def _grid_search_optimal_noise(self):
        if len(self.error_buffer) < 50:
            return self.current_q, self.current_r

        q_values = np.logspace(np.log10(self.p.min_q), np.log10(self.p.max_q), self.p.grid_size)
        r_values = np.logspace(np.log10(self.p.min_r), np.log10(self.p.max_r), self.p.grid_size)

        best_mse = float('inf')
        best_q, best_r = self.current_q, self.current_r
        observations = np.array(list(self.observation_buffer))

        for q_test, r_test in product(q_values, r_values):
            try:
                mse = self._evaluate_parameters(q_test, r_test, observations)
            except Exception:
                mse = float('inf')
            if mse < best_mse:
                best_mse = mse
                best_q, best_r = q_test, r_test

        return best_q, best_r

Evaluation function:

    def _evaluate_parameters(self, q_val, r_val, observations):
        if len(observations) < 10:
            return float('inf')
        F, H = self.F.copy(), self.H.copy()
        Q = np.array([[(q_val**2)/4, (q_val**2)/2],
                      [(q_val**2)/2,  q_val**2]])
        R = np.array([[r_val**2]])
        x = np.array([observations[0], 0.0])
        P = np.array([[1.0, 0.0], [0.0, 100.0]])
        errs = []
        for i in range(1, min(len(observations), 100)):
            z = observations[i]
            x_pred = F @ x
            P_pred = (F @ P @ F.T) + Q
            errs.append((z - x_pred[0])**2)
            y = z - (H @ x_pred)
            S = (H @ P_pred @ H.T) + R
            S_inv = 1.0 / S[0, 0] if np.abs(S[0, 0]) > 1e-8 else 1.0
            K = P_pred @ H.T * S_inv
            x = x_pred + (K * y)
            P = (self.I - (K @ H)) @ P_pred
        return np.mean(errs) if errs else float('inf')

Retuning condition:

    def _should_retune(self):
        return (len(self) - self.last_tune_bar) >= self.p.retune_frequency

Full next method with prediction, update, buffering and retuning:

    def next(self):
        if not self.initialized:
            self._lazyinit()
            if not self.initialized:
                return

        current_observation = float(self.dataclose[0])
        self.observation_buffer.append(current_observation)

        # Prediction
        x_pred = self.F @ self.x
        P_pred = (self.F @ self.P @ self.F.T) + self.Q

        try:
            predicted_price = float(x_pred[0])
        except Exception:
            predicted_price = current_observation

        self.prediction_buffer.append(predicted_price)
        prediction_error = current_observation - predicted_price
        self.error_buffer.append(prediction_error)

        # Update
        z = current_observation
        y = z - (self.H @ x_pred).flatten()[0]
        S = (self.H @ P_pred @ self.H.T) + self.R
        S_inv = 1.0 / S[0, 0] if np.abs(S[0, 0]) > 1e-8 else 1.0
        K = P_pred @ self.H.T * S_inv
        self.x = x_pred + (K * y).flatten()
        self.P = (self.I - (K @ self.H)) @ P_pred

        if len(self.x) != 2:
            self.x = np.array([float(self.x[0]), 0.0]) if len(self.x) > 0 else np.array([current_observation, 0.0])

        # Adaptive retuning
        if self._should_retune() and len(self.error_buffer) >= 50:
            old_q, old_r = self.current_q, self.current_r
            new_q, new_r = self._grid_search_optimal_noise()
            if abs(np.log10(new_q) - np.log10(old_q)) > 0.1 or abs(np.log10(new_r) - np.log10(old_r)) > 0.1:
                self.current_q, self.current_r = new_q, new_r
                self._update_noise_matrices()
            self.last_tune_bar = len(self)
            self.tune_count += 1

        # Output lines
        self.lines.kf_price[0] = float(self.x[0])
        self.lines.kf_velocity[0] = float(self.x[1])
        self.lines.prediction_error[0] = float(prediction_error)
        self.lines.current_q[0] = float(self.current_q)
        self.lines.current_r[0] = float(self.current_r)

The indicator’s velocity line becomes the primary trend signal in the trading strategy.

Strategy design: Adaptive Kalman Crypto Portfolio

The portfolio strategy trades multiple USDC pairs simultaneously. For each asset it:

Strategy parameters:

class AdaptiveKalmanCryptoPortfolio(bt.Strategy):
    params = dict(
        initial_process_noise=1e-3,
        initial_measurement_noise=1e-1,
        retune_frequency=100,
        velocity_threshold=0.001,
        adx_period=14,
        atr_period=14,
        adx_trend_threshold=20.0,  # threshold for trend strength
        trail_percent=0.02,
        risk_per_trade=0.01,
        max_gross_leverage=2.0,
        printlog=True,
    )

Logging helper:

    def log(self, txt, dt=None, doprint=False):
        if self.p.printlog or doprint:
            dt = dt or self.datas[0].datetime.datetime(0)
            print(f"{dt.isoformat()} - {txt}")

Per-asset state initialization:

    def __init__(self):
        self.assets = {}
        for data in self.datas:
            state = {}
            state['data'] = data

            state['kf'] = AdaptiveKalmanFilterIndicator(
                data,
                initial_process_noise=self.p.initial_process_noise,
                initial_measurement_noise=self.p.initial_measurement_noise,
                retune_frequency=self.p.retune_frequency,
            )
            state['kf_price'] = state['kf'].lines.kf_price
            state['kf_velocity'] = state['kf'].lines.kf_velocity

            state['adx'] = bt.indicators.ADX(data, period=self.p.adx_period)
            state['plusdi'] = bt.indicators.PlusDI(data, period=self.p.adx_period)
            state['minusdi'] = bt.indicators.MinusDI(data, period=self.p.adx_period)
            state['atr'] = bt.indicators.ATR(data, period=self.p.atr_period)

            state['order'] = None
            state['stop_order'] = None
            state['last_signal'] = None

            self.assets[data._name] = state

Each asset has its own Kalman filter, ADX, DI, ATR, and state for working orders and trailing stops.

Order handling and trailing stops

The notify_order hook reacts to order states and attaches trailing stops on successful entries.

    def notify_order(self, order):
        data = order.data
        name = data._name
        state = self.assets.get(name)
        if state is None:
            return

        if order.status in [order.Submitted, order.Accepted]:
            return

        if order.status == order.Completed:
            side = 'BUY' if order.isbuy() else 'SELL'
            cost = order.executed.value
            price = order.executed.price
            self.log(f"{name} {side} EXECUTED @ {price:.2f}, Size: {order.executed.size:.4f}, Cost: {cost:.2f}")

            if self.p.trail_percent > 0:
                if order.isbuy():
                    state['stop_order'] = self.sell(
                        data=data,
                        exectype=bt.Order.StopTrail,
                        trailpercent=self.p.trail_percent
                    )
                else:
                    state['stop_order'] = self.buy(
                        data=data,
                        exectype=bt.Order.StopTrail,
                        trailpercent=self.p.trail_percent
                    )
            state['order'] = None

        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log(f"{name} ORDER {order.getstatusname()}")
            if state['order'] is order:
                state['order'] = None
            if state['stop_order'] is order:
                state['stop_order'] = None

For long entries, a StopTrail sell order is placed; for short entries, a StopTrail buy order is placed. The trailing distance is defined as a fraction of the entry price via trail_percent.

Volatility-scaled position sizing

Position size is calculated from ATR, portfolio equity, and a risk budget per trade.

Risk logic in plain text:

This number of units ensures that a move of magnitude ATR risks approximately risk_per_trade of the equity. A leverage cap is then applied:

Final position size is:

Implementation:

    def _compute_target_size(self, data, atr_value, signal_direction):
        if atr_value <= 0 or signal_direction == 0:
            return 0

        portfolio_value = self.broker.getvalue()
        risk_budget = self.p.risk_per_trade * portfolio_value
        dollar_vol_per_unit = float(atr_value)
        if dollar_vol_per_unit == 0:
            return 0
        
        units = risk_budget / dollar_vol_per_unit
        price = data.close[0]

        if price > 0:
            max_units_by_leverage = (self.p.max_gross_leverage * portfolio_value) / price
            units = min(units, max_units_by_leverage)
        
        return int(signal_direction * units)

The cast to int ensures discrete position sizes; Backtrader can handle floats as well, but integer sizes are often adequate for crypto contracts measured in units.

Trading logic using Kalman velocity and ADX

The main decision logic resides in next.

    def next(self):
        active_assets = []
        min_bars_needed = max(self.p.adx_period, self.p.atr_period) + 1
        for name, state in self.assets.items():
            if len(state['data']) >= min_bars_needed:
                active_assets.append(name)
        
        if not active_assets:
            return

Only assets with sufficient history for ADX and ATR are considered. For each asset:

        for name, state in self.assets.items():
            if name not in active_assets:
                continue
            data = state['data']
            if state['order']:
                continue  # wait for pending order to finish

            kf_vel = state['kf_velocity'][0]
            adx = state['adx'][0]
            plusdi = state['plusdi'][0]
            minusdi = state['minusdi'][0]
            atr = state['atr'][0]
            pos = self.getposition(data).size

            # Trend filter
            if adx < self.p.adx_trend_threshold:
                if pos != 0 and state['stop_order']:
                    self.cancel(state['stop_order'])
                    state['stop_order'] = None
                continue

            strong_bullish = (kf_vel > self.p.velocity_threshold and plusdi > minusdi)
            strong_bearish = (kf_vel < -self.p.velocity_threshold and minusdi > plusdi)

            desired_dir = 1 if strong_bullish else (-1 if strong_bearish else 0)

Interpretation:

Entry conditions:

            if pos == 0:
                if desired_dir != 0:
                    sig = 'buy' if desired_dir > 0 else 'sell'
                    if state['last_signal'] != sig:
                        size = self._compute_target_size(data, atr, desired_dir)
                        if size != 0:
                            self.log(f"{name} ENTRY Signal: {sig} | Vel: {kf_vel:.5f} | ADX: {adx:.1f}")
                            if desired_dir > 0:
                                state['order'] = self.buy(data=data, size=abs(size))
                            else:
                                state['order'] = self.sell(data=data, size=abs(size))
                            state['last_signal'] = sig

The strategy only takes a new position if:

Exit and reversal logic:

            else:
                current_dir = 1 if pos > 0 else -1
                if desired_dir == 0:
                    # Exit to flat
                    if state['stop_order']: 
                        self.cancel(state['stop_order'])
                        state['stop_order'] = None
                    self.log(f"{name} EXIT Signal (Trend Lost)")
                    if current_dir > 0:
                        state['order'] = self.sell(data=data, size=abs(pos))
                    else:
                        state['order'] = self.buy(data=data, size=abs(pos))
                    state['last_signal'] = None

                elif desired_dir != current_dir:
                    # Reversal
                    if state['stop_order']: 
                        self.cancel(state['stop_order'])
                        state['stop_order'] = None
                    size = self._compute_target_size(data, atr, desired_dir)
                    total_size = abs(pos) + abs(size)
                    self.log(f"{name} REVERSAL Signal")
                    if current_dir > 0:
                        state['order'] = self.sell(data=data, size=total_size)
                    else:
                        state['order'] = self.buy(data=data, size=total_size)
                    state['last_signal'] = 'buy' if desired_dir > 0 else 'sell'

If the trend disappears (desired direction zero), the strategy closes the position and cancels trailing stops. If the trend reverses direction, a single reversal order is submitted whose size equals existing position plus new target size.

At the end of the backtest, the strategy reports final portfolio value:

    def stop(self):
        self.log(f"Strategy Ended. Value: {self.broker.getvalue():.2f}", doprint=True)

Main script: universe, analyzers and execution

The main block sets up the Backtrader engine, data feeds, analyzers and reporting.

if __name__ == '__main__':
    # A. Setup
    cerebro = bt.Cerebro()
    cerebro.broker.setcash(100000.0)
    cerebro.broker.setcommission(commission=0.001)  # 0.1% Standard Crypto Fee

Universe of liquid USDC pairs:

    # B. Define Universe
    symbols = ['BTC/USDC', 'ETH/USDC', 'SOL/USDC', 'BNB/USDC', 'XRP/USDC', 
               'ADA/USDC', 'DOGE/USDC', 'AVAX/USDC', 'LINK/USDC', 'LTC/USDC']

Data ingestion loop (hourly bars, 90 days):

    # C. Data Ingestion
    print("--- Starting Data Download ---")
    for sym in symbols:
        try:
            df = fetch_binance_data(sym, timeframe='1h', limit_days=90)
            if df.empty:
                print(f"Skipping {sym}: No data found.")
                continue
                
            data_feed = bt.feeds.PandasData(
                dataname=df,
                name=sym,
                timeframe=bt.TimeFrame.Minutes,
                compression=60
            )
            cerebro.adddata(data_feed)
            print(f"Loaded {sym}: {len(df)} bars")
            
        except Exception as e:
            print(f"Failed to load {sym}: {e}")

Strategy injection with a more conservative configuration than the default parameters:

    # D. Strategy Injection (Conservative Tuning)
    cerebro.addstrategy(
        AdaptiveKalmanCryptoPortfolio,
        risk_per_trade=0.005,      # 0.5% risk per trade
        max_gross_leverage=1.0,    # no leverage, cash-only exposure
        trail_percent=0.02,        # 2% trailing stop
        velocity_threshold=0.0005, 
        adx_trend_threshold=25.0   # require stronger trends
    )

Analyzers for performance metrics:

    # E. Analytics
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe',
                        timeframe=bt.TimeFrame.Days, annualize=True)
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')

    cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='time_return',
                        timeframe=bt.TimeFrame.Days)

Execution and core console report:

    # F. Run
    print(f"\nStarting Portfolio Value: {cerebro.broker.getvalue():,.2f}")
    results = cerebro.run()
    strat = results[0]

    # G. Report
    final_val = cerebro.broker.getvalue()
    print(f"Final Portfolio Value: {final_val:,.2f}")
    
    sharpe = strat.analyzers.sharpe.get_analysis()
    dd = strat.analyzers.drawdown.get_analysis()
    trades = strat.analyzers.trades.get_analysis()
    
    print("\n--- Performance Metrics ---")
    print(f"Sharpe Ratio: {sharpe['sharperatio']:.3f}" if sharpe['sharperatio'] else "Sharpe Ratio: N/A")
    print(f"Max Drawdown: {dd['max']['drawdown']:.2f}%")
    print(f"Total Trades: {trades.get('total', {}).get('total', 0)}")

For the sample run, these lines produce:

Advanced reporting and benchmarking

The remaining block extracts daily returns, cleans them, and produces both Matplotlib plots and a QuantStats HTML report.

    # H. ADVANCED REPORTING & BENCHMARKING
    import quantstats as qs
    import pandas as pd
    import matplotlib.pyplot as plt
    import matplotlib
    matplotlib.use('inline')

    print("\n--- Generating Professional Report ---")

    # 1. Extract Strategy Returns
    return_analyzer = strat.analyzers.time_return.get_analysis()
    returns_series = pd.Series(return_analyzer)

Index and value cleaning:

    returns_series.index = pd.to_datetime(returns_series.index)
    returns_series = returns_series[returns_series.index.year < 2100]
    returns_series.index = returns_series.index.tz_localize(None)
    returns_series = returns_series.astype(float)
    returns_series.name = "Portfolio_Strategy"
    returns_series = returns_series.sort_index()

Benchmark using daily BTC-USD from yfinance, aligned to the same date index:

    if returns_series.empty:
        print("Error: No return data found. Double check Section E has TimeFrame.Days")
    else:
        start_date = returns_series.index[0]
        end_date = returns_series.index[-1]
        
        print(f"Fetching Benchmark for: {start_date.date()} to {end_date.date()}")
        try:
            import yfinance as yf
            bench_df = yf.download("BTC-USD", start=start_date,
                                   end=end_date + pd.Timedelta(days=1),
                                   progress=False)
            benchmark = bench_df['Close'].pct_change().dropna()
            benchmark.index = pd.to_datetime(benchmark.index).tz_localize(None)
            benchmark = benchmark.reindex(returns_series.index).fillna(0.0)
        except Exception as e:
            print(f"Could not fetch benchmark: {e}")
            benchmark = None

Manual Matplotlib equity curve and drawdown:

        try:
            plt.style.use('seaborn-v0_8-darkgrid')
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8),
                                           gridspec_kw={'height_ratios': [2, 1]})

            strat_cum = (1 + returns_series).cumprod()
            bench_cum = (1 + benchmark).cumprod() if benchmark is not None else None

            # Equity curve
            ax1.plot(strat_cum.index, strat_cum.values,
                     label='Strategy (Kalman)', linewidth=2, color='#2ca02c')
            if bench_cum is not None:
                ax1.plot(bench_cum.index, bench_cum.values,
                         label='Bitcoin Benchmark', linewidth=1.5,
                         color='#7f7f7f', alpha=0.7)
            ax1.set_title('Cumulative Performance', fontsize=12, fontweight='bold')
            ax1.legend()
            ax1.set_ylabel('Growth (1.0 = Start)')

            # Drawdown
            running_max = strat_cum.cummax()
            drawdown = (strat_cum - running_max) / running_max
            ax2.fill_between(drawdown.index, drawdown.values, 0,
                             color='#d62728', alpha=0.3)
            ax2.plot(drawdown.index, drawdown.values,
                     color='#d62728', linewidth=1)
            ax2.set_title('Portfolio Drawdown', fontsize=12, fontweight='bold')
            ax2.set_ylabel('Drawdown %')
            
            plt.tight_layout()
            plt.savefig('portfolio_performance_chart.png')
            print(">> Plot saved as 'portfolio_performance_chart.png'")
            plt.show()
        except Exception as e:
            print(f"Manual Plotting Error: {e}")
Pasted image 20251203185952.png

QuantStats HTML report and additional terminal stats:

        try:
            qs.reports.html(
                returns_series, 
                benchmark=benchmark, 
                output="portfolio_metrics_report.html", 
                title='Crypto Kalman Portfolio Audit'
            )
            print(">> Full HTML Report saved as 'portfolio_metrics_report.html'")
        except Exception as e:
            print(f"HTML Report Error: {e}")
            print("(Note: Try installing: `pip install --upgrade pandas quantstats`)")

        print("\n[Strategy Risk Metrics]")
        try:
            print(f"Sortino Ratio: {qs.stats.sortino(returns_series):.2f}")
            print(f"CAGR:          {qs.stats.cagr(returns_series) * 100:.2f}%")
            print(f"Win Rate:      {qs.stats.win_rate(returns_series) * 100:.2f}%")
        except:
            print("Metrics calculation failed.")

From this run, QuantStats reports:

The figure at the top shows the cumulative performance of the strategy versus the Bitcoin benchmark, and the corresponding drawdown profile over the backtest period (2025-09-04 to 2025-12-03). The adaptive Kalman portfolio substantially outperforms buy-and-hold Bitcoin over this window while maintaining a manageable drawdown profile.

Discussion and extensions

The presented framework demonstrates that an adaptive Kalman filter can serve as a powerful trend component in a multi-asset crypto portfolio:

Several extensions are natural:

Despite these limitations, the current implementation already provides a strong starting point for an adaptive, statistically grounded crypto portfolio strategy, along with a full, production-grade pipeline from data acquisition to comprehensive performance reporting.