This article presents a trading strategy implemented in Backtrader that uses quantile regression to construct dynamic price channels for breakout trading. The strategy identifies upper and lower price boundaries and a trend line, entering trades on confirmed breakouts and exiting on mean reversion or stop-loss triggers. It incorporates robust risk management and confidence-based filtering to enhance trade reliability.
The Quantile Regression Channel Breakout Strategy leverages the following components:
Below is the complete Backtrader code for the strategy, including the quantile regression implementation and rolling backtest function:
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.stats import norm
import dateutil.relativedelta as rd
# Set matplotlib figure size
'figure.figsize'] = (12, 8)
plt.rcParams[
class QuantileRegression:
"""Quantile Regression implementation for channel estimation"""
def __init__(self, tau=0.5):
self.tau = tau # Quantile level (0.5 = median)
def quantile_loss(self, y_true, y_pred):
"""Quantile loss function (pinball loss)"""
= y_true - y_pred
residual return np.mean(np.maximum(self.tau * residual, (self.tau - 1) * residual))
def fit(self, X, y):
"""Fit quantile regression using optimization"""
= X.shape[1] if len(X.shape) > 1 else 1
n_features
# Initialize coefficients
= np.zeros(n_features + 1) # +1 for intercept
initial_params
def objective(params):
"""Objective function to minimize"""
if len(X.shape) == 1:
= params[0] + params[1] * X
y_pred else:
= params[0] + np.dot(X, params[1:])
y_pred return self.quantile_loss(y, y_pred)
# Optimize
try:
= minimize(objective, initial_params, method='L-BFGS-B')
result self.coef_ = result.x
return self
except:
# Fallback to simple quantile
self.coef_ = np.array([np.quantile(y, self.tau), 0])
return self
def predict(self, X):
"""Predict using fitted model"""
if not hasattr(self, 'coef_'):
raise ValueError("Model must be fitted before prediction")
if len(X.shape) == 1:
return self.coef_[0] + self.coef_[1] * X
else:
return self.coef_[0] + np.dot(X, self.coef_[1:])
class QuantileChannelStrategy(bt.Strategy):
= (
params 'lookback_period', 60), # Lookback for channel estimation
('upper_quantile', 0.8), # Upper channel quantile (80th percentile)
('lower_quantile', 0.2), # Lower channel quantile (20th percentile)
('trend_quantile', 0.5), # Trend line quantile (median)
('breakout_threshold', 1.02), # Breakout confirmation (2% above/below)
('stop_loss_pct', 0.08), # 8% stop loss
('rebalance_period', 1), # Daily rebalancing
('min_channel_width', 0.02), # Minimum 2% channel width
('volume_confirm', False), # Volume confirmation (if available)
(
)
def __init__(self):
# Price and time data
self.prices = []
self.time_indices = []
# Channel estimates
self.upper_channel = []
self.lower_channel = []
self.trend_line = []
self.channel_width = []
# Quantile regression models
self.upper_qr = QuantileRegression(tau=self.params.upper_quantile)
self.lower_qr = QuantileRegression(tau=self.params.lower_quantile)
self.trend_qr = QuantileRegression(tau=self.params.trend_quantile)
# Trading variables
self.rebalance_counter = 0
self.stop_price = 0
self.trade_count = 0
self.breakout_direction = 0 # 1=upper, -1=lower, 0=none
self.channel_confidence = 0
# Track breakouts
self.upper_breakouts = 0
self.lower_breakouts = 0
self.false_breakouts = 0
def estimate_channels(self):
"""Estimate quantile regression channels"""
if len(self.prices) < self.params.lookback_period:
return None, None, None, 0
# Get recent data
= np.array(self.prices[-self.params.lookback_period:])
recent_prices = np.array(self.time_indices[-self.params.lookback_period:])
recent_times
# Normalize time for better numerical stability
= (recent_times - recent_times[0]) / (recent_times[-1] - recent_times[0] + 1e-8)
time_normalized
try:
# Fit quantile regressions
self.upper_qr.fit(time_normalized, recent_prices)
self.lower_qr.fit(time_normalized, recent_prices)
self.trend_qr.fit(time_normalized, recent_prices)
# Predict current levels
= 1.0 # Current time (end of normalized period)
current_time_norm
= self.upper_qr.predict(np.array([current_time_norm]))[0]
upper_level = self.lower_qr.predict(np.array([current_time_norm]))[0]
lower_level = self.trend_qr.predict(np.array([current_time_norm]))[0]
trend_level
# Calculate channel width and confidence
= (upper_level - lower_level) / trend_level
channel_width
# Ensure minimum channel width
if channel_width < self.params.min_channel_width:
= (upper_level + lower_level) / 2
mid_price = mid_price * self.params.min_channel_width / 2
half_width = mid_price + half_width
upper_level = mid_price - half_width
lower_level = self.params.min_channel_width
channel_width
# Channel confidence based on data dispersion
= np.std(recent_prices)
price_std = 2 * price_std / np.mean(recent_prices) # 2-sigma as reference
expected_width = min(1.0, expected_width / (channel_width + 1e-8))
confidence
return upper_level, lower_level, trend_level, confidence
except Exception as e:
# Fallback to simple quantiles
= np.quantile(recent_prices, self.params.upper_quantile)
upper_level = np.quantile(recent_prices, self.params.lower_quantile)
lower_level = np.quantile(recent_prices, self.params.trend_quantile)
trend_level = 0.5
confidence
return upper_level, lower_level, trend_level, confidence
def detect_breakout(self, current_price, upper_channel, lower_channel):
"""Detect channel breakout with confirmation"""
= 0
breakout
# Upper breakout
if current_price > upper_channel * self.params.breakout_threshold:
= 1
breakout self.upper_breakouts += 1
# Lower breakout
elif current_price < lower_channel / self.params.breakout_threshold:
= -1
breakout self.lower_breakouts += 1
return breakout
def next(self):
# Collect price and time data
= self.data.close[0]
current_price = len(self.prices)
current_time
self.prices.append(current_price)
self.time_indices.append(current_time)
# Keep only recent history
if len(self.prices) > self.params.lookback_period * 2:
self.prices = self.prices[-self.params.lookback_period * 2:]
self.time_indices = self.time_indices[-self.params.lookback_period * 2:]
# Estimate channels
= self.estimate_channels()
upper_channel, lower_channel, trend_line, confidence
if upper_channel is None:
return # Not enough data yet
# Store channel estimates
self.upper_channel.append(upper_channel)
self.lower_channel.append(lower_channel)
self.trend_line.append(trend_line)
self.channel_confidence = confidence
# Calculate channel width
= (upper_channel - lower_channel) / trend_line
width self.channel_width.append(width)
# Rebalancing logic
self.rebalance_counter += 1
if self.rebalance_counter < self.params.rebalance_period:
# Check stop loss
if self.position.size > 0 and current_price <= self.stop_price:
self.close()
self.log(f'STOP LOSS - Long closed at {current_price:.2f}')
elif self.position.size < 0 and current_price >= self.stop_price:
self.close()
self.log(f'STOP LOSS - Short closed at {current_price:.2f}')
return
# Reset rebalance counter
self.rebalance_counter = 0
# Detect breakout
= self.detect_breakout(current_price, upper_channel, lower_channel)
breakout
# Current position
= 0
current_pos if self.position.size > 0:
= 1
current_pos elif self.position.size < 0:
= -1
current_pos
# Trading logic with channel confirmation
if breakout != 0 and confidence > 0.3: # Require minimum confidence
# Close existing position if direction changed
if current_pos != 0 and current_pos != breakout:
self.close()
= 0
current_pos
# Open new position on breakout
if current_pos == 0:
if breakout == 1: # Upper breakout - go long
self.buy()
self.stop_price = lower_channel # Use lower channel as stop
self.trade_count += 1
self.breakout_direction = 1
self.log(f'UPPER BREAKOUT LONG - Price: {current_price:.2f}, '
f'Upper: {upper_channel:.2f}, Confidence: {confidence:.3f}')
elif breakout == -1: # Lower breakout - go short
self.sell()
self.stop_price = upper_channel # Use upper channel as stop
self.trade_count += 1
self.breakout_direction = -1
self.log(f'LOWER BREAKOUT SHORT - Price: {current_price:.2f}, '
f'Lower: {lower_channel:.2f}, Confidence: {confidence:.3f}')
# Exit on return to channel (mean reversion)
elif self.position.size != 0:
= lower_channel <= current_price <= upper_channel
in_channel
if in_channel and abs(current_price - trend_line) / trend_line < 0.02:
self.close()
self.log(f'RETURN TO CHANNEL - Position closed at {current_price:.2f}')
# Update trailing stops
if self.position.size > 0: # Long position
= max(self.stop_price, lower_channel)
new_stop if new_stop > self.stop_price:
self.stop_price = new_stop
elif self.position.size < 0: # Short position
= min(self.stop_price, upper_channel)
new_stop if new_stop < self.stop_price:
self.stop_price = new_stop
def log(self, txt, dt=None):
= dt or self.datas[0].datetime.date(0)
dt print(f'{dt.isoformat()}: {txt}')
def notify_order(self, order):
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'LONG EXECUTED - Price: {order.executed.price:.2f}')
elif order.issell():
if self.position.size == 0:
self.log(f'POSITION CLOSED - Price: {order.executed.price:.2f}')
else:
self.log(f'SHORT EXECUTED - Price: {order.executed.price:.2f}')
def notify_trade(self, trade):
if trade.isclosed:
self.log(f'TRADE CLOSED - PnL: {trade.pnl:.2f}')
# Check if this was a false breakout
if abs(trade.pnl) < abs(trade.size * trade.price * 0.01): # Less than 1% profit
self.false_breakouts += 1
def stop(self):
print(f'\n=== QUANTILE REGRESSION CHANNEL BREAKOUT RESULTS ===')
print(f'Total Trades: {self.trade_count}')
print(f'Upper Breakouts: {self.upper_breakouts}')
print(f'Lower Breakouts: {self.lower_breakouts}')
print(f'False Breakouts: {self.false_breakouts}')
if len(self.channel_width) > 0:
= np.mean(self.channel_width)
avg_width = np.std(self.channel_width)
width_std print(f'Average Channel Width: {avg_width:.3f}')
print(f'Channel Width Std: {width_std:.3f}')
print(f'Final Confidence: {self.channel_confidence:.3f}')
if len(self.upper_channel) > 0:
print(f'Final Upper Channel: {self.upper_channel[-1]:.2f}')
print(f'Final Lower Channel: {self.lower_channel[-1]:.2f}')
print(f'Final Trend Line: {self.trend_line[-1]:.2f}')
= 1 - (self.false_breakouts / max(1, self.trade_count))
success_rate print(f'Breakout Success Rate: {success_rate:.2%}')
def run_rolling_backtest(
="SOL-USD",
ticker="2020-01-01",
start="2025-01-01",
end=12,
window_months=None
strategy_params
):= strategy_params or {}
strategy_params = []
all_results = pd.to_datetime(start)
start_dt = pd.to_datetime(end)
end_dt = start_dt
current_start
while True:
= current_start + rd.relativedelta(months=window_months)
current_end if current_end > end_dt:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
= yf.download(ticker, start=current_start, end=current_end, progress=False)
data if data.empty or len(data) < 90:
print("Not enough data.")
+= rd.relativedelta(months=window_months)
current_start continue
= data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data
data
= bt.feeds.PandasData(dataname=data)
feed = bt.Cerebro()
cerebro **strategy_params)
cerebro.addstrategy(QuantileChannelStrategy,
cerebro.adddata(feed)100000)
cerebro.broker.setcash(=0.001)
cerebro.broker.setcommission(commission=95)
cerebro.addsizer(bt.sizers.PercentSizer, percents
= cerebro.broker.getvalue()
start_val
cerebro.run()= cerebro.broker.getvalue()
final_val = (final_val - start_val) / start_val * 100
ret
all_results.append({'start': current_start.date(),
'end': current_end.date(),
'return_pct': ret,
'final_value': final_val,
})
print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
+= rd.relativedelta(months=window_months)
current_start
return pd.DataFrame(all_results)
The QuantileRegression
class implements quantile
regression to estimate price channels. It uses the pinball loss function
for a specified quantile \(\tau\)
(e.g., 0.8 for upper channel):
\[\text{Loss}=\frac{1}{n}\sum_{i=1}^n\max(\tau(y_i-\hat{y}_i),(\tau-1)(y_i-\hat{y}_i))\]
scipy.optimize.minimize
with the L-BFGS-B method, fitting a
linear model to time-normalized price data. Falls back to simple
quantiles if optimization fails.The strategy constructs channels and manages trades:
Channel Estimation
(estimate_channels
):
Breakout Detection
(detect_breakout
):
Trading Logic (next
):
rebalance_period=1
).Performance Tracking (stop
):
The run_rolling_backtest
function tests the strategy on
SOL-USD from 2020 to 2025 in 12-month windows:
yfinance
.lookback_period
, breakout_threshold
, or
stop_loss_pct
.volume_confirm
to filter breakouts with volume surges.upper_quantile
and lower_quantile
based on
volatility.run_rolling_backtest
to test across multiple tickers.This strategy is suited for volatile assets like cryptocurrencies and can be backtested to evaluate performance across different markets and timeframes.