Momentum Style Rotation Strategy
This Python strategy rotates among a set of factor exchange-traded funds (ETFs) and a broad market ETF. At the end of each month the ETFs are ranked by their trailing three-month total return. The portfolio then invests entirely in the top ranked fund for the following month to harvest medium-term momentum.
The approach always holds a single ETF and re-evaluates monthly. Daily candles are used for calculations and all rebalancing trades are executed at the market price.
Details
- Universe: list of factor ETFs and a benchmark ETF.
- Signal: compute 63-day (three-month) total return and select the strongest instrument.
- Rebalance: first trading day of each month.
- Positioning: fully long the selected ETF, all others flat.
- Risk control: orders skipped when the required trade value falls below
MinTradeUsd.
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Momentum style rotation strategy that trades the primary instrument when its relative strength versus a benchmark style ETF rotates into or out of favor.
/// </summary>
public class MomentumStyleRotationStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryMomentum = null!;
private RateOfChange _benchmarkMomentum = null!;
private ExponentialMovingAverage _relativeStrengthAverage = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimaryMomentum;
private decimal _latestBenchmarkMomentum;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark style ETF identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Momentum lookback period.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Lookback period used to normalize the rotation spread.
/// </summary>
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public MomentumStyleRotationStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark style ETF", "General");
_lookbackPeriod = Param(nameof(LookbackPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Lookback Period", "Momentum lookback period", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 20)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the rotation spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.2m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 6)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryMomentum = null!;
_benchmarkMomentum = null!;
_relativeStrengthAverage = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimaryMomentum = 0m;
_latestBenchmarkMomentum = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryMomentum = new RateOfChange { Length = LookbackPeriod };
_benchmarkMomentum = new RateOfChange { Length = LookbackPeriod };
_relativeStrengthAverage = new ExponentialMovingAverage { Length = Math.Max(2, NormalizationPeriod / 2) };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var momentumValue = _primaryMomentum.Process(candle);
if (momentumValue.IsEmpty || !_primaryMomentum.IsFormed)
return;
_latestPrimaryMomentum = momentumValue.ToDecimal();
_primaryUpdated = true;
TryProcessRotation(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var momentumValue = _benchmarkMomentum.Process(candle);
if (momentumValue.IsEmpty || !_benchmarkMomentum.IsFormed)
return;
_latestBenchmarkMomentum = momentumValue.ToDecimal();
_benchmarkUpdated = true;
TryProcessRotation(candle.OpenTime);
}
private void TryProcessRotation(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var relativeStrength = _latestPrimaryMomentum - _latestBenchmarkMomentum;
var smoothedStrength = _relativeStrengthAverage.Process(relativeStrength, time, true).ToDecimal();
var spread = relativeStrength - smoothedStrength;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_relativeStrengthAverage.IsFormed || !_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish &&
previousBullish < EntryThreshold &&
_latestPrimaryMomentum > 0m &&
zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish &&
previousBearish > -EntryThreshold &&
_latestPrimaryMomentum < 0m &&
_latestBenchmarkMomentum > _latestPrimaryMomentum &&
zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && (zScore <= ExitThreshold || bearishEntry))
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && (zScore >= -ExitThreshold || bullishEntry))
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, ExponentialMovingAverage, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class momentum_style_rotation_strategy(Strategy):
"""Momentum style rotation strategy that trades the primary instrument when its relative strength versus a benchmark style ETF rotates into or out of favor."""
def __init__(self):
super(momentum_style_rotation_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark style ETF", "General")
self._lookback_period = self.Param("LookbackPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Lookback Period", "Momentum lookback period", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 20) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the rotation spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.0) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.2) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 6) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._primary_momentum = None
self._benchmark_momentum = None
self._relative_strength_average = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_momentum = 0.0
self._latest_benchmark_momentum = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(momentum_style_rotation_strategy, self).OnReseted()
self._benchmark = None
self._primary_momentum = None
self._benchmark_momentum = None
self._relative_strength_average = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_momentum = 0.0
self._latest_benchmark_momentum = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(momentum_style_rotation_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
lookback = int(self._lookback_period.Value)
norm_period = int(self._normalization_period.Value)
self._primary_momentum = RateOfChange()
self._primary_momentum.Length = lookback
self._benchmark_momentum = RateOfChange()
self._benchmark_momentum.Length = lookback
self._relative_strength_average = ExponentialMovingAverage()
self._relative_strength_average.Length = max(2, norm_period // 2)
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
mom_iv = CandleIndicatorValue(self._primary_momentum, candle)
mom_iv.IsFinal = True
mom_result = self._primary_momentum.Process(mom_iv)
if mom_result.IsEmpty or not self._primary_momentum.IsFormed:
return
self._latest_primary_momentum = float(mom_result)
self._primary_updated = True
self.TryProcessRotation(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
mom_iv = CandleIndicatorValue(self._benchmark_momentum, candle)
mom_iv.IsFinal = True
mom_result = self._benchmark_momentum.Process(mom_iv)
if mom_result.IsEmpty or not self._benchmark_momentum.IsFormed:
return
self._latest_benchmark_momentum = float(mom_result)
self._benchmark_updated = True
self.TryProcessRotation(candle.OpenTime)
def TryProcessRotation(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
relative_strength = self._latest_primary_momentum - self._latest_benchmark_momentum
smoothed_strength = float(process_float(self._relative_strength_average, relative_strength, time, True))
spread = relative_strength - smoothed_strength
mean = float(process_float(self._spread_average, spread, time, True))
deviation = float(process_float(self._spread_deviation, spread, time, True))
if not self._relative_strength_average.IsFormed or not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = (self._previous_z_score is not None and
self._previous_z_score < entry_thresh and
self._latest_primary_momentum > 0 and
z_score >= entry_thresh)
bearish_entry = (self._previous_z_score is not None and
self._previous_z_score > -entry_thresh and
self._latest_primary_momentum < 0 and
self._latest_benchmark_momentum > self._latest_primary_momentum and
z_score <= -entry_thresh)
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and (z_score <= exit_thresh or bearish_entry):
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and (z_score >= -exit_thresh or bullish_entry):
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return momentum_style_rotation_strategy()