ESG因子动量策略
该策略在具有环境、社会和公司治理评分的证券之间轮动。每月初按回测窗口的收益率对所有标的排序,只持有表现最强者。假设获得ESG资金关注的资产能保持动量。只有当交易金额超过最小阈值时才会执行。
在再平衡时策略平掉现有仓位并转入动量最高的标的。组合不使用杠杆或做空;始终全仓持有一只资产。
细节
- 入场条件:
- 每月第一个交易日计算每个标的在
LookbackDays期间的总收益。 - 若订单规模不小于
MinTradeUsd,买入收益率最高的标的。
- 每月第一个交易日计算每个标的在
- 多空方向:仅做多。
- 出场条件:每次月度再平衡前平掉所有仓位。
- 止损:无。
- 默认参数:
Universe– ESG关注的标的列表。LookbackDays= 252。CandleType= 1天。MinTradeUsd– 最小交易金额。
- 筛选:
- 类型:动量。
- 方向:仅多头。
- 周期:中期。
- 再平衡:每月。
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// ESG factor momentum strategy that trades the primary instrument when its ESG-adjusted momentum is stronger or weaker than a benchmark.
/// </summary>
public class EsgFactorMomentumStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _momentumLength;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryMomentum = null!;
private RateOfChange _benchmarkMomentum = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimaryScore;
private decimal _latestBenchmarkScore;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Momentum lookback period.
/// </summary>
public int MomentumLength
{
get => _momentumLength.Value;
set => _momentumLength.Value = value;
}
/// <summary>
/// Lookback period used to normalize the ESG momentum spread.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for price data.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="EsgFactorMomentumStrategy"/>.
/// </summary>
public EsgFactorMomentumStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_momentumLength = Param(nameof(MomentumLength), 40)
.SetRange(5, 200)
.SetDisplay("Momentum Length", "Momentum lookback period", "Indicators");
_lookbackPeriod = Param(nameof(LookbackPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Lookback Period", "Lookback period used to normalize the ESG momentum spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.2m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.3m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle TF", "Time-frame", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryMomentum = null!;
_benchmarkMomentum = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimaryScore = 0m;
_latestBenchmarkScore = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryMomentum = new RateOfChange { Length = MomentumLength };
_benchmarkMomentum = new RateOfChange { Length = MomentumLength };
_spreadAverage = new SimpleMovingAverage { Length = LookbackPeriod };
_spreadDeviation = new StandardDeviation { Length = LookbackPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var momentumValue = _primaryMomentum.Process(candle);
if (!momentumValue.IsEmpty && _primaryMomentum.IsFormed)
{
_latestPrimaryScore = momentumValue.ToDecimal() + CalculateEsgBias(candle);
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var momentumValue = _benchmarkMomentum.Process(candle);
if (!momentumValue.IsEmpty && _benchmarkMomentum.IsFormed)
{
_latestBenchmarkScore = momentumValue.ToDecimal() + CalculateEsgBias(candle);
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
}
private decimal CalculateEsgBias(ICandleMessage candle)
{
var priceBase = Math.Max(candle.OpenPrice, 1m);
var range = Math.Max(candle.HighPrice - candle.LowPrice, Security?.PriceStep ?? 1m);
var stability = 1m - Math.Min(0.2m, range / priceBase);
var bodyBias = (candle.ClosePrice - candle.OpenPrice) / priceBase;
return (stability * 5m) + (bodyBias * 100m);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimaryScore - _latestBenchmarkScore;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class esg_factor_momentum_strategy(Strategy):
"""ESG factor momentum strategy that trades the primary instrument when its ESG-adjusted momentum is stronger or weaker than a benchmark."""
def __init__(self):
super(esg_factor_momentum_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._momentum_length = self.Param("MomentumLength", 40) \
.SetRange(5, 200) \
.SetDisplay("Momentum Length", "Momentum lookback period", "Indicators")
self._lookback_period = self.Param("LookbackPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Lookback Period", "Lookback period used to normalize the ESG momentum spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.2) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.3) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle TF", "Time-frame", "General")
self._benchmark = None
self._primary_momentum = None
self._benchmark_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_score = 0.0
self._latest_benchmark_score = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(esg_factor_momentum_strategy, self).OnReseted()
self._benchmark = None
self._primary_momentum = None
self._benchmark_momentum = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_score = 0.0
self._latest_benchmark_score = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(esg_factor_momentum_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
mom_len = int(self._momentum_length.Value)
lookback = int(self._lookback_period.Value)
self._primary_momentum = RateOfChange()
self._primary_momentum.Length = mom_len
self._benchmark_momentum = RateOfChange()
self._benchmark_momentum.Length = mom_len
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = lookback
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = lookback
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
civ = CandleIndicatorValue(self._primary_momentum, candle)
civ.IsFinal = True
momentum_value = self._primary_momentum.Process(civ)
if not momentum_value.IsEmpty and self._primary_momentum.IsFormed:
self._latest_primary_score = float(momentum_value) + self.CalculateEsgBias(candle)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
civ = CandleIndicatorValue(self._benchmark_momentum, candle)
civ.IsFinal = True
momentum_value = self._benchmark_momentum.Process(civ)
if not momentum_value.IsEmpty and self._benchmark_momentum.IsFormed:
self._latest_benchmark_score = float(momentum_value) + self.CalculateEsgBias(candle)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def CalculateEsgBias(self, candle):
price_base = max(float(candle.OpenPrice), 1.0)
price_step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
range_val = max(float(candle.HighPrice) - float(candle.LowPrice), price_step)
stability = 1.0 - min(0.2, range_val / price_base)
body_bias = (float(candle.ClosePrice) - float(candle.OpenPrice)) / price_base
return (stability * 5.0) + (body_bias * 100.0)
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_score - self._latest_benchmark_score
mean_result = process_float(self._spread_average, spread, time, True)
mean = float(mean_result)
dev_result = process_float(self._spread_deviation, spread, time, True)
deviation = float(dev_result)
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return esg_factor_momentum_strategy()