← Back to Home
Keltner Channels and ADX Trend-Following Trading Strategy with Adaptive Stops A Rolling Backtest

Keltner Channels and ADX Trend-Following Trading Strategy with Adaptive Stops A Rolling Backtest

This article introduces a trading strategy that combines Keltner Channels for identifying price breakouts and the Average Directional Movement Index (ADX) for trend strength confirmation. A key enhancement is the implementation of dynamic, ATR-based trailing stops for robust risk management. The strategy is evaluated using a comprehensive rolling backtesting framework.

1. The Keltner + ADX Strategy Concept

The KeltnerADXStrategy aims to capture significant price movements when a strong trend is confirmed. It leverages two popular technical indicators:

Entry Logic:

Exit Logic:

2. The KeltnerChannels Indicator

The KeltnerChannels indicator is a prerequisite for the strategy. It calculates the middle band (EMA), upper band, and lower band.

import backtrader as bt

class KeltnerChannels(bt.Indicator):
    lines = ('middle', 'upper', 'lower',)
    params = (
        ('ema_period', 20),
        ('atr_period', 10),
        ('atr_mult', 2.0),
    )

    def __init__(self):
        self.ema = bt.indicators.EMA(self.data.close, period=self.p.ema_period)
        self.atr = bt.indicators.ATR(self.data, period=self.p.atr_period)

        self.lines.middle = self.ema
        self.lines.upper = self.ema + (self.atr * self.p.atr_mult)
        self.lines.lower = self.ema - (self.atr * self.p.atr_mult)

        # Plotting information for backtrader visuals
        self.plotinfo.plotname = 'Keltner Channels'
        self.plotinfo.subplot = False # Plot on the same subplot as the price
        self.plotlines.middle.subplot = False
        self.plotlines.upper.subplot = False
        self.plotlines.lower.subplot = False

Explanation of KeltnerChannels:

3. The KeltnerADXStrategy Implementation

import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
import dateutil.relativedelta as rd
import matplotlib.pyplot as plt
import seaborn as sns

# Make sure KeltnerChannels class is defined or imported above this
# from keltner_channels_indicator import KeltnerChannels

class KeltnerADXStrategy(bt.Strategy):
    """
    Improved Keltner + ADX Strategy with Dynamic Trailing Stops.
    """
    params = (
        ('kc_ema_period', 20),
        ('kc_atr_period', 10),
        ('kc_atr_mult', 2.0),
        ('dmi_period', 14),
        ('adx_threshold', 25),
        ('sl_atr_period', 14),
        ('sl_atr_mult', 3.0),
    )

    def __init__(self):
        # Initialize Keltner Channels indicator
        self.keltner = KeltnerChannels(
            self.data,
            ema_period=self.p.kc_ema_period,
            atr_period=self.p.kc_atr_period,
            atr_mult=self.p.kc_atr_mult
        )
        # Initialize Directional Movement Index (ADX, +DI, -DI)
        self.dmi = bt.indicators.DirectionalMovementIndex(
            self.data, period=self.p.dmi_period
        )
        # Initialize Average True Range for stop loss calculation
        self.atr = bt.indicators.AverageTrueRange(
            self.data, period=self.p.sl_atr_period
        )

        # Order tracking
        self.entry_order = None  # Tracks the current entry order
        self.sl_order = None     # Tracks the current stop loss order
        self.current_stop_price = None # Stores the current trailing stop price

    def log(self, txt, dt=None):
        dt = dt or self.datas[0].datetime.date(0)
        print(f'{dt.isoformat()} - {txt}')

    def notify_order(self, order):
        # Ignore submitted/accepted orders, wait for completion or failure
        if order.status in [order.Submitted, order.Accepted]:
            return

        # Handle completed orders
        if order.status == order.Completed:
            if order == self.entry_order: # Our entry order has filled
                if order.isbuy(): # Long entry
                    self.log(f'Long entry executed at {order.executed.price:.2f}')
                    # Calculate initial stop loss for long
                    initial_stop = self.data.close[0] - self.p.sl_atr_mult * self.atr[0]
                    self.current_stop_price = initial_stop
                    # Place a stop sell order
                    self.sl_order = self.sell(
                        exectype=bt.Order.Stop,
                        price=self.current_stop_price,
                        size=order.executed.size # Size matches the entry order
                    )
                    self.log(f'Initial stop loss for long at {self.current_stop_price:.2f}')
                elif order.issell(): # Short entry
                    self.log(f'Short entry executed at {order.executed.price:.2f}')
                    # Calculate initial stop loss for short
                    initial_stop = self.data.close[0] + self.p.sl_atr_mult * self.atr[0]
                    self.current_stop_price = initial_stop
                    # Place a stop buy order
                    self.sl_order = self.buy(
                        exectype=bt.Order.Stop,
                        price=self.current_stop_price,
                        size=abs(order.executed.size) # Size matches the entry order
                    )
                    self.log(f'Initial stop loss for short at {self.current_stop_price:.2f}')
                self.entry_order = None # Clear entry order reference

            else: # Must be our stop loss order that filled
                if order.exectype == bt.Order.Stop:
                    self.log('Stop order executed – trade closed')
                    self.sl_order = None # Clear stop order reference
                    self.current_stop_price = None # Clear current stop price

        # Handle failed orders (canceled, rejected, margin)
        elif order.status in [order.Canceled, order.Rejected, order.Margin]:
            self.log(f'Order {order.getstatusname()}')
            if order == self.entry_order:
                self.entry_order = None
            elif order == self.sl_order:
                self.sl_order = None # If SL fails, we should re-evaluate or manage manually

    def next(self):
        # Prevent new orders if an entry order is pending or a stop order is alive
        if self.entry_order or (self.sl_order and self.sl_order.alive()):
            return

        # Check if indicators have enough data to be valid
        if len(self.data) < max(self.p.kc_ema_period, self.p.kc_atr_period, self.p.dmi_period, self.p.sl_atr_period) + 1:
            return # Not enough data for indicators to be stable

        # Entry Logic
        if not self.position: # We are currently flat (no position)
            # Long Entry Condition: Price > Upper Keltner, ADX > Threshold, +DI > -DI
            if (self.data.close[0] > self.keltner.l.upper[0] and
                self.dmi.adx[0] > self.p.adx_threshold and
                self.dmi.plusDI[0] > self.dmi.minusDI[0]):
                self.log(f'Long entry signal at {self.data.close[0]:.2f}')
                self.entry_order = self.buy() # Place a buy order
            # Short Entry Condition: Price < Lower Keltner, ADX > Threshold, -DI > +DI
            elif (self.data.close[0] < self.keltner.l.lower[0] and
                  self.dmi.adx[0] > self.p.adx_threshold and
                  self.dmi.minusDI[0] > self.dmi.plusDI[0]):
                self.log(f'Short entry signal at {self.data.close[0]:.2f}')
                self.entry_order = self.sell() # Place a sell (short) order

        # Position Management (Trailing Stop and Keltner Exit)
        else: # We have an open position
            if self.position.size > 0: # Currently long
                # Calculate new candidate trailing stop price
                candidate_stop = self.data.close[0] - self.p.sl_atr_mult * self.atr[0]
                
                # Update stop if it moves favorably (upwards)
                if self.current_stop_price is None or candidate_stop > self.current_stop_price:
                    self.current_stop_price = candidate_stop
                    self.log(f'Updating long trailing stop to {self.current_stop_price:.2f}')
                    # Cancel existing stop order if active and place a new one at the updated price
                    if self.sl_order and self.sl_order.alive():
                        self.cancel(self.sl_order)
                    self.sl_order = self.sell(
                        exectype=bt.Order.Stop,
                        price=self.current_stop_price,
                        size=self.position.size
                    )
                
                # Additional Exit Condition for Long: Price crosses below Lower Keltner Channel
                if self.data.close[0] < self.keltner.l.lower[0]:
                    self.log(f'Long exit signal (channel cross) at {self.data.close[0]:.2f}')
                    self.close() # Close the current position

            elif self.position.size < 0: # Currently short
                # Calculate new candidate trailing stop price
                candidate_stop = self.data.close[0] + self.p.sl_atr_mult * self.atr[0]
                
                # Update stop if it moves favorably (downwards)
                if self.current_stop_price is None or candidate_stop < self.current_stop_price:
                    self.current_stop_price = candidate_stop
                    self.log(f'Updating short trailing stop to {self.current_stop_price:.2f}')
                    # Cancel existing stop order if active and place a new one at the updated price
                    if self.sl_order and self.sl_order.alive():
                        self.cancel(self.sl_order)
                    self.sl_order = self.buy(
                        exectype=bt.Order.Stop,
                        price=self.current_stop_price,
                        size=abs(self.position.size)
                    )
                
                # Additional Exit Condition for Short: Price crosses above Upper Keltner Channel
                if self.data.close[0] > self.keltner.l.upper[0]:
                    self.log(f'Short exit signal (channel cross) at {self.data.close[0]:.2f}')
                    self.close() # Close the current position

    def stop(self):
        self.log(f'Final Portfolio Value: {self.broker.getvalue():.2f}')

Explanation of KeltnerADXStrategy:

4. Rolling Backtesting Framework

To assess the strategy’s robustness across different market conditions, a rolling backtest is employed. This method runs the strategy over multiple, sequential time windows, providing insights into its consistent performance rather than just a single historical period.

# ... (imports and strategy/KeltnerChannels definitions as above) ...

def run_rolling_backtest(
    ticker="BTC-USD",
    start="2018-01-01",
    end="2025-12-31",
    window_months=3,
    strategy_params=None
):
    strategy_params = strategy_params or {}
    all_results = []
    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)
    current_start = start_dt

    while True:
        current_end = current_start + rd.relativedelta(months=window_months)
        if current_end > end_dt:
            # Ensure the last window doesn't exceed the end date
            current_end = end_dt
            if current_start >= current_end: # No valid period left
                break

        print(f"\nROLLING BACKTEST: {current_start.date()} to {current_end.date()}")

        # Data download using yfinance, respecting the user's preference for auto_adjust=False and droplevel
        # Using the saved preference: yfinance download with auto_adjust=False and droplevel(axis=1, level=1)
        data = yf.download(ticker, start=current_start, end=current_end, auto_adjust=False, progress=False)
        
        # Apply droplevel if data is a MultiIndex, as per user's preference
        if isinstance(data.columns, pd.MultiIndex):
            data = data.droplevel(1, axis=1)

        # Check for sufficient data after droplevel
        if data.empty or len(data) < 90: # Ensure enough data for indicators to warm up
            print(f"Not enough data for period {current_start.date()} to {current_end.date()}. Skipping.")
            current_start += rd.relativedelta(months=window_months)
            if current_start >= end_dt: # If moving to next window makes us pass overall end
                 break
            continue

        feed = bt.feeds.PandasData(dataname=data)
        cerebro = bt.Cerebro()
        cerebro.addstrategy(strategy, **strategy_params)
        cerebro.adddata(feed)
        cerebro.broker.setcash(100000)
        cerebro.broker.setcommission(commission=0.001)
        cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

        start_val = cerebro.broker.getvalue()
        cerebro.run() # Run the backtest for the current window
        final_val = cerebro.broker.getvalue()
        ret = (final_val - start_val) / start_val * 100

        all_results.append({
            'start': current_start.date(),
            'end': current_end.date(),
            'return_pct': ret,
            'final_value': final_val,
        })

        print(f"Return: {ret:.2f}% | Final Value: {final_val:.2f}")
        
        # Increment current_start for the next window
        # If current_end reached overall end_dt, break
        if current_end == end_dt:
            break
        current_start = current_end
        
    return pd.DataFrame(all_results)

Explanation of run_rolling_backtest:

5. Reporting and Visualization

# ... (all code as above) ...

def report_stats(df):
    returns = df['return_pct']
    stats = {
        'Mean Return %': np.mean(returns),
        'Median Return %': np.median(returns),
        'Std Dev %': np.std(returns),
        'Min Return %': np.min(returns),
        'Max Return %': np.max(returns),
        'Sharpe Ratio': np.mean(returns) / np.std(returns) if np.std(returns) > 0 else np.nan
    }
    print("\n=== ROLLING BACKTEST STATISTICS ===")
    for k, v in stats.items():
        print(f"{k}: {v:.2f}")
    return stats

def plot_return_distribution(df):
    sns.set(style="whitegrid")
    plt.figure(figsize=(10, 5))
    sns.histplot(df['return_pct'], bins=20, kde=True, color='dodgerblue')
    plt.axvline(df['return_pct'].mean(), color='black', linestyle='--', label='Mean')
    plt.title('Rolling Backtest Return Distribution')
    plt.xlabel('Return %')
    plt.ylabel('Frequency')
    plt.legend()
    plt.tight_layout()
    plt.show()

def plot_four_charts(df, rolling_sharpe_window=4):
    """
    Generates four analytical plots for rolling backtest results.
    """
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8)) # Increased figsize for better readability
    
    # Calculate period numbers (0, 1, 2, 3, ...)
    periods = list(range(len(df)))
    returns = df['return_pct']
    
    # 1. Period Returns (Top Left)
    colors = ['green' if r >= 0 else 'red' for r in returns]
    ax1.bar(periods, returns, color=colors, alpha=0.7)
    ax1.set_title('Period Returns', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Period')
    ax1.set_ylabel('Return %')
    ax1.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    ax1.grid(True, alpha=0.3)
    
    # 2. Cumulative Returns (Top Right)
    cumulative_returns = (1 + returns / 100).cumprod() * 100 - 100
    ax2.plot(periods, cumulative_returns, marker='o', linewidth=2, markersize=4, color='blue')
    ax2.set_title('Cumulative Returns', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Period')
    ax2.set_ylabel('Cumulative Return %')
    ax2.grid(True, alpha=0.3)
    
    # 3. Rolling Sharpe Ratio (Bottom Left)
    rolling_sharpe = returns.rolling(window=rolling_sharpe_window).apply(
        lambda x: x.mean() / x.std() if x.std() > 0 else np.nan, raw=False
    )
    # Only plot where we have valid rolling calculations
    valid_mask = ~rolling_sharpe.isna()
    valid_periods = [i for i, valid in enumerate(valid_mask) if valid]
    valid_sharpe = rolling_sharpe[valid_mask]
    
    ax3.plot(valid_periods, valid_sharpe, marker='o', linewidth=2, markersize=4, color='orange')
    ax3.axhline(y=0, color='red', linestyle='--', alpha=0.5)
    ax3.set_title(f'Rolling Sharpe Ratio ({rolling_sharpe_window}-period)', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Period')
    ax3.set_ylabel('Sharpe Ratio')
    ax3.grid(True, alpha=0.3)
    
    # 4. Return Distribution (Bottom Right)
    bins = min(15, max(5, len(returns)//2))
    ax4.hist(returns, bins=bins, alpha=0.7, color='steelblue', edgecolor='black')
    mean_return = returns.mean()
    ax4.axvline(mean_return, color='red', linestyle='--', linewidth=2, 
                label=f'Mean: {mean_return:.2f}%')
    ax4.set_title('Return Distribution', fontsize=14, fontweight='bold')
    ax4.set_xlabel('Return %')
    ax4.set_ylabel('Frequency')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    df = run_rolling_backtest(ticker="AAPL", start="2010-01-01", end="2025-06-20", window_months=6)

    print("\n=== ROLLING BACKTEST RESULTS ===")
    print(df)

    stats = report_stats(df)
    plot_four_charts(df)
Pasted image 20250620154617.png

Explanation of Reporting and Visualization:

6. Conclusion

The Keltner + ADX strategy offers a method for trend-following that combines breakout signals with trend strength confirmation. The dynamic ATR-based trailing stop is a significant feature, providing adaptive risk management by moving the stop loss to protect profits as the trade progresses. The rolling backtest framework is an indispensable tool for thoroughly evaluating such a strategy, allowing for the analysis of performance consistency and resilience across various market phases. By examining the detailed statistics and visualizations, traders can gain deeper insights into the strategy’s potential and areas for further optimization, such as fine-tuning the Keltner Channel and ADX parameters or exploring alternative exit conditions.