In the relentless currents of financial markets, a compelling quest for quantitative traders is to identify and capitalize on persistent trends across diverse assets. While simple momentum strategies often involve buying past winners, their effectiveness can be undermined by sudden market reversals, volatile periods, or illiquid assets. The challenge lies in constructing a strategy resilient enough to navigate these complexities.
This article explores an Enhanced Dual Momentum Strategy, a sophisticated algorithmic approach designed to select and manage a portfolio of assets based on a multi-layered filtering system. It aims to investigate whether combining robust momentum signals with stringent risk management, liquidity checks, and dynamic position sizing can lead to a more adaptive and potentially superior portfolio performance. The ultimate test of such a strategy’s mettle comes through walk-forward analysis, a rigorous backtesting methodology employed to assess its true robustness across unseen market conditions.
The strategy operates on a blend of well-established quantitative principles, layered to create a more discerning approach to asset allocation:
Dual Momentum: The foundational concept involves identifying assets that exhibit strong historical performance. This often combines:
Multi-Layered Filtering: Beyond pure momentum, the strategy applies several crucial filters to select truly eligible assets:
Sophisticated Risk Management & Position Sizing:
Systematic Rebalancing & Transaction Costs: The strategy rebalances the portfolio at a fixed interval (e.g., monthly), re-evaluating eligible assets. Realistic transaction costs and slippage are factored into the backtest to provide a more accurate assessment of real-world performance.
Walk-Forward Analysis: The entire framework is designed to be tested using walk-forward analysis. This rigorous backtesting technique involves splitting historical data into sequential “out-of-sample” periods. By simulating how the strategy (with a pre-defined or previously optimized set of parameters) performs on data it has not seen during its development, walk-forward analysis provides a much more robust estimate of its real-world viability and generalizability, guarding against overfitting.
The overarching aim is to investigate whether a comprehensive, rule-based approach to asset selection, combined with dynamic risk management and validated through rigorous testing, can generate superior risk-adjusted returns across evolving market conditions.
backtrader
StrategyThe following backtrader
code provides a concrete
implementation of this Enhanced Dual Momentum Strategy. It illustrates
how the multi-layered filtering, rebalancing, and risk management
concepts are translated into executable code within a multi-asset
backtesting environment.
This section sets up the core
EnhancedDualMomentumStrategy
class, defining its adjustable
parameters and initializing the data structures needed to track multiple
assets and portfolio performance.
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
from datetime import datetime, timedelta
import itertools
"ignore") # Suppress warnings, often from pandas/yfinance
warnings.filterwarnings(
'figure.figsize'] = (10, 15) # Set default plot size
plt.rcParams[
class EnhancedDualMomentumStrategy(bt.Strategy):
= (
params # Core Momentum Parameters
'momentum_lookback', 252), # 12 months lookback (252 trading days)
('short_momentum_lookback', 63), # 3 months for short-term momentum
('abs_momentum_threshold', 0.0), # Minimum absolute momentum required
('rebalance_period', 21), # Days between rebalancing (monthly)
(
# Enhanced Filters
'volatility_lookback', 63), # Period for volatility calculation
('max_volatility_threshold', 0.8), # Max annualized volatility (80%)
('min_volume_ratio', 0.5), # Minimum volume vs average
('volume_lookback', 20), # Period for volume average
(
# Risk Management
'max_position_pct', 100), # Maximum position size percentage
('drawdown_protection', True), # Enable drawdown protection
('max_drawdown_threshold', 0.15), # Max portfolio drawdown (15%)
('volatility_sizing', True), # Use volatility for position sizing
('target_volatility', 0.20), # Target portfolio volatility (20%)
(
# Market Regime Filters
'use_market_filter', True), # Enable market regime filter
('market_sma_period', 200), # Market trend SMA period
('correlation_threshold', 0.7), # Max correlation with benchmark (not directly used in logic presented)
(
# Transaction Costs
'transaction_cost', 0.001), # 0.1% transaction cost
('slippage_factor', 0.0005), # 0.05% slippage (not directly applied by broker in this snippet)
(
'printlog', False), # Flag to enable/disable detailed logs
(
)
def __init__(self):
# Track all data feeds (assets)
self.asset_data = {} # Stores metrics for each asset {name: {data, momentum, volatility, etc.}}
self.asset_names = [] # List of asset names
# Initialize data structure for each asset added to Cerebro
for i, data in enumerate(self.datas):
# Extract asset name (from 'name' passed to bt.feeds.PandasData)
if hasattr(data, '_name'):
= data._name
name else: # Fallback if name is not explicitly set
= f"Asset_{i}"
name
self.asset_names.append(name)
self.asset_data[name] = {
'data': data,
'returns': None, # Placeholder for calculated returns
'volatility': None, # Placeholder for calculated volatility
'volume_avg': None, # Placeholder for calculated average volume
'momentum_long': None, # Placeholder for long-term momentum
'momentum_short': None, # Placeholder for short-term momentum
'sma_trend': None, # Placeholder for asset's own SMA trend (not directly used)
}
# Portfolio tracking
self.current_asset = None # Name of the asset currently held (or None if in cash)
self.last_rebalance = 0 # Bar index of the last rebalancing event
self.portfolio_value_history = [] # Tracks portfolio value for drawdown calculation
self.drawdown_series = [] # Stores calculated drawdown values
self.selected_assets_history = [] # Records which asset was selected at each rebalance
self.rebalance_dates = [] # Dates of rebalancing events
# Performance tracking (for internal logging)
self.trade_count = 0
self.cash_periods = 0 # Count of periods spent in cash
# Market filter (using first asset's close as benchmark for SMA)
if self.datas and self.params.use_market_filter:
# This SMA is for the market trend filter (e.g., SPY or BTC-USD)
self.market_sma = bt.indicators.SMA(
self.datas[0].close, # Uses the first data feed as the market benchmark
=self.params.market_sma_period
period
)# Log this choice, as it's a critical assumption
self.log("Market filter using first asset as benchmark (potential bias!)")
Analysis of Strategy Foundation:
params
Tuple: This comprehensive set
of parameters defines the strategy’s operational rules, ranging from
momentum lookbacks (momentum_lookback
,
short_momentum_lookback
) to filtering criteria
(max_volatility_threshold
, min_volume_ratio
,
adx_threshold
, market_sma_period
), risk
management (max_drawdown_threshold
,
target_volatility
), and transaction costs. The sheer number
of parameters highlights the strategy’s complexity and the extensive
tuning required for research.__init__(self)
Method:
for i, data in enumerate(self.datas):
is critical. It
iterates through all data feeds added to cerebro
, allowing
the strategy to manage multiple assets. For each asset, a dictionary
self.asset_data[name]
is created to store its specific data
reference and calculated metrics (momentum, volatility, volume ratio,
etc.).self.current_asset
, self.last_rebalance
,
self.portfolio_value_history
, and
self.drawdown_series
are initialized to manage asset
rotation, track rebalancing timing, and calculate portfolio-level
drawdown.self.market_sma
is
initialized using the first data feed’s closing price. This is
a design choice for a market trend filter. While convenient, it’s noted
in the log message as a “potential bias” if this single asset isn’t
truly representative of the overall market.This section covers the helper methods responsible for calculating asset-specific metrics, applying filters, selecting the best asset, and then executing the portfolio rebalancing based on the selected asset and calculated position size.
def log(self, txt, dt=None):
"""Custom logging function controlled by the 'printlog' parameter."""
if self.params.printlog:
= dt or self.datas[0].datetime.date(0)
dt print(f'{dt.isoformat()}: {txt}')
def calculate_returns(self, data, period):
"""Calculates simple returns over a specified period."""
if len(data) < period + 1: # Ensure enough data for the lookback
return None
try:
= float(data.close[0])
current_price = float(data.close[-period])
past_price
if past_price != 0 and not pd.isna(past_price) and not pd.isna(current_price):
return (current_price / past_price) - 1
except (ValueError, TypeError, IndexError):
pass # Handle potential errors gracefully
return None
def calculate_volatility(self, data, period):
"""Calculates annualized standard deviation of daily returns (volatility)."""
if len(data) < period + 1:
return None
try:
= []
returns for i in range(period):
if len(data) > i + 1:
= float(data.close[-i])
current = float(data.close[-i-1])
previous if previous != 0:
= current / previous - 1
ret if not pd.isna(ret):
returns.append(ret)
if len(returns) > 10: # Require a minimum number of observations for a reliable std dev
return np.std(returns) * np.sqrt(252) # Annualize volatility (assuming 252 trading days)
except (ValueError, TypeError, IndexError):
pass
return None
def calculate_volume_ratio(self, data, period):
"""Calculates current volume vs. average historical volume ratio."""
if len(data) < period + 1:
return None
try:
= float(data.volume[0])
current_volume = []
volumes for i in range(1, period + 1):
if len(data) > i:
= float(data.volume[-i])
vol if not pd.isna(vol) and vol > 0: # Ensure valid volume
volumes.append(vol)
if len(volumes) > 0 and current_volume > 0:
= np.mean(volumes)
avg_volume return current_volume / avg_volume if avg_volume > 0 else None
except (ValueError, TypeError, IndexError):
pass
return None
def update_asset_metrics(self):
"""Updates momentum, volatility, and volume metrics for all tracked assets."""
for name, asset_info in self.asset_data.items():
= asset_info['data']
data
# Skip asset if not enough data for lookbacks
if len(data) < max(self.params.momentum_lookback, self.params.volatility_lookback) + 1:
continue
# Calculate momentum metrics (long and short lookbacks)
'momentum_long'] = self.calculate_returns(
asset_info[self.params.momentum_lookback
data,
)'momentum_short'] = self.calculate_returns(
asset_info[self.params.short_momentum_lookback
data,
)
# Calculate annualized volatility
'volatility'] = self.calculate_volatility(
asset_info[self.params.volatility_lookback
data,
)
# Calculate current volume vs. average volume ratio
'volume_ratio'] = self.calculate_volume_ratio(
asset_info[self.params.volume_lookback
data,
)
# Market trend filter (asset's own trend vs its SMA, if market filter is enabled)
if self.params.use_market_filter:
try:
# Manually calculate SMA for the asset's own trend check
if len(data) > self.params.market_sma_period:
= []
prices_for_sma for i in range(self.params.market_sma_period):
if len(data) > i:
float(data.close[-i]))
prices_for_sma.append(
if len(prices_for_sma) == self.params.market_sma_period:
= np.mean(prices_for_sma)
sma_value = float(data.close[0])
current_price 'above_sma'] = current_price > sma_value # True if above SMA
asset_info[else: # Not enough data for SMA, default to allowing
'above_sma'] = True
asset_info[else: # Not enough data for SMA, default to allowing
'above_sma'] = True
asset_info[except (ValueError, TypeError): # Handle potential errors during float conversion
'above_sma'] = True
asset_info[else: # If market filter is disabled, always allow asset to pass this check
'above_sma'] = True
asset_info[
def check_asset_filters(self, asset_name):
"""Checks if a specific asset passes all defined eligibility filters."""
= self.asset_data[asset_name]
asset_info
# Check absolute momentum threshold
= asset_info.get('momentum_long', None)
momentum_long if momentum_long is None or pd.isna(momentum_long) or momentum_long <= self.params.abs_momentum_threshold:
return False, "Failed absolute momentum"
# Check maximum volatility threshold
= asset_info.get('volatility', None)
volatility if volatility is None or pd.isna(volatility) or volatility > self.params.max_volatility_threshold:
= f"{volatility:.2f}" if volatility is not None else "N/A"
vol_str return False, f"Volatility too high ({vol_str})"
# Check minimum volume ratio
= asset_info.get('volume_ratio', None)
volume_ratio if volume_ratio is None or pd.isna(volume_ratio) or volume_ratio < self.params.min_volume_ratio:
= f"{volume_ratio:.2f}" if volume_ratio is not None else "N/A"
vol_str return False, f"Volume too low ({vol_str})"
# Check market trend filter (asset's own trend)
if self.params.use_market_filter:
= asset_info.get('above_sma', False)
above_sma if not above_sma:
return False, "Below market trend"
return True, "All filters passed"
def calculate_position_size(self, selected_asset):
"""Calculates the target position size percentage based on volatility targeting."""
if not self.params.volatility_sizing: # If volatility sizing is disabled, use max_position_pct
return self.params.max_position_pct
= self.asset_data[selected_asset]
asset_info = asset_info.get('volatility', None)
volatility
if volatility is None or pd.isna(volatility) or volatility <= 0:
return self.params.max_position_pct # Fallback if volatility is invalid
# Volatility targeting formula: (Target Volatility / Asset Volatility) * Base Allocation
# This aims to inverse-scale position size by asset's volatility
= self.params.target_volatility / volatility
vol_multiplier = min(vol_multiplier * 100, self.params.max_position_pct) # Cap at max_position_pct
position_size
return max(position_size, 10) # Ensure a minimum allocation (10%)
def check_drawdown_protection(self):
"""Checks if the portfolio drawdown exceeds the threshold, triggering protection."""
if not self.params.drawdown_protection or len(self.portfolio_value_history) < 50:
return False # Not enough history or protection is disabled
# Calculate rolling maximum portfolio value
= np.array(self.portfolio_value_history)
portfolio_values = np.maximum.accumulate(portfolio_values)
rolling_max # Calculate current drawdown relative to the highest peak seen so far
= (portfolio_values[-1] - rolling_max[-1]) / rolling_max[-1]
current_drawdown
self.drawdown_series.append(abs(current_drawdown)) # Store drawdown for analysis
if abs(current_drawdown) > self.params.max_drawdown_threshold:
self.log(f"DRAWDOWN PROTECTION: {current_drawdown:.2%} > {self.params.max_drawdown_threshold:.2%}")
return True # Trigger drawdown protection
return False
def select_best_asset(self):
"""Selects the best eligible asset based on highest long-term momentum."""
self.update_asset_metrics() # First, update all asset metrics for the current bar
# Check portfolio-level drawdown protection
if self.check_drawdown_protection():
return None, "Drawdown protection activated - move to cash"
# Find assets that pass all individual asset-level filters
= {} # Stores {asset_name: momentum_value} for eligible assets
eligible_assets = {} # For debugging/logging all calculated momentums
all_momentums
self.log("=== ASSET EVALUATION DEBUG ===") # Debugging log for asset selection process
for asset_name in self.asset_names:
= self.asset_data[asset_name]
asset_info = asset_info.get('momentum_long', None)
momentum = asset_info.get('volatility', None)
volatility = asset_info.get('volume_ratio', None)
volume_ratio = asset_info.get('above_sma', False)
above_sma
= momentum # Store all for debug
all_momentums[asset_name]
# Detailed logging of each asset's metrics and filter status
self.log(f"{asset_name}:")
self.log(f" Momentum: {momentum:.2%}" if momentum else f" Momentum: None")
self.log(f" Volatility: {volatility:.2f}" if volatility else f" Volatility: None")
self.log(f" Volume Ratio: {volume_ratio:.2f}" if volume_ratio else f" Volume Ratio: None")
self.log(f" Above SMA: {above_sma}")
= self.check_asset_filters(asset_name) # Apply all defined filters
passed, reason if passed:
if momentum is not None: # Ensure momentum is valid
= momentum
eligible_assets[asset_name] self.log(f" ✓ ELIGIBLE")
else:
self.log(f" ✗ FILTERED OUT: {reason}")
self.log("=== END DEBUG ===")
# Select the asset with the highest momentum among those that passed all filters
if eligible_assets:
= max(eligible_assets, key=eligible_assets.get) # Asset with max momentum
best_asset = eligible_assets[best_asset]
best_momentum self.log(f"SELECTED: {best_asset} with momentum {best_momentum:.2%}")
self.log(f"Eligible count: {len(eligible_assets)} out of {len(all_momentums)} evaluated")
return best_asset, f"Best momentum: {best_momentum:.2%}"
else:
# Log why no asset was selected (e.g., all filtered out)
= {k: v for k, v in all_momentums.items() if v is not None}
valid_momentums if valid_momentums:
= max(valid_momentums, key=valid_momentums.get)
would_be_best self.log(f"NO ELIGIBLE ASSETS: Best unfiltered would be {would_be_best} ({valid_momentums[would_be_best]:.2%})")
else:
self.log("NO ELIGIBLE ASSETS: No valid momentum data for any asset")
return None, "No assets passed filters"
def rebalance_portfolio(self):
"""Performs the portfolio rebalancing: closes old positions, opens new ones."""
try:
# Select the best asset based on defined criteria
= self.select_best_asset()
selected_asset, reason except Exception as e:
self.log(f"Error in asset selection: {e}")
return
# Record rebalancing decision for later analysis
= self.datas[0].datetime.date(0)
current_date self.rebalance_dates.append(current_date)
self.selected_assets_history.append(selected_asset or "CASH") # Store asset name or "CASH"
# Close current position if switching assets (or moving to cash)
if self.current_asset != selected_asset:
if self.current_asset: # If currently holding an asset
= self.asset_data[self.current_asset]['data']
asset_data = self.getposition(asset_data).size
position_size if position_size != 0: # If there's an open position
self.close(data=asset_data) # Close it
self.log(f"CLOSED: {self.current_asset}")
# Open new position if an asset was selected
if selected_asset:
try:
# Calculate position size based on volatility targeting (if enabled)
= self.calculate_position_size(selected_asset)
position_size_pct = self.asset_data[selected_asset]['data']
asset_data
# Calculate actual order size in units
= self.broker.getcash()
cash = float(asset_data.close[0])
price = cash * (position_size_pct / 100) # Target value for new position
target_value = int(target_value / price) if price > 0 else 0 # Convert to number of units
order_size
if order_size > 0:
self.buy(data=asset_data, size=order_size) # Place buy order
self.log(f"BOUGHT: {selected_asset}, Size: {order_size}, Position: {position_size_pct:.1f}%")
self.trade_count += 1 # Increment total trade count
else:
= None # If unable to buy (e.g., too small), stay in cash
selected_asset self.log("Could not calculate order size, staying in cash")
except Exception as e:
self.log(f"Error buying {selected_asset}: {e}")
= None # On error, fall back to cash
selected_asset
# If no asset was selected (or purchase failed), increment cash periods counter
if not selected_asset:
self.cash_periods += 1
self.log(f"HOLDING CASH: {reason}")
self.current_asset = selected_asset # Update the currently held asset
def next(self):
# Check if all data feeds have warmed up for calculations
# Uses the first data feed's length as a proxy
if len(self.datas[0]) < self.params.momentum_lookback + 1:
return # Skip until we have enough data for the longest lookback
# Track portfolio value for drawdown calculation
= self.broker.getvalue()
current_value self.portfolio_value_history.append(current_value)
# Check if it's time to rebalance based on the rebalance_period
= len(self.datas[0])
current_bar if current_bar - self.last_rebalance >= self.params.rebalance_period:
self.rebalance_portfolio() # Trigger the rebalancing process
self.last_rebalance = current_bar # Reset rebalance counter for next period
def stop(self):
"""Called at the very end of the backtest. Logs final strategy summary."""
= self.broker.getvalue()
final_value = 100000.0 # Assuming initial cash was $100,000
initial_value = (final_value / initial_value - 1) * 100
total_return
self.log(f'=== STRATEGY SUMMARY ===')
self.log(f'Final Portfolio Value: ${final_value:,.2f}')
self.log(f'Total Return: {total_return:.2f}%')
self.log(f'Total Rebalances: {len(self.rebalance_dates)}')
self.log(f'Total Trades: {self.trade_count}')
self.log(f'Cash Periods: {self.cash_periods}')
# Summarize how often each asset was selected
if self.selected_assets_history:
self.log(f'=== ASSET ALLOCATION SUMMARY ===')
= {}
asset_counts for asset in self.selected_assets_history:
= asset_counts.get(asset, 0) + 1
asset_counts[asset]
for asset, count in sorted(asset_counts.items(), key=lambda x: x[1], reverse=True):
= count / len(self.selected_assets_history) * 100
pct self.log(f'{asset}: {count}/{len(self.selected_assets_history)} periods ({pct:.1f}%)')
self.log(f'=== END SUMMARY ===')
Analysis of Core Logic:
calculate_returns()
, calculate_volatility()
,
and calculate_volume_ratio()
are custom helper methods
designed to compute fundamental metrics for each asset over specified
lookback periods. These are crucial for the filtering and selection
process.update_asset_metrics()
: This function
is responsible for iterating through all tracked assets and updating
their respective momentum (long and short term), volatility, and volume
ratio metrics for the current bar. It also calculates an asset’s own
trend (above_sma
) if the market filter is enabled and set
to use individual asset trends.check_asset_filters()
: This function
applies the multi-layered screening process. For a given asset, it
verifies if it meets the abs_momentum_threshold
,
max_volatility_threshold
, min_volume_ratio
,
and optionally the market_filter
(checking if the asset’s
price is above its SMA). An asset must pass all filters to be
considered eligible.calculate_position_size()
: This
function implements volatility-based position sizing. If enabled
(volatility_sizing=True
), it calculates the target position
size for the selected asset inversely proportional to its volatility,
aiming for a target_volatility
for the portfolio. This
adjusts the allocation dynamically based on risk.check_drawdown_protection()
: This
implements a portfolio-level circuit breaker. It continuously tracks the
portfolio’s drawdown relative to its historical peak. If the drawdown
exceeds max_drawdown_threshold
, it triggers a signal to
move to cash, protecting capital.select_best_asset()
: This is the heart
of the asset selection process. It first updates all asset metrics.
Then, it checks for drawdown_protection
. If no drawdown
protection is triggered, it iterates through all assets, applying
check_asset_filters()
to identify eligible ones. From the
eligible assets, it selects the one with the highest
momentum_long
. If no assets are eligible, it signals to
move to cash.rebalance_portfolio()
: This function
orchestrates the actual portfolio adjustments. It calls
select_best_asset()
. If the selected_asset
is
different from self.current_asset
, it closes any existing
position and then opens a new position in the
selected_asset
, using
calculate_position_size()
for allocation. If no asset is
selected or the purchase fails, the strategy moves to cash.next()
Method: This is the main
bar-by-bar execution loop.
self.rebalance_portfolio()
at fixed
rebalance_period
intervals. This sets the rhythm of the
strategy’s decision-making.stop()
Method: This special
backtrader
method is called at the end of the backtest. It
logs a comprehensive summary of the strategy’s performance, including
total return, number of rebalances, total trades, periods spent in cash,
and a breakdown of how often each asset was selected.This final section sets up the environment for running the multi-asset strategy, including downloading data for multiple tickers and implementing the walk-forward optimization framework.
def download_crypto_data(tickers, start_date, end_date):
"""Downloads historical data for multiple crypto assets."""
print(f"Downloading data for {len(tickers)} assets...")
= {}
all_data for ticker in tickers:
try:
= yf.download(ticker, start=start_date, end=end_date, progress=False)
data if hasattr(data.columns, 'levels'):
= data.columns.droplevel(1) # Flatten multi-level columns
data.columns
if len(data) > 252: # Ensure at least 1 year of data for long lookbacks
= data
all_data[ticker] # Print basic stats for downloaded data for quick verification
= (data['Close'][-1] / data['Close'][0] - 1) * 100
total_return = data['Close'].pct_change().std() * np.sqrt(252) * 100
volatility = data['Volume'].mean()
avg_volume print(f"✓ {ticker}: {len(data)} days, Return: {total_return:.1f}%, Vol: {volatility:.1f}%, Avg Volume: {avg_volume:,.0f}")
else:
print(f"✗ {ticker}: Insufficient data ({len(data)} days)")
except Exception as e:
print(f"✗ {ticker}: Download failed - {e}")
return all_data
def run_enhanced_dual_momentum(tickers, start_date, end_date, **strategy_params):
"""
Runs the Enhanced Dual Momentum strategy for a given period and parameter set.
Handles data downloading and Cerebro setup.
"""
# Download data for all assets
= download_crypto_data(tickers, start_date, end_date)
crypto_data
if len(crypto_data) < 2: # Need at least two assets to form a universe
print("Not enough assets with sufficient data to run strategy.")
return None, None, None
print(f"\nRunning strategy with {len(crypto_data)} assets...")
# Setup Cerebro
= bt.Crebro()
cerebro
# Add strategy with provided parameters
**strategy_params)
cerebro.addstrategy(EnhancedDualMomentumStrategy,
# Add data feeds for all downloaded assets
for ticker, data in crypto_data.items():
= bt.feeds.PandasData(dataname=data, name=ticker)
feed
cerebro.adddata(feed)
# Set broker parameters
100000.0)
cerebro.broker.setcash(=strategy_params.get('transaction_cost', 0.001))
cerebro.broker.setcommission(commission
# Add analyzers for comprehensive performance analysis (for individual runs)
='sharpe')
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='returns')
cerebro.addanalyzer(bt.analyzers.Returns, _name='trades')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name
print(f'Starting Portfolio Value: ${cerebro.broker.getvalue():,.2f}')
# Run strategy
try:
= cerebro.run()
results = results[0] if results else None # Get the strategy instance
strat
print(f'Final Portfolio Value: ${cerebro.broker.getvalue():,.2f}')
return cerebro, strat, crypto_data
except Exception as e:
print(f"Error running strategy: {e}")
return None, None, None
def analyze_performance(strat, crypto_data, benchmark_ticker="BTC-USD"):
"""Analyzes and prints detailed performance metrics of the strategy."""
print('\n' + '='*60)
print('ENHANCED DUAL MOMENTUM PERFORMANCE ANALYSIS')
print('='*60)
try:
# Basic performance metrics from analyzers
= strat.analyzers.sharpe.get_analysis()
sharpe_analysis = sharpe_analysis.get('sharperatio', None)
sharpe_ratio
= strat.analyzers.drawdown.get_analysis()
drawdown_analysis = drawdown_analysis.get('max', {}).get('drawdown', 0)
max_drawdown
= strat.analyzers.returns.get_analysis()
returns_analysis = returns_analysis.get('rtot', 0) * 100
total_return
= strat.analyzers.trades.get_analysis()
trade_analysis = trade_analysis.get('total', {}).get('total', 0)
total_trades
print(f'Total Return: {total_return:.2f}%')
print(f'Sharpe Ratio: {sharpe_ratio:.3f}' if sharpe_ratio else 'Sharpe Ratio: N/A')
print(f'Max Drawdown: {max_drawdown:.2f}%')
print(f'Total Trades: {total_trades}')
# Asset selection frequency analysis
if hasattr(strat, 'selected_assets_history') and strat.selected_assets_history:
= pd.Series(strat.selected_assets_history).value_counts()
asset_counts print(f'\nAsset Selection Frequency:')
for asset, count in asset_counts.items():
= count / len(strat.selected_assets_history) * 100
pct print(f' {asset}: {count} times ({pct:.1f}%)')
else:
print('\nNo asset selection history available')
# Compare with benchmark (e.g., BTC-USD Buy & Hold)
if benchmark_ticker in crypto_data:
= crypto_data[benchmark_ticker]
benchmark_data = ((benchmark_data['Close'][-1] / benchmark_data['Close'][0]) - 1) * 100
benchmark_return print(f'\n{benchmark_ticker} Buy & Hold Return: {benchmark_return:.2f}%')
print(f'Strategy vs {benchmark_ticker}: {total_return - benchmark_return:.2f}% outperformance')
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_trades': total_trades
}
except Exception as e:
print(f"Error in performance analysis: {e}")
return {
'total_return': 0,
'sharpe_ratio': 0,
'max_drawdown': 0,
'total_trades': 0
}
def run_walk_forward_optimization(tickers, start_date, end_date):
"""
Performs a simplified walk-forward analysis with parameter combinations.
It runs strategy with fixed parameter sets over the full period and evaluates.
"""
print('\n' + '='*60)
print('WALK-FORWARD OPTIMIZATION')
print('='*60)
# Predefined parameter combinations to test in walk-forward
= [
param_combinations 'momentum_lookback': 126, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.6},
{'momentum_lookback': 189, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.7},
{'momentum_lookback': 252, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.8},
{'momentum_lookback': 252, 'abs_momentum_threshold': 0.05, 'max_volatility_threshold': 0.7},
{'momentum_lookback': 315, 'abs_momentum_threshold': 0.0, 'max_volatility_threshold': 0.8},
{
]
= pd.to_datetime(start_date)
start = pd.to_datetime(end_date)
end
= [] # To store results for each parameter combination
results
# Iterate through each predefined parameter combination
for i, params in enumerate(param_combinations):
print(f"\nTesting parameter set {i+1}/{len(param_combinations)}: {params}")
try:
# Run the strategy with current parameter set over the FULL period for this walk-forward segment
# Note: This is a fixed-parameter validation across the full historical period for *each* parameter set.
# A true rolling walk-forward optimization would involve splitting into in-sample/out-of-sample segments.
= run_enhanced_dual_momentum(
cerebro_instance, strat_instance, crypto_data_instance **params
tickers, start_date, end_date,
)
if strat_instance is not None:
# Analyze performance for this parameter set
= analyze_performance(strat_instance, crypto_data_instance)
performance 'params'] = params # Store params with results
performance[
results.append(performance)
except Exception as e:
print(f"Error with parameter set {i+1}: {e}")
# Find the best parameter set based on Sharpe Ratio
if results:
= max(results, key=lambda x: x.get('sharpe_ratio', 0) or 0)
best_result print(f"\n{'='*40}")
print(f"BEST PARAMETER SET (by Sharpe Ratio):")
print(f"Parameters: {best_result['params']}")
print(f"Sharpe Ratio: {best_result['sharpe_ratio']:.3f}")
print(f"Total Return: {best_result['total_return']:.2f}%")
print(f"Max Drawdown: {best_result['max_drawdown']:.2f}%")
return results
# Main execution block
if __name__ == "__main__":
# Global Configuration
= ["BTC-USD", "ETH-USD", "SOL-USD"] # Universe of assets to consider
TICKERS = "2021-01-01"
START_DATE = "2024-12-31" # Up to current time
END_DATE
print("=" * 80)
print("ENHANCED DUAL MOMENTUM STRATEGY WITH BACKTRADER")
print("=" * 80)
print(f"Universe: {', '.join(TICKERS)}")
print(f"Period: {START_DATE} to {END_DATE}")
print("\nEnhancements Included:")
print("✓ Absolute + Relative Momentum")
print("✓ Volatility filtering")
print("✓ Volume filtering")
print("✓ Market trend filtering (per asset, if enabled)")
print("✓ Drawdown protection (portfolio-level)")
print("✓ Volatility-based position sizing")
print("✓ Transaction costs & slippage simulation")
print("✓ Walk-forward optimization (parameter validation)")
# Strategy parameters for the MAIN BACKTEST run
= {
strategy_params 'momentum_lookback': 90, # Long-term momentum lookback (e.g., 90 days)
'short_momentum_lookback': 21, # Short-term momentum lookback (e.g., 21 days - not explicitly used in asset selection here)
'abs_momentum_threshold': 0.0, # Minimum positive return required
'rebalance_period': 21, # Monthly rebalancing
'volatility_lookback': 63, # 3 months for volatility
'max_volatility_threshold': 3.0, # High volatility tolerance for crypto (300% annualized)
'min_volume_ratio': 0.1, # Low volume requirement (10% of avg)
'volume_lookback': 20, # Period for volume average
'max_position_pct': 100, # Max 100% allocation to single asset
'drawdown_protection': True, # Enable drawdown protection
'max_drawdown_threshold': 0.30, # 30% max portfolio drawdown (adjusted for crypto volatility)
'volatility_sizing': True, # Use volatility targeting for position sizing
'target_volatility': 0.30, # Target 30% annualized portfolio volatility
'use_market_filter': False, # Disable overall market filter (using asset's own SMA now)
'market_sma_period': 200, # (Still defined for asset's own SMA if filter enabled)
'transaction_cost': 0.001, # 0.1% transaction cost
'slippage_factor': 0.0005, # 0.05% slippage (broker level)
'printlog': False # Turn off detailed logging for main run
}
# --- Execute Main Backtest ---
print(f"\n{'-'*60}")
print("RUNNING MAIN BACKTEST (Full Period)")
print(f"{'-'*60}")
# Run the strategy on the full historical period
# Returns cerebro instance, strategy instance, and downloaded data
= run_enhanced_dual_momentum(
cerebro_main, strat_main, crypto_data_main **strategy_params
TICKERS, START_DATE, END_DATE,
)
if strat_main is not None:
# Analyze performance of the main backtest
= analyze_performance(strat_main, crypto_data_main)
main_performance
# Plot results of the main backtest
print(f"\n{'-'*40}")
print("PLOTTING MAIN BACKTEST RESULTS")
print(f"{'-'*40}")
# Plotting the main backtest. cerebro.plot returns a list of figures.
= cerebro_main.plot(iplot=False, style='line', volume=False)
main_figs
# Adjusting layout for each figure in the list
for fig in main_figs:
= fig[0] # Unpack the figure object from the tuple
fig_obj 'Enhanced Dual Momentum Strategy Performance (Main Backtest)', fontsize=16)
fig_obj.suptitle(=[0, 0.03, 1, 0.95]) # Adjust layout to prevent title overlap
fig_obj.tight_layout(rect=0.3) # Add vertical spacing between subplots
fig_obj.subplots_adjust(hspace# Display the plot
fig_obj.show()
# --- Execute Walk-Forward Optimization ---
print(f"\n{'-'*60}")
print("STARTING WALK-FORWARD OPTIMIZATION (Parameter Validation)")
print(f"{'-'*60}")
# Run walk-forward optimization (which itself calls run_enhanced_dual_momentum multiple times)
= run_walk_forward_optimization(TICKERS, START_DATE, END_DATE)
wf_results
print(f"\n{'='*80}")
print("COMPREHENSIVE ANALYSIS COMPLETE")
print(f"{'='*80}")
else:
print("Main strategy execution failed! Aborting further analysis.")
The journey through the Enhanced Dual Momentum Strategy reveals a sophisticated approach to algorithmic portfolio management, far transcending simple momentum plays. This exploration underscores the ambition to build trading systems that are not only capable of identifying performance leaders but are also robustly equipped to navigate the inherent complexities and shifting regimes of financial markets.
The strategy’s strength lies in its multi-layered architecture. By incorporating stringent filters for volatility and liquidity, alongside a market trend filter (even if simplified), it attempts to operate only in environments deemed favorable. Its advanced risk management features, such as portfolio-level drawdown protection and volatility-based position sizing, represent critical components designed to preserve capital and smooth equity curves. Furthermore, its application within a walk-forward optimization framework speaks volumes about a commitment to rigorous research, providing a more realistic assessment of its adaptability to unseen market conditions.
Ultimately, this strategy serves as a compelling testament to the ongoing quest in quantitative finance: to create intelligent, adaptive systems that can continually learn and respond to the market’s evolving narrative. It highlights that building truly resilient portfolios is not about finding a single magic indicator, but about meticulously layering a diverse set of principles, constantly testing their interactions, and always maintaining a spirit of continuous exploration and refinement.