Pairs Trading Strategy
该策略监控两只相关资产之间的价差,并与其历史均值与标准差比较,捕捉暂时性偏离。
测试表明年均收益约为 88%,该策略在股票市场表现最佳。
当价差低于均值减去乘数倍标准差时做多价差(买入第一只资产卖出第二只资产);当价差高于均值加上乘数倍标准差时做空价差。价差回到均值附近时平仓。
此方法适合追求市场中性、擅长平衡两只合约敞口的交易者。若价差继续扩大,止损可以限制回撤。
细节
- 入场条件:
- 多头:
Spread < Mean - Multiplier * StdDev - 空头:
Spread > Mean + Multiplier * StdDev
- 多头:
- 多/空: 双向
- 离场条件:
- 多头: 价差回到均值时平仓
- 空头: 价差回到均值时平仓
- 止损: 百分比止损,基于价差
- 默认值:
LookbackPeriod= 20DeviationMultiplier= 2.0mStopLossPercent= 2mCandleType= TimeSpan.FromMinutes(5)
- 过滤器:
- 类别: Arbitrage
- 方向: 双向
- 指标: Spread statistics
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内
- 季节性: 否
- 神经网络: 否
- 背离: 是
- 风险等级: 中等
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Statistical Pairs Trading strategy.
/// Trades the spread between two correlated assets, entering positions when
/// the spread deviates significantly from its mean.
/// </summary>
public class PairsTradingStrategy : Strategy
{
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _deviationMultiplier;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<Security> _secondSecurity;
private SimpleMovingAverage _spreadMA;
private StandardDeviation _spreadStdDev;
private decimal _spread;
private decimal _lastSecondPrice;
/// <summary>
/// Period for calculating mean and standard deviation of the spread.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Number of standard deviations for entry signals.
/// </summary>
public decimal DeviationMultiplier
{
get => _deviationMultiplier.Value;
set => _deviationMultiplier.Value = value;
}
/// <summary>
/// Stop-loss percentage parameter.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Candle type parameter.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Second security in the pair.
/// </summary>
public Security SecondSecurity
{
get => _secondSecurity.Value;
set => _secondSecurity.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public PairsTradingStrategy()
{
_lookbackPeriod = Param(nameof(LookbackPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Lookback Period", "Period for calculating spread mean and standard deviation", "Parameters")
.SetOptimize(10, 40, 5);
_deviationMultiplier = Param(nameof(DeviationMultiplier), 2.0m)
.SetGreaterThanZero()
.SetDisplay("Deviation Multiplier", "Number of standard deviations for entry signals", "Parameters")
.SetOptimize(1.5m, 3.0m, 0.5m);
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetGreaterThanZero()
.SetDisplay("Stop-loss %", "Stop-loss as percentage of spread at entry", "Risk Management")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_secondSecurity = Param<Security>(nameof(SecondSecurity))
.SetDisplay("Second Security", "Second security in the pair", "General")
.SetRequired();
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return
[
(Security, CandleType),
(SecondSecurity, CandleType)
];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_spreadMA = null;
_spreadStdDev = null;
_spread = 0;
_lastSecondPrice = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (SecondSecurity == null)
throw new InvalidOperationException("Second security is not specified.");
// Initialize indicators
_spreadMA = new() { Length = LookbackPeriod };
_spreadStdDev = new StandardDeviation { Length = LookbackPeriod };
// Create subscriptions for both securities
var firstSecuritySubscription = SubscribeCandles(CandleType);
var secondSecuritySubscription = SubscribeCandles(CandleType, security: SecondSecurity);
// Bind to first security candles
firstSecuritySubscription
.Bind(ProcessFirstSecurityCandle)
.Start();
// Bind to second security candles
secondSecuritySubscription
.Bind(ProcessSecondSecurityCandle)
.Start();
// Enable position protection with stop-loss
StartProtection(
takeProfit: new Unit(0, UnitTypes.Absolute), // No take-profit
stopLoss: new Unit(StopLossPercent, UnitTypes.Percent) // Stop-loss as percentage
);
// Setup chart if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, firstSecuritySubscription);
DrawOwnTrades(area);
}
}
private void ProcessFirstSecurityCandle(ICandleMessage candle)
{
// Skip if we don't have price for the second security yet
if (_lastSecondPrice == 0)
return;
// Calculate the spread: Asset1 - Asset2
_spread = candle.ClosePrice - _lastSecondPrice;
// Process the spread through indicators
var maValue = _spreadMA.Process(new DecimalIndicatorValue(_spreadMA, _spread, candle.ServerTime) { IsFinal = true });
var stdDevValue = _spreadStdDev.Process(new DecimalIndicatorValue(_spreadStdDev, _spread, candle.ServerTime) { IsFinal = true });
// Skip until indicators are formed
if (!_spreadMA.IsFormed || !_spreadStdDev.IsFormed)
return;
decimal spreadMean = maValue.ToDecimal();
decimal spreadStdDev = stdDevValue.ToDecimal();
// Calculate entry thresholds
decimal upperThreshold = spreadMean + (spreadStdDev * DeviationMultiplier);
decimal lowerThreshold = spreadMean - (spreadStdDev * DeviationMultiplier);
// Trading logic
if (_spread < lowerThreshold)
{
// Spread is below lower threshold:
// Buy Asset1 (Security), Sell Asset2 (SecondSecurity)
if (Position <= 0)
{
// Close any existing position and enter new position
BuyMarket(Volume + Math.Abs(Position));
LogInfo($"Long Signal: Spread({_spread:F4}) < Lower Threshold({lowerThreshold:F4})");
// Note: In a real implementation, you would also place a sell order
// for the second security here, using a different strategy instance or connector
}
}
else if (_spread > upperThreshold)
{
// Spread is above upper threshold:
// Sell Asset1 (Security), Buy Asset2 (SecondSecurity)
if (Position >= 0)
{
// Close any existing position and enter new position
SellMarket(Volume + Math.Abs(Position));
LogInfo($"Short Signal: Spread({_spread:F4}) > Upper Threshold({upperThreshold:F4})");
// Note: In a real implementation, you would also place a buy order
// for the second security here, using a different strategy instance or connector
}
}
else if ((_spread > spreadMean && Position > 0) ||
(_spread < spreadMean && Position < 0))
{
// Exit signals: Spread returned to the mean
if (Position > 0)
{
SellMarket(Math.Abs(Position));
LogInfo($"Exit Long: Spread({_spread:F4}) > Mean({spreadMean:F4})");
}
else if (Position < 0)
{
BuyMarket(Math.Abs(Position));
LogInfo($"Exit Short: Spread({_spread:F4}) < Mean({spreadMean:F4})");
}
}
}
private void ProcessSecondSecurityCandle(ICandleMessage candle)
{
// Store the close price of the second security for spread calculation
_lastSecondPrice = candle.ClosePrice;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, UnitTypes, Unit, CandleStates
from StockSharp.Algo.Indicators import SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security, Subscription
from datatype_extensions import *
from indicator_extensions import *
class pairs_trading_strategy(Strategy):
"""
Statistical Pairs Trading strategy.
Trades the spread between two correlated assets, entering positions when
the spread deviates significantly from its mean.
"""
def __init__(self):
super(pairs_trading_strategy, self).__init__()
# Strategy parameters
self._lookback_period = self.Param("LookbackPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Lookback Period", "Period for calculating spread mean and standard deviation", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(10, 40, 5)
self._deviation_multiplier = self.Param("DeviationMultiplier", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Deviation Multiplier", "Number of standard deviations for entry signals", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(1.5, 3.0, 0.5)
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Stop-loss %", "Stop-loss as percentage of spread at entry", "Risk Management") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5)
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._second_security = self.Param[Security]("SecondSecurity", None) \
.SetDisplay("Second Security", "Second security in the pair", "General") \
.SetRequired()
# Internal state
self._spread_ma = None
self._spread_std_dev = None
self._spread = 0
self._last_second_price = 0
@property
def lookback_period(self):
"""Period for calculating mean and standard deviation of the spread."""
return self._lookback_period.Value
@lookback_period.setter
def lookback_period(self, value):
self._lookback_period.Value = value
@property
def deviation_multiplier(self):
"""Number of standard deviations for entry signals."""
return self._deviation_multiplier.Value
@deviation_multiplier.setter
def deviation_multiplier(self, value):
self._deviation_multiplier.Value = value
@property
def stop_loss_percent(self):
"""Stop-loss percentage parameter."""
return self._stop_loss_percent.Value
@stop_loss_percent.setter
def stop_loss_percent(self, value):
self._stop_loss_percent.Value = value
@property
def candle_type(self):
"""Candle type parameter."""
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
@property
def second_security(self):
"""Second security in the pair."""
return self._second_security.Value
@second_security.setter
def second_security(self, value):
self._second_security.Value = value
def GetWorkingSecurities(self):
"""Return the securities and candle type this strategy works with."""
return [
(self.Security, self.candle_type),
(self.second_security, self.candle_type)
]
def OnReseted(self):
super(pairs_trading_strategy, self).OnReseted()
self._spread_ma = None
self._spread_std_dev = None
self._spread = 0
self._last_second_price = 0
def OnStarted2(self, time):
super(pairs_trading_strategy, self).OnStarted2(time)
if self.second_security is None:
raise Exception("Second security is not specified.")
# Initialize indicators
self._spread_ma = SimpleMovingAverage()
self._spread_ma.Length = self.lookback_period
self._spread_std_dev = StandardDeviation()
self._spread_std_dev.Length = self.lookback_period
# Create subscriptions for both securities
first_subscription = self.SubscribeCandles(self.candle_type)
second_subscription = self.SubscribeCandles(self.candle_type, self.second_security)
# Bind to first security candles
first_subscription.Bind(self.ProcessFirstSecurityCandle).Start()
# Bind to second security candles
second_subscription.Bind(self.ProcessSecondSecurityCandle).Start()
# Enable position protection with stop-loss
self.StartProtection(
takeProfit=Unit(0, UnitTypes.Absolute),
stopLoss=Unit(self.stop_loss_percent, UnitTypes.Percent)
)
# Setup chart if available
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, first_subscription)
self.DrawOwnTrades(area)
def ProcessFirstSecurityCandle(self, candle):
# Skip if we don't have price for the second security yet
if self._last_second_price == 0:
return
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
# Skip if strategy is not ready to trade
# Calculate the spread: Asset1 - Asset2
self._spread = float(candle.ClosePrice - self._last_second_price)
# Process the spread through indicators
ma_value = process_float(self._spread_ma, self._spread, candle.ServerTime, True)
std_dev_value = process_float(self._spread_std_dev, self._spread, candle.ServerTime, True)
# Skip until indicators are formed
if not self._spread_ma.IsFormed or not self._spread_std_dev.IsFormed:
return
spread_mean = float(ma_value)
spread_std_dev = float(std_dev_value)
# Calculate entry thresholds
upper_threshold = spread_mean + (spread_std_dev * self.deviation_multiplier)
lower_threshold = spread_mean - (spread_std_dev * self.deviation_multiplier)
# Trading logic
if self._spread < lower_threshold:
# Spread is below lower threshold:
# Buy Asset1 (Security), Sell Asset2 (SecondSecurity)
if self.Position <= 0:
# Close any existing position and enter new position
self.BuyMarket(self.Volume + Math.Abs(self.Position))
self.LogInfo("Long Signal: Spread({0:F4}) < Lower Threshold({1:F4})".format(
self._spread, lower_threshold))
# Note: In a real implementation, you would also place a sell order
# for the second security here, using a different strategy instance or connector
elif self._spread > upper_threshold:
# Spread is above upper threshold:
# Sell Asset1 (Security), Buy Asset2 (SecondSecurity)
if self.Position >= 0:
# Close any existing position and enter new position
self.SellMarket(self.Volume + Math.Abs(self.Position))
self.LogInfo("Short Signal: Spread({0:F4}) > Upper Threshold({1:F4})".format(
self._spread, upper_threshold))
# Note: In a real implementation, you would also place a buy order
# for the second security here, using a different strategy instance or connector
elif (self._spread > spread_mean and self.Position > 0) or \
(self._spread < spread_mean and self.Position < 0):
# Exit signals: Spread returned to the mean
if self.Position > 0:
self.SellMarket(Math.Abs(self.Position))
self.LogInfo("Exit Long: Spread({0:F4}) > Mean({1:F4})".format(
self._spread, spread_mean))
elif self.Position < 0:
self.BuyMarket(Math.Abs(self.Position))
self.LogInfo("Exit Short: Spread({0:F4}) < Mean({1:F4})".format(
self._spread, spread_mean))
def ProcessSecondSecurityCandle(self, candle):
# Store the close price of the second security for spread calculation
if candle.State == CandleStates.Finished:
self._last_second_price = float(candle.ClosePrice)
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return pairs_trading_strategy()