← Back to Home
Quantile Regression Channel Breakout Trading Strategy with a Rolling Backtest

Quantile Regression Channel Breakout Trading Strategy with a Rolling Backtest

This article presents a trading strategy implemented in Backtrader that uses quantile regression to construct dynamic price channels for breakout trading. The strategy identifies upper and lower price boundaries and a trend line, entering trades on confirmed breakouts and exiting on mean reversion or stop-loss triggers. It incorporates robust risk management and confidence-based filtering to enhance trade reliability.

Strategy Overview

The Quantile Regression Channel Breakout Strategy leverages the following components:

Code Implementation

Below is the complete Backtrader code for the strategy, including the quantile regression implementation and rolling backtest function:

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.stats import norm
import dateutil.relativedelta as rd

# Set matplotlib figure size
plt.rcParams['figure.figsize'] = (12, 8)

class QuantileRegression:
    """Quantile Regression implementation for channel estimation"""
    
    def __init__(self, tau=0.5):
        self.tau = tau  # Quantile level (0.5 = median)
        
    def quantile_loss(self, y_true, y_pred):
        """Quantile loss function (pinball loss)"""
        residual = y_true - y_pred
        return np.mean(np.maximum(self.tau * residual, (self.tau - 1) * residual))
    
    def fit(self, X, y):
        """Fit quantile regression using optimization"""
        n_features = X.shape[1] if len(X.shape) > 1 else 1
        
        # Initialize coefficients
        initial_params = np.zeros(n_features + 1)  # +1 for intercept
        
        def objective(params):
            """Objective function to minimize"""
            if len(X.shape) == 1:
                y_pred = params[0] + params[1] * X
            else:
                y_pred = params[0] + np.dot(X, params[1:])
            return self.quantile_loss(y, y_pred)
        
        # Optimize
        try:
            result = minimize(objective, initial_params, method='L-BFGS-B')
            self.coef_ = result.x
            return self
        except:
            # Fallback to simple quantile
            self.coef_ = np.array([np.quantile(y, self.tau), 0])
            return self
    
    def predict(self, X):
        """Predict using fitted model"""
        if not hasattr(self, 'coef_'):
            raise ValueError("Model must be fitted before prediction")
            
        if len(X.shape) == 1:
            return self.coef_[0] + self.coef_[1] * X
        else:
            return self.coef_[0] + np.dot(X, self.coef_[1:])

class QuantileChannelStrategy(bt.Strategy):
    params = (
        ('lookback_period', 60),      # Lookback for channel estimation
        ('upper_quantile', 0.8),      # Upper channel quantile (80th percentile)
        ('lower_quantile', 0.2),      # Lower channel quantile (20th percentile)
        ('trend_quantile', 0.5),      # Trend line quantile (median)
        ('breakout_threshold', 1.02), # Breakout confirmation (2% above/below)
        ('stop_loss_pct', 0.08),      # 8% stop loss
        ('rebalance_period', 1),      # Daily rebalancing
        ('min_channel_width', 0.02),  # Minimum 2% channel width
        ('volume_confirm', False),    # Volume confirmation (if available)
    )
    
    def __init__(self):
        # Price and time data
        self.prices = []
        self.time_indices = []
        
        # Channel estimates
        self.upper_channel = []
        self.lower_channel = []
        self.trend_line = []
        self.channel_width = []
        
        # Quantile regression models
        self.upper_qr = QuantileRegression(tau=self.params.upper_quantile)
        self.lower_qr = QuantileRegression(tau=self.params.lower_quantile)
        self.trend_qr = QuantileRegression(tau=self.params.trend_quantile)
        
        # Trading variables
        self.rebalance_counter = 0
        self.stop_price = 0
        self.trade_count = 0
        self.breakout_direction = 0  # 1=upper, -1=lower, 0=none
        self.channel_confidence = 0
        
        # Track breakouts
        self.upper_breakouts = 0
        self.lower_breakouts = 0
        self.false_breakouts = 0
        
    def estimate_channels(self):
        """Estimate quantile regression channels"""
        if len(self.prices) < self.params.lookback_period:
            return None, None, None, 0
        
        # Get recent data
        recent_prices = np.array(self.prices[-self.params.lookback_period:])
        recent_times = np.array(self.time_indices[-self.params.lookback_period:])
        
        # Normalize time for better numerical stability
        time_normalized = (recent_times - recent_times[0]) / (recent_times[-1] - recent_times[0] + 1e-8)
        
        try:
            # Fit quantile regressions
            self.upper_qr.fit(time_normalized, recent_prices)
            self.lower_qr.fit(time_normalized, recent_prices)
            self.trend_qr.fit(time_normalized, recent_prices)
            
            # Predict current levels
            current_time_norm = 1.0  # Current time (end of normalized period)
            
            upper_level = self.upper_qr.predict(np.array([current_time_norm]))[0]
            lower_level = self.lower_qr.predict(np.array([current_time_norm]))[0]
            trend_level = self.trend_qr.predict(np.array([current_time_norm]))[0]
            
            # Calculate channel width and confidence
            channel_width = (upper_level - lower_level) / trend_level
            
            # Ensure minimum channel width
            if channel_width < self.params.min_channel_width:
                mid_price = (upper_level + lower_level) / 2
                half_width = mid_price * self.params.min_channel_width / 2
                upper_level = mid_price + half_width
                lower_level = mid_price - half_width
                channel_width = self.params.min_channel_width
            
            # Channel confidence based on data dispersion
            price_std = np.std(recent_prices)
            expected_width = 2 * price_std / np.mean(recent_prices)  # 2-sigma as reference
            confidence = min(1.0, expected_width / (channel_width + 1e-8))
            
            return upper_level, lower_level, trend_level, confidence
            
        except Exception as e:
            # Fallback to simple quantiles
            upper_level = np.quantile(recent_prices, self.params.upper_quantile)
            lower_level = np.quantile(recent_prices, self.params.lower_quantile)
            trend_level = np.quantile(recent_prices, self.params.trend_quantile)
            confidence = 0.5
            
            return upper_level, lower_level, trend_level, confidence
    
    def detect_breakout(self, current_price, upper_channel, lower_channel):
        """Detect channel breakout with confirmation"""
        breakout = 0
        
        # Upper breakout
        if current_price > upper_channel * self.params.breakout_threshold:
            breakout = 1
            self.upper_breakouts += 1
            
        # Lower breakout  
        elif current_price < lower_channel / self.params.breakout_threshold:
            breakout = -1
            self.lower_breakouts += 1
            
        return breakout
    
    def next(self):
        # Collect price and time data
        current_price = self.data.close[0]
        current_time = len(self.prices)
        
        self.prices.append(current_price)
        self.time_indices.append(current_time)
        
        # Keep only recent history
        if len(self.prices) > self.params.lookback_period * 2:
            self.prices = self.prices[-self.params.lookback_period * 2:]
            self.time_indices = self.time_indices[-self.params.lookback_period * 2:]
        
        # Estimate channels
        upper_channel, lower_channel, trend_line, confidence = self.estimate_channels()
        
        if upper_channel is None:
            return  # Not enough data yet
        
        # Store channel estimates
        self.upper_channel.append(upper_channel)
        self.lower_channel.append(lower_channel)
        self.trend_line.append(trend_line)
        self.channel_confidence = confidence
        
        # Calculate channel width
        width = (upper_channel - lower_channel) / trend_line
        self.channel_width.append(width)
        
        # Rebalancing logic
        self.rebalance_counter += 1
        if self.rebalance_counter < self.params.rebalance_period:
            # Check stop loss
            if self.position.size > 0 and current_price <= self.stop_price:
                self.close()
                self.log(f'STOP LOSS - Long closed at {current_price:.2f}')
            elif self.position.size < 0 and current_price >= self.stop_price:
                self.close()  
                self.log(f'STOP LOSS - Short closed at {current_price:.2f}')
            return
        
        # Reset rebalance counter
        self.rebalance_counter = 0
        
        # Detect breakout
        breakout = self.detect_breakout(current_price, upper_channel, lower_channel)
        
        # Current position
        current_pos = 0
        if self.position.size > 0:
            current_pos = 1
        elif self.position.size < 0:
            current_pos = -1
        
        # Trading logic with channel confirmation
        if breakout != 0 and confidence > 0.3:  # Require minimum confidence
            # Close existing position if direction changed
            if current_pos != 0 and current_pos != breakout:
                self.close()
                current_pos = 0
            
            # Open new position on breakout
            if current_pos == 0:
                if breakout == 1:  # Upper breakout - go long
                    self.buy()
                    self.stop_price = lower_channel  # Use lower channel as stop
                    self.trade_count += 1
                    self.breakout_direction = 1
                    self.log(f'UPPER BREAKOUT LONG - Price: {current_price:.2f}, '
                            f'Upper: {upper_channel:.2f}, Confidence: {confidence:.3f}')
                    
                elif breakout == -1:  # Lower breakout - go short
                    self.sell()
                    self.stop_price = upper_channel  # Use upper channel as stop
                    self.trade_count += 1
                    self.breakout_direction = -1
                    self.log(f'LOWER BREAKOUT SHORT - Price: {current_price:.2f}, '
                            f'Lower: {lower_channel:.2f}, Confidence: {confidence:.3f}')
        
        # Exit on return to channel (mean reversion)
        elif self.position.size != 0:
            in_channel = lower_channel <= current_price <= upper_channel
            
            if in_channel and abs(current_price - trend_line) / trend_line < 0.02:
                self.close()
                self.log(f'RETURN TO CHANNEL - Position closed at {current_price:.2f}')
        
        # Update trailing stops
        if self.position.size > 0:  # Long position
            new_stop = max(self.stop_price, lower_channel)
            if new_stop > self.stop_price:
                self.stop_price = new_stop
                
        elif self.position.size < 0:  # Short position
            new_stop = min(self.stop_price, upper_channel)
            if new_stop < self.stop_price:
                self.stop_price = new_stop
    
    def log(self, txt, dt=None):
        dt = dt or self.datas[0].datetime.date(0)
        print(f'{dt.isoformat()}: {txt}')

    def notify_order(self, order):
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log(f'LONG EXECUTED - Price: {order.executed.price:.2f}')
            elif order.issell():
                if self.position.size == 0:
                    self.log(f'POSITION CLOSED - Price: {order.executed.price:.2f}')
                else:
                    self.log(f'SHORT EXECUTED - Price: {order.executed.price:.2f}')

    def notify_trade(self, trade):
        if trade.isclosed:
            self.log(f'TRADE CLOSED - PnL: {trade.pnl:.2f}')
            # Check if this was a false breakout
            if abs(trade.pnl) < abs(trade.size * trade.price * 0.01):  # Less than 1% profit
                self.false_breakouts += 1
    
    def stop(self):
        print(f'\n=== QUANTILE REGRESSION CHANNEL BREAKOUT RESULTS ===')
        print(f'Total Trades: {self.trade_count}')
        print(f'Upper Breakouts: {self.upper_breakouts}')
        print(f'Lower Breakouts: {self.lower_breakouts}')
        print(f'False Breakouts: {self.false_breakouts}')
        
        if len(self.channel_width) > 0:
            avg_width = np.mean(self.channel_width)
            width_std = np.std(self.channel_width)
            print(f'Average Channel Width: {avg_width:.3f}')
            print(f'Channel Width Std: {width_std:.3f}')
            print(f'Final Confidence: {self.channel_confidence:.3f}')
            
        if len(self.upper_channel) > 0:
            print(f'Final Upper Channel: {self.upper_channel[-1]:.2f}')
            print(f'Final Lower Channel: {self.lower_channel[-1]:.2f}')
            print(f'Final Trend Line: {self.trend_line[-1]:.2f}')
            
        success_rate = 1 - (self.false_breakouts / max(1, self.trade_count))
        print(f'Breakout Success Rate: {success_rate:.2%}')

def run_rolling_backtest(
    ticker="SOL-USD",
    start="2020-01-01",
    end="2025-01-01",
    window_months=12,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        data = yf.download(ticker, start=current_start, end=current_end, progress=False)
        if data.empty or len(data) < 90:
            print("Not enough data.")
            current_start += rd.relativedelta(months=window_months)
            continue

        data = data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        cerebro.addstrategy(QuantileChannelStrategy, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000)
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

        start_val = cerebro.broker.getvalue()
        cerebro.run()
        final_val = cerebro.broker.getvalue()
        ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': ret,
            'final_value': final_val,
        })

        print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
        current_start += rd.relativedelta(months=window_months)

    return pd.DataFrame(all_results)

Strategy Explanation

1. QuantileRegression Class

The QuantileRegression class implements quantile regression to estimate price channels. It uses the pinball loss function for a specified quantile \(\tau\) (e.g., 0.8 for upper channel):

\[\text{Loss}=\frac{1}{n}\sum_{i=1}^n\max(\tau(y_i-\hat{y}_i),(\tau-1)(y_i-\hat{y}_i))\]

2. QuantileChannelStrategy

The strategy constructs channels and manages trades:

3. Rolling Backtest

The run_rolling_backtest function tests the strategy on SOL-USD from 2020 to 2025 in 12-month windows:

Key Features

Pasted image 20250714072049.png
Pasted image 20250714072055.png

Potential Improvements

This strategy is suited for volatile assets like cryptocurrencies and can be backtested to evaluate performance across different markets and timeframes.