← Back to Home
Navigating Multi-Asset Universes An Algorithmic Exploration of Enhanced Dual Momentum

Navigating Multi-Asset Universes An Algorithmic Exploration of Enhanced Dual Momentum

In the relentless currents of financial markets, a compelling quest for quantitative traders is to identify and capitalize on persistent trends across diverse assets. While simple momentum strategies often involve buying past winners, their effectiveness can be undermined by sudden market reversals, volatile periods, or illiquid assets. The challenge lies in constructing a strategy resilient enough to navigate these complexities.

This article explores an Enhanced Dual Momentum Strategy, a sophisticated algorithmic approach designed to select and manage a portfolio of assets based on a multi-layered filtering system. It aims to investigate whether combining robust momentum signals with stringent risk management, liquidity checks, and dynamic position sizing can lead to a more adaptive and potentially superior portfolio performance. The ultimate test of such a strategy’s mettle comes through walk-forward analysis, a rigorous backtesting methodology employed to assess its true robustness across unseen market conditions.


The Core Idea: Intelligent Asset Rotation with Robust Filters

The strategy operates on a blend of well-established quantitative principles, layered to create a more discerning approach to asset allocation:

  1. Dual Momentum: The foundational concept involves identifying assets that exhibit strong historical performance. This often combines:

    • Absolute Momentum: Requiring an asset to have a positive return over a lookback period (e.g., 12 months) to ensure it is in an uptrend, preventing participation in bear markets.
    • Relative Momentum: Selecting the best-performing asset from a universe of contenders that pass the absolute momentum screen. The strategy hypothesizes that assets in strong, persistent trends are more likely to continue outperforming.
  2. Multi-Layered Filtering: Beyond pure momentum, the strategy applies several crucial filters to select truly eligible assets:

    • Volatility Filtering: Assets that are excessively volatile may pose undue risk. The strategy includes a maximum volatility threshold to exclude such assets, promoting smoother portfolio returns.
    • Volume Filtering: Illiquid assets can be challenging to trade effectively due to wide bid-ask spreads and difficulty in executing large orders. A minimum volume ratio ensures selection is limited to sufficiently liquid assets.
    • Market Regime Filtering: A critical macro-level filter. By optionally checking if the broader market (or a benchmark asset) is above its long-term moving average, the strategy seeks to avoid participating in overall bear markets, potentially mitigating significant drawdowns.
  3. Sophisticated Risk Management & Position Sizing:

    • Drawdown Protection: A portfolio-level safeguard. If the overall portfolio value experiences a drawdown exceeding a predefined threshold, the strategy moves all capital to cash, acting as a circuit breaker to preserve capital during severe downturns.
    • Volatility-Based Position Sizing: Rather than allocating a fixed percentage to each asset, the strategy dynamically adjusts the position size based on the selected asset’s volatility. This aims to equalize the risk contribution of each asset to the overall portfolio, targeting a specific portfolio volatility. The hypothesis is that this leads to more efficient risk allocation and potentially smoother equity curves.
  4. Systematic Rebalancing & Transaction Costs: The strategy rebalances the portfolio at a fixed interval (e.g., monthly), re-evaluating eligible assets. Realistic transaction costs and slippage are factored into the backtest to provide a more accurate assessment of real-world performance.

  5. Walk-Forward Analysis: The entire framework is designed to be tested using walk-forward analysis. This rigorous backtesting technique involves splitting historical data into sequential “out-of-sample” periods. By simulating how the strategy (with a pre-defined or previously optimized set of parameters) performs on data it has not seen during its development, walk-forward analysis provides a much more robust estimate of its real-world viability and generalizability, guarding against overfitting.

The overarching aim is to investigate whether a comprehensive, rule-based approach to asset selection, combined with dynamic risk management and validated through rigorous testing, can generate superior risk-adjusted returns across evolving market conditions.


Algorithmic Implementation: A backtrader Strategy

The following backtrader code provides a concrete implementation of this Enhanced Dual Momentum Strategy. It illustrates how the multi-layered filtering, rebalancing, and risk management concepts are translated into executable code within a multi-asset backtesting environment.

Step 1: Strategy Foundation: Initialization and Parameter Configuration

This section sets up the core EnhancedDualMomentumStrategy class, defining its adjustable parameters and initializing the data structures needed to track multiple assets and portfolio performance.

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
from datetime import datetime, timedelta
import itertools
warnings.filterwarnings("ignore") # Suppress warnings, often from pandas/yfinance

plt.rcParams['figure.figsize'] = (10, 15) # Set default plot size

class EnhancedDualMomentumStrategy(bt.Strategy):
    params = (
        # Core Momentum Parameters
        ('momentum_lookback', 252),      # 12 months lookback (252 trading days)
        ('short_momentum_lookback', 63), # 3 months for short-term momentum
        ('abs_momentum_threshold', 0.0), # Minimum absolute momentum required
        ('rebalance_period', 21),        # Days between rebalancing (monthly)
        
        # Enhanced Filters
        ('volatility_lookback', 63),     # Period for volatility calculation
        ('max_volatility_threshold', 0.8), # Max annualized volatility (80%)
        ('min_volume_ratio', 0.5),       # Minimum volume vs average
        ('volume_lookback', 20),         # Period for volume average
        
        # Risk Management
        ('max_position_pct', 100),       # Maximum position size percentage
        ('drawdown_protection', True),   # Enable drawdown protection
        ('max_drawdown_threshold', 0.15), # Max portfolio drawdown (15%)
        ('volatility_sizing', True),     # Use volatility for position sizing
        ('target_volatility', 0.20),     # Target portfolio volatility (20%)
        
        # Market Regime Filters
        ('use_market_filter', True),     # Enable market regime filter
        ('market_sma_period', 200),      # Market trend SMA period
        ('correlation_threshold', 0.7),  # Max correlation with benchmark (not directly used in logic presented)
        
        # Transaction Costs
        ('transaction_cost', 0.001),     # 0.1% transaction cost
        ('slippage_factor', 0.0005),     # 0.05% slippage (not directly applied by broker in this snippet)
        
        ('printlog', False), # Flag to enable/disable detailed logs
    )
    
    def __init__(self):
        # Track all data feeds (assets)
        self.asset_data = {} # Stores metrics for each asset {name: {data, momentum, volatility, etc.}}
        self.asset_names = [] # List of asset names
        
        # Initialize data structure for each asset added to Cerebro
        for i, data in enumerate(self.datas):
            # Extract asset name (from 'name' passed to bt.feeds.PandasData)
            if hasattr(data, '_name'):
                name = data._name
            else: # Fallback if name is not explicitly set
                name = f"Asset_{i}"
            
            self.asset_names.append(name)
            self.asset_data[name] = {
                'data': data,
                'returns': None, # Placeholder for calculated returns
                'volatility': None, # Placeholder for calculated volatility
                'volume_avg': None, # Placeholder for calculated average volume
                'momentum_long': None, # Placeholder for long-term momentum
                'momentum_short': None, # Placeholder for short-term momentum
                'sma_trend': None, # Placeholder for asset's own SMA trend (not directly used)
            }
        
        # Portfolio tracking
        self.current_asset = None # Name of the asset currently held (or None if in cash)
        self.last_rebalance = 0 # Bar index of the last rebalancing event
        self.portfolio_value_history = [] # Tracks portfolio value for drawdown calculation
        self.drawdown_series = [] # Stores calculated drawdown values
        self.selected_assets_history = [] # Records which asset was selected at each rebalance
        self.rebalance_dates = [] # Dates of rebalancing events
        
        # Performance tracking (for internal logging)
        self.trade_count = 0
        self.cash_periods = 0 # Count of periods spent in cash
        
        # Market filter (using first asset's close as benchmark for SMA)
        if self.datas and self.params.use_market_filter:
            # This SMA is for the market trend filter (e.g., SPY or BTC-USD)
            self.market_sma = bt.indicators.SMA(
                self.datas[0].close, # Uses the first data feed as the market benchmark
                period=self.params.market_sma_period
            )
            # Log this choice, as it's a critical assumption
            self.log("Market filter using first asset as benchmark (potential bias!)")

Analysis of Strategy Foundation:

Step 2: Core Strategy Logic: Asset Selection and Portfolio Rebalancing

This section covers the helper methods responsible for calculating asset-specific metrics, applying filters, selecting the best asset, and then executing the portfolio rebalancing based on the selected asset and calculated position size.

    def log(self, txt, dt=None):
        """Custom logging function controlled by the 'printlog' parameter."""
        if self.params.printlog:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}: {txt}')

    def calculate_returns(self, data, period):
        """Calculates simple returns over a specified period."""
        if len(data) < period + 1: # Ensure enough data for the lookback
            return None
            
        try:
            current_price = float(data.close[0])
            past_price = float(data.close[-period])
            
            if past_price != 0 and not pd.isna(past_price) and not pd.isna(current_price):
                return (current_price / past_price) - 1
        except (ValueError, TypeError, IndexError):
            pass # Handle potential errors gracefully
            
        return None

    def calculate_volatility(self, data, period):
        """Calculates annualized standard deviation of daily returns (volatility)."""
        if len(data) < period + 1:
            return None
            
        try:
            returns = []
            for i in range(period):
                if len(data) > i + 1:
                    current = float(data.close[-i])
                    previous = float(data.close[-i-1])
                    if previous != 0:
                        ret = current / previous - 1
                        if not pd.isna(ret):
                            returns.append(ret)
            
            if len(returns) > 10: # Require a minimum number of observations for a reliable std dev
                return np.std(returns) * np.sqrt(252) # Annualize volatility (assuming 252 trading days)
        except (ValueError, TypeError, IndexError):
            pass
            
        return None

    def calculate_volume_ratio(self, data, period):
        """Calculates current volume vs. average historical volume ratio."""
        if len(data) < period + 1:
            return None
            
        try:
            current_volume = float(data.volume[0])
            volumes = []
            for i in range(1, period + 1):
                if len(data) > i:
                    vol = float(data.volume[-i])
                    if not pd.isna(vol) and vol > 0: # Ensure valid volume
                        volumes.append(vol)
            
            if len(volumes) > 0 and current_volume > 0:
                avg_volume = np.mean(volumes)
                return current_volume / avg_volume if avg_volume > 0 else None
        except (ValueError, TypeError, IndexError):
            pass
            
        return None

    def update_asset_metrics(self):
        """Updates momentum, volatility, and volume metrics for all tracked assets."""
        for name, asset_info in self.asset_data.items():
            data = asset_info['data']
            
            # Skip asset if not enough data for lookbacks
            if len(data) < max(self.params.momentum_lookback, self.params.volatility_lookback) + 1:
                continue
            
            # Calculate momentum metrics (long and short lookbacks)
            asset_info['momentum_long'] = self.calculate_returns(
                data, self.params.momentum_lookback
            )
            asset_info['momentum_short'] = self.calculate_returns(
                data, self.params.short_momentum_lookback
            )
            
            # Calculate annualized volatility
            asset_info['volatility'] = self.calculate_volatility(
                data, self.params.volatility_lookback
            )
            
            # Calculate current volume vs. average volume ratio
            asset_info['volume_ratio'] = self.calculate_volume_ratio(
                data, self.params.volume_lookback
            )
            
            # Market trend filter (asset's own trend vs its SMA, if market filter is enabled)
            if self.params.use_market_filter:
                try:
                    # Manually calculate SMA for the asset's own trend check
                    if len(data) > self.params.market_sma_period:
                        prices_for_sma = []
                        for i in range(self.params.market_sma_period):
                            if len(data) > i:
                                prices_for_sma.append(float(data.close[-i]))
                        
                        if len(prices_for_sma) == self.params.market_sma_period:
                            sma_value = np.mean(prices_for_sma)
                            current_price = float(data.close[0])
                            asset_info['above_sma'] = current_price > sma_value # True if above SMA
                        else: # Not enough data for SMA, default to allowing
                            asset_info['above_sma'] = True
                    else: # Not enough data for SMA, default to allowing
                        asset_info['above_sma'] = True
                except (ValueError, TypeError): # Handle potential errors during float conversion
                    asset_info['above_sma'] = True
            else: # If market filter is disabled, always allow asset to pass this check
                asset_info['above_sma'] = True

    def check_asset_filters(self, asset_name):
        """Checks if a specific asset passes all defined eligibility filters."""
        asset_info = self.asset_data[asset_name]
        
        # Check absolute momentum threshold
        momentum_long = asset_info.get('momentum_long', None)
        if momentum_long is None or pd.isna(momentum_long) or momentum_long <= self.params.abs_momentum_threshold:
            return False, "Failed absolute momentum"
        
        # Check maximum volatility threshold
        volatility = asset_info.get('volatility', None)
        if volatility is None or pd.isna(volatility) or volatility > self.params.max_volatility_threshold:
            vol_str = f"{volatility:.2f}" if volatility is not None else "N/A"
            return False, f"Volatility too high ({vol_str})"
        
        # Check minimum volume ratio
        volume_ratio = asset_info.get('volume_ratio', None)
        if volume_ratio is None or pd.isna(volume_ratio) or volume_ratio < self.params.min_volume_ratio:
            vol_str = f"{volume_ratio:.2f}" if volume_ratio is not None else "N/A"
            return False, f"Volume too low ({vol_str})"
        
        # Check market trend filter (asset's own trend)
        if self.params.use_market_filter:
            above_sma = asset_info.get('above_sma', False)
            if not above_sma:
                return False, "Below market trend"
        
        return True, "All filters passed"

    def calculate_position_size(self, selected_asset):
        """Calculates the target position size percentage based on volatility targeting."""
        if not self.params.volatility_sizing: # If volatility sizing is disabled, use max_position_pct
            return self.params.max_position_pct
        
        asset_info = self.asset_data[selected_asset]
        volatility = asset_info.get('volatility', None)
        
        if volatility is None or pd.isna(volatility) or volatility <= 0:
            return self.params.max_position_pct # Fallback if volatility is invalid
        
        # Volatility targeting formula: (Target Volatility / Asset Volatility) * Base Allocation
        # This aims to inverse-scale position size by asset's volatility
        vol_multiplier = self.params.target_volatility / volatility
        position_size = min(vol_multiplier * 100, self.params.max_position_pct) # Cap at max_position_pct
        
        return max(position_size, 10) # Ensure a minimum allocation (10%)
    
    def check_drawdown_protection(self):
        """Checks if the portfolio drawdown exceeds the threshold, triggering protection."""
        if not self.params.drawdown_protection or len(self.portfolio_value_history) < 50:
            return False # Not enough history or protection is disabled
        
        # Calculate rolling maximum portfolio value
        portfolio_values = np.array(self.portfolio_value_history)
        rolling_max = np.maximum.accumulate(portfolio_values)
        # Calculate current drawdown relative to the highest peak seen so far
        current_drawdown = (portfolio_values[-1] - rolling_max[-1]) / rolling_max[-1]
        
        self.drawdown_series.append(abs(current_drawdown)) # Store drawdown for analysis
        
        if abs(current_drawdown) > self.params.max_drawdown_threshold:
            self.log(f"DRAWDOWN PROTECTION: {current_drawdown:.2%} > {self.params.max_drawdown_threshold:.2%}")
            return True # Trigger drawdown protection
            
        return False

    def select_best_asset(self):
        """Selects the best eligible asset based on highest long-term momentum."""
        self.update_asset_metrics() # First, update all asset metrics for the current bar
        
        # Check portfolio-level drawdown protection
        if self.check_drawdown_protection():
            return None, "Drawdown protection activated - move to cash"
        
        # Find assets that pass all individual asset-level filters
        eligible_assets = {} # Stores {asset_name: momentum_value} for eligible assets
        all_momentums = {} # For debugging/logging all calculated momentums
        
        self.log("=== ASSET EVALUATION DEBUG ===") # Debugging log for asset selection process
        
        for asset_name in self.asset_names:
            asset_info = self.asset_data[asset_name]
            momentum = asset_info.get('momentum_long', None)
            volatility = asset_info.get('volatility', None)
            volume_ratio = asset_info.get('volume_ratio', None)
            above_sma = asset_info.get('above_sma', False)
            
            all_momentums[asset_name] = momentum # Store all for debug
            
            # Detailed logging of each asset's metrics and filter status
            self.log(f"{asset_name}:")
            self.log(f"  Momentum: {momentum:.2%}" if momentum else f"  Momentum: None")
            self.log(f"  Volatility: {volatility:.2f}" if volatility else f"  Volatility: None")
            self.log(f"  Volume Ratio: {volume_ratio:.2f}" if volume_ratio else f"  Volume Ratio: None")
            self.log(f"  Above SMA: {above_sma}")
            
            passed, reason = self.check_asset_filters(asset_name) # Apply all defined filters
            if passed:
                if momentum is not None: # Ensure momentum is valid
                    eligible_assets[asset_name] = momentum
                    self.log(f"  ✓ ELIGIBLE")
            else:
                self.log(f"  ✗ FILTERED OUT: {reason}")
        
        self.log("=== END DEBUG ===")
        
        # Select the asset with the highest momentum among those that passed all filters
        if eligible_assets:
            best_asset = max(eligible_assets, key=eligible_assets.get) # Asset with max momentum
            best_momentum = eligible_assets[best_asset]
            self.log(f"SELECTED: {best_asset} with momentum {best_momentum:.2%}")
            self.log(f"Eligible count: {len(eligible_assets)} out of {len(all_momentums)} evaluated")
            return best_asset, f"Best momentum: {best_momentum:.2%}"
        else:
            # Log why no asset was selected (e.g., all filtered out)
            valid_momentums = {k: v for k, v in all_momentums.items() if v is not None}
            if valid_momentums:
                would_be_best = max(valid_momentums, key=valid_momentums.get)
                self.log(f"NO ELIGIBLE ASSETS: Best unfiltered would be {would_be_best} ({valid_momentums[would_be_best]:.2%})")
            else:
                self.log("NO ELIGIBLE ASSETS: No valid momentum data for any asset")
            return None, "No assets passed filters"

    def rebalance_portfolio(self):
        """Performs the portfolio rebalancing: closes old positions, opens new ones."""
        try:
            # Select the best asset based on defined criteria
            selected_asset, reason = self.select_best_asset()
        except Exception as e:
            self.log(f"Error in asset selection: {e}")
            return
        
        # Record rebalancing decision for later analysis
        current_date = self.datas[0].datetime.date(0)
        self.rebalance_dates.append(current_date)
        self.selected_assets_history.append(selected_asset or "CASH") # Store asset name or "CASH"
        
        # Close current position if switching assets (or moving to cash)
        if self.current_asset != selected_asset:
            if self.current_asset: # If currently holding an asset
                asset_data = self.asset_data[self.current_asset]['data']
                position_size = self.getposition(asset_data).size
                if position_size != 0: # If there's an open position
                    self.close(data=asset_data) # Close it
                    self.log(f"CLOSED: {self.current_asset}")
            
            # Open new position if an asset was selected
            if selected_asset:
                try:
                    # Calculate position size based on volatility targeting (if enabled)
                    position_size_pct = self.calculate_position_size(selected_asset)
                    asset_data = self.asset_data[selected_asset]['data']
                    
                    # Calculate actual order size in units
                    cash = self.broker.getcash()
                    price = float(asset_data.close[0])
                    target_value = cash * (position_size_pct / 100) # Target value for new position
                    order_size = int(target_value / price) if price > 0 else 0 # Convert to number of units
                    
                    if order_size > 0:
                        self.buy(data=asset_data, size=order_size) # Place buy order
                        self.log(f"BOUGHT: {selected_asset}, Size: {order_size}, Position: {position_size_pct:.1f}%")
                        self.trade_count += 1 # Increment total trade count
                    else:
                        selected_asset = None # If unable to buy (e.g., too small), stay in cash
                        self.log("Could not calculate order size, staying in cash")
                except Exception as e:
                    self.log(f"Error buying {selected_asset}: {e}")
                    selected_asset = None # On error, fall back to cash
            
            # If no asset was selected (or purchase failed), increment cash periods counter
            if not selected_asset:
                self.cash_periods += 1
                self.log(f"HOLDING CASH: {reason}")
            
            self.current_asset = selected_asset # Update the currently held asset

    def next(self):
        # Check if all data feeds have warmed up for calculations
        # Uses the first data feed's length as a proxy
        if len(self.datas[0]) < self.params.momentum_lookback + 1:
            return # Skip until we have enough data for the longest lookback
            
        # Track portfolio value for drawdown calculation
        current_value = self.broker.getvalue()
        self.portfolio_value_history.append(current_value)
        
        # Check if it's time to rebalance based on the rebalance_period
        current_bar = len(self.datas[0])
        if current_bar - self.last_rebalance >= self.params.rebalance_period:
            self.rebalance_portfolio() # Trigger the rebalancing process
            self.last_rebalance = current_bar # Reset rebalance counter for next period

    def stop(self):
        """Called at the very end of the backtest. Logs final strategy summary."""
        final_value = self.broker.getvalue()
        initial_value = 100000.0 # Assuming initial cash was $100,000
        total_return = (final_value / initial_value - 1) * 100
        
        self.log(f'=== STRATEGY SUMMARY ===')
        self.log(f'Final Portfolio Value: ${final_value:,.2f}')
        self.log(f'Total Return: {total_return:.2f}%')
        self.log(f'Total Rebalances: {len(self.rebalance_dates)}')
        self.log(f'Total Trades: {self.trade_count}')
        self.log(f'Cash Periods: {self.cash_periods}')
        
        # Summarize how often each asset was selected
        if self.selected_assets_history:
            self.log(f'=== ASSET ALLOCATION SUMMARY ===')
            asset_counts = {}
            for asset in self.selected_assets_history:
                asset_counts[asset] = asset_counts.get(asset, 0) + 1
            
            for asset, count in sorted(asset_counts.items(), key=lambda x: x[1], reverse=True):
                pct = count / len(self.selected_assets_history) * 100
                self.log(f'{asset}: {count}/{len(self.selected_assets_history)} periods ({pct:.1f}%)')
        
        self.log(f'=== END SUMMARY ===')

Analysis of Core Logic:

Step 3: The Research Framework: Multi-Asset Data Handling and Walk-Forward Optimization

This final section sets up the environment for running the multi-asset strategy, including downloading data for multiple tickers and implementing the walk-forward optimization framework.

def download_crypto_data(tickers, start_date, end_date):
    """Downloads historical data for multiple crypto assets."""
    print(f"Downloading data for {len(tickers)} assets...")
    
    all_data = {}
    for ticker in tickers:
        try:
            data = yf.download(ticker, start=start_date, end=end_date, progress=False)
            if hasattr(data.columns, 'levels'):
                data.columns = data.columns.droplevel(1) # Flatten multi-level columns
            
            if len(data) > 252: # Ensure at least 1 year of data for long lookbacks
                all_data[ticker] = data
                # Print basic stats for downloaded data for quick verification
                total_return = (data['Close'][-1] / data['Close'][0] - 1) * 100
                volatility = data['Close'].pct_change().std() * np.sqrt(252) * 100
                avg_volume = data['Volume'].mean()
                print(f"✓ {ticker}: {len(data)} days, Return: {total_return:.1f}%, Vol: {volatility:.1f}%, Avg Volume: {avg_volume:,.0f}")
            else:
                print(f"✗ {ticker}: Insufficient data ({len(data)} days)")
        except Exception as e:
            print(f"✗ {ticker}: Download failed - {e}")
            
    return all_data

def run_enhanced_dual_momentum(tickers, start_date, end_date, **strategy_params):
    """
    Runs the Enhanced Dual Momentum strategy for a given period and parameter set.
    Handles data downloading and Cerebro setup.
    """
    
    # Download data for all assets
    crypto_data = download_crypto_data(tickers, start_date, end_date)
    
    if len(crypto_data) < 2: # Need at least two assets to form a universe
        print("Not enough assets with sufficient data to run strategy.")
        return None, None, None
    
    print(f"\nRunning strategy with {len(crypto_data)} assets...")
    
    # Setup Cerebro
    cerebro = bt.Crebro()
    
    # Add strategy with provided parameters
    cerebro.addstrategy(EnhancedDualMomentumStrategy, **strategy_params)
    
    # Add data feeds for all downloaded assets
    for ticker, data in crypto_data.items():
        feed = bt.feeds.PandasData(dataname=data, name=ticker)
        cerebro.adddata(feed)
    
    # Set broker parameters
    cerebro.broker.setcash(100000.0)
    cerebro.broker.setcommission(commission=strategy_params.get('transaction_cost', 0.001))
    
    # Add analyzers for comprehensive performance analysis (for individual runs)
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
    
    print(f'Starting Portfolio Value: ${cerebro.broker.getvalue():,.2f}')
    
    # Run strategy
    try:
        results = cerebro.run()
        strat = results[0] if results else None # Get the strategy instance
        
        print(f'Final Portfolio Value: ${cerebro.broker.getvalue():,.2f}')
        
        return cerebro, strat, crypto_data
        
    except Exception as e:
        print(f"Error running strategy: {e}")
        return None, None, None

def analyze_performance(strat, crypto_data, benchmark_ticker="BTC-USD"):
    """Analyzes and prints detailed performance metrics of the strategy."""
    print('\n' + '='*60)
    print('ENHANCED DUAL MOMENTUM PERFORMANCE ANALYSIS')
    print('='*60)
    
    try:
        # Basic performance metrics from analyzers
        sharpe_analysis = strat.analyzers.sharpe.get_analysis()
        sharpe_ratio = sharpe_analysis.get('sharperatio', None)
        
        drawdown_analysis = strat.analyzers.drawdown.get_analysis()
        max_drawdown = drawdown_analysis.get('max', {}).get('drawdown', 0)
        
        returns_analysis = strat.analyzers.returns.get_analysis()
        total_return = returns_analysis.get('rtot', 0) * 100
        
        trade_analysis = strat.analyzers.trades.get_analysis()
        total_trades = trade_analysis.get('total', {}).get('total', 0)
        
        print(f'Total Return: {total_return:.2f}%')
        print(f'Sharpe Ratio: {sharpe_ratio:.3f}' if sharpe_ratio else 'Sharpe Ratio: N/A')
        print(f'Max Drawdown: {max_drawdown:.2f}%')
        print(f'Total Trades: {total_trades}')
        
        # Asset selection frequency analysis
        if hasattr(strat, 'selected_assets_history') and strat.selected_assets_history:
            asset_counts = pd.Series(strat.selected_assets_history).value_counts()
            print(f'\nAsset Selection Frequency:')
            for asset, count in asset_counts.items():
                pct = count / len(strat.selected_assets_history) * 100
                print(f'  {asset}: {count} times ({pct:.1f}%)')
        else:
            print('\nNo asset selection history available')
        
        # Compare with benchmark (e.g., BTC-USD Buy & Hold)
        if benchmark_ticker in crypto_data:
            benchmark_data = crypto_data[benchmark_ticker]
            benchmark_return = ((benchmark_data['Close'][-1] / benchmark_data['Close'][0]) - 1) * 100
            print(f'\n{benchmark_ticker} Buy & Hold Return: {benchmark_return:.2f}%')
            print(f'Strategy vs {benchmark_ticker}: {total_return - benchmark_return:.2f}% outperformance')
        
        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'total_trades': total_trades
        }
        
    except Exception as e:
        print(f"Error in performance analysis: {e}")
        return {
            'total_return': 0,
            'sharpe_ratio': 0,
            'max_drawdown': 0,
            'total_trades': 0
        }

def run_walk_forward_optimization(tickers, start_date, end_date):
    """
    Performs a simplified walk-forward analysis with parameter combinations.
    It runs strategy with fixed parameter sets over the full period and evaluates.
    """
    print('\n' + '='*60)
    print('WALK-FORWARD OPTIMIZATION')
    print('='*60)
    
    # Predefined parameter combinations to test in walk-forward
    param_combinations = [
        {'momentum_lookback': 126, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.6},
        {'momentum_lookback': 189, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.7},
        {'momentum_lookback': 252, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.8},
        {'momentum_lookback': 252, 'abs_momentum_threshold': 0.05, 'max_volatility_threshold': 0.7},
        {'momentum_lookback': 315, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.8},
    ]
    
    start = pd.to_datetime(start_date)
    end = pd.to_datetime(end_date)
    
    results = [] # To store results for each parameter combination
    
    # Iterate through each predefined parameter combination
    for i, params in enumerate(param_combinations):
        print(f"\nTesting parameter set {i+1}/{len(param_combinations)}: {params}")
        
        try:
            # Run the strategy with current parameter set over the FULL period for this walk-forward segment
            # Note: This is a fixed-parameter validation across the full historical period for *each* parameter set.
            # A true rolling walk-forward optimization would involve splitting into in-sample/out-of-sample segments.
            cerebro_instance, strat_instance, crypto_data_instance = run_enhanced_dual_momentum(
                tickers, start_date, end_date, **params
            )
            
            if strat_instance is not None:
                # Analyze performance for this parameter set
                performance = analyze_performance(strat_instance, crypto_data_instance)
                performance['params'] = params # Store params with results
                results.append(performance)
        
        except Exception as e:
            print(f"Error with parameter set {i+1}: {e}")
    
    # Find the best parameter set based on Sharpe Ratio
    if results:
        best_result = max(results, key=lambda x: x.get('sharpe_ratio', 0) or 0)
        print(f"\n{'='*40}")
        print(f"BEST PARAMETER SET (by Sharpe Ratio):")
        print(f"Parameters: {best_result['params']}")
        print(f"Sharpe Ratio: {best_result['sharpe_ratio']:.3f}")
        print(f"Total Return: {best_result['total_return']:.2f}%")
        print(f"Max Drawdown: {best_result['max_drawdown']:.2f}%")
        
    return results

# Main execution block
if __name__ == "__main__":
    # Global Configuration
    TICKERS = ["BTC-USD", "ETH-USD", "SOL-USD"] # Universe of assets to consider
    START_DATE = "2021-01-01"
    END_DATE = "2024-12-31" # Up to current time
    
    print("=" * 80)
    print("ENHANCED DUAL MOMENTUM STRATEGY WITH BACKTRADER")
    print("=" * 80)
    print(f"Universe: {', '.join(TICKERS)}")
    print(f"Period: {START_DATE} to {END_DATE}")
    print("\nEnhancements Included:")
    print("✓ Absolute + Relative Momentum")
    print("✓ Volatility filtering")
    print("✓ Volume filtering")
    print("✓ Market trend filtering (per asset, if enabled)")
    print("✓ Drawdown protection (portfolio-level)")
    print("✓ Volatility-based position sizing")
    print("✓ Transaction costs & slippage simulation")
    print("✓ Walk-forward optimization (parameter validation)")
    
    # Strategy parameters for the MAIN BACKTEST run
    strategy_params = {
        'momentum_lookback': 90,           # Long-term momentum lookback (e.g., 90 days)
        'short_momentum_lookback': 21,     # Short-term momentum lookback (e.g., 21 days - not explicitly used in asset selection here)
        'abs_momentum_threshold': 0.0,     # Minimum positive return required
        'rebalance_period': 21,            # Monthly rebalancing
        'volatility_lookback': 63,         # 3 months for volatility
        'max_volatility_threshold': 3.0,   # High volatility tolerance for crypto (300% annualized)
        'min_volume_ratio': 0.1,           # Low volume requirement (10% of avg)
        'volume_lookback': 20,             # Period for volume average
        'max_position_pct': 100,           # Max 100% allocation to single asset
        'drawdown_protection': True,       # Enable drawdown protection
        'max_drawdown_threshold': 0.30,    # 30% max portfolio drawdown (adjusted for crypto volatility)
        'volatility_sizing': True,         # Use volatility targeting for position sizing
        'target_volatility': 0.30,         # Target 30% annualized portfolio volatility
        'use_market_filter': False,        # Disable overall market filter (using asset's own SMA now)
        'market_sma_period': 200,          # (Still defined for asset's own SMA if filter enabled)
        'transaction_cost': 0.001,         # 0.1% transaction cost
        'slippage_factor': 0.0005,         # 0.05% slippage (broker level)
        'printlog': False                  # Turn off detailed logging for main run
    }
    
    # --- Execute Main Backtest ---
    print(f"\n{'-'*60}")
    print("RUNNING MAIN BACKTEST (Full Period)")
    print(f"{'-'*60}")
    
    # Run the strategy on the full historical period
    # Returns cerebro instance, strategy instance, and downloaded data
    cerebro_main, strat_main, crypto_data_main = run_enhanced_dual_momentum(
        TICKERS, START_DATE, END_DATE, **strategy_params
    )
    
    if strat_main is not None:
        # Analyze performance of the main backtest
        main_performance = analyze_performance(strat_main, crypto_data_main)
        
        # Plot results of the main backtest
        print(f"\n{'-'*40}")
        print("PLOTTING MAIN BACKTEST RESULTS")
        print(f"{'-'*40}")
        
        # Plotting the main backtest. cerebro.plot returns a list of figures.
        main_figs = cerebro_main.plot(iplot=False, style='line', volume=False)
        
        # Adjusting layout for each figure in the list
        for fig in main_figs:
            fig_obj = fig[0] # Unpack the figure object from the tuple
            fig_obj.suptitle('Enhanced Dual Momentum Strategy Performance (Main Backtest)', fontsize=16)
            fig_obj.tight_layout(rect=[0, 0.03, 1, 0.95]) # Adjust layout to prevent title overlap
            fig_obj.subplots_adjust(hspace=0.3) # Add vertical spacing between subplots
            fig_obj.show() # Display the plot
        
        # --- Execute Walk-Forward Optimization ---
        print(f"\n{'-'*60}")
        print("STARTING WALK-FORWARD OPTIMIZATION (Parameter Validation)")
        print(f"{'-'*60}")
        
        # Run walk-forward optimization (which itself calls run_enhanced_dual_momentum multiple times)
        wf_results = run_walk_forward_optimization(TICKERS, START_DATE, END_DATE)
        
        print(f"\n{'='*80}")
        print("COMPREHENSIVE ANALYSIS COMPLETE")
        print(f"{'='*80}")
        
    else:
        print("Main strategy execution failed! Aborting further analysis.")
Pasted image 20250614000755.png

Conclusion: The Continuous Quest for Resilient Portfolios

The journey through the Enhanced Dual Momentum Strategy reveals a sophisticated approach to algorithmic portfolio management, far transcending simple momentum plays. This exploration underscores the ambition to build trading systems that are not only capable of identifying performance leaders but are also robustly equipped to navigate the inherent complexities and shifting regimes of financial markets.

The strategy’s strength lies in its multi-layered architecture. By incorporating stringent filters for volatility and liquidity, alongside a market trend filter (even if simplified), it attempts to operate only in environments deemed favorable. Its advanced risk management features, such as portfolio-level drawdown protection and volatility-based position sizing, represent critical components designed to preserve capital and smooth equity curves. Furthermore, its application within a walk-forward optimization framework speaks volumes about a commitment to rigorous research, providing a more realistic assessment of its adaptability to unseen market conditions.

Ultimately, this strategy serves as a compelling testament to the ongoing quest in quantitative finance: to create intelligent, adaptive systems that can continually learn and respond to the market’s evolving narrative. It highlights that building truly resilient portfolios is not about finding a single magic indicator, but about meticulously layering a diverse set of principles, constantly testing their interactions, and always maintaining a spirit of continuous exploration and refinement.