Understanding the current market environment, or “regime,” is crucial for effective trading and investment strategies. Markets constantly shift between periods of strong trends, consolidation, and high volatility. This article explores how Empirical Mode Decomposition (EMD) can be leveraged to effectively identify these distinct market regimes using Bitcoin price data.
EMD is a powerful, data-driven signal processing technique used to decompose complex, non-linear, and non-stationary signals into a finite and often small number of Intrinsic Mode Functions (IMFs). Each IMF represents a simple oscillatory mode, with the last IMF typically representing the overall trend of the data. This decomposition allows us to isolate different frequency components, from high-frequency noise to long-term trends, providing a granular view of market movements.
EMD breaks a signal \(x(t)\) into:
\[ x(t) = \sum_{i=1}^{n} \text{IMF}_i(t) + r_n(t) \]
Each IMF must:
To extract one IMF:
Identify local maxima/minima of \(x(t)\).
Interpolate:
Compute the mean envelope:
\[ m(t) = \frac{e_{\text{upper}}(t) + e_{\text{lower}}(t)}{2} \]
Subtract:
\[ h(t) = x(t) - m(t) \]
Check if \(h(t)\) is an IMF. If not, repeat steps 1–4 on \(h(t)\) (sifting).
Once an IMF is obtained, subtract it from \(x(t)\) to get the residue:
\[ r(t) = x(t) - \text{IMF}_1(t) \]
Repeat the process on \(r(t)\) to extract the next IMF.
In the end you get:
\[ x(t) = \text{IMF}_1(t) + \text{IMF}_2(t) + \cdots + \text{IMF}_n(t) + r_n(t) \]
EMD is often used in time-frequency analysis, particularly in Hilbert-Huang Transform (HHT).
We’ll apply EMD to historical Bitcoin (BTC-USD) daily closing prices.
First, we download Bitcoin historical data using
yfinance.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from PyEMD import EMD
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
# Set style for better plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def download_bitcoin_data(start_date='2018-01-01', end_date='2025-01-01'):
"""Download Bitcoin price data from Yahoo Finance"""
# Auto_adjust=False and droplevel for consistent data handling as per user preference.
btc = yf.download('BTC-USD', start=start_date, end=end_date, auto_adjust=False).droplevel(1, 1)
return btc
btc_data = download_bitcoin_data('2018-01-01', '2025-01-01')
prices = btc_data['Close'].dropna()
dates = prices.indexThe PyEMD library is used to perform the decomposition.
Each IMF captures oscillations at a different frequency scale.
def perform_emd(price_series):
"""Perform Empirical Mode Decomposition on price series"""
emd = EMD()
price_array = price_series.dropna().values
imfs = emd(price_array)
return imfs, price_array
imfs, price_array = perform_emd(prices)
print(f"EMD decomposed the signal into {len(imfs)} components (IMFs)")The output shows the number of IMFs extracted, ranging from high-frequency (noise) to low-frequency (trend).
We define market regimes (Bull, Bear, Sideways, High Volatility) based on two key EMD-derived indicators:
Thresholds for these indicators are dynamically calculated using percentiles to classify the market state.
def detect_market_regimes(imfs, price_data, window=30):
"""Detect market regimes based on EMD components - SIMPLIFIED VERSION"""
trend = imfs[-1]
if len(imfs) > 1:
trend_enhanced = imfs[-1] + imfs[-2] # Combine trend + one low-frequency component
else:
trend_enhanced = trend
trend_slope = np.gradient(trend_enhanced)
volatility = np.abs(imfs[0])
trend_smooth = pd.Series(trend_slope).rolling(window=window).mean().values
vol_smooth = pd.Series(volatility).rolling(window=window).mean().values
regimes = np.ones(len(trend)) # Default to sideways
trend_thresh_pos = np.nanpercentile(trend_smooth, 70)
trend_thresh_neg = np.nanpercentile(trend_smooth, 30)
vol_thresh = np.nanpercentile(vol_smooth, 60)
for i in range(len(regimes)):
if not (np.isnan(trend_smooth[i]) or np.isnan(vol_smooth[i])):
if trend_smooth[i] > trend_thresh_pos and vol_smooth[i] < vol_thresh:
regimes[i] = 2 # Bull market
elif trend_smooth[i] < trend_thresh_neg and vol_smooth[i] < vol_thresh:
regimes[i] = 0 # Bear market
elif vol_smooth[i] > vol_thresh:
regimes[i] = 3 # High volatility
else:
regimes[i] = 1 # Sideways
return regimes, trend_smooth, vol_smooth, trend_enhanced
regimes, trend_smooth, vol_smooth, trend_enhanced = detect_market_regimes(imfs, price_array)Visualizations clearly show the Bitcoin price colored by detected regimes, the individual IMFs representing different market cycles, and the enhanced trend. Statistical analysis provides insights into the performance and characteristics of each regime, such as average daily returns, volatility, and duration.
# Align dates with decomposed data
aligned_dates = dates[-len(price_array):]
def plot_emd_analysis(price_data, imfs, regimes, dates, trend_smooth, vol_smooth, trend_enhanced):
"""Create comprehensive plots for EMD analysis"""
fig, axes = plt.subplots(4, 2, figsize=(20, 16))
fig.suptitle('Bitcoin Market Regime Detection using EMD - SIMPLIFIED & IMPROVED', fontsize=16, fontweight='bold')
regime_colors = {0: 'red', 1: 'gray', 2: 'green', 3: 'orange'}
regime_labels = {0: 'Bear Market', 1: 'Sideways', 2: 'Bull Market', 3: 'High Volatility'}
# Plot 1: Original price with regimes
ax1 = axes[0, 0]
ax1.plot(dates, price_data, 'k-', alpha=0.7, linewidth=1)
for regime in [0, 1, 2, 3]:
mask = regimes == regime
if np.any(mask):
ax1.scatter(dates[mask], price_data[mask], c=regime_colors[regime], alpha=0.6, s=10, label=regime_labels[regime])
ax1.set_title('Bitcoin Price with Market Regimes')
ax1.set_ylabel('Price (USD)')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Original vs Enhanced trend
ax2 = axes[0, 1]
ax2.plot(dates, imfs[-1], 'b-', linewidth=2, label='Original Trend (too smooth)', alpha=0.7)
ax2.plot(dates, trend_enhanced, 'r-', linewidth=2, label='Enhanced Trend (more responsive)')
ax2.set_title('Trend Comparison: Original vs Enhanced')
ax2.set_ylabel('Trend Value')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: High frequency IMFs
ax3 = axes[1, 0]
for i in range(min(3, len(imfs)-1)):
ax3.plot(dates, imfs[i], alpha=0.7, label=f'IMF {i+1}')
ax3.set_title('High Frequency IMFs (Market Noise)')
ax3.set_ylabel('IMF Value')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Plot 4: Medium frequency IMFs
ax4 = axes[1, 1]
start_idx = min(3, len(imfs)-2)
end_idx = min(6, len(imfs)-1)
for i in range(start_idx, end_idx):
if i < len(imfs)-1:
ax4.plot(dates, imfs[i], alpha=0.7, label=f'IMF {i+1}')
ax4.set_title('Medium Frequency IMFs (Market Cycles)')
ax4.set_ylabel('IMF Value')
ax4.legend()
ax4.grid(True, alpha=0.3)
# Plot 5: Trend slope
ax5 = axes[2, 0]
ax5.plot(dates, trend_smooth, 'g-', linewidth=2, label='Trend Slope (Enhanced)')
ax5.axhline(y=0, color='k', linestyle='--', alpha=0.5)
ax5.set_title('Market Trend Direction (From Enhanced Trend)')
ax5.set_ylabel('Trend Slope')
ax5.legend()
ax5.grid(True, alpha=0.3)
# Plot 6: Volatility
ax6 = axes[2, 1]
vol_smooth_series = pd.Series(np.abs(imfs[0])).rolling(window=30).mean().values
ax6.plot(dates, vol_smooth_series, 'r-', linewidth=2, label='Smoothed Volatility')
ax6.set_title('Market Volatility (EMD-based)')
ax6.set_ylabel('Volatility')
ax6.legend()
ax6.grid(True, alpha=0.3)
# Plot 7: Regime distribution
ax7 = axes[3, 0]
regime_counts = pd.Series(regimes).value_counts().sort_index()
colors = [regime_colors[i] for i in regime_counts.index]
labels = [regime_labels[i] for i in regime_counts.index]
ax7.pie(regime_counts.values, labels=labels, colors=colors, autopct='%1.1f%%')
ax7.set_title('Market Regime Distribution')
# Plot 8: Cumulative returns by regime
ax8 = axes[3, 1]
returns = np.diff(np.log(price_data))
for regime in [0, 1, 2, 3]:
if regime in regimes:
regime_mask = regimes[1:] == regime
if np.any(regime_mask):
regime_returns = returns[regime_mask]
cumulative_returns = np.cumsum(regime_returns)
ax8.plot(cumulative_returns, color=regime_colors[regime],
label=f'{regime_labels[regime]} (μ: {np.mean(regime_returns):.4f})',
alpha=0.8, linewidth=2)
ax8.set_title('Cumulative Returns by Market Regime')
ax8.set_ylabel('Cumulative Log Returns')
ax8.set_xlabel('Time Steps in Regime')
ax8.legend()
ax8.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
plot_emd_analysis(price_array, imfs, regimes, aligned_dates, trend_smooth, vol_smooth, trend_enhanced)
def analyze_regime_statistics(price_data, regimes, dates):
"""Analyze statistics for each market regime"""
returns = np.diff(np.log(price_data))
print("\nMarket Regime Analysis")
print("=" * 50)
regime_labels = {0: 'Bear Market', 1: 'Sideways', 2: 'Bull Market', 3: 'High Volatility'}
for regime in [0, 1, 2, 3]:
if regime in regimes:
regime_mask = regimes[1:] == regime
if np.any(regime_mask):
regime_returns = returns[regime_mask]
regime_duration = np.sum(regime_mask)
print(f"\n{regime_labels[regime]}:")
print(f" Duration: {regime_duration} days ({regime_duration/len(regimes)*100:.1f}%)")
print(f" Mean Daily Return: {np.mean(regime_returns)*100:.3f}%")
print(f" Volatility (Std): {np.std(regime_returns)*100:.3f}%")
print(f" Sharpe Ratio: {np.mean(regime_returns)/np.std(regime_returns):.3f}")
print(f" Total Return: {np.sum(regime_returns)*100:.2f}%")
analyze_regime_statistics(price_array, regimes, aligned_dates)
print("\nEMD Component Analysis:")
print("=" * 30)
for i, imf in enumerate(imfs):
if i < len(imfs) - 1:
zero_crossings = np.sum(np.diff(np.sign(imf)) != 0)
if zero_crossings > 0:
period = len(imf) / (zero_crossings / 2)
print(f"IMF {i+1}: Average period ≈ {period:.1f} days")
else:
print(f"IMF {i+1}: No clear periodicity")
else:
print(f"IMF {i+1}: Trend component (non-oscillatory)")Empirical Mode Decomposition offers a robust framework for dissecting complex financial time series into understandable components. By analyzing these components, particularly the trend and high-frequency IMFs, we can effectively delineate distinct market regimes. This approach provides a powerful tool for traders and analysts to adapt their strategies to the prevailing market conditions, potentially leading to more informed decisions.