Smart Factors Momentum Market
The Smart Factors Momentum Market strategy blends multiple equity factors with a broad market trend filter. The system goes long the market only when both the momentum factor basket and the overall index show positive trends, and exits to cash otherwise.
Details
- Entry Criteria: Composite factor momentum and market trend confirmation.
- Long/Short: Long only.
- Exit Criteria: Exit when factor momentum or market trend turns negative.
- Stops: No explicit stop.
- Default Values:
CandleType = TimeSpan.FromMinutes(5).TimeFrame()
- Filters:
- Category: Momentum
- Direction: Long
- Indicators: Multiple
- Stops: No
- Complexity: Intermediate
- Timeframe: Medium-term
- Seasonality: No
- Neural Networks: No
- Divergence: No
- Risk Level: Medium
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Smart-factors momentum market strategy that trades the primary instrument when its blended fast and slow momentum outperforms a benchmark market proxy.
/// </summary>
public class SmartFactorsMomentumMarketStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _fastPeriod;
private readonly StrategyParam<int> _slowPeriod;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryFastMomentum = null!;
private RateOfChange _primarySlowMomentum = null!;
private RateOfChange _benchmarkFastMomentum = null!;
private RateOfChange _benchmarkSlowMomentum = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimarySignal;
private decimal _latestBenchmarkSignal;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
public int FastPeriod
{
get => _fastPeriod.Value;
set => _fastPeriod.Value = value;
}
public int SlowPeriod
{
get => _slowPeriod.Value;
set => _slowPeriod.Value = value;
}
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public SmartFactorsMomentumMarketStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark market proxy", "General");
_fastPeriod = Param(nameof(FastPeriod), 8)
.SetRange(2, 80)
.SetDisplay("Fast Period", "Fast momentum lookback period", "Indicators");
_slowPeriod = Param(nameof(SlowPeriod), 24)
.SetRange(5, 200)
.SetDisplay("Slow Period", "Slow momentum lookback period", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 20)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the blended momentum spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.2m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 3m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryFastMomentum = null!;
_primarySlowMomentum = null!;
_benchmarkFastMomentum = null!;
_benchmarkSlowMomentum = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimarySignal = 0m;
_latestBenchmarkSignal = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryFastMomentum = new RateOfChange { Length = FastPeriod };
_primarySlowMomentum = new RateOfChange { Length = SlowPeriod };
_benchmarkFastMomentum = new RateOfChange { Length = FastPeriod };
_benchmarkSlowMomentum = new RateOfChange { Length = SlowPeriod };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription.Bind(ProcessPrimaryCandle).Start();
benchmarkSubscription.Bind(ProcessBenchmarkCandle).Start();
StartProtection(new Unit(2, UnitTypes.Percent), new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var fast = _primaryFastMomentum.Process(candle);
var slow = _primarySlowMomentum.Process(candle);
if (fast.IsEmpty || slow.IsEmpty || !_primaryFastMomentum.IsFormed || !_primarySlowMomentum.IsFormed)
return;
_latestPrimarySignal = (fast.ToDecimal() * 0.65m) + (slow.ToDecimal() * 0.35m);
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var fast = _benchmarkFastMomentum.Process(candle);
var slow = _benchmarkSlowMomentum.Process(candle);
if (fast.IsEmpty || slow.IsEmpty || !_benchmarkFastMomentum.IsFormed || !_benchmarkSlowMomentum.IsFormed)
return;
_latestBenchmarkSignal = (fast.ToDecimal() * 0.65m) + (slow.ToDecimal() * 0.35m);
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimarySignal - _latestBenchmarkSignal;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class smart_factors_momentum_market_strategy(Strategy):
"""Smart-factors momentum market strategy that trades the primary instrument when its blended fast and slow momentum outperforms a benchmark market proxy."""
def __init__(self):
super(smart_factors_momentum_market_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark market proxy", "General")
self._fast_period = self.Param("FastPeriod", 8) \
.SetRange(2, 80) \
.SetDisplay("Fast Period", "Fast momentum lookback period", "Indicators")
self._slow_period = self.Param("SlowPeriod", 24) \
.SetRange(5, 200) \
.SetDisplay("Slow Period", "Slow momentum lookback period", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 20) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the blended momentum spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.0) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.2) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 3.0) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._primary_fast_momentum = None
self._primary_slow_momentum = None
self._benchmark_fast_momentum = None
self._benchmark_slow_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(smart_factors_momentum_market_strategy, self).OnReseted()
self._benchmark = None
self._primary_fast_momentum = None
self._primary_slow_momentum = None
self._benchmark_fast_momentum = None
self._benchmark_slow_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(smart_factors_momentum_market_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
fast = int(self._fast_period.Value)
slow = int(self._slow_period.Value)
norm_period = int(self._normalization_period.Value)
self._primary_fast_momentum = RateOfChange()
self._primary_fast_momentum.Length = fast
self._primary_slow_momentum = RateOfChange()
self._primary_slow_momentum.Length = slow
self._benchmark_fast_momentum = RateOfChange()
self._benchmark_fast_momentum.Length = fast
self._benchmark_slow_momentum = RateOfChange()
self._benchmark_slow_momentum.Length = slow
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self._process_primary_candle).Start()
benchmark_subscription.Bind(self._process_benchmark_candle).Start()
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def _process_primary_candle(self, candle):
if candle.State != CandleStates.Finished:
return
civ_fast = CandleIndicatorValue(self._primary_fast_momentum, candle)
civ_fast.IsFinal = True
fast = self._primary_fast_momentum.Process(civ_fast)
civ_slow = CandleIndicatorValue(self._primary_slow_momentum, candle)
civ_slow.IsFinal = True
slow = self._primary_slow_momentum.Process(civ_slow)
if fast.IsEmpty or slow.IsEmpty or not self._primary_fast_momentum.IsFormed or not self._primary_slow_momentum.IsFormed:
return
self._latest_primary_signal = (float(fast) * 0.65) + (float(slow) * 0.35)
self._primary_updated = True
self._try_process_spread(candle.OpenTime)
def _process_benchmark_candle(self, candle):
if candle.State != CandleStates.Finished:
return
civ_fast = CandleIndicatorValue(self._benchmark_fast_momentum, candle)
civ_fast.IsFinal = True
fast = self._benchmark_fast_momentum.Process(civ_fast)
civ_slow = CandleIndicatorValue(self._benchmark_slow_momentum, candle)
civ_slow.IsFinal = True
slow = self._benchmark_slow_momentum.Process(civ_slow)
if fast.IsEmpty or slow.IsEmpty or not self._benchmark_fast_momentum.IsFormed or not self._benchmark_slow_momentum.IsFormed:
return
self._latest_benchmark_signal = (float(fast) * 0.65) + (float(slow) * 0.35)
self._benchmark_updated = True
self._try_process_spread(candle.OpenTime)
def _try_process_spread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_signal - self._latest_benchmark_signal
mean = float(process_float(self._spread_average, spread, time, True))
deviation = float(process_float(self._spread_deviation, spread, time, True))
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return smart_factors_momentum_market_strategy()