← Back to Home
Dynamic Trend Capture Rolling Backtest of Polynomial Channel Breakout Strategy with Adaptive Trailing Stops

Dynamic Trend Capture Rolling Backtest of Polynomial Channel Breakout Strategy with Adaptive Trailing Stops

This article introduces a sophisticated quantitative trading strategy, PolynomialChannelBreakoutStrategy, which leverages polynomial regression to dynamically identify price channels. Instead of fixed-width bands, these channels adapt to the non-linear trends of the market. The strategy aims to capitalize on channel breakouts and employs an Average True Range (ATR)-based trailing stop for robust risk management.

1. The Polynomial Channel Breakout Strategy Concept

Traditional trading channels often rely on simple linear regression or fixed-width bands (like Bollinger Bands). However, real-world price movements are rarely linear. This strategy addresses this by fitting a polynomial curve to historical price data, allowing the channel to better capture the actual shape of the trend.

Key Components:

2. The PolynomialChannelIndicator

This custom indicator is fundamental to the strategy. It leverages scikit-learn for polynomial regression.

import backtrader as bt
import numpy as np
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
import warnings

# Suppress sklearn warnings if they occur during fitting
warnings.filterwarnings('ignore')

class PolynomialChannelIndicator(bt.Indicator):
    lines = ('upper_channel', 'lower_channel', 'regression_line')
    
    params = (
        ('degree', 3),           # Polynomial degree
        ('channel_width', 2.0),  # Channel width in standard deviations
        ('lookback', 50),        # Lookback period for regression
    )
    
    plotinfo = dict(
        plot=True,
        subplot=False, # Plot on the main price chart
        plotlinelabels=True
    )
    
    plotlines = dict(
        upper_channel=dict(color='red', ls='--', alpha=0.7),
        lower_channel=dict(color='green', ls='--', alpha=0.7),
        regression_line=dict(color='blue', ls='-', alpha=0.8)
    )
    
    def __init__(self):
        self.addminperiod(self.params.lookback) # Ensure enough data for lookback
            
    def next(self):
        """Calculate polynomial channels for current bar."""
        if len(self.data) < self.params.lookback:
            return
            
        # Get price data for the lookback period, reversed to get newest last
        prices = np.array([self.data.close[-i] for i in range(self.params.lookback-1, -1, -1)])
            
        try:
            # Create x values (time points from 0 to lookback-1)
            x = np.arange(len(prices)).reshape(-1, 1)
            
            # Create a pipeline for polynomial features and linear regression
            poly_reg = make_pipeline(PolynomialFeatures(self.params.degree), LinearRegression())
            poly_reg.fit(x, prices) # Fit the model to prices
                
            # Predict values using the fitted model
            y_pred = poly_reg.predict(x)
                
            # Calculate residuals (difference between actual and predicted prices)
            residuals = prices - y_pred
            std_residuals = np.std(residuals) # Standard deviation of residuals (error)
                
            # The current regression value is the last predicted point
            current_regression = y_pred[-1]
                
            # Set channel boundaries for the current bar
            self.lines.upper_channel[0] = current_regression + (self.params.channel_width * std_residuals)
            self.lines.lower_channel[0] = current_regression - (self.params.channel_width * std_residuals)
            self.lines.regression_line[0] = current_regression
                
        except Exception as e:
            # Fallback in case of calculation error (e.g., singular matrix)
            # Use previous values if available, otherwise NaN
            if len(self) > 1:
                self.lines.upper_channel[0] = self.lines.upper_channel[-1]
                self.lines.lower_channel[0] = self.lines.lower_channel[-1]
                self.lines.regression_line[0] = self.lines.regression_line[-1]
            else:
                self.lines.upper_channel[0] = float('nan')
                self.lines.lower_channel[0] = float('nan')
                self.lines.regression_line[0] = float('nan')

Explanation of PolynomialChannelIndicator:

3. The PolynomialChannelBreakoutStrategy Implementation

import backtrader as bt
import backtrader.indicators as btind # Used for ATR
import numpy as np
# Import PolynomialChannelIndicator here or ensure it's defined above

class PolynomialChannelBreakoutStrategy(bt.Strategy):
    params = (
        ('degree', 3),             # Polynomial degree for the channel
        ('channel_width', 2.0),    # Channel width in standard deviations
        ('lookback', 30),          # Lookback period for polynomial regression
        ('trail_atr_mult', 3.0),   # ATR multiple for trailing stop
        ('atr_period', 14),        # ATR period for trailing stop
        ('use_regression_exit', False), # Option to exit on regression line cross
        ('printlog', True),        # Enable/disable logging
    )
    
    def __init__(self):
        self.dataclose = self.datas[0].close
        
        # Instantiate our custom Polynomial Channel Indicator
        self.poly_channel = PolynomialChannelIndicator(
            self.datas[0], # Pass the data feed to the indicator
            degree=self.params.degree,
            channel_width=self.params.channel_width,
            lookback=self.params.lookback
        )
        
        # ATR for trailing stops
        self.atr = btind.ATR(period=self.params.atr_period)
        
        # Trailing stop variables
        self.trail_stop = None       # The current price level of the trailing stop
        self.entry_price = None      # Price at which the current position was entered
        self.position_type = 0       # 0: no position, 1: long, -1: short
        self.order = None            # To track active entry/exit orders
        
        # Counters for logging strategy activity
        self.signal_count = 0
        self.long_signals = 0
        self.short_signals = 0
        self.exit_signals = 0        # Exits from regression line
        self.trail_exits = 0         # Exits from trailing stop hit
            
    def log(self, txt, dt=None):
        """Logging function for strategy actions."""
        if self.params.printlog:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}: {txt}')
            
    def notify_order(self, order):
        """Handles order notifications and sets initial trailing stop."""
        if order.status in [order.Submitted, order.Accepted]:
            return # Order is pending, nothing to do yet
            
        if order.status in [order.Completed]:
            if order.isbuy(): # A buy order has completed (either entry long or cover short)
                self.log(f'BUY EXECUTED: Price: {order.executed.price:.2f}, '
                         f'Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}')
                # If we are now in a long position, set initial trailing stop
                if self.position.size > 0:
                    self.position_type = 1
                    self.entry_price = order.executed.price
                    # Calculate initial trailing stop
                    self.trail_stop = self.entry_price - (self.atr[0] * self.params.trail_atr_mult)
                    self.log(f'INITIAL LONG STOP set at: {self.trail_stop:.2f}')
                    
            elif order.issell(): # A sell order has completed (either entry short or exit long)
                self.log(f'SELL EXECUTED: Price: {order.executed.price:.2f}, '
                         f'Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}')
                # If we are now in a short position, set initial trailing stop
                if self.position.size < 0: # This means it was an opening short position
                    self.position_type = -1
                    self.entry_price = order.executed.price
                    # Calculate initial trailing stop
                    self.trail_stop = self.entry_price + (self.atr[0] * self.params.trail_atr_mult)
                    self.log(f'INITIAL SHORT STOP set at: {self.trail_stop:.2f}')
                else: # This means it was a closing order for a long position
                    self.position_type = 0 # Position closed
                    self.trail_stop = None # Reset trailing stop tracking
                    self.entry_price = None
                    
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log(f'Order Failed: Status {order.getstatusname()}')
            # Reset order reference if it failed
            
        self.order = None # Clear general order reference after processing

    def notify_trade(self, trade):
        """Handle trade notifications (when a position is fully closed)."""
        if not trade.isclosed:
            return # Only interested in closed trades
            
        self.log(f'TRADE CLOSED: Gross P&L: {trade.pnl:.2f}, Net P&L: {trade.pnlcomm:.2f}')
        # Reset position type and trailing stop after trade closure
        self.position_type = 0
        self.trail_stop = None
        self.entry_price = None
            
    def update_trailing_stop(self, current_price):
        """Dynamically updates the ATR-based trailing stop."""
        if self.trail_stop is None or self.position_type == 0:
            return # No active position or stop
            
        # Ensure ATR has a valid value
        if np.isnan(self.atr[0]):
            return
            
        stop_distance = self.atr[0] * self.params.trail_atr_mult
            
        if self.position_type == 1: # Long position
            new_stop = current_price - stop_distance
            # Trail stop up with price, never down (only raise the stop)
            if new_stop > self.trail_stop:
                old_stop = self.trail_stop
                self.trail_stop = new_stop
                # Log only if the stop moved significantly
                if abs(new_stop - old_stop) / old_stop > 0.005: # > 0.5% move
                    self.log(f'LONG STOP UPDATED: {old_stop:.2f} -> {new_stop:.2f}')
                    
        elif self.position_type == -1: # Short position
            new_stop = current_price + stop_distance
            # Trail stop down with price, never up (only lower the stop)
            if new_stop < self.trail_stop:
                old_stop = self.trail_stop
                self.trail_stop = new_stop
                # Log only if the stop moved significantly
                if abs(new_stop - old_stop) / old_stop > 0.005: # > 0.5% move
                    self.log(f'SHORT STOP UPDATED: {old_stop:.2f} -> {new_stop:.2f}')
            
    def next(self):
        """Main strategy logic executed on each bar."""
        
        # 1. Skip if indicators not ready or pending order
        # Ensure enough data for PolynomialChannelIndicator and ATR
        min_indicator_period = max(self.params.lookback, self.params.atr_period)
        if len(self) < min_indicator_period + 1: # +1 for current bar's close
            return
            
        if self.order: # Prevent new orders if one is already pending
            return
            
        # Check for NaN values from indicators
        if (np.isnan(self.poly_channel.upper_channel[0]) or 
            np.isnan(self.poly_channel.lower_channel[0]) or 
            np.isnan(self.poly_channel.regression_line[0]) or
            np.isnan(self.atr[0])):
            self.log("Indicators not ready (NaN values). Waiting for more data.")
            return
            
        current_price = self.dataclose[0]
        # Get previous prices and indicator values for crossover checks
        prev_price = self.dataclose[-1]
        upper_channel = self.poly_channel.upper_channel[0]
        lower_channel = self.poly_channel.lower_channel[0]
        regression_line = self.poly_channel.regression_line[0]
        
        prev_upper = self.poly_channel.upper_channel[-1]
        prev_lower = self.poly_channel.lower_channel[-1]
        prev_regression = self.poly_channel.regression_line[-1]
        
        # 2. Handle existing positions (Trailing Stop & Optional Regression Exit)
        if self.position:
            # Update trailing stop (this just updates the price, doesn't place order)
            self.update_trailing_stop(current_price)
            
            # Check if trailing stop has been hit (current price crosses the trail_stop)
            # For backtrader, this logic is usually handled by the actual stop order
            # but here we manage it manually for clear logging and control.
            if self.position_type == 1: # Long position
                if current_price <= self.trail_stop:
                    self.trail_exits += 1
                    self.log(f'LONG TRAILING STOP HIT: Price {current_price:.2f} <= Stop {self.trail_stop:.2f}')
                    self.order = self.close() # Close the position
                    return # Exit after placing close order
                    
            elif self.position_type == -1: # Short position
                if current_price >= self.trail_stop:
                    self.trail_exits += 1
                    self.log(f'SHORT TRAILING STOP HIT: Price {current_price:.2f} >= Stop {self.trail_stop:.2f}')
                    self.order = self.close() # Close the position
                    return # Exit after placing close order
                    
            # Optional: Exit if price crosses the regression line
            if self.params.use_regression_exit:
                # Exit long if price breaks below regression line
                if (self.position_type == 1 and 
                    current_price < regression_line and 
                    prev_price >= prev_regression): # Crossover check
                    
                    self.exit_signals += 1
                    self.log(f'LONG REGRESSION EXIT: Price {current_price:.2f} < Regression {regression_line:.2f}')
                    self.order = self.close()
                    return
                    
                # Exit short if price breaks above regression line
                elif (self.position_type == -1 and 
                      current_price > regression_line and 
                      prev_price <= prev_regression): # Crossover check
                    
                    self.exit_signals += 1
                    self.log(f'SHORT REGRESSION EXIT: Price {current_price:.2f} > Regression {regression_line:.2f}')
                    self.order = self.close()
                    return
        
        # 3. Entry Logic - only if currently no position
        else:
            # Long signal: Current price breaks above upper channel, and prev price was below or at upper channel
            if (current_price > upper_channel and 
                prev_price <= prev_upper):
                
                self.signal_count += 1
                self.long_signals += 1
                self.log(f'LONG ENTRY SIGNAL #{self.signal_count}: Price {current_price:.2f} > Upper Channel {upper_channel:.2f}')
                self.order = self.buy() # Place buy order
                    
            # Short signal: Current price breaks below lower channel, and prev price was above or at lower channel
            elif (current_price < lower_channel and 
                  prev_price >= prev_lower):
                
                self.signal_count += 1
                self.short_signals += 1
                self.log(f'SHORT ENTRY SIGNAL #{self.signal_count}: Price {current_price:.2f} < Lower Channel {lower_channel:.2f}')
                self.order = self.sell() # Place sell (short) order
                
    def stop(self):
        """Called at the very end of the backtest to provide a summary."""
        self.log(f'\n=== STRATEGY SUMMARY ===')
        self.log(f'Total Entry Signals Generated: {self.signal_count}')
        self.log(f'Total Long Entry Signals: {self.long_signals}')
        self.log(f'Total Short Entry Signals: {self.short_signals}')
        self.log(f'Total Trailing Stop Exits: {self.trail_exits}')
        if self.params.use_regression_exit:
            self.log(f'Total Regression Line Exits: {self.exit_signals}')
        self.log(f'Final Portfolio Value: ${self.broker.getvalue():,.2f}')

Explanation of PolynomialChannelBreakoutStrategy:

4. Backtesting and Analysis

The provided script includes a dedicated run_backtest() function for single, direct backtests and also leverages your robust rolling backtesting framework for comprehensive performance evaluation.

# ... (imports from the general rolling backtest script) ...
import dateutil.relativedelta as rd # Already present
import seaborn as sns # Already present
from datetime import datetime # For current date

# Define the strategy for the rolling backtest
strategy = PolynomialChannelBreakoutStrategy

def run_rolling_backtest(
    ticker="BTC-USD",
    start="2018-01-01",
    end="2025-06-24", # Current date in Luxembourg
    window_months=3,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            current_end = end_dt 
            if current_start >= current_end:
                break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        # Data download using yfinance, respecting user's preference
        # Using the saved preference: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
        
        # Apply droplevel if data is a MultiIndex, as per user's preference
        if isinstance(data.columns, pd.MultiIndex):
            data = data.droplevel(1, axis=1)

        # Check for sufficient data after droplevel for strategy warm-up
        # Requires enough bars for PolynomialChannelIndicator's lookback and ATR period
        lookback = strategy_params.get('lookback', PolynomialChannelBreakoutStrategy.params.lookback)
        atr_period = strategy_params.get('atr_period', PolynomialChannelBreakoutStrategy.params.atr_period)
        min_bars_needed = max(lookback, atr_period) + 1 # +1 for current bar's data
        
        if data.empty or len(data) < min_bars_needed:
            print(f"Not enough data for period {current_start.date()} to {current_end.date()} (requires at least {min_bars_needed} bars). Skipping.")
            if current_end == end_dt:
                break
            current_start = current_end
            continue

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        cerebro.addstrategy(strategy, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000)
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

        start_val = cerebro.broker.getvalue()
        cerebro.run()
        final_val = cerebro.broker.getvalue()
        ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': ret,
            'final_value': final_val,
        })

        print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
        
        if current_end == end_dt:
            break
        current_start = current_end

    return pd.DataFrame(all_results)


def report_stats(df):
    returns = df['return_pct']
    stats = {
        'Mean Return %': np.mean(returns),
        'Median Return %': np.median(returns),
        'Std Dev %': np.std(returns),
        'Min Return %': np.min(returns),
        'Max Return %': np.max(returns),
        'Sharpe Ratio': np.mean(returns) / np.std(returns) if np.std(returns) > 0 else np.nan
    }
    print("\n=== ROLLING BACKTEST STATISTICS ===")
    for k, v in stats.items():
        print(f"{k}: {v:.2f}")
    return stats

def plot_four_charts(df, rolling_sharpe_window=4):
    """
    Generates four analytical plots for rolling backtest results.
    """
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8)) # Adjusted figsize for clarity
    
    periods = list(range(len(df)))
    returns = df['return_pct']
    
    # 1. Period Returns (Top Left)
    colors = ['green' if r >= 0 else 'red' for r in returns]
    ax1.bar(periods, returns, color=colors, alpha=0.7)
    ax1.set_title('Period Returns', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Period')
    ax1.set_ylabel('Return %')
    ax1.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    ax1.grid(True, alpha=0.3)
    
    # 2. Cumulative Returns (Top Right)
    cumulative_returns = (1 + returns / 100).cumprod() * 100 - 100
    ax2.plot(periods, cumulative_returns, marker='o', linewidth=2, markersize=4, color='blue')
    ax2.set_title('Cumulative Returns', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Period')
    ax2.set_ylabel('Cumulative Return %')
    ax2.grid(True, alpha=0.3)
    
    # 3. Rolling Sharpe Ratio (Bottom Left)
    rolling_sharpe = returns.rolling(window=rolling_sharpe_window).apply(
        lambda x: x.mean() / x.std() if x.std() > 0 else np.nan, raw=False
    )
    valid_mask = ~rolling_sharpe.isna()
    valid_periods = [i for i, valid in enumerate(valid_mask) if valid]
    valid_sharpe = rolling_sharpe[valid_mask]
    
    ax3.plot(valid_periods, valid_sharpe, marker='o', linewidth=2, markersize=4, color='orange')
    ax3.axhline(y=0, color='red', linestyle='--', alpha=0.5)
    ax3.set_title(f'Rolling Sharpe Ratio ({rolling_sharpe_window}-period)', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Period')
    ax3.set_ylabel('Sharpe Ratio')
    ax3.grid(True, alpha=0.3)
    
    # 4. Return Distribution (Bottom Right)
    bins = min(15, max(5, len(returns)//2))
    ax4.hist(returns, bins=bins, alpha=0.7, color='steelblue', edgecolor='black')
    mean_return = returns.mean()
    ax4.axvline(mean_return, color='red', linestyle='--', linewidth=2, 
                label=f'Mean: {mean_return:.2f}%')
    ax4.set_title('Return Distribution', fontsize=14, fontweight='bold')
    ax4.set_xlabel('Return %')
    ax4.set_ylabel('Frequency')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    # Run a single backtest example for illustration
    run_backtest()

    # Then run the rolling backtest for a more robust evaluation
    current_date = datetime.now().date()
    df = run_rolling_backtest(
        ticker="BTC-USD", # Default ticker for article's example
        start="2018-01-01",
        end=current_date, # Use the current date
        window_months=3,
        # strategy_params={ # Example of how to override default parameters
        #     'degree': 4,
        #     'channel_width': 2.5,
        #     'lookback': 40,
        #     'trail_atr_mult': 4.0,
        #     'atr_period': 20,
        #     'use_regression_exit': True, # Enable optional exit
        #     'printlog': False,
        # }
    )

    print("\n=== ROLLING BACKTEST RESULTS ===")
    print(df)

    stats = report_stats(df)
    plot_four_charts(df)
Pasted image 20250624203145.png

5. Conclusion

The PolynomialChannelBreakoutStrategy offers a sophisticated approach to trend trading by dynamically adapting to the non-linear nature of market price movements through polynomial regression. Its ability to identify strong breakouts from these adaptive channels, combined with a robust ATR-based trailing stop, provides a comprehensive framework for managing both entry and exit points. The optional regression line exit adds another layer of responsiveness to trend shifts. The rigorous use of both single and rolling backtests is crucial for evaluating such a complex strategy, offering deeper insights into its performance consistency and resilience across various market conditions. Further research could focus on optimizing the polynomial degree and channel_width parameters, or integrating additional filters to enhance profitability and reduce whipsaws in less trending environments.