Correlation Mean Reversion
Correlation Mean Reversion 策略关注指标的极端读数以捕捉均值回归。远离正常水平的情况通常不会持续太久。
当指标大幅偏离均值后开始反转时产生交易信号,可做多也可做空,并带有保护性止损。
适合预期震荡行情的交易者,当指标回归平衡时平仓。初始参数 CorrelationPeriod = 20.
详细信息
- 入场条件: Indicator crosses back toward mean.
- 多空: Both directions.
- 出场条件: Indicator reverts to average.
- 止损: Yes.
- 默认值:
CorrelationPeriod= 20LookbackPeriod= 20DeviationThreshold= 2.0mStopLossPercent= 2.0mCandleType= TimeSpan.FromMinutes(5)
- 过滤器:
- 分类: Mean Reversion
- 方向: Both
- 指标: Correlation
- 止损: Yes
- 复杂度: Intermediate
- 时间框架: Short-term
- 季节性: No
- 神经网络: No
- 背离: No
- 风险级别: Medium
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Mean-reversion strategy that uses rolling inter-market correlation as a regime filter.
/// Trades the primary security when a low-correlation regime coincides with short-term divergence versus the secondary security.
/// </summary>
public class CorrelationMeanReversionStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _correlationPeriod;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _deviationThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<decimal> _divergenceThreshold;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<DataType> _candleType;
private Security _security2;
private Correlation _correlation;
private SimpleMovingAverage _correlationSma;
private StandardDeviation _correlationStdDev;
private decimal _latestPrice1;
private decimal _latestPrice2;
private decimal _previousPrice1;
private decimal _previousPrice2;
private bool _primaryUpdated;
private bool _secondaryUpdated;
private int _cooldown;
/// <summary>
/// Secondary security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Rolling period for the correlation indicator.
/// </summary>
public int CorrelationPeriod
{
get => _correlationPeriod.Value;
set => _correlationPeriod.Value = value;
}
/// <summary>
/// Lookback period for correlation mean and deviation.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Absolute Z-score required to recognize a low-correlation dislocation.
/// </summary>
public decimal DeviationThreshold
{
get => _deviationThreshold.Value;
set => _deviationThreshold.Value = value;
}
/// <summary>
/// Exit Z-score threshold as correlation normalizes.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Minimum one-bar relative performance spread required for entry.
/// </summary>
public decimal DivergenceThreshold
{
get => _divergenceThreshold.Value;
set => _divergenceThreshold.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Bars to wait after each order.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Candle type for both instruments.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes strategy parameters.
/// </summary>
public CorrelationMeanReversionStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Second Security Id", "Identifier of the secondary security", "General");
_correlationPeriod = Param(nameof(CorrelationPeriod), 20)
.SetRange(5, 100)
.SetDisplay("Correlation Period", "Rolling period for the correlation indicator", "Indicators");
_lookbackPeriod = Param(nameof(LookbackPeriod), 30)
.SetRange(10, 150)
.SetDisplay("Lookback Period", "Lookback period for correlation statistics", "Indicators");
_deviationThreshold = Param(nameof(DeviationThreshold), 1.1m)
.SetRange(0.25m, 3m)
.SetDisplay("Deviation Threshold", "Absolute Z-score required for entry", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.15m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold used for exit", "Signals");
_divergenceThreshold = Param(nameof(DivergenceThreshold), 0.003m)
.SetRange(0.0005m, 0.05m)
.SetDisplay("Divergence Threshold", "Minimum one-bar divergence between the two instruments", "Signals");
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_cooldownBars = Param(nameof(CooldownBars), 120)
.SetRange(1, 500)
.SetDisplay("Cooldown Bars", "Bars to wait after each order", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Candle series for both instruments", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_security2 = null;
_correlation = null;
_correlationSma = null;
_correlationStdDev = null;
_latestPrice1 = 0m;
_latestPrice2 = 0m;
_previousPrice1 = 0m;
_previousPrice2 = 0m;
_primaryUpdated = false;
_secondaryUpdated = false;
_cooldown = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Secondary security identifier is not specified.");
_security2 = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_correlation = new Correlation { Length = CorrelationPeriod };
_correlationSma = new SimpleMovingAverage { Length = LookbackPeriod };
_correlationStdDev = new StandardDeviation { Length = LookbackPeriod };
_cooldown = 0;
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var secondarySubscription = SubscribeCandles(CandleType, security: _security2);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
secondarySubscription
.Bind(ProcessSecondaryCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, secondarySubscription);
DrawOwnTrades(area);
}
StartProtection(new Unit(0, UnitTypes.Absolute), new Unit(StopLossPercent, UnitTypes.Percent), false);
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestPrice1 = candle.ClosePrice;
_primaryUpdated = true;
TryProcessPair(candle.OpenTime);
}
private void ProcessSecondaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestPrice2 = candle.ClosePrice;
_secondaryUpdated = true;
TryProcessPair(candle.OpenTime);
}
private void TryProcessPair(DateTimeOffset time)
{
if (!_primaryUpdated || !_secondaryUpdated)
return;
_primaryUpdated = false;
_secondaryUpdated = false;
if (_latestPrice1 <= 0 || _latestPrice2 <= 0)
return;
if (_previousPrice1 <= 0 || _previousPrice2 <= 0)
{
_previousPrice1 = _latestPrice1;
_previousPrice2 = _latestPrice2;
return;
}
var correlationValue = _correlation.Process((_latestPrice1, _latestPrice2), time.UtcDateTime, true).ToDecimal();
var averageCorrelation = _correlationSma.Process(correlationValue, time.UtcDateTime, true).ToDecimal();
var stdCorrelation = _correlationStdDev.Process(correlationValue, time.UtcDateTime, true).ToDecimal();
var primaryReturn = (_latestPrice1 - _previousPrice1) / _previousPrice1;
var secondaryReturn = (_latestPrice2 - _previousPrice2) / _previousPrice2;
var divergence = primaryReturn - secondaryReturn;
_previousPrice1 = _latestPrice1;
_previousPrice2 = _latestPrice2;
if (!_correlation.IsFormed || !_correlationSma.IsFormed || !_correlationStdDev.IsFormed)
return;
if (ProcessState != ProcessStates.Started)
return;
if (_cooldown > 0)
{
_cooldown--;
return;
}
if (stdCorrelation <= 0)
return;
var zScore = (correlationValue - averageCorrelation) / stdCorrelation;
var isLowCorrelation = zScore <= -DeviationThreshold;
if (Position == 0)
{
if (!isLowCorrelation)
return;
if (divergence <= -DivergenceThreshold)
{
BuyMarket();
_cooldown = CooldownBars;
}
else if (divergence >= DivergenceThreshold)
{
SellMarket();
_cooldown = CooldownBars;
}
return;
}
var correlationRecovered = zScore >= -ExitThreshold;
if (Position > 0 && (correlationRecovered || divergence >= -DivergenceThreshold * 0.5m))
{
SellMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
else if (Position < 0 && (correlationRecovered || divergence <= DivergenceThreshold * 0.5m))
{
BuyMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from System import Decimal, ValueTuple
from StockSharp.Algo.Indicators import Correlation, SimpleMovingAverage, StandardDeviation, PairIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class correlation_mean_reversion_strategy(Strategy):
"""
Mean-reversion strategy that uses rolling inter-market correlation as a regime filter.
Trades the primary security when a low-correlation regime coincides with short-term
divergence versus the secondary security.
"""
def __init__(self):
super(correlation_mean_reversion_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Second Security Id", "Identifier of the secondary security", "General")
self._correlation_period = self.Param("CorrelationPeriod", 20) \
.SetDisplay("Correlation Period", "Rolling period for the correlation indicator", "Indicators")
self._lookback_period = self.Param("LookbackPeriod", 30) \
.SetDisplay("Lookback Period", "Lookback period for correlation statistics", "Indicators")
self._deviation_threshold = self.Param("DeviationThreshold", 1.1) \
.SetDisplay("Deviation Threshold", "Absolute Z-score required for entry", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.15) \
.SetDisplay("Exit Threshold", "Z-score threshold used for exit", "Signals")
self._divergence_threshold = self.Param("DivergenceThreshold", 0.003) \
.SetDisplay("Divergence Threshold", "Minimum one-bar divergence between the two instruments", "Signals")
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._cooldown_bars = self.Param("CooldownBars", 120) \
.SetDisplay("Cooldown Bars", "Bars to wait after each order", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Candle series for both instruments", "General")
self._security2 = None
self._correlation = None
self._correlation_sma = None
self._correlation_std_dev = None
self._latest_price1 = Decimal(0)
self._latest_price2 = Decimal(0)
self._previous_price1 = Decimal(0)
self._previous_price2 = Decimal(0)
self._primary_updated = False
self._secondary_updated = False
self._cooldown = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(correlation_mean_reversion_strategy, self).OnReseted()
self._security2 = None
self._correlation = None
self._correlation_sma = None
self._correlation_std_dev = None
self._latest_price1 = Decimal(0)
self._latest_price2 = Decimal(0)
self._previous_price1 = Decimal(0)
self._previous_price2 = Decimal(0)
self._primary_updated = False
self._secondary_updated = False
self._cooldown = 0
def OnStarted2(self, time):
super(correlation_mean_reversion_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Secondary security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._security2 = s
self._correlation = Correlation()
self._correlation.Length = int(self._correlation_period.Value)
self._correlation_sma = SimpleMovingAverage()
self._correlation_sma.Length = int(self._lookback_period.Value)
self._correlation_std_dev = StandardDeviation()
self._correlation_std_dev.Length = int(self._lookback_period.Value)
self._cooldown = 0
primary_subscription = self.SubscribeCandles(self.candle_type, False, self.Security)
secondary_subscription = self.SubscribeCandles(self.candle_type, False, self._security2)
primary_subscription.Bind(self._process_primary_candle).Start()
secondary_subscription.Bind(self._process_secondary_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, secondary_subscription)
self.DrawOwnTrades(area)
self.StartProtection(Unit(0, UnitTypes.Absolute), Unit(self._stop_loss_percent.Value, UnitTypes.Percent), False)
def _process_primary_candle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_price1 = candle.ClosePrice
self._primary_updated = True
self._try_process_pair(candle.OpenTime)
def _process_secondary_candle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_price2 = candle.ClosePrice
self._secondary_updated = True
self._try_process_pair(candle.OpenTime)
def _try_process_pair(self, time):
if not self._primary_updated or not self._secondary_updated:
return
self._primary_updated = False
self._secondary_updated = False
# Convert DateTimeOffset to DateTime if needed
try:
time = time.UtcDateTime
except:
pass
if self._latest_price1 <= 0 or self._latest_price2 <= 0:
return
if self._previous_price1 <= 0 or self._previous_price2 <= 0:
self._previous_price1 = self._latest_price1
self._previous_price2 = self._latest_price2
return
pair_val = ValueTuple[Decimal, Decimal](self._latest_price1, self._latest_price2)
pair_input = PairIndicatorValue[Decimal](self._correlation, pair_val, time)
pair_input.IsFinal = True
corr_result = self._correlation.Process(pair_input)
correlation_value = float(corr_result)
avg_result = process_float(self._correlation_sma, Decimal(correlation_value), time, True)
average_correlation = float(avg_result)
std_result = process_float(self._correlation_std_dev, Decimal(correlation_value), time, True)
std_correlation = float(std_result)
primary_return = float(self._latest_price1 - self._previous_price1) / float(self._previous_price1)
secondary_return = float(self._latest_price2 - self._previous_price2) / float(self._previous_price2)
divergence = primary_return - secondary_return
self._previous_price1 = self._latest_price1
self._previous_price2 = self._latest_price2
if not self._correlation.IsFormed or not self._correlation_sma.IsFormed or not self._correlation_std_dev.IsFormed:
return
from StockSharp.Algo import ProcessStates as PS
if self.ProcessState != PS.Started:
return
if self._cooldown > 0:
self._cooldown -= 1
return
if std_correlation <= 0:
return
z_score = (correlation_value - average_correlation) / std_correlation
dev_threshold = float(self._deviation_threshold.Value)
is_low_correlation = z_score <= -dev_threshold
exit_threshold = float(self._exit_threshold.Value)
div_threshold = float(self._divergence_threshold.Value)
cd = int(self._cooldown_bars.Value)
if self.Position == 0:
if not is_low_correlation:
return
if divergence <= -div_threshold:
self.BuyMarket()
self._cooldown = cd
elif divergence >= div_threshold:
self.SellMarket()
self._cooldown = cd
return
correlation_recovered = z_score >= -exit_threshold
if self.Position > 0 and (correlation_recovered or divergence >= -div_threshold * 0.5):
self.SellMarket(Math.Abs(self.Position))
self._cooldown = cd
elif self.Position < 0 and (correlation_recovered or divergence <= div_threshold * 0.5):
self.BuyMarket(Math.Abs(self.Position))
self._cooldown = cd
def CreateClone(self):
return correlation_mean_reversion_strategy()