低波动率股票策略
该防御型因子利用“低波动率异常”——波动较小的股票往往拥有更佳风险调整后收益。波动率通过过去一段时间(日内默认为 60 个交易日)的日收益率标准差计算。
每月第一个交易日按历史波动率对股票进行排序,在波动率最低的十分位做多,在波动率最高的十分位做空,仓位等权。持仓持续到下一次月度再平衡,期间不设止损。
回测显示相较大盘,该策略的权益曲线更平滑,回撤更小,适合寻求低风险股票敞口的投资者。
细节
- 入场条件:按波动率排序后每月再平衡,多头最低十分位、空头最高十分位
- 多/空:双向
- 退出条件:下一次月度再平衡
- 止损:无
- 默认值:
VolWindowDays= 60Deciles= 10MinTradeUsd= 200CandleType= TimeSpan.FromDays(1)
- 过滤:
- 类别:波动率
- 方向:双向
- 指标:标准差
- 止损:无
- 复杂度:中等
- 时间框架:中期
- 季节性:无
- 神经网络:无
- 背离:无
- 风险级别:低
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Low volatility anomaly strategy that trades the primary instrument when its realized volatility diverges from a benchmark instrument.
/// </summary>
public class LowVolatilityStocksStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _volatilityPeriod;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<int> _trendPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private StandardDeviation _primaryVolatility = null!;
private StandardDeviation _benchmarkVolatility = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private SimpleMovingAverage _primaryTrend = null!;
private decimal? _previousPrimaryClose;
private decimal? _previousBenchmarkClose;
private decimal _latestPrimaryVolatility;
private decimal _latestBenchmarkVolatility;
private decimal _latestPrimaryTrend;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Lookback period used to estimate realized volatility.
/// </summary>
public int VolatilityPeriod
{
get => _volatilityPeriod.Value;
set => _volatilityPeriod.Value = value;
}
/// <summary>
/// Lookback period used to normalize the volatility spread.
/// </summary>
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
/// <summary>
/// Trend period used to align entries with the primary instrument direction.
/// </summary>
public int TrendPeriod
{
get => _trendPeriod.Value;
set => _trendPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public LowVolatilityStocksStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_volatilityPeriod = Param(nameof(VolatilityPeriod), 18)
.SetRange(5, 120)
.SetDisplay("Volatility Period", "Lookback period used to estimate realized volatility", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the volatility spread", "Indicators");
_trendPeriod = Param(nameof(TrendPeriod), 30)
.SetRange(5, 200)
.SetDisplay("Trend Period", "Trend period used to align entries with the primary instrument direction", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.1m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.25m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 6)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryVolatility = null!;
_benchmarkVolatility = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_primaryTrend = null!;
_previousPrimaryClose = null;
_previousBenchmarkClose = null;
_latestPrimaryVolatility = 0m;
_latestBenchmarkVolatility = 0m;
_latestPrimaryTrend = 0m;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryVolatility = new StandardDeviation { Length = VolatilityPeriod };
_benchmarkVolatility = new StandardDeviation { Length = VolatilityPeriod };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
_primaryTrend = new SimpleMovingAverage { Length = TrendPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var trendValue = _primaryTrend.Process(candle);
if (!trendValue.IsEmpty && _primaryTrend.IsFormed)
_latestPrimaryTrend = trendValue.ToDecimal();
var ret = CalculateReturn(candle.ClosePrice, ref _previousPrimaryClose);
if (ret is null)
return;
var volatilityValue = _primaryVolatility.Process(Math.Abs(ret.Value), candle.OpenTime, true);
if (!volatilityValue.IsEmpty && _primaryVolatility.IsFormed)
{
_latestPrimaryVolatility = volatilityValue.ToDecimal();
_primaryUpdated = true;
TryProcessSpread(candle);
}
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var ret = CalculateReturn(candle.ClosePrice, ref _previousBenchmarkClose);
if (ret is null)
return;
var volatilityValue = _benchmarkVolatility.Process(Math.Abs(ret.Value), candle.OpenTime, true);
if (!volatilityValue.IsEmpty && _benchmarkVolatility.IsFormed)
{
_latestBenchmarkVolatility = volatilityValue.ToDecimal();
_benchmarkUpdated = true;
TryProcessSpread(candle);
}
}
private static decimal? CalculateReturn(decimal closePrice, ref decimal? previousClose)
{
if (previousClose is not decimal previous || previous <= 0m)
{
previousClose = closePrice;
return null;
}
var ret = (closePrice - previous) / previous;
previousClose = closePrice;
return ret;
}
private void TryProcessSpread(ICandleMessage candle)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestBenchmarkVolatility - _latestPrimaryVolatility;
var mean = _spreadAverage.Process(spread, candle.OpenTime, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, candle.OpenTime, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m || !_primaryTrend.IsFormed)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishTrend = candle.ClosePrice >= _latestPrimaryTrend;
var bearishTrend = candle.ClosePrice <= _latestPrimaryTrend;
var bullishEntry = zScore >= EntryThreshold && bullishTrend;
var bearishEntry = zScore <= -EntryThreshold && bearishTrend;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && (zScore <= ExitThreshold || bearishEntry))
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && (zScore >= -ExitThreshold || bullishEntry))
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import StandardDeviation, SimpleMovingAverage, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class low_volatility_stocks_strategy(Strategy):
"""Low volatility anomaly strategy that trades the primary instrument when its realized volatility diverges from a benchmark instrument."""
def __init__(self):
super(low_volatility_stocks_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._volatility_period = self.Param("VolatilityPeriod", 18) \
.SetRange(5, 120) \
.SetDisplay("Volatility Period", "Lookback period used to estimate realized volatility", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the volatility spread", "Indicators")
self._trend_period = self.Param("TrendPeriod", 30) \
.SetRange(5, 200) \
.SetDisplay("Trend Period", "Trend period used to align entries with the primary instrument direction", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.1) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.25) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 6) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._primary_volatility = None
self._benchmark_volatility = None
self._spread_average = None
self._spread_deviation = None
self._primary_trend = None
self._previous_primary_close = None
self._previous_benchmark_close = None
self._latest_primary_volatility = 0.0
self._latest_benchmark_volatility = 0.0
self._latest_primary_trend = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(low_volatility_stocks_strategy, self).OnReseted()
self._benchmark = None
self._primary_volatility = None
self._benchmark_volatility = None
self._spread_average = None
self._spread_deviation = None
self._primary_trend = None
self._previous_primary_close = None
self._previous_benchmark_close = None
self._latest_primary_volatility = 0.0
self._latest_benchmark_volatility = 0.0
self._latest_primary_trend = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(low_volatility_stocks_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
vol_period = int(self._volatility_period.Value)
norm_period = int(self._normalization_period.Value)
trend_period = int(self._trend_period.Value)
self._primary_volatility = StandardDeviation()
self._primary_volatility.Length = vol_period
self._benchmark_volatility = StandardDeviation()
self._benchmark_volatility.Length = vol_period
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
self._primary_trend = SimpleMovingAverage()
self._primary_trend.Length = trend_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
trend_iv = CandleIndicatorValue(self._primary_trend, candle)
trend_iv.IsFinal = True
trend_result = self._primary_trend.Process(trend_iv)
if not trend_result.IsEmpty and self._primary_trend.IsFormed:
self._latest_primary_trend = float(trend_result)
close_price = float(candle.ClosePrice)
if self._previous_primary_close is None or self._previous_primary_close <= 0.0:
self._previous_primary_close = close_price
return
ret = (close_price - self._previous_primary_close) / self._previous_primary_close
self._previous_primary_close = close_price
abs_ret = abs(ret)
vol_result = process_float(self._primary_volatility, abs_ret, candle.OpenTime, True)
if not vol_result.IsEmpty and self._primary_volatility.IsFormed:
self._latest_primary_volatility = float(vol_result)
self._primary_updated = True
self.TryProcessSpread(candle)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
close_price = float(candle.ClosePrice)
if self._previous_benchmark_close is None or self._previous_benchmark_close <= 0.0:
self._previous_benchmark_close = close_price
return
ret = (close_price - self._previous_benchmark_close) / self._previous_benchmark_close
self._previous_benchmark_close = close_price
abs_ret = abs(ret)
vol_result = process_float(self._benchmark_volatility, abs_ret, candle.OpenTime, True)
if not vol_result.IsEmpty and self._benchmark_volatility.IsFormed:
self._latest_benchmark_volatility = float(vol_result)
self._benchmark_updated = True
self.TryProcessSpread(candle)
def TryProcessSpread(self, candle):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_benchmark_volatility - self._latest_primary_volatility
mean_result = process_float(self._spread_average, spread, candle.OpenTime, True)
mean = float(mean_result)
dev_result = process_float(self._spread_deviation, spread, candle.OpenTime, True)
deviation = float(dev_result)
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self._primary_trend.IsFormed:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
close_price = float(candle.ClosePrice)
bullish_trend = close_price >= self._latest_primary_trend
bearish_trend = close_price <= self._latest_primary_trend
bullish_entry = z_score >= entry_thresh and bullish_trend
bearish_entry = z_score <= -entry_thresh and bearish_trend
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and (z_score <= exit_thresh or bearish_entry):
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and (z_score >= -exit_thresh or bullish_entry):
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
def CreateClone(self):
return low_volatility_stocks_strategy()