离散交易策略
离散交易策略利用指数与成分股之间的分化。当成分股之间的平均相关性低于阈值时,策略买入这些股票并做空指数,押注相关性回归。
策略基于日K线维护滚动相关性窗口。当相关性回到阈值以上时,所有仓位被关闭。最小交易金额限制用于避免过小的订单。
细节
- 投资范围:一个股指及其成分股。
- 信号:当平均相关性低于
CorrThreshold时开仓。 - 再平衡:每天检查相关性。
- 仓位:信号存在期间做多成分股、做空指数。
- 参数:
Constituents– 指数成分列表。LookbackDays– 相关性计算窗口长度。CorrThreshold– 触发交易的相关性阈值。MinTradeUsd– 最小订单金额(美元)。CandleType– 使用的K线周期(默认1天)。
- 注意:示例未考虑交易成本,并假设等权分配。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Dispersion trading strategy.
/// Trades an equity index against its constituent securities when the average correlation falls below a threshold.
/// </summary>
public class DispersionTradingStrategy : Strategy
{
private readonly StrategyParam<IEnumerable<Security>> _constituents;
private readonly StrategyParam<int> _lookbackDays;
private readonly StrategyParam<decimal> _corrThreshold;
private readonly StrategyParam<decimal> _minTradeUsd;
private readonly StrategyParam<DataType> _candleType;
private readonly Dictionary<Security, RollingWindow<decimal>> _windows = [];
private readonly Dictionary<Security, decimal> _latestPrices = [];
private DateTime _lastDay = DateTime.MinValue;
private bool _open;
/// <summary>
/// Securities representing index constituents.
/// </summary>
public IEnumerable<Security> Constituents
{
get => _constituents.Value;
set => _constituents.Value = value;
}
/// <summary>
/// Number of days used for correlation calculation.
/// </summary>
public int LookbackDays
{
get => _lookbackDays.Value;
set => _lookbackDays.Value = value;
}
/// <summary>
/// Correlation threshold for opening dispersion.
/// </summary>
public decimal CorrThreshold
{
get => _corrThreshold.Value;
set => _corrThreshold.Value = value;
}
/// <summary>
/// Minimum trade value in USD.
/// </summary>
public decimal MinTradeUsd
{
get => _minTradeUsd.Value;
set => _minTradeUsd.Value = value;
}
/// <summary>
/// Candle type used for analysis.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="DispersionTradingStrategy"/>.
/// </summary>
public DispersionTradingStrategy()
{
_constituents = Param<IEnumerable<Security>>(nameof(Constituents), [])
.SetDisplay("Constituents", "Index constituent securities", "General");
_lookbackDays = Param(nameof(LookbackDays), 60)
.SetDisplay("Lookback Days", "Days for rolling correlation", "Parameters");
_corrThreshold = Param(nameof(CorrThreshold), 0.4m)
.SetDisplay("Correlation Threshold", "Average correlation threshold", "Parameters");
_minTradeUsd = Param(nameof(MinTradeUsd), 100m)
.SetDisplay("Minimum Trade USD", "Minimal order value", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Time frame for analysis", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return Constituents.Append(Security).Select(s => (s, CandleType));
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_windows.Clear();
_latestPrices.Clear();
_lastDay = default;
_open = default;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("IndexSec is not set.");
if (Constituents == null || !Constituents.Any())
throw new InvalidOperationException("Constituents collection is empty.");
foreach (var (sec, dt) in GetWorkingSecurities())
{
_windows[sec] = new RollingWindow<decimal>(LookbackDays + 1);
SubscribeCandles(dt, true, sec)
.Bind(c => ProcessCandle(c, sec))
.Start();
}
}
private void ProcessCandle(ICandleMessage candle, Security security)
{
// Skip unfinished candles.
if (candle.State != CandleStates.Finished)
return;
// Store the latest closing price for this security.
_latestPrices[security] = candle.ClosePrice;
_windows[security].Add(candle.ClosePrice);
var day = candle.OpenTime.Date;
if (day == _lastDay)
return;
_lastDay = day;
if (_windows.Values.Any(w => !w.IsFull()))
return;
// Daily check after windows are full.
EvaluateSignal();
}
private void EvaluateSignal()
{
var indexRet = Returns(_windows[Security]);
var corrs = new List<decimal>();
foreach (var s in Constituents)
corrs.Add(Corr(Returns(_windows[s]), indexRet));
var avg = corrs.Average();
if (avg < CorrThreshold && !_open)
OpenDispersion();
else if (avg >= CorrThreshold && _open)
CloseAll();
}
private void OpenDispersion()
{
var count = Constituents.Count();
var portfolioValue = Portfolio.CurrentValue ?? 0m;
var capLeg = portfolioValue * 0.5m;
var eachLong = capLeg / count;
foreach (var s in Constituents)
{
var price = GetLatestPrice(s);
if (price > 0)
TradeToTarget(s, eachLong / price);
}
var indexPrice = GetLatestPrice(Security);
if (indexPrice > 0)
TradeToTarget(Security, -capLeg / indexPrice); // short index
_open = true;
LogInfo("Opened dispersion spread");
}
private void CloseAll()
{
foreach (var position in Positions)
TradeToTarget(position.Security, 0m);
_open = false;
LogInfo("Closed dispersion spread");
}
private decimal GetLatestPrice(Security security)
{
return _latestPrices.TryGetValue(security, out var price) ? price : 0m;
}
#region Helper math / trading
private decimal[] Returns(RollingWindow<decimal> win)
{
var arr = win.ToArray();
var r = new decimal[arr.Length - 1];
for (var i = 1; i < arr.Length; i++)
r[i - 1] = (arr[i] - arr[i - 1]) / arr[i - 1];
return r;
}
private decimal Corr(decimal[] x, decimal[] y)
{
var n = Math.Min(x.Length, y.Length);
var meanX = x.Take(n).Average();
var meanY = y.Take(n).Average();
decimal num = 0, dx = 0, dy = 0;
for (var i = 0; i < n; i++)
{
var a = x[i] - meanX;
var b = y[i] - meanY;
num += a * b;
dx += a * a;
dy += b * b;
}
return dx > 0 && dy > 0 ? num / (decimal)Math.Sqrt((double)(dx * dy)) : 0m;
}
private void TradeToTarget(Security s, decimal tgtQty)
{
var diff = tgtQty - PositionBy(s);
var price = GetLatestPrice(s);
if (price <= 0 || Math.Abs(diff) * price < MinTradeUsd)
return;
RegisterOrder(new Order
{
Security = s,
Portfolio = Portfolio,
Side = diff > 0 ? Sides.Buy : Sides.Sell,
Volume = Math.Abs(diff),
Type = OrderTypes.Market,
Comment = "Dispersion"
});
}
private decimal PositionBy(Security s) => GetPositionValue(s, Portfolio) ?? 0;
#endregion
#region RollingWindow
private class RollingWindow<T>
{
private readonly Queue<T> _queue = [];
private readonly int _size;
public RollingWindow(int size)
{
_size = size;
}
public void Add(T value)
{
if (_queue.Count == _size)
_queue.Dequeue();
_queue.Enqueue(value);
}
public bool IsFull() => _queue.Count == _size;
public T Last() => _queue.Last();
public T[] ToArray() => [.. _queue];
}
#endregion
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import DateTime, TimeSpan, Math, Array
from System.Collections.Generic import IEnumerable
from StockSharp.Messages import DataType, CandleStates, Sides, OrderTypes
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Order, Security
from datatype_extensions import *
class dispersion_trading_strategy(Strategy):
"""Dispersion trading strategy.
Trades an equity index against its constituent securities when the average correlation falls below a threshold.
"""
def __init__(self):
super(dispersion_trading_strategy, self).__init__()
self._constituents = self.Param[IEnumerable[Security]]("Constituents", None) \
.SetDisplay("Constituents", "Index constituent securities", "General")
self._lookback_days = self.Param("LookbackDays", 60) \
.SetDisplay("Lookback Days", "Days for rolling correlation", "Parameters")
self._corr_threshold = self.Param("CorrThreshold", 0.4) \
.SetDisplay("Correlation Threshold", "Average correlation threshold", "Parameters")
self._min_trade_usd = self.Param("MinTradeUsd", 100.0) \
.SetDisplay("Minimum Trade USD", "Minimal order value", "Risk")
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Time frame for analysis", "General")
self._windows = {}
self._latest_prices = {}
self._last_day = DateTime.MinValue
self._open = False
# region Properties
@property
def Constituents(self):
return self._constituents.Value
@Constituents.setter
def Constituents(self, value):
self._constituents.Value = value
@property
def LookbackDays(self):
return self._lookback_days.Value
@LookbackDays.setter
def LookbackDays(self, value):
self._lookback_days.Value = value
@property
def CorrThreshold(self):
return self._corr_threshold.Value
@CorrThreshold.setter
def CorrThreshold(self, value):
self._corr_threshold.Value = value
@property
def MinTradeUsd(self):
return self._min_trade_usd.Value
@MinTradeUsd.setter
def MinTradeUsd(self, value):
self._min_trade_usd.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
# endregion
def GetWorkingSecurities(self):
constituents_list = list(self.Constituents) if self.Constituents is not None else []
securities = constituents_list + [self.Security]
return [(s, self.CandleType) for s in securities]
def OnReseted(self):
super(dispersion_trading_strategy, self).OnReseted()
self._windows.clear()
self._latest_prices.clear()
self._last_day = DateTime.MinValue
self._open = False
def OnStarted2(self, time):
super(dispersion_trading_strategy, self).OnStarted2(time)
if self.Security is None:
raise Exception("IndexSec is not set.")
constituents_list = list(self.Constituents) if self.Constituents is not None else []
if len(constituents_list) == 0:
raise Exception("Constituents collection is empty.")
for sec, dt in self.GetWorkingSecurities():
self._windows[sec] = RollingWindow(self.LookbackDays + 1)
self.SubscribeCandles(dt, True, sec) \
.Bind(lambda candle, security=sec: self._process_candle(candle, security)) \
.Start()
def _process_candle(self, candle, security):
if candle.State != CandleStates.Finished:
return
self._latest_prices[security] = float(candle.ClosePrice)
self._windows[security].add(float(candle.ClosePrice))
day = candle.OpenTime.Date
if day == self._last_day:
return
self._last_day = day
if not all(w.is_full() for w in self._windows.values()):
return
self._evaluate_signal()
def _evaluate_signal(self):
index_returns = _returns(self._windows[self.Security])
constituents_list = list(self.Constituents) if self.Constituents is not None else []
corrs = []
for s in constituents_list:
corrs.append(_corr(_returns(self._windows[s]), index_returns))
if len(corrs) == 0:
return
avg = sum(corrs) / len(corrs)
if avg < self.CorrThreshold and not self._open:
self._open_dispersion()
elif avg >= self.CorrThreshold and self._open:
self._close_all()
def _open_dispersion(self):
constituents_list = list(self.Constituents) if self.Constituents is not None else []
count = len(constituents_list)
if count == 0:
return
portfolio_value = self.Portfolio.CurrentValue if self.Portfolio.CurrentValue is not None else 0.0
cap_leg = float(portfolio_value) * 0.5
each_long = cap_leg / count
for s in constituents_list:
price = self._get_latest_price(s)
if price > 0:
self._trade_to_target(s, each_long / price)
index_price = self._get_latest_price(self.Security)
if index_price > 0:
self._trade_to_target(self.Security, -cap_leg / index_price)
self._open = True
self.LogInfo("Opened dispersion spread")
def _close_all(self):
for position in self.Positions:
self._trade_to_target(position.Security, 0.0)
self._open = False
self.LogInfo("Closed dispersion spread")
def _get_latest_price(self, security):
return self._latest_prices.get(security, 0.0)
def _trade_to_target(self, sec, tgt):
diff = tgt - self._position_by(sec)
price = self._get_latest_price(sec)
if price <= 0 or abs(diff) * price < self.MinTradeUsd:
return
order = Order()
order.Security = sec
order.Portfolio = self.Portfolio
order.Side = Sides.Buy if diff > 0 else Sides.Sell
order.Volume = abs(diff)
order.Type = OrderTypes.Market
order.Comment = "Dispersion"
self.RegisterOrder(order)
def _position_by(self, sec):
val = self.GetPositionValue(sec, self.Portfolio)
return float(val) if val is not None else 0.0
def CreateClone(self):
return dispersion_trading_strategy()
class RollingWindow:
"""Rolling window of fixed size, mimicking the C# RollingWindow inner class."""
def __init__(self, size):
self._size = size
self._queue = []
def add(self, value):
if len(self._queue) == self._size:
self._queue.pop(0)
self._queue.append(value)
def is_full(self):
return len(self._queue) == self._size
def last(self):
return self._queue[-1]
def to_array(self):
return list(self._queue)
def _returns(win):
"""Compute simple returns from a rolling window of prices."""
arr = win.to_array()
r = []
for i in range(1, len(arr)):
r.append((arr[i] - arr[i - 1]) / arr[i - 1])
return r
def _corr(x, y):
"""Compute Pearson correlation between two return arrays."""
n = min(len(x), len(y))
if n == 0:
return 0.0
mx = sum(x[:n]) / n
my = sum(y[:n]) / n
num = 0.0
dx = 0.0
dy = 0.0
for i in range(n):
a = x[i] - mx
b = y[i] - my
num += a * b
dx += a * a
dy += b * b
if dx <= 0 or dy <= 0:
return 0.0
return num / math.sqrt(dx * dy)