This article presents a trading strategy implemented in Backtrader that uses quantile regression to construct dynamic price channels for breakout trading. The strategy identifies upper and lower price boundaries and a trend line, entering trades on confirmed breakouts and exiting on mean reversion or stop-loss triggers. It incorporates robust risk management and confidence-based filtering to enhance trade reliability.
The Quantile Regression Channel Breakout Strategy leverages the following components:
Below is the complete Backtrader code for the strategy, including the quantile regression implementation and rolling backtest function:
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.stats import norm
import dateutil.relativedelta as rd
# Set matplotlib figure size
plt.rcParams['figure.figsize'] = (12, 8)
class QuantileRegression:
"""Quantile Regression implementation for channel estimation"""
def __init__(self, tau=0.5):
self.tau = tau # Quantile level (0.5 = median)
def quantile_loss(self, y_true, y_pred):
"""Quantile loss function (pinball loss)"""
residual = y_true - y_pred
return np.mean(np.maximum(self.tau * residual, (self.tau - 1) * residual))
def fit(self, X, y):
"""Fit quantile regression using optimization"""
n_features = X.shape[1] if len(X.shape) > 1 else 1
# Initialize coefficients
initial_params = np.zeros(n_features + 1) # +1 for intercept
def objective(params):
"""Objective function to minimize"""
if len(X.shape) == 1:
y_pred = params[0] + params[1] * X
else:
y_pred = params[0] + np.dot(X, params[1:])
return self.quantile_loss(y, y_pred)
# Optimize
try:
result = minimize(objective, initial_params, method='L-BFGS-B')
self.coef_ = result.x
return self
except:
# Fallback to simple quantile
self.coef_ = np.array([np.quantile(y, self.tau), 0])
return self
def predict(self, X):
"""Predict using fitted model"""
if not hasattr(self, 'coef_'):
raise ValueError("Model must be fitted before prediction")
if len(X.shape) == 1:
return self.coef_[0] + self.coef_[1] * X
else:
return self.coef_[0] + np.dot(X, self.coef_[1:])
class QuantileChannelStrategy(bt.Strategy):
params = (
('lookback_period', 60), # Lookback for channel estimation
('upper_quantile', 0.8), # Upper channel quantile (80th percentile)
('lower_quantile', 0.2), # Lower channel quantile (20th percentile)
('trend_quantile', 0.5), # Trend line quantile (median)
('breakout_threshold', 1.02), # Breakout confirmation (2% above/below)
('stop_loss_pct', 0.08), # 8% stop loss
('rebalance_period', 1), # Daily rebalancing
('min_channel_width', 0.02), # Minimum 2% channel width
('volume_confirm', False), # Volume confirmation (if available)
)
def __init__(self):
# Price and time data
self.prices = []
self.time_indices = []
# Channel estimates
self.upper_channel = []
self.lower_channel = []
self.trend_line = []
self.channel_width = []
# Quantile regression models
self.upper_qr = QuantileRegression(tau=self.params.upper_quantile)
self.lower_qr = QuantileRegression(tau=self.params.lower_quantile)
self.trend_qr = QuantileRegression(tau=self.params.trend_quantile)
# Trading variables
self.rebalance_counter = 0
self.stop_price = 0
self.trade_count = 0
self.breakout_direction = 0 # 1=upper, -1=lower, 0=none
self.channel_confidence = 0
# Track breakouts
self.upper_breakouts = 0
self.lower_breakouts = 0
self.false_breakouts = 0
def estimate_channels(self):
"""Estimate quantile regression channels"""
if len(self.prices) < self.params.lookback_period:
return None, None, None, 0
# Get recent data
recent_prices = np.array(self.prices[-self.params.lookback_period:])
recent_times = np.array(self.time_indices[-self.params.lookback_period:])
# Normalize time for better numerical stability
time_normalized = (recent_times - recent_times[0]) / (recent_times[-1] - recent_times[0] + 1e-8)
try:
# Fit quantile regressions
self.upper_qr.fit(time_normalized, recent_prices)
self.lower_qr.fit(time_normalized, recent_prices)
self.trend_qr.fit(time_normalized, recent_prices)
# Predict current levels
current_time_norm = 1.0 # Current time (end of normalized period)
upper_level = self.upper_qr.predict(np.array([current_time_norm]))[0]
lower_level = self.lower_qr.predict(np.array([current_time_norm]))[0]
trend_level = self.trend_qr.predict(np.array([current_time_norm]))[0]
# Calculate channel width and confidence
channel_width = (upper_level - lower_level) / trend_level
# Ensure minimum channel width
if channel_width < self.params.min_channel_width:
mid_price = (upper_level + lower_level) / 2
half_width = mid_price * self.params.min_channel_width / 2
upper_level = mid_price + half_width
lower_level = mid_price - half_width
channel_width = self.params.min_channel_width
# Channel confidence based on data dispersion
price_std = np.std(recent_prices)
expected_width = 2 * price_std / np.mean(recent_prices) # 2-sigma as reference
confidence = min(1.0, expected_width / (channel_width + 1e-8))
return upper_level, lower_level, trend_level, confidence
except Exception as e:
# Fallback to simple quantiles
upper_level = np.quantile(recent_prices, self.params.upper_quantile)
lower_level = np.quantile(recent_prices, self.params.lower_quantile)
trend_level = np.quantile(recent_prices, self.params.trend_quantile)
confidence = 0.5
return upper_level, lower_level, trend_level, confidence
def detect_breakout(self, current_price, upper_channel, lower_channel):
"""Detect channel breakout with confirmation"""
breakout = 0
# Upper breakout
if current_price > upper_channel * self.params.breakout_threshold:
breakout = 1
self.upper_breakouts += 1
# Lower breakout
elif current_price < lower_channel / self.params.breakout_threshold:
breakout = -1
self.lower_breakouts += 1
return breakout
def next(self):
# Collect price and time data
current_price = self.data.close[0]
current_time = len(self.prices)
self.prices.append(current_price)
self.time_indices.append(current_time)
# Keep only recent history
if len(self.prices) > self.params.lookback_period * 2:
self.prices = self.prices[-self.params.lookback_period * 2:]
self.time_indices = self.time_indices[-self.params.lookback_period * 2:]
# Estimate channels
upper_channel, lower_channel, trend_line, confidence = self.estimate_channels()
if upper_channel is None:
return # Not enough data yet
# Store channel estimates
self.upper_channel.append(upper_channel)
self.lower_channel.append(lower_channel)
self.trend_line.append(trend_line)
self.channel_confidence = confidence
# Calculate channel width
width = (upper_channel - lower_channel) / trend_line
self.channel_width.append(width)
# Rebalancing logic
self.rebalance_counter += 1
if self.rebalance_counter < self.params.rebalance_period:
# Check stop loss
if self.position.size > 0 and current_price <= self.stop_price:
self.close()
self.log(f'STOP LOSS - Long closed at {current_price:.2f}')
elif self.position.size < 0 and current_price >= self.stop_price:
self.close()
self.log(f'STOP LOSS - Short closed at {current_price:.2f}')
return
# Reset rebalance counter
self.rebalance_counter = 0
# Detect breakout
breakout = self.detect_breakout(current_price, upper_channel, lower_channel)
# Current position
current_pos = 0
if self.position.size > 0:
current_pos = 1
elif self.position.size < 0:
current_pos = -1
# Trading logic with channel confirmation
if breakout != 0 and confidence > 0.3: # Require minimum confidence
# Close existing position if direction changed
if current_pos != 0 and current_pos != breakout:
self.close()
current_pos = 0
# Open new position on breakout
if current_pos == 0:
if breakout == 1: # Upper breakout - go long
self.buy()
self.stop_price = lower_channel # Use lower channel as stop
self.trade_count += 1
self.breakout_direction = 1
self.log(f'UPPER BREAKOUT LONG - Price: {current_price:.2f}, '
f'Upper: {upper_channel:.2f}, Confidence: {confidence:.3f}')
elif breakout == -1: # Lower breakout - go short
self.sell()
self.stop_price = upper_channel # Use upper channel as stop
self.trade_count += 1
self.breakout_direction = -1
self.log(f'LOWER BREAKOUT SHORT - Price: {current_price:.2f}, '
f'Lower: {lower_channel:.2f}, Confidence: {confidence:.3f}')
# Exit on return to channel (mean reversion)
elif self.position.size != 0:
in_channel = lower_channel <= current_price <= upper_channel
if in_channel and abs(current_price - trend_line) / trend_line < 0.02:
self.close()
self.log(f'RETURN TO CHANNEL - Position closed at {current_price:.2f}')
# Update trailing stops
if self.position.size > 0: # Long position
new_stop = max(self.stop_price, lower_channel)
if new_stop > self.stop_price:
self.stop_price = new_stop
elif self.position.size < 0: # Short position
new_stop = min(self.stop_price, upper_channel)
if new_stop < self.stop_price:
self.stop_price = new_stop
def log(self, txt, dt=None):
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()}: {txt}')
def notify_order(self, order):
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'LONG EXECUTED - Price: {order.executed.price:.2f}')
elif order.issell():
if self.position.size == 0:
self.log(f'POSITION CLOSED - Price: {order.executed.price:.2f}')
else:
self.log(f'SHORT EXECUTED - Price: {order.executed.price:.2f}')
def notify_trade(self, trade):
if trade.isclosed:
self.log(f'TRADE CLOSED - PnL: {trade.pnl:.2f}')
# Check if this was a false breakout
if abs(trade.pnl) < abs(trade.size * trade.price * 0.01): # Less than 1% profit
self.false_breakouts += 1
def stop(self):
print(f'\n=== QUANTILE REGRESSION CHANNEL BREAKOUT RESULTS ===')
print(f'Total Trades: {self.trade_count}')
print(f'Upper Breakouts: {self.upper_breakouts}')
print(f'Lower Breakouts: {self.lower_breakouts}')
print(f'False Breakouts: {self.false_breakouts}')
if len(self.channel_width) > 0:
avg_width = np.mean(self.channel_width)
width_std = np.std(self.channel_width)
print(f'Average Channel Width: {avg_width:.3f}')
print(f'Channel Width Std: {width_std:.3f}')
print(f'Final Confidence: {self.channel_confidence:.3f}')
if len(self.upper_channel) > 0:
print(f'Final Upper Channel: {self.upper_channel[-1]:.2f}')
print(f'Final Lower Channel: {self.lower_channel[-1]:.2f}')
print(f'Final Trend Line: {self.trend_line[-1]:.2f}')
success_rate = 1 - (self.false_breakouts / max(1, self.trade_count))
print(f'Breakout Success Rate: {success_rate:.2%}')
def run_rolling_backtest(
ticker="SOL-USD",
start="2020-01-01",
end="2025-01-01",
window_months=12,
strategy_params=None
):
strategy_params = strategy_params or {}
all_results = []
start_dt = pd.to_datetime(start)
end_dt = pd.to_datetime(end)
current_start = start_dt
while True:
current_end = current_start + rd.relativedelta(months=window_months)
if current_end > end_dt:
break
print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")
data = yf.download(ticker, start=current_start, end=current_end, progress=False)
if data.empty or len(data) < 90:
print("Not enough data.")
current_start += rd.relativedelta(months=window_months)
continue
data = data.droplevel(1, 1) if isinstance(data.columns, pd.MultiIndex) else data
feed = bt.feeds.PandasData(dataname=data)
cerebro = bt.Cerebro()
cerebro.addstrategy(QuantileChannelStrategy, **strategy_params)
cerebro.adddata(feed)
cerebro.broker.setcash(100000)
cerebro.broker.setcommission(commission=0.001)
cerebro.addsizer(bt.sizers.PercentSizer, percents=95)
start_val = cerebro.broker.getvalue()
cerebro.run()
final_val = cerebro.broker.getvalue()
ret = (final_val - start_val) / start_val * 100
all_results.append({
'start': current_start.date(),
'end': current_end.date(),
'return_pct': ret,
'final_value': final_val,
})
print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
current_start += rd.relativedelta(months=window_months)
return pd.DataFrame(all_results)The QuantileRegression class implements quantile
regression to estimate price channels. It uses the pinball loss function
for a specified quantile \(\tau\)
(e.g., 0.8 for upper channel):
\[\text{Loss}=\frac{1}{n}\sum_{i=1}^n\max(\tau(y_i-\hat{y}_i),(\tau-1)(y_i-\hat{y}_i))\]
scipy.optimize.minimize with the L-BFGS-B method, fitting a
linear model to time-normalized price data. Falls back to simple
quantiles if optimization fails.The strategy constructs channels and manages trades:
Channel Estimation
(estimate_channels):
Breakout Detection
(detect_breakout):
Trading Logic (next):
rebalance_period=1).Performance Tracking (stop):
The run_rolling_backtest function tests the strategy on
SOL-USD from 2020 to 2025 in 12-month windows:
yfinance.lookback_period, breakout_threshold, or
stop_loss_pct.volume_confirm to filter breakouts with volume surges.upper_quantile and lower_quantile based on
volatility.run_rolling_backtest to test across multiple tickers.This strategy is suited for volatile assets like cryptocurrencies and can be backtested to evaluate performance across different markets and timeframes.