动量因子股票策略
本策略利用经典的 12‑1 个月动量因子。每月底按过去 12 个月(忽略最近 1 个月)的收益率对股票排序,动量最高的五分位买入,动量最低的五分位卖空,形成市场中性头寸。
每月第一个交易日进行再平衡,仓位等权并持有至下一次调整,期间不设置止损。
大量学术和行业研究表明,动量因子能提供持久的超额收益,并在与其他因子组合时具有良好的分散化效果。
细节
- 入场条件:按 12‑1 个月动量排序,每月再平衡;多头最高五分位、空头最低五分位
- 多/空:双向
- 退出条件:下一次月度再平衡
- 止损:无
- 默认值:
LookbackDays= 252SkipDays= 21Quintile= 5MinTradeUsd= 200CandleType= TimeSpan.FromDays(1)
- 过滤:
- 类别:动量
- 方向:双向
- 指标:价格变化
- 止损:无
- 复杂度:中等
- 时间框架:中期
- 季节性:无
- 神经网络:无
- 背离:无
- 风险级别:中等
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Momentum factor strategy that trades the primary instrument when its medium-term momentum outperforms a benchmark after filtering out the most recent short-term move.
/// </summary>
public class MomentumFactorStocksStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<int> _skipPeriod;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryLongMomentum = null!;
private RateOfChange _benchmarkLongMomentum = null!;
private RateOfChange _primaryShortMomentum = null!;
private RateOfChange _benchmarkShortMomentum = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimarySignal;
private decimal _latestBenchmarkSignal;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Long momentum lookback period.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Short lookback period removed from the long momentum signal.
/// </summary>
public int SkipPeriod
{
get => _skipPeriod.Value;
set => _skipPeriod.Value = value;
}
/// <summary>
/// Lookback period used to normalize the relative factor spread.
/// </summary>
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public MomentumFactorStocksStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_lookbackPeriod = Param(nameof(LookbackPeriod), 36)
.SetRange(8, 200)
.SetDisplay("Lookback Period", "Long momentum lookback period", "Indicators");
_skipPeriod = Param(nameof(SkipPeriod), 8)
.SetRange(2, 60)
.SetDisplay("Skip Period", "Short lookback period removed from the long momentum signal", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the relative factor spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.15m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.3m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryLongMomentum = null!;
_benchmarkLongMomentum = null!;
_primaryShortMomentum = null!;
_benchmarkShortMomentum = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimarySignal = 0m;
_latestBenchmarkSignal = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryLongMomentum = new RateOfChange { Length = LookbackPeriod };
_benchmarkLongMomentum = new RateOfChange { Length = LookbackPeriod };
_primaryShortMomentum = new RateOfChange { Length = SkipPeriod };
_benchmarkShortMomentum = new RateOfChange { Length = SkipPeriod };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var longMomentum = _primaryLongMomentum.Process(candle);
var shortMomentum = _primaryShortMomentum.Process(candle);
if (longMomentum.IsEmpty || shortMomentum.IsEmpty || !_primaryLongMomentum.IsFormed || !_primaryShortMomentum.IsFormed)
return;
_latestPrimarySignal = longMomentum.ToDecimal() - shortMomentum.ToDecimal();
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var longMomentum = _benchmarkLongMomentum.Process(candle);
var shortMomentum = _benchmarkShortMomentum.Process(candle);
if (longMomentum.IsEmpty || shortMomentum.IsEmpty || !_benchmarkLongMomentum.IsFormed || !_benchmarkShortMomentum.IsFormed)
return;
_latestBenchmarkSignal = longMomentum.ToDecimal() - shortMomentum.ToDecimal();
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimarySignal - _latestBenchmarkSignal;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class momentum_factor_stocks_strategy(Strategy):
"""Momentum factor strategy that trades the primary instrument when its medium-term momentum outperforms a benchmark after filtering out the most recent short-term move."""
def __init__(self):
super(momentum_factor_stocks_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._lookback_period = self.Param("LookbackPeriod", 36) \
.SetRange(8, 200) \
.SetDisplay("Lookback Period", "Long momentum lookback period", "Indicators")
self._skip_period = self.Param("SkipPeriod", 8) \
.SetRange(2, 60) \
.SetDisplay("Skip Period", "Short lookback period removed from the long momentum signal", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the relative factor spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.15) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.3) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._primary_long_momentum = None
self._benchmark_long_momentum = None
self._primary_short_momentum = None
self._benchmark_short_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(momentum_factor_stocks_strategy, self).OnReseted()
self._benchmark = None
self._primary_long_momentum = None
self._benchmark_long_momentum = None
self._primary_short_momentum = None
self._benchmark_short_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(momentum_factor_stocks_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
lookback = int(self._lookback_period.Value)
skip = int(self._skip_period.Value)
norm_period = int(self._normalization_period.Value)
self._primary_long_momentum = RateOfChange()
self._primary_long_momentum.Length = lookback
self._benchmark_long_momentum = RateOfChange()
self._benchmark_long_momentum.Length = lookback
self._primary_short_momentum = RateOfChange()
self._primary_short_momentum.Length = skip
self._benchmark_short_momentum = RateOfChange()
self._benchmark_short_momentum.Length = skip
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
long_iv = CandleIndicatorValue(self._primary_long_momentum, candle)
long_iv.IsFinal = True
long_result = self._primary_long_momentum.Process(long_iv)
short_iv = CandleIndicatorValue(self._primary_short_momentum, candle)
short_iv.IsFinal = True
short_result = self._primary_short_momentum.Process(short_iv)
if long_result.IsEmpty or short_result.IsEmpty or not self._primary_long_momentum.IsFormed or not self._primary_short_momentum.IsFormed:
return
self._latest_primary_signal = float(long_result) - float(short_result)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
long_iv = CandleIndicatorValue(self._benchmark_long_momentum, candle)
long_iv.IsFinal = True
long_result = self._benchmark_long_momentum.Process(long_iv)
short_iv = CandleIndicatorValue(self._benchmark_short_momentum, candle)
short_iv.IsFinal = True
short_result = self._benchmark_short_momentum.Process(short_iv)
if long_result.IsEmpty or short_result.IsEmpty or not self._benchmark_long_momentum.IsFormed or not self._benchmark_short_momentum.IsFormed:
return
self._latest_benchmark_signal = float(long_result) - float(short_result)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_signal - self._latest_benchmark_signal
mean = float(process_float(self._spread_average, spread, time, True))
deviation = float(process_float(self._spread_deviation, spread, time, True))
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return momentum_factor_stocks_strategy()