Dispersion Trading Strategy
The dispersion trading strategy exploits periods when an equity index and its constituents diverge. When the average pairwise correlation between index members drops below a threshold, the strategy buys the individual stocks and shorts the index, betting that correlations will mean‑revert.
Daily candles feed a rolling correlation window. If correlations recover above the threshold, all positions are closed. A minimum trade value is enforced to avoid tiny orders.
Details
- Universe: One index security plus its constituent stocks.
- Signal: Open a dispersion trade when the average correlation of constituents is below
CorrThreshold. - Rebalance: Correlation checked every day.
- Positioning: Long constituents and short the index while the signal is active.
- Parameters:
Constituents– list of component securities.LookbackDays– window size for correlation calculation.CorrThreshold– correlation level that triggers trades.MinTradeUsd– minimum order value in USD.CandleType– timeframe for candles (default: 1 day).
- Note: The example omits transaction costs and assumes equal weighting.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Dispersion trading strategy.
/// Trades an equity index against its constituent securities when the average correlation falls below a threshold.
/// </summary>
public class DispersionTradingStrategy : Strategy
{
private readonly StrategyParam<IEnumerable<Security>> _constituents;
private readonly StrategyParam<int> _lookbackDays;
private readonly StrategyParam<decimal> _corrThreshold;
private readonly StrategyParam<decimal> _minTradeUsd;
private readonly StrategyParam<DataType> _candleType;
private readonly Dictionary<Security, RollingWindow<decimal>> _windows = [];
private readonly Dictionary<Security, decimal> _latestPrices = [];
private DateTime _lastDay = DateTime.MinValue;
private bool _open;
/// <summary>
/// Securities representing index constituents.
/// </summary>
public IEnumerable<Security> Constituents
{
get => _constituents.Value;
set => _constituents.Value = value;
}
/// <summary>
/// Number of days used for correlation calculation.
/// </summary>
public int LookbackDays
{
get => _lookbackDays.Value;
set => _lookbackDays.Value = value;
}
/// <summary>
/// Correlation threshold for opening dispersion.
/// </summary>
public decimal CorrThreshold
{
get => _corrThreshold.Value;
set => _corrThreshold.Value = value;
}
/// <summary>
/// Minimum trade value in USD.
/// </summary>
public decimal MinTradeUsd
{
get => _minTradeUsd.Value;
set => _minTradeUsd.Value = value;
}
/// <summary>
/// Candle type used for analysis.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="DispersionTradingStrategy"/>.
/// </summary>
public DispersionTradingStrategy()
{
_constituents = Param<IEnumerable<Security>>(nameof(Constituents), [])
.SetDisplay("Constituents", "Index constituent securities", "General");
_lookbackDays = Param(nameof(LookbackDays), 60)
.SetDisplay("Lookback Days", "Days for rolling correlation", "Parameters");
_corrThreshold = Param(nameof(CorrThreshold), 0.4m)
.SetDisplay("Correlation Threshold", "Average correlation threshold", "Parameters");
_minTradeUsd = Param(nameof(MinTradeUsd), 100m)
.SetDisplay("Minimum Trade USD", "Minimal order value", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Time frame for analysis", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return Constituents.Append(Security).Select(s => (s, CandleType));
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_windows.Clear();
_latestPrices.Clear();
_lastDay = default;
_open = default;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("IndexSec is not set.");
if (Constituents == null || !Constituents.Any())
throw new InvalidOperationException("Constituents collection is empty.");
foreach (var (sec, dt) in GetWorkingSecurities())
{
_windows[sec] = new RollingWindow<decimal>(LookbackDays + 1);
SubscribeCandles(dt, true, sec)
.Bind(c => ProcessCandle(c, sec))
.Start();
}
}
private void ProcessCandle(ICandleMessage candle, Security security)
{
// Skip unfinished candles.
if (candle.State != CandleStates.Finished)
return;
// Store the latest closing price for this security.
_latestPrices[security] = candle.ClosePrice;
_windows[security].Add(candle.ClosePrice);
var day = candle.OpenTime.Date;
if (day == _lastDay)
return;
_lastDay = day;
if (_windows.Values.Any(w => !w.IsFull()))
return;
// Daily check after windows are full.
EvaluateSignal();
}
private void EvaluateSignal()
{
var indexRet = Returns(_windows[Security]);
var corrs = new List<decimal>();
foreach (var s in Constituents)
corrs.Add(Corr(Returns(_windows[s]), indexRet));
var avg = corrs.Average();
if (avg < CorrThreshold && !_open)
OpenDispersion();
else if (avg >= CorrThreshold && _open)
CloseAll();
}
private void OpenDispersion()
{
var count = Constituents.Count();
var portfolioValue = Portfolio.CurrentValue ?? 0m;
var capLeg = portfolioValue * 0.5m;
var eachLong = capLeg / count;
foreach (var s in Constituents)
{
var price = GetLatestPrice(s);
if (price > 0)
TradeToTarget(s, eachLong / price);
}
var indexPrice = GetLatestPrice(Security);
if (indexPrice > 0)
TradeToTarget(Security, -capLeg / indexPrice); // short index
_open = true;
LogInfo("Opened dispersion spread");
}
private void CloseAll()
{
foreach (var position in Positions)
TradeToTarget(position.Security, 0m);
_open = false;
LogInfo("Closed dispersion spread");
}
private decimal GetLatestPrice(Security security)
{
return _latestPrices.TryGetValue(security, out var price) ? price : 0m;
}
#region Helper math / trading
private decimal[] Returns(RollingWindow<decimal> win)
{
var arr = win.ToArray();
var r = new decimal[arr.Length - 1];
for (var i = 1; i < arr.Length; i++)
r[i - 1] = (arr[i] - arr[i - 1]) / arr[i - 1];
return r;
}
private decimal Corr(decimal[] x, decimal[] y)
{
var n = Math.Min(x.Length, y.Length);
var meanX = x.Take(n).Average();
var meanY = y.Take(n).Average();
decimal num = 0, dx = 0, dy = 0;
for (var i = 0; i < n; i++)
{
var a = x[i] - meanX;
var b = y[i] - meanY;
num += a * b;
dx += a * a;
dy += b * b;
}
return dx > 0 && dy > 0 ? num / (decimal)Math.Sqrt((double)(dx * dy)) : 0m;
}
private void TradeToTarget(Security s, decimal tgtQty)
{
var diff = tgtQty - PositionBy(s);
var price = GetLatestPrice(s);
if (price <= 0 || Math.Abs(diff) * price < MinTradeUsd)
return;
RegisterOrder(new Order
{
Security = s,
Portfolio = Portfolio,
Side = diff > 0 ? Sides.Buy : Sides.Sell,
Volume = Math.Abs(diff),
Type = OrderTypes.Market,
Comment = "Dispersion"
});
}
private decimal PositionBy(Security s) => GetPositionValue(s, Portfolio) ?? 0;
#endregion
#region RollingWindow
private class RollingWindow<T>
{
private readonly Queue<T> _queue = [];
private readonly int _size;
public RollingWindow(int size)
{
_size = size;
}
public void Add(T value)
{
if (_queue.Count == _size)
_queue.Dequeue();
_queue.Enqueue(value);
}
public bool IsFull() => _queue.Count == _size;
public T Last() => _queue.Last();
public T[] ToArray() => [.. _queue];
}
#endregion
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import DateTime, TimeSpan, Math, Array
from System.Collections.Generic import IEnumerable
from StockSharp.Messages import DataType, CandleStates, Sides, OrderTypes
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Order, Security
from datatype_extensions import *
class dispersion_trading_strategy(Strategy):
"""Dispersion trading strategy.
Trades an equity index against its constituent securities when the average correlation falls below a threshold.
"""
def __init__(self):
super(dispersion_trading_strategy, self).__init__()
self._constituents = self.Param[IEnumerable[Security]]("Constituents", None) \
.SetDisplay("Constituents", "Index constituent securities", "General")
self._lookback_days = self.Param("LookbackDays", 60) \
.SetDisplay("Lookback Days", "Days for rolling correlation", "Parameters")
self._corr_threshold = self.Param("CorrThreshold", 0.4) \
.SetDisplay("Correlation Threshold", "Average correlation threshold", "Parameters")
self._min_trade_usd = self.Param("MinTradeUsd", 100.0) \
.SetDisplay("Minimum Trade USD", "Minimal order value", "Risk")
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Time frame for analysis", "General")
self._windows = {}
self._latest_prices = {}
self._last_day = DateTime.MinValue
self._open = False
# region Properties
@property
def Constituents(self):
return self._constituents.Value
@Constituents.setter
def Constituents(self, value):
self._constituents.Value = value
@property
def LookbackDays(self):
return self._lookback_days.Value
@LookbackDays.setter
def LookbackDays(self, value):
self._lookback_days.Value = value
@property
def CorrThreshold(self):
return self._corr_threshold.Value
@CorrThreshold.setter
def CorrThreshold(self, value):
self._corr_threshold.Value = value
@property
def MinTradeUsd(self):
return self._min_trade_usd.Value
@MinTradeUsd.setter
def MinTradeUsd(self, value):
self._min_trade_usd.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
# endregion
def GetWorkingSecurities(self):
constituents_list = list(self.Constituents) if self.Constituents is not None else []
securities = constituents_list + [self.Security]
return [(s, self.CandleType) for s in securities]
def OnReseted(self):
super(dispersion_trading_strategy, self).OnReseted()
self._windows.clear()
self._latest_prices.clear()
self._last_day = DateTime.MinValue
self._open = False
def OnStarted2(self, time):
super(dispersion_trading_strategy, self).OnStarted2(time)
if self.Security is None:
raise Exception("IndexSec is not set.")
constituents_list = list(self.Constituents) if self.Constituents is not None else []
if len(constituents_list) == 0:
raise Exception("Constituents collection is empty.")
for sec, dt in self.GetWorkingSecurities():
self._windows[sec] = RollingWindow(self.LookbackDays + 1)
self.SubscribeCandles(dt, True, sec) \
.Bind(lambda candle, security=sec: self._process_candle(candle, security)) \
.Start()
def _process_candle(self, candle, security):
if candle.State != CandleStates.Finished:
return
self._latest_prices[security] = float(candle.ClosePrice)
self._windows[security].add(float(candle.ClosePrice))
day = candle.OpenTime.Date
if day == self._last_day:
return
self._last_day = day
if not all(w.is_full() for w in self._windows.values()):
return
self._evaluate_signal()
def _evaluate_signal(self):
index_returns = _returns(self._windows[self.Security])
constituents_list = list(self.Constituents) if self.Constituents is not None else []
corrs = []
for s in constituents_list:
corrs.append(_corr(_returns(self._windows[s]), index_returns))
if len(corrs) == 0:
return
avg = sum(corrs) / len(corrs)
if avg < self.CorrThreshold and not self._open:
self._open_dispersion()
elif avg >= self.CorrThreshold and self._open:
self._close_all()
def _open_dispersion(self):
constituents_list = list(self.Constituents) if self.Constituents is not None else []
count = len(constituents_list)
if count == 0:
return
portfolio_value = self.Portfolio.CurrentValue if self.Portfolio.CurrentValue is not None else 0.0
cap_leg = float(portfolio_value) * 0.5
each_long = cap_leg / count
for s in constituents_list:
price = self._get_latest_price(s)
if price > 0:
self._trade_to_target(s, each_long / price)
index_price = self._get_latest_price(self.Security)
if index_price > 0:
self._trade_to_target(self.Security, -cap_leg / index_price)
self._open = True
self.LogInfo("Opened dispersion spread")
def _close_all(self):
for position in self.Positions:
self._trade_to_target(position.Security, 0.0)
self._open = False
self.LogInfo("Closed dispersion spread")
def _get_latest_price(self, security):
return self._latest_prices.get(security, 0.0)
def _trade_to_target(self, sec, tgt):
diff = tgt - self._position_by(sec)
price = self._get_latest_price(sec)
if price <= 0 or abs(diff) * price < self.MinTradeUsd:
return
order = Order()
order.Security = sec
order.Portfolio = self.Portfolio
order.Side = Sides.Buy if diff > 0 else Sides.Sell
order.Volume = abs(diff)
order.Type = OrderTypes.Market
order.Comment = "Dispersion"
self.RegisterOrder(order)
def _position_by(self, sec):
val = self.GetPositionValue(sec, self.Portfolio)
return float(val) if val is not None else 0.0
def CreateClone(self):
return dispersion_trading_strategy()
class RollingWindow:
"""Rolling window of fixed size, mimicking the C# RollingWindow inner class."""
def __init__(self, size):
self._size = size
self._queue = []
def add(self, value):
if len(self._queue) == self._size:
self._queue.pop(0)
self._queue.append(value)
def is_full(self):
return len(self._queue) == self._size
def last(self):
return self._queue[-1]
def to_array(self):
return list(self._queue)
def _returns(win):
"""Compute simple returns from a rolling window of prices."""
arr = win.to_array()
r = []
for i in range(1, len(arr)):
r.append((arr[i] - arr[i - 1]) / arr[i - 1])
return r
def _corr(x, y):
"""Compute Pearson correlation between two return arrays."""
n = min(len(x), len(y))
if n == 0:
return 0.0
mx = sum(x[:n]) / n
my = sum(y[:n]) / n
num = 0.0
dx = 0.0
dy = 0.0
for i in range(n):
a = x[i] - mx
b = y[i] - my
num += a * b
dx += a * a
dy += b * b
if dx <= 0 or dy <= 0:
return 0.0
return num / math.sqrt(dx * dy)