Month12 Cycle Strategy
This Python strategy implements the 12-month cycle anomaly. Stocks are ranked by the return they earned one year ago over the corresponding calendar month. Each month the top decile is bought and the bottom decile is sold short, creating a market-neutral portfolio based on lagged annual performance.
The system uses daily data to approximate monthly closes and rebalances at the start of every month. Position sizes are scaled to keep dollar exposure balanced across long and short sides.
Details
- Universe: user defined list of securities.
- Signal: sort by percentage change from the same month one year earlier.
- Portfolio: long top decile, short bottom decile with leverage per leg set by
Leverage. - Rebalance: monthly.
- Data: daily candles aggregated into month-end prices.
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// 12-month cycle strategy that trades the primary instrument when its 12-month minus 1-month seasonal return outperforms a benchmark.
/// </summary>
public class Month12CycleStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _annualPeriod;
private readonly StrategyParam<int> _recentPeriod;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryAnnualMomentum = null!;
private RateOfChange _benchmarkAnnualMomentum = null!;
private RateOfChange _primaryRecentMomentum = null!;
private RateOfChange _benchmarkRecentMomentum = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimarySignal;
private decimal _latestBenchmarkSignal;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Long lookback period used to approximate the prior 12-month cycle.
/// </summary>
public int AnnualPeriod
{
get => _annualPeriod.Value;
set => _annualPeriod.Value = value;
}
/// <summary>
/// Short lookback period used to remove the most recent month.
/// </summary>
public int RecentPeriod
{
get => _recentPeriod.Value;
set => _recentPeriod.Value = value;
}
/// <summary>
/// Lookback period used to normalize the seasonal spread.
/// </summary>
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public Month12CycleStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_annualPeriod = Param(nameof(AnnualPeriod), 90)
.SetRange(30, 400)
.SetDisplay("Annual Period", "Long lookback period used to approximate the prior 12-month cycle", "Indicators");
_recentPeriod = Param(nameof(RecentPeriod), 10)
.SetRange(2, 60)
.SetDisplay("Recent Period", "Short lookback period used to remove the most recent month", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 12)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the seasonal spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 0.65m)
.SetRange(0.1m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.15m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 4m)
.SetRange(0.5m, 15m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Candle type used for calculations", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryAnnualMomentum = null!;
_benchmarkAnnualMomentum = null!;
_primaryRecentMomentum = null!;
_benchmarkRecentMomentum = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimarySignal = 0m;
_latestBenchmarkSignal = 0m;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryAnnualMomentum = new RateOfChange { Length = AnnualPeriod };
_benchmarkAnnualMomentum = new RateOfChange { Length = AnnualPeriod };
_primaryRecentMomentum = new RateOfChange { Length = RecentPeriod };
_benchmarkRecentMomentum = new RateOfChange { Length = RecentPeriod };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var annualMomentum = _primaryAnnualMomentum.Process(candle);
var recentMomentum = _primaryRecentMomentum.Process(candle);
if (annualMomentum.IsEmpty || recentMomentum.IsEmpty || !_primaryAnnualMomentum.IsFormed || !_primaryRecentMomentum.IsFormed)
return;
_latestPrimarySignal = annualMomentum.ToDecimal() - recentMomentum.ToDecimal();
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var annualMomentum = _benchmarkAnnualMomentum.Process(candle);
var recentMomentum = _benchmarkRecentMomentum.Process(candle);
if (annualMomentum.IsEmpty || recentMomentum.IsEmpty || !_benchmarkAnnualMomentum.IsFormed || !_benchmarkRecentMomentum.IsFormed)
return;
_latestBenchmarkSignal = annualMomentum.ToDecimal() - recentMomentum.ToDecimal();
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimarySignal - _latestBenchmarkSignal;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = zScore >= EntryThreshold;
var bearishEntry = zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class month12_cycle_strategy(Strategy):
"""12-month cycle strategy that trades the primary instrument when its 12-month minus 1-month seasonal return outperforms a benchmark."""
def __init__(self):
super(month12_cycle_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._annual_period = self.Param("AnnualPeriod", 90) \
.SetRange(30, 400) \
.SetDisplay("Annual Period", "Long lookback period used to approximate the prior 12-month cycle", "Indicators")
self._recent_period = self.Param("RecentPeriod", 10) \
.SetRange(2, 60) \
.SetDisplay("Recent Period", "Short lookback period used to remove the most recent month", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 12) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the seasonal spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 0.65) \
.SetRange(0.1, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.15) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 4.0) \
.SetRange(0.5, 15.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Candle type used for calculations", "General")
self._benchmark = None
self._primary_annual_momentum = None
self._benchmark_annual_momentum = None
self._primary_recent_momentum = None
self._benchmark_recent_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(month12_cycle_strategy, self).OnReseted()
self._benchmark = None
self._primary_annual_momentum = None
self._benchmark_annual_momentum = None
self._primary_recent_momentum = None
self._benchmark_recent_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(month12_cycle_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
annual = int(self._annual_period.Value)
recent = int(self._recent_period.Value)
norm_period = int(self._normalization_period.Value)
self._primary_annual_momentum = RateOfChange()
self._primary_annual_momentum.Length = annual
self._benchmark_annual_momentum = RateOfChange()
self._benchmark_annual_momentum.Length = annual
self._primary_recent_momentum = RateOfChange()
self._primary_recent_momentum.Length = recent
self._benchmark_recent_momentum = RateOfChange()
self._benchmark_recent_momentum.Length = recent
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
annual_iv = CandleIndicatorValue(self._primary_annual_momentum, candle)
annual_iv.IsFinal = True
annual_result = self._primary_annual_momentum.Process(annual_iv)
recent_iv = CandleIndicatorValue(self._primary_recent_momentum, candle)
recent_iv.IsFinal = True
recent_result = self._primary_recent_momentum.Process(recent_iv)
if annual_result.IsEmpty or recent_result.IsEmpty or not self._primary_annual_momentum.IsFormed or not self._primary_recent_momentum.IsFormed:
return
self._latest_primary_signal = float(annual_result) - float(recent_result)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
annual_iv = CandleIndicatorValue(self._benchmark_annual_momentum, candle)
annual_iv.IsFinal = True
annual_result = self._benchmark_annual_momentum.Process(annual_iv)
recent_iv = CandleIndicatorValue(self._benchmark_recent_momentum, candle)
recent_iv.IsFinal = True
recent_result = self._benchmark_recent_momentum.Process(recent_iv)
if annual_result.IsEmpty or recent_result.IsEmpty or not self._benchmark_annual_momentum.IsFormed or not self._benchmark_recent_momentum.IsFormed:
return
self._latest_benchmark_signal = float(annual_result) - float(recent_result)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_signal - self._latest_benchmark_signal
mean = float(process_float(self._spread_average, spread, time, True))
deviation = float(process_float(self._spread_deviation, spread, time, True))
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = z_score >= entry_thresh
bearish_entry = z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
def CreateClone(self):
return month12_cycle_strategy()