← Back to Home
Market Structure Rotation Breakout Strategy with Volatility-Weighted Portfolio Allocation

Market Structure Rotation Breakout Strategy with Volatility-Weighted Portfolio Allocation

Strategy Description:

The strategy rotates across several crypto assets using three simple layers:

1. Market structure signal

Lower high + lower low = downtrend.

Then:

Close > Last Swing High = bullish structure break

So the entry idea is:

Buy only after price breaks above resistance following a downtrend.

2. Position sizing

Capital is split across active assets using inverse volatility:

\[w_i=\frac{1/\sigma_i}{\sum_{j=1}^{n}1/\sigma_j}\]

Meaning:

Lower volatility asset → larger weight
Higher volatility asset → smaller weight

3. Risk management

Each open position uses an ATR trailing stop:

\[Stop=Highest\ Price\ Since\ Entry-3\times ATR\]

Meaning:

As price rises, the stop moves up.
If price closes below the stop, the position exits.

Python Backtesting Script

import math
import backtrader as bt
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from collections import deque


class SwingPoint(bt.Indicator):
    lines = ("swing_high", "swing_low")
    params = (("period", 10),)
    plotinfo = dict(subplot=False)

    def __init__(self):
        self.addminperiod(self.p.period * 2 + 1)

    def next(self):
        p = self.p.period

        center_high = self.data.high[-p]
        center_low = self.data.low[-p]

        highs = [self.data.high[-i] for i in range(p * 2 + 1)]
        lows = [self.data.low[-i] for i in range(p * 2 + 1)]

        if center_high == max(highs):
            self.lines.swing_high[0] = center_high
        else:
            self.lines.swing_high[0] = float("nan")

        if center_low == min(lows):
            self.lines.swing_low[0] = center_low
        else:
            self.lines.swing_low[0] = float("nan")


class MultiAssetMSSInvVolATR(bt.Strategy):
    params = dict(
        swing_period=10,
        atr_period=14,
        atr_stop_multiplier=3.0,
        vol_lookback=30,
        rebalance_days=7,
        max_gross_exposure=1.00,
    )

    def __init__(self):
        self.ind = {}
        self.weights = {}
        self.last_rebalance = -999
        self.order_refs = {}
        self.stop_price = {}
        self.highest_since_entry = {}
        self.swing_highs = {}
        self.swing_lows = {}

        self.date_history = []
        self.value_history = []
        self.cash_history = []
        self.asset_value_history = {d._name: [] for d in self.datas}

        for d in self.datas:
            swing = SwingPoint(d, period=self.p.swing_period)
            atr = bt.ind.ATR(d, period=self.p.atr_period)

            self.ind[d] = dict(swing=swing, atr=atr)
            self.weights[d] = 0.0
            self.stop_price[d] = None
            self.highest_since_entry[d] = None
            self.swing_highs[d] = deque(maxlen=2)
            self.swing_lows[d] = deque(maxlen=2)

    def next(self):
        for d in self.datas:
            swing = self.ind[d]["swing"]

            if pd.notna(swing.swing_high[0]):
                self.swing_highs[d].append(float(swing.swing_high[0]))

            if pd.notna(swing.swing_low[0]):
                self.swing_lows[d].append(float(swing.swing_low[0]))

            if d in self.order_refs:
                continue

            pos = self.getposition(d)

            if pos.size > 0:
                atr_dist = self.ind[d]["atr"][0] * self.p.atr_stop_multiplier
                self.highest_since_entry[d] = max(self.highest_since_entry[d], d.high[0])
                new_stop = self.highest_since_entry[d] - atr_dist
                self.stop_price[d] = max(self.stop_price[d], new_stop)

                if d.close[0] < self.stop_price[d]:
                    self.order_refs[d] = self.close(data=d)

        if len(self) - self.last_rebalance >= self.p.rebalance_days:
            self.last_rebalance = len(self)

            active = []

            for d in self.datas:
                if d in self.order_refs:
                    continue

                pos = self.getposition(d)

                if len(self.swing_highs[d]) < 2 or len(self.swing_lows[d]) < 2:
                    continue

                prior_high, last_high = self.swing_highs[d][0], self.swing_highs[d][1]
                prior_low, last_low = self.swing_lows[d][0], self.swing_lows[d][1]

                downtrend = last_high < prior_high and last_low < prior_low
                long_signal = downtrend and d.close[0] > last_high

                if pos.size > 0 or long_signal:
                    active.append(d)

            vols = {}

            for d in active:
                rets = []

                for i in range(1, self.p.vol_lookback + 1):
                    if d.close[-i] == 0:
                        continue

                    r = d.close[-i + 1] / d.close[-i] - 1
                    rets.append(r)

                if len(rets) >= 10:
                    vol = np.std(rets)

                    if vol > 0 and np.isfinite(vol):
                        vols[d] = vol

            for d in self.datas:
                self.weights[d] = 0.0

            inv_sum = sum(1 / v for v in vols.values())

            if inv_sum > 0:
                for d, vol in vols.items():
                    self.weights[d] = (1 / vol) / inv_sum

            equity = self.broker.getvalue()

            for d in self.datas:
                if d in self.order_refs:
                    continue

                price = d.close[0]

                if price <= 0:
                    continue

                target_value = equity * self.weights[d] * self.p.max_gross_exposure
                target_size = math.floor(target_value / price)
                current_size = self.getposition(d).size
                delta = target_size - current_size

                if delta > 0:
                    self.order_refs[d] = self.buy(data=d, size=delta)

                elif delta < 0:
                    self.order_refs[d] = self.sell(data=d, size=abs(delta))

        self.date_history.append(self.datas[0].datetime.datetime(0))
        self.value_history.append(self.broker.getvalue())
        self.cash_history.append(self.broker.getcash())

        for d in self.datas:
            pos = self.getposition(d)
            self.asset_value_history[d._name].append(max(0.0, pos.size * d.close[0]))

    def notify_order(self, order):
        d = order.data

        if order.status in [order.Submitted, order.Accepted]:
            return

        if d in self.order_refs and order.ref == self.order_refs[d].ref:
            del self.order_refs[d]

        if order.status == order.Completed:
            pos = self.getposition(d)

            if pos.size > 0:
                atr_dist = self.ind[d]["atr"][0] * self.p.atr_stop_multiplier
                self.highest_since_entry[d] = d.high[0]
                self.stop_price[d] = d.high[0] - atr_dist

            elif pos.size == 0:
                self.highest_since_entry[d] = None
                self.stop_price[d] = None
                self.weights[d] = 0.0


cerebro = bt.Cerebro()

assets = [
    "BTC-USD", "ETH-USD", "BNB-USD", "SOL-USD",
    "XRP-USD", "ADA-USD", "DOGE-USD",
]

START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
INIT_CASH = 100_000

min_bars = max(
    MultiAssetMSSInvVolATR.params.swing_period * 2 + 1,
    MultiAssetMSSInvVolATR.params.atr_period,
    MultiAssetMSSInvVolATR.params.vol_lookback,
) + 10

raw_data = {}

for t in assets:
    df = yf.download(t, start=START_DATE, end=END_DATE, interval="1d", progress=False).droplevel(1, 1)
    df.dropna(inplace=True)

    if df.empty or len(df) < min_bars:
        print(f"Skipping {t}: only {len(df)} bars.")
        continue

    raw_data[t] = df.copy()
    data = bt.feeds.PandasData(dataname=df, name=t)
    cerebro.adddata(data)

cerebro.addstrategy(MultiAssetMSSInvVolATR)

cerebro.broker.setcash(INIT_CASH)
cerebro.broker.setcommission(commission=0.001)
cerebro.broker.set_slippage_perc(perc=0.0005)

cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe")
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="dd")
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades")

start_value = cerebro.broker.getvalue()
results = cerebro.run()
strat = results[0]
end_value = cerebro.broker.getvalue()

print(f"Start Value: {start_value:,.2f}")
print(f"End Value:   {end_value:,.2f}")
print(f"Return:      {(end_value / start_value - 1) * 100:.2f}%")
print(f"Sharpe:      {strat.analyzers.sharpe.get_analysis()}")
print(f"DrawDown:    {strat.analyzers.dd.get_analysis()}")
print(f"Trades:      {strat.analyzers.trades.get_analysis()}")

eq = pd.Series(strat.value_history, index=pd.to_datetime(strat.date_history), name="Strategy")
asset_values = pd.DataFrame(strat.asset_value_history, index=eq.index)

asset_values = asset_values.clip(lower=0)
cash_plot = (eq - asset_values.sum(axis=1)).clip(lower=0)

bench_close = pd.DataFrame(index=eq.index)

for t, df in raw_data.items():
    bench_close[t] = df["Close"].reindex(eq.index, method="ffill")

bench_ret = bench_close.pct_change().fillna(0.0)
equal_bh = INIT_CASH * (1 + bench_ret.mean(axis=1)).cumprod()

btc_close = bench_close["BTC-USD"].dropna()
btc_bh = INIT_CASH * (btc_close / btc_close.iloc[0])
btc_bh = btc_bh.reindex(eq.index, method="ffill")

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 9), sharex=True)

ax1.plot(eq.index, eq.values, color="black", linewidth=2, label="MSS InvVol ATR")
ax1.plot(equal_bh.index, equal_bh.values, linestyle="--", label="Equal-Weight Buy & Hold")
ax1.plot(btc_bh.index, btc_bh.values, linestyle="--", label="BTC Buy & Hold")

pf_ret = eq.iloc[-1] / eq.iloc[0] - 1
ew_ret = equal_bh.iloc[-1] / equal_bh.iloc[0] - 1
btc_ret = btc_bh.iloc[-1] / btc_bh.iloc[0] - 1

x_last = eq.index[-1]

ax1.annotate(f"PF: {pf_ret * 100:.2f}%", xy=(x_last, eq.iloc[-1]), xytext=(8, 0),
             textcoords="offset points", va="center", ha="left", color="black", clip_on=False)

ax1.annotate(f"EW: {ew_ret * 100:.2f}%", xy=(x_last, equal_bh.iloc[-1]), xytext=(8, 0),
             textcoords="offset points", va="center", ha="left", clip_on=False)

ax1.annotate(f"BTC: {btc_ret * 100:.2f}%", xy=(x_last, btc_bh.iloc[-1]), xytext=(8, 0),
             textcoords="offset points", va="center", ha="left", clip_on=False)

ax1.grid(True, alpha=0.3)
ax1.legend()

labels = ["Cash"] + list(asset_values.columns)
colors = ["#d3d3d3"] + list(plt.cm.tab10.colors)[:len(asset_values.columns)]

ax2.stackplot(
    eq.index,
    cash_plot,
    *[asset_values[col] for col in asset_values.columns],
    labels=labels,
    colors=colors,
    alpha=0.8,
)

ax2.set_title("Portfolio Allocation Breakdown")
ax2.set_ylabel("Value ($)")
ax2.grid(True, alpha=0.3)
ax2.legend(loc="upper left", bbox_to_anchor=(1.0, 1), frameon=True)

plt.tight_layout()
plt.show()