Cointegration Pairs Strategy
This strategy trades two assets that share a long-term cointegration relationship. By calculating the residual between the first asset and a beta-adjusted second asset, it looks for deviations that historically revert back to equilibrium.
Testing indicates an average annual return of about 103%. It performs best in the stocks market.
A long position buys the first asset and sells the second when the residual z-score drops below -EntryThreshold. A short position sells the first and buys the second when the z-score rises above the threshold. Positions are closed once the spread normalizes toward zero.
Cointegration pairs trading suits statistical arbitrageurs comfortable managing two instruments simultaneously. The built-in stop-loss protects against extreme moves if the relationship temporarily breaks down.
Details
- Entry Criteria:
- Long: Residual Z-Score < -EntryThreshold
- Short: Residual Z-Score > EntryThreshold
- Long/Short: Both sides.
- Exit Criteria:
- Long: Exit when |Z-Score| < 0.5
- Short: Exit when |Z-Score| < 0.5
- Stops: Yes, percentage stop-loss.
- Default Values:
Period= 20EntryThreshold= 2.0mBeta= 1.0mStopLossPercent= 2.0mCandleType= TimeSpan.FromMinutes(5)
- Filters:
- Category: Arbitrage
- Direction: Both
- Indicators: Cointegration
- Stops: Yes
- Complexity: Intermediate
- Timeframe: Intraday
- Seasonality: No
- Neural networks: No
- Divergence: Yes
- Risk Level: Medium
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Cointegration pairs trading strategy.
/// Trades based on cointegration relationship between two assets.
/// </summary>
public class CointegrationPairsStrategy : Strategy
{
private readonly StrategyParam<int> _periodParam;
private readonly StrategyParam<decimal> _entryThresholdParam;
private readonly StrategyParam<decimal> _betaParam;
private readonly StrategyParam<Security> _asset2Param;
private readonly StrategyParam<decimal> _stopLossPercentParam;
private readonly StrategyParam<DataType> _candleTypeParam;
private decimal _residualMean;
private decimal _residualStdDev;
private decimal _residualSum;
private decimal _squaredResidualSum;
private readonly Queue<decimal> _residuals = [];
private decimal _asset1Price;
private decimal _asset2Price;
private const int _tradeCooldownTicks = 30;
private int _cooldownTicksLeft;
private Portfolio _asset2Portfolio;
/// <summary>
/// Period for calculation of residual mean and standard deviation.
/// </summary>
public int Period
{
get => _periodParam.Value;
set => _periodParam.Value = value;
}
/// <summary>
/// Entry threshold as a multiple of standard deviation.
/// </summary>
public decimal EntryThreshold
{
get => _entryThresholdParam.Value;
set => _entryThresholdParam.Value = value;
}
/// <summary>
/// Beta coefficient for calculation of residual.
/// </summary>
public decimal Beta
{
get => _betaParam.Value;
set => _betaParam.Value = value;
}
/// <summary>
/// Second asset for pair trading.
/// </summary>
public Security Asset2
{
get => _asset2Param.Value;
set => _asset2Param.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercentParam.Value;
set => _stopLossPercentParam.Value = value;
}
/// <summary>
/// Candle type for strategy.
/// </summary>
public DataType CandleType
{
get => _candleTypeParam.Value;
set => _candleTypeParam.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public CointegrationPairsStrategy()
{
_periodParam = Param(nameof(Period), 20)
.SetGreaterThanZero()
.SetDisplay("Period", "Period for residual calculations", "Parameters")
.SetOptimize(10, 50, 10);
_entryThresholdParam = Param(nameof(EntryThreshold), 2.0m)
.SetRange(0.1m, decimal.MaxValue)
.SetDisplay("Entry Threshold", "Entry threshold as multiple of standard deviation", "Parameters")
.SetOptimize(1.0m, 3.0m, 0.5m);
_betaParam = Param(nameof(Beta), 1.0m)
.SetRange(0.01m, decimal.MaxValue)
.SetDisplay("Beta", "Coefficient of cointegration", "Parameters")
.SetOptimize(0.5m, 2.0m, 0.1m);
_asset2Param = Param<Security>(nameof(Asset2))
.SetDisplay("Asset 2", "Second asset for pair trading", "Parameters");
_stopLossPercentParam = Param(nameof(StopLossPercent), 2.0m)
.SetRange(0.1m, decimal.MaxValue)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Parameters")
.SetOptimize(1.0m, 5.0m, 1.0m);
_candleTypeParam = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Candle type for strategy", "Common");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return
[
(Security, CandleType),
(Asset2, CandleType)
];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_residualMean = 0;
_residualStdDev = 0;
_residualSum = 0;
_squaredResidualSum = 0;
_residuals.Clear();
_asset1Price = 0;
_asset2Price = 0;
_cooldownTicksLeft = 0;
// Use the same portfolio for second asset or find another portfolio
_asset2Portfolio = Portfolio;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Asset2 == null)
throw new InvalidOperationException("Second asset is not specified.");
// Subscribe to Asset1 candles
var asset1Subscription = SubscribeCandles(CandleType)
.Bind(ProcessAsset1Candle)
.Start();
// Subscribe to Asset2 candles
var asset2Subscription = SubscribeCandles(CandleType, security: Asset2)
.Bind(ProcessAsset2Candle)
.Start();
// Setup chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, asset1Subscription);
DrawOwnTrades(area);
}
// Enable position protection with stop loss
StartProtection(
takeProfit: new Unit(0, UnitTypes.Absolute), // No take profit
stopLoss: new Unit(StopLossPercent, UnitTypes.Percent) // Stop loss percentage
);
}
private void ProcessAsset1Candle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_asset1Price = candle.ClosePrice;
ProcessPair();
}
private void ProcessAsset2Candle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_asset2Price = candle.ClosePrice;
ProcessPair();
}
private void ProcessPair()
{
if (_asset1Price == 0 || _asset2Price == 0)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownTicksLeft > 0)
{
_cooldownTicksLeft--;
_asset1Price = 0;
_asset2Price = 0;
return;
}
// Calculate residual = Asset1Price - Beta * Asset2Price
var residual = _asset1Price - Beta * _asset2Price;
var hasTraded = false;
// Track residual statistics over period
_residuals.Enqueue(residual);
_residualSum += residual;
_squaredResidualSum += residual * residual;
if (_residuals.Count > Period)
{
var oldResidual = _residuals.Dequeue();
_residualSum -= oldResidual;
_squaredResidualSum -= oldResidual * oldResidual;
}
if (_residuals.Count == Period)
{
// Calculate mean and standard deviation
_residualMean = _residualSum / Period;
var variance = (_squaredResidualSum / Period) - (_residualMean * _residualMean);
_residualStdDev = variance <= 0 ? 0.0001m : (decimal)Math.Sqrt((double)variance);
// Calculate z-score of current residual
var zScore = (_residualStdDev == 0) ? 0 : (residual - _residualMean) / _residualStdDev;
// Check for trading signals
if (zScore < -EntryThreshold && Position <= 0)
{
// Long Asset1, Short Asset2
// First, close any existing short position on Asset1
BuyMarket(Volume + Math.Abs(Position));
hasTraded = true;
// Then, short Asset2 using the second portfolio
if (_asset2Portfolio != null)
{
var asset2Order = new Order
{
Side = Sides.Sell,
Security = Asset2,
Portfolio = _asset2Portfolio,
Volume = Volume * Beta
};
RegisterOrder(asset2Order);
hasTraded = true;
}
}
else if (zScore > EntryThreshold && Position >= 0)
{
// Short Asset1, Long Asset2
// First, close any existing long position on Asset1
SellMarket(Volume + Math.Abs(Position));
hasTraded = true;
// Then, buy Asset2 using the second portfolio
if (_asset2Portfolio != null)
{
var asset2Order = new Order
{
Side = Sides.Buy,
Security = Asset2,
Portfolio = _asset2Portfolio,
Volume = Volume * Beta
};
RegisterOrder(asset2Order);
hasTraded = true;
}
}
else if (Math.Abs(zScore) < 0.5m)
{
// Close positions when spread reverts to mean
if (Position != 0)
{
if (Position > 0)
SellMarket(Position);
else
BuyMarket(Math.Abs(Position));
hasTraded = true;
// Close position on Asset2
if (_asset2Portfolio != null)
{
var asset2Order = new Order
{
Side = Position > 0 ? Sides.Buy : Sides.Sell,
Security = Asset2,
Portfolio = _asset2Portfolio,
Volume = Volume * Beta
};
RegisterOrder(asset2Order);
hasTraded = true;
}
}
}
}
if (hasTraded)
_cooldownTicksLeft = _tradeCooldownTicks;
// Reset prices for next update
_asset1Price = 0;
_asset2Price = 0;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from System.Collections.Generic import Queue
from StockSharp.Messages import DataType, Unit, UnitTypes, CandleStates, Sides
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Order, Security
from datatype_extensions import *
class cointegration_pairs_strategy(Strategy):
"""
Cointegration pairs trading strategy.
Trades based on cointegration relationship between two assets.
"""
def __init__(self):
super(cointegration_pairs_strategy, self).__init__()
# Period for calculation of residual mean and standard deviation.
self._period = self.Param("Period", 20) \
.SetGreaterThanZero() \
.SetDisplay("Period", "Period for residual calculations", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(10, 50, 10)
# Entry threshold as a multiple of standard deviation.
self._entryThreshold = self.Param("EntryThreshold", 2.0) \
.SetRange(0.1, 100.0) \
.SetDisplay("Entry Threshold", "Entry threshold as multiple of standard deviation", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5)
# Beta coefficient for calculation of residual.
self._beta = self.Param("Beta", 1.0) \
.SetDisplay("Beta", "Coefficient of cointegration", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(0.5, 2.0, 0.1)
# Second asset for pair trading.
self._asset2 = self.Param[Security]("Asset2", None) \
.SetDisplay("Asset 2", "Second asset for pair trading", "Parameters")
# Stop loss percentage.
self._stopLossPercent = self.Param("StopLossPercent", 2.0) \
.SetRange(0.1, 100.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Parameters") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 5.0, 1.0)
# Candle type for strategy.
self._candleType = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Candle type for strategy", "Common")
# Internal state
self._residualMean = 0.0
self._residualStdDev = 0.0
self._residualSum = 0.0
self._squaredResidualSum = 0.0
self._residuals = Queue[float]()
self._asset1Price = 0.0
self._asset2Price = 0.0
self._asset2Portfolio = None
@property
def Period(self):
return self._period.Value
@Period.setter
def Period(self, value):
self._period.Value = value
@property
def EntryThreshold(self):
return self._entryThreshold.Value
@EntryThreshold.setter
def EntryThreshold(self, value):
self._entryThreshold.Value = value
@property
def Beta(self):
return self._beta.Value
@Beta.setter
def Beta(self, value):
self._beta.Value = value
@property
def Asset2(self):
return self._asset2.Value
@Asset2.setter
def Asset2(self, value):
self._asset2.Value = value
@property
def StopLossPercent(self):
return self._stopLossPercent.Value
@StopLossPercent.setter
def StopLossPercent(self, value):
self._stopLossPercent.Value = value
@property
def CandleType(self):
return self._candleType.Value
@CandleType.setter
def CandleType(self, value):
self._candleType.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType), (self.Asset2, self.CandleType)]
def OnReseted(self):
super(cointegration_pairs_strategy, self).OnReseted()
self._residualMean = 0
self._residualStdDev = 0
self._residualSum = 0
self._squaredResidualSum = 0
self._residuals.Clear()
self._asset1Price = 0
self._asset2Price = 0
def OnStarted2(self, time):
super(cointegration_pairs_strategy, self).OnStarted2(time)
if self.Asset2 is None:
raise Exception("Second asset is not specified.")
# Use the same portfolio for second asset or find another portfolio
self._asset2Portfolio = self.Portfolio
# Create subscriptions for both assets
asset1Subscription = self.SubscribeCandles(self.CandleType)
asset2Subscription = self.SubscribeCandles(self.CandleType, self.Asset2)
# Subscribe to Asset1 candles
asset1Subscription.Bind(self.ProcessAsset1Candle).Start()
# Subscribe to Asset2 candles
asset2Subscription.Bind(self.ProcessAsset2Candle).Start()
# Setup chart visualization if available
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, asset1Subscription)
self.DrawOwnTrades(area)
# Enable position protection with stop loss
self.StartProtection(
takeProfit=Unit(0, UnitTypes.Absolute),
stopLoss=Unit(self.StopLossPercent, UnitTypes.Percent)
)
def ProcessAsset1Candle(self, candle):
if candle.State != CandleStates.Finished:
return
self._asset1Price = float(candle.ClosePrice)
self.ProcessPair()
def ProcessAsset2Candle(self, candle):
if candle.State != CandleStates.Finished:
return
self._asset2Price = float(candle.ClosePrice)
self.ProcessPair()
def ProcessPair(self):
if self._asset1Price == 0 or self._asset2Price == 0:
return
# Calculate residual = Asset1Price - Beta * Asset2Price
residual = self._asset1Price - self.Beta * self._asset2Price
# Track residual statistics over period
self._residuals.Enqueue(residual)
self._residualSum += residual
self._squaredResidualSum += residual * residual
if self._residuals.Count > self.Period:
oldResidual = self._residuals.Dequeue()
self._residualSum -= oldResidual
self._squaredResidualSum -= oldResidual * oldResidual
if self._residuals.Count == self.Period:
# Calculate mean and standard deviation
self._residualMean = self._residualSum / self.Period
variance = (self._squaredResidualSum / self.Period) - (self._residualMean * self._residualMean)
self._residualStdDev = 0.0001 if variance <= 0 else Math.Sqrt(float(variance))
# Calculate z-score of current residual
zScore = 0 if self._residualStdDev == 0 else (residual - self._residualMean) / self._residualStdDev
# Check for trading signals
if zScore < -self.EntryThreshold and self.Position <= 0:
# Long Asset1, Short Asset2
# First, close any existing short position on Asset1
self.BuyMarket(self.Volume + Math.Abs(self.Position))
# Then, short Asset2 using the second portfolio
if self._asset2Portfolio is not None:
asset2Order = Order()
asset2Order.Side = Sides.Sell
asset2Order.Security = self.Asset2
asset2Order.Portfolio = self._asset2Portfolio
asset2Order.Volume = self.Volume * self.Beta
self.RegisterOrder(asset2Order)
elif zScore > self.EntryThreshold and self.Position >= 0:
# Short Asset1, Long Asset2
# First, close any existing long position on Asset1
self.SellMarket(self.Volume + Math.Abs(self.Position))
# Then, buy Asset2 using the second portfolio
if self._asset2Portfolio is not None:
asset2Order = Order()
asset2Order.Side = Sides.Buy
asset2Order.Security = self.Asset2
asset2Order.Portfolio = self._asset2Portfolio
asset2Order.Volume = self.Volume * self.Beta
self.RegisterOrder(asset2Order)
elif Math.Abs(zScore) < 0.5:
# Close positions when spread reverts to mean
if self.Position != 0:
if self.Position > 0:
self.SellMarket(self.Position)
else:
self.BuyMarket(Math.Abs(self.Position))
# Close position on Asset2
if self._asset2Portfolio is not None:
asset2Order = Order()
asset2Order.Side = Sides.Buy if self.Position > 0 else Sides.Sell
asset2Order.Security = self.Asset2
asset2Order.Portfolio = self._asset2Portfolio
asset2Order.Volume = self.Volume * self.Beta
self.RegisterOrder(asset2Order)
# Reset prices for next update
self._asset1Price = 0
self._asset2Price = 0
def CreateClone(self):
"""
!! REQUIRED!! Creates a new instance of the strategy.
"""
return cointegration_pairs_strategy()