← Back to Home
Exploring Non-Linear Dynamics Rolling Backtest of Rough Path Momentum Strategy with Adaptive Trailing Stops

Exploring Non-Linear Dynamics Rolling Backtest of Rough Path Momentum Strategy with Adaptive Trailing Stops

This article introduces a sophisticated quantitative trading strategy, RoughPathMomentumStrategy, which delves into the realm of rough path theory to capture complex momentum characteristics. Unlike traditional indicators that rely on direct price or volume averages, this strategy constructs “path signatures” from price increments to infer market direction and persistence. It is further enhanced by an adaptive trailing stop-loss mechanism for robust risk management. The strategy’s performance is evaluated using a comprehensive rolling backtesting framework.

1. The Rough Path Momentum Strategy Concept

The RoughPathMomentumStrategy is inspired by the mathematical concept of “rough paths,” which provides a rigorous framework for understanding and analyzing highly oscillatory and non-differentiable paths, such as financial time series. The core idea is that the “signature” of a path contains all the relevant information about its shape and dynamics, regardless of its specific parameterization. By analyzing these signatures, the strategy aims to uncover deeper, multi-level momentum signals.

Key Concepts and Components:

Entry Logic:

Exit Logic:

2. The RoughPathMomentumStrategy Implementation

Here are the key components of the strategy:

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

class RoughPathMomentumStrategy(bt.Strategy):
    params = (
        ('signature_window', 30),     # Window for path signature calculation (e.g., 30 bars of returns)
        ('signature_depth', 3),       # Signature truncation level (Level 1, Level 2, Level 3)
        ('momentum_threshold', 0.1),  # Momentum signature threshold for entry/exit
        ('trailing_stop_pct', 0.05),  # 5% trailing stop
    )
    
    def __init__(self):
        self.close = self.data.close
        # Calculate daily percentage returns as path increments
        self.returns = bt.indicators.PctChange(self.close, period=1) 
        
        # Path data storage for the rolling window
        self.path_increments = []
        
        # Signature components (for internal tracking/debugging)
        self.level1_signature = 0
        self.level2_signature = 0
        self.level3_signature = 0
        self.momentum_signature = 0 # The combined signature value used for trading
        
        # Trailing stop tracking variables
        self.entry_price = 0
        self.trailing_stop_price = 0
        self.highest_price_since_entry = 0 # For long positions
        self.lowest_price_since_entry = 0  # For short positions
        
        # Track orders to prevent multiple concurrent orders
        self.order = None
        self.stop_order = None # For the trailing stop order

    def calculate_path_signature(self, increments):
        """
        Calculates the iterated integrals (signature components) of a path
        up to a specified depth. This is a simplified implementation.
        """
        if len(increments) < 2: # Need at least 2 points (1 increment) for a path
            return 0, 0, 0
            
        increments_arr = np.array(increments)
        n = len(increments_arr)
        
        # Level 1 signature: ∫ dX (sum of increments)
        level1 = np.sum(increments_arr)
        
        # Level 2 signature: ∫∫ dX ⊗ dX (iterated integrals of increments)
        # This is a basic approximation for demonstration.
        # It approximates the sum of xi * xj for i < j
        level2 = 0
        for i in range(n):
            for j in range(i + 1, n):
                level2 += increments_arr[i] * increments_arr[j]
        
        # Level 3 signature: ∫∫∫ dX ⊗ dX ⊗ dX (even higher order)
        # A very simplified approximation
        level3 = 0
        for i in range(n):
            for j in range(i + 1, n):
                for k in range(j + 1, n):
                    level3 += increments_arr[i] * increments_arr[j] * increments_arr[k]
        
        return level1, level2, level3

    def extract_momentum_signature(self, increments):
        """
        Combines different levels of the path signature into a single
        momentum signature, normalized by path length.
        """
        if len(increments) < self.params.signature_depth: # Not enough increments to compute full depth
            return 0
        
        # Calculate signature components up to the specified depth
        level1, level2, level3 = self.calculate_path_signature(increments)
        
        # Store components (for plotting/debugging if needed)
        self.level1_signature = level1
        self.level2_signature = level2
        self.level3_signature = level3
        
        # Weighted combination for the final momentum signature
        # Normalization by path_length is crucial to make levels comparable
        path_length = len(increments)
        if path_length == 0: return 0
        
        # Assigning weights and normalizing by powers of path_length
        # These weights and normalization factors are heuristic for this example.
        momentum_sig = (
            level1 * 0.5 +                     # Direct directional bias
            level2 * 0.3 / (path_length) +     # Captures vol-vol interactions / persistence
            level3 * 0.2 / (path_length**2)    # Captures higher-order curvature
        )
        
        return momentum_sig

    def is_signature_invariant(self, increments):
        """
        Checks for a simplified form of signature invariance (stability)
        by comparing signatures of sub-paths.
        """
        # Needs enough data to split into meaningful sub-paths
        if len(increments) < self.params.signature_window / 2: 
            return False 
        
        # Split increments into two halves
        mid = len(increments) // 2
        if mid == 0 or len(increments) - mid == 0: return False # Ensure halves are not empty

        first_half = increments[:mid]
        second_half = increments[mid:]
        
        # Calculate momentum signatures for each half
        sig1 = self.extract_momentum_signature(first_half)
        sig2 = self.extract_momentum_signature(second_half)
        
        # Check if the absolute difference between the signatures is below a threshold
        # A small difference implies stability/invariance
        return abs(sig1 - sig2) < self.params.momentum_threshold * 0.5

    def notify_order(self, order):
        # Handles order completion and placement of trailing stops
        if order.status in [order.Completed]:
            if order.isbuy(): # Long entry completed
                self.entry_price = order.executed.price
                self.highest_price_since_entry = order.executed.price
                # Place initial trailing stop based on entry price
                self.trailing_stop_price = order.executed.price * (1 - self.params.trailing_stop_pct)
                self.stop_order = self.sell(exectype=bt.Order.Stop, price=self.trailing_stop_price, size=self.position.size)
                self.log(f'BUY EXECUTED at {self.entry_price:.2f}. Initial Trailing Stop: {self.trailing_stop_price:.2f}')
                
            elif order.issell(): # Short entry completed or long exit completed
                # Differentiate between initial short entry and closing long
                if self.position.size < 0: # If we are now net short
                    self.entry_price = order.executed.price
                    self.lowest_price_since_entry = order.executed.price
                    # Place initial trailing stop for short position
                    self.trailing_stop_price = order.executed.price * (1 + self.params.trailing_stop_pct)
                    self.stop_order = self.buy(exectype=bt.Order.Stop, price=self.trailing_stop_price, size=abs(self.position.size))
                    self.log(f'SELL SHORT EXECUTED at {self.entry_price:.2f}. Initial Trailing Stop: {self.trailing_stop_price:.2f}')
                else: # It was a close order for a long position (regular exit or stop-loss)
                     self.log(f'CLOSE LONG EXECUTED at {order.executed.price:.2f}')
            
            # Reset entry order reference
            self.order = None
            # If the completed order was our stop order, reset its reference and tracking variables
            if self.stop_order is not None and order.ref == self.stop_order.ref:
                self.stop_order = None
                self.entry_price = 0
                self.trailing_stop_price = 0
                self.highest_price_since_entry = 0
                self.lowest_price_since_entry = 0
                self.log(f'STOP ORDER EXECUTED at {order.executed.price:.2f}. Position closed.')
        
        elif order.status in [order.Canceled, order.Rejected]:
            self.log(f'Order Canceled/Rejected: Status {order.getstatusname()}')
            if self.order and order.ref == self.order.ref: # If it was our main entry order
                self.order = None
            if self.stop_order and order.ref == self.stop_order.ref: # If it was our stop order
                self.log('WARNING: Trailing Stop Order Failed/Canceled!')
                self.stop_order = None # Position is now unprotected by this stop

    def notify_trade(self, trade):
        if not trade.isclosed:
            return
        self.log(f'OPERATION PROFIT: GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}')

    def update_trailing_stop(self):
        """Manages the adaptive trailing stop-loss."""
        if not self.position or self.stop_order is None or not self.stop_order.alive():
            return # No position or no active stop order
            
        current_price = self.close[0]
        
        if self.position.size > 0: # Long position
            # Update highest price reached since entry
            if current_price > self.highest_price_since_entry:
                self.highest_price_since_entry = current_price
            
            # Calculate new potential stop price
            new_stop_price = self.highest_price_since_entry * (1 - self.params.trailing_stop_pct)
            
            # Move stop up only if it's higher than the current stop
            if new_stop_price > self.trailing_stop_price:
                self.log(f'Updating long stop from {self.trailing_stop_price:.2f} to {new_stop_price:.2f}')
                self.trailing_stop_price = new_stop_price
                self.cancel(self.stop_order) # Cancel old stop order
                self.stop_order = self.sell(exectype=bt.Order.Stop, price=self.trailing_stop_price, size=self.position.size)
        
        elif self.position.size < 0: # Short position
            # Update lowest price reached since entry
            if current_price < self.lowest_price_since_entry:
                self.lowest_price_since_entry = current_price
            
            # Calculate new potential stop price
            new_stop_price = self.lowest_price_since_entry * (1 + self.params.trailing_stop_pct)
            
            # Move stop down only if it's lower than the current stop
            if new_stop_price < self.trailing_stop_price:
                self.log(f'Updating short stop from {self.trailing_stop_price:.2f} to {new_stop_price:.2f}')
                self.trailing_stop_price = new_stop_price
                self.cancel(self.stop_order) # Cancel old stop order
                self.stop_order = self.buy(exectype=bt.Order.Stop, price=self.trailing_stop_price, size=abs(self.position.size))


    def next(self):
        # Always update trailing stop first if in a position
        self.update_trailing_stop()
        
        # Prevent new entry/exit orders if one is already pending
        if self.order is not None:
            return
            
        # Add current return to path increments
        if not np.isnan(self.returns[0]):
            self.path_increments.append(self.returns[0])
        
        # Manage the rolling window of increments
        if len(self.path_increments) > self.params.signature_window * 2: # Keep double the window for stability checks
            self.path_increments = self.path_increments[-self.params.signature_window * 2:]
            
        # Ensure enough data for signature calculation
        if len(self.path_increments) < self.params.signature_window:
            return # Not enough historical returns for a full signature window
            
        # Extract momentum signature from the most recent window
        recent_path = self.path_increments[-self.params.signature_window:]
        momentum_sig = self.extract_momentum_signature(recent_path)
        self.momentum_signature = momentum_sig # Store for plotting/debugging
        
        # Check for signature stability/invariance
        is_stable = self.is_signature_invariant(recent_path)
        
        # Trading signals based on momentum signatures and stability
        if abs(momentum_sig) > self.params.momentum_threshold and is_stable:
            
            # Strong positive momentum signature: Go long or close short
            if momentum_sig > self.params.momentum_threshold:
                if self.position.size < 0: # If currently short, close it
                    self.log(f'Rough Path Momentum: CLOSE SHORT. Strong BULLISH Sig: {momentum_sig:.4f}, Stable: {is_stable}')
                    if self.stop_order is not None: self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position: # If flat, open long
                    self.log(f'Rough Path Momentum: BUY. Strong BULLISH Sig: {momentum_sig:.4f}, Stable: {is_stable}')
                    self.order = self.buy()
            
            # Strong negative momentum signature: Go short or close long
            elif momentum_sig < -self.params.momentum_threshold:
                if self.position.size > 0: # If currently long, close it
                    self.log(f'Rough Path Momentum: CLOSE LONG. Strong BEARISH Sig: {momentum_sig:.4f}, Stable: {is_stable}')
                    if self.stop_order is not None: self.cancel(self.stop_order)
                    self.order = self.close()
                elif not self.position: # If flat, open short
                    self.log(f'Rough Path Momentum: SELL (Short). Strong BEARISH Sig: {momentum_sig:.4f}, Stable: {is_stable}')
                    self.order = self.sell()

Explanation of RoughPathMomentumStrategy:

3. Backtesting and Analysis

The provided script includes a robust rolling backtesting framework to thoroughly evaluate the strategy’s performance.

# ... (imports from top of the rolling backtest script) ...
import datetime # Added import for current date

# Define the strategy for the rolling backtest
strategy = RoughPathMomentumStrategy

def run_rolling_backtest(
    ticker="BTC-USD",
    start="2018-01-01",
    end="2025-06-23", # Updated end date to current date
    window_months=3,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            current_end = end_dt 
            if current_start >= current_end:
                break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        # Data download using yfinance, respecting user's preference
        # Using the saved preference: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
        
        # Apply droplevel if data is a MultiIndex, as per user's preference
        if isinstance(data.columns, pd.MultiIndex):
            data = data.droplevel(1, axis=1)

        # Check for sufficient data after droplevel for strategy warm-up
        # Requires at least signature_window * 2 bars for path increments and stability check
        sig_window = strategy_params.get('signature_window', RoughPathMomentumStrategy.params.signature_window)
        min_bars_needed = sig_window * 2 + 1 # +1 for current return calculation
        
        if data.empty or len(data) < min_bars_needed:
            print(f"Not enough data for period {current_start.date()} to {current_end.date()} (requires at least {min_bars_needed} bars). Skipping.")
            if current_end == end_dt:
                break
            current_start = current_end
            continue

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        cerebro.addstrategy(strategy, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000)
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

        start_val = cerebro.broker.getvalue()
        cerebro.run()
        final_val = cerebro.broker.getvalue()
        ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': ret,
            'final_value': final_val,
        })

        print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
        
        if current_end == end_dt:
            break
        current_start = current_end

    return pd.DataFrame(all_results)


def report_stats(df):
    returns = df['return_pct']
    stats = {
        'Mean Return %': np.mean(returns),
        'Median Return %': np.median(returns),
        'Std Dev %': np.std(returns),
        'Min Return %': np.min(returns),
        'Max Return %': np.max(returns),
        'Sharpe Ratio': np.mean(returns) / np.std(returns) if np.std(returns) > 0 else np.nan
    }
    print("\n=== ROLLING BACKTEST STATISTICS ===")
    for k, v in stats.items():
        print(f"{k}: {v:.2f}")
    return stats

def plot_four_charts(df, rolling_sharpe_window=4):
    """
    Generates four analytical plots for rolling backtest results.
    """
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8)) # Adjusted figsize for clarity
    
    periods = list(range(len(df)))
    returns = df['return_pct']
    
    # 1. Period Returns (Top Left)
    colors = ['green' if r >= 0 else 'red' for r in returns]
    ax1.bar(periods, returns, color=colors, alpha=0.7)
    ax1.set_title('Period Returns', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Period')
    ax1.set_ylabel('Return %')
    ax1.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    ax1.grid(True, alpha=0.3)
    
    # 2. Cumulative Returns (Top Right)
    cumulative_returns = (1 + returns / 100).cumprod() * 100 - 100
    ax2.plot(periods, cumulative_returns, marker='o', linewidth=2, markersize=4, color='blue')
    ax2.set_title('Cumulative Returns', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Period')
    ax2.set_ylabel('Cumulative Return %')
    ax2.grid(True, alpha=0.3)
    
    # 3. Rolling Sharpe Ratio (Bottom Left)
    rolling_sharpe = returns.rolling(window=rolling_sharpe_window).apply(
        lambda x: x.mean() / x.std() if x.std() > 0 else np.nan, raw=False
    )
    valid_mask = ~rolling_sharpe.isna()
    valid_periods = [i for i, valid in enumerate(valid_mask) if valid]
    valid_sharpe = rolling_sharpe[valid_mask]
    
    ax3.plot(valid_periods, valid_sharpe, marker='o', linewidth=2, markersize=4, color='orange')
    ax3.axhline(y=0, color='red', linestyle='--', alpha=0.5)
    ax3.set_title(f'Rolling Sharpe Ratio ({rolling_sharpe_window}-period)', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Period')
    ax3.set_ylabel('Sharpe Ratio')
    ax3.grid(True, alpha=0.3)
    
    # 4. Return Distribution (Bottom Right)
    bins = min(15, max(5, len(returns)//2))
    ax4.hist(returns, bins=bins, alpha=0.7, color='steelblue', edgecolor='black')
    mean_return = returns.mean()
    ax4.axvline(mean_return, color='red', linestyle='--', linewidth=2, 
                label=f'Mean: {mean_return:.2f}%')
    ax4.set_title('Return Distribution', fontsize=14, fontweight='bold')
    ax4.set_xlabel('Return %')
    ax4.set_ylabel('Frequency')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    # Use current date for the end of the backtest for a more "live" simulation
    current_date = datetime.now().date() 
    
    df = run_rolling_backtest(
        ticker="BTC-USD", # Default ticker for article's example
        start="2018-01-01",
        end=current_date, # Use the current date
        window_months=3,
       
    )

    print("\n=== ROLLING BACKTEST RESULTS ===")
    print(df)

    stats = report_stats(df)
    plot_four_charts(df)

Pasted image 20250623145235.png ### 4. Conclusion

The RoughPathMomentumStrategy represents an ambitious and innovative step towards integrating advanced mathematical concepts like rough path theory into quantitative trading. By analyzing the “signatures” of price paths, it aims to capture deeper, multi-level momentum characteristics that traditional indicators might miss. The inclusion of a signature invariance check provides a unique filter for signal reliability, while the adaptive trailing stop ensures robust risk management. The rigorous rolling backtesting framework is essential for assessing such a complex strategy, offering a more reliable evaluation of its consistency and resilience across diverse market environments. While still an emerging field in practical trading, strategies like this open new avenues for understanding and profiting from the non-linear dynamics of financial markets. Further research into optimal signature depth, weighting schemes, and stability criteria, possibly combined with machine learning techniques, could unlock even greater potential.