Donchian Volatility Contraction
Donchian Volatility Contraction 策略基于 Donchian Channel breakout after volatility contraction。
测试表明年均收益约为 187%,该策略在股票市场表现最佳。
当 Donchian confirms volatility contraction patterns 在日内(5m)数据上得到确认时触发信号,适合积极交易者。
止损依赖于 ATR 倍数以及 DonchianPeriod, AtrPeriod 等参数,可根据需要调整以平衡风险与收益。
详情
- 入场条件:参见指标条件实现.
- 多空方向:双向.
- 退出条件:反向信号或止损逻辑.
- 止损:是,基于指标计算.
- 默认值:
DonchianPeriod = 20AtrPeriod = 14VolatilityFactor = 2.0mCandleType = TimeSpan.FromMinutes(5).TimeFrame()
- 过滤器:
- 分类: 趋势跟随
- 方向: 双向
- 指标: Donchian
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内 (5m)
- 季节性: 否
- 神经网络: 否
- 背离: 否
- 风险等级: 中等
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Breakout strategy that waits for Donchian channel contraction before trading a break of the previous channel.
/// </summary>
public class DonchianWithVolatilityContractionStrategy : Strategy
{
private readonly StrategyParam<int> _donchianPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _volatilityFactor;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<DataType> _candleType;
private Highest _donchianHigh;
private Lowest _donchianLow;
private AverageTrueRange _atr;
private SimpleMovingAverage _widthAverage;
private StandardDeviation _widthStdDev;
private decimal _previousHigh;
private decimal _previousLow;
private decimal _previousWidth;
private decimal _widthAverageValue;
private decimal _widthStdDevValue;
private bool _isInitialized;
private int _cooldown;
/// <summary>
/// Donchian channel period.
/// </summary>
public int DonchianPeriod
{
get => _donchianPeriod.Value;
set => _donchianPeriod.Value = value;
}
/// <summary>
/// ATR period.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// Standard deviation multiplier for contraction detection.
/// </summary>
public decimal VolatilityFactor
{
get => _volatilityFactor.Value;
set => _volatilityFactor.Value = value;
}
/// <summary>
/// Bars to wait after each order.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Candle type.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes strategy parameters.
/// </summary>
public DonchianWithVolatilityContractionStrategy()
{
_donchianPeriod = Param(nameof(DonchianPeriod), 20)
.SetRange(2, 100)
.SetDisplay("Donchian Period", "Period for the Donchian channel", "Indicators");
_atrPeriod = Param(nameof(AtrPeriod), 14)
.SetRange(2, 100)
.SetDisplay("ATR Period", "Period for the ATR", "Indicators");
_volatilityFactor = Param(nameof(VolatilityFactor), 0.8m)
.SetRange(0.1m, 5m)
.SetDisplay("Volatility Factor", "Standard deviation multiplier for contraction detection", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 72)
.SetRange(1, 500)
.SetDisplay("Cooldown Bars", "Bars to wait after each order", "Risk");
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_donchianHigh = null;
_donchianLow = null;
_atr = null;
_widthAverage = null;
_widthStdDev = null;
_previousHigh = 0m;
_previousLow = 0m;
_previousWidth = 0m;
_widthAverageValue = 0m;
_widthStdDevValue = 0m;
_isInitialized = false;
_cooldown = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Security is not specified.");
_donchianHigh = new Highest { Length = DonchianPeriod };
_donchianLow = new Lowest { Length = DonchianPeriod };
_atr = new AverageTrueRange { Length = AtrPeriod };
_widthAverage = new SimpleMovingAverage { Length = DonchianPeriod };
_widthStdDev = new StandardDeviation { Length = DonchianPeriod };
_isInitialized = false;
_cooldown = 0;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_donchianHigh, _donchianLow, _atr, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _atr);
DrawOwnTrades(area);
}
StartProtection(new Unit(0, UnitTypes.Absolute), new Unit(StopLossPercent, UnitTypes.Percent), false);
}
private void ProcessCandle(ICandleMessage candle, decimal donchianHigh, decimal donchianLow, decimal atrValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!_donchianHigh.IsFormed || !_donchianLow.IsFormed || !_atr.IsFormed)
return;
if (!_isInitialized)
{
_previousHigh = donchianHigh;
_previousLow = donchianLow;
_previousWidth = donchianHigh - donchianLow;
_widthAverageValue = _widthAverage.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
_widthStdDevValue = _widthStdDev.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
_isInitialized = true;
return;
}
if (!_widthAverage.IsFormed || !_widthStdDev.IsFormed)
{
_previousHigh = donchianHigh;
_previousLow = donchianLow;
_previousWidth = donchianHigh - donchianLow;
_widthAverageValue = _widthAverage.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
_widthStdDevValue = _widthStdDev.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
return;
}
if (ProcessState != ProcessStates.Started)
return;
if (_cooldown > 0)
{
_cooldown--;
UpdateChannelStatistics(candle, donchianHigh, donchianLow);
return;
}
var price = candle.ClosePrice;
var channelMiddle = (_previousHigh + _previousLow) / 2m;
var volatilityThreshold = Math.Max(_widthAverageValue - VolatilityFactor * _widthStdDevValue, Security.PriceStep ?? 1m);
var isVolatilityContracted = _previousWidth <= volatilityThreshold;
if (Position == 0)
{
if (isVolatilityContracted && price >= _previousHigh + atrValue * 0.05m)
{
BuyMarket();
_cooldown = CooldownBars;
}
else if (isVolatilityContracted && price <= _previousLow - atrValue * 0.05m)
{
SellMarket();
_cooldown = CooldownBars;
}
}
else if (Position > 0)
{
if (price <= channelMiddle)
{
SellMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
}
else if (Position < 0)
{
if (price >= channelMiddle)
{
BuyMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
}
UpdateChannelStatistics(candle, donchianHigh, donchianLow);
}
private void UpdateChannelStatistics(ICandleMessage candle, decimal donchianHigh, decimal donchianLow)
{
_previousHigh = donchianHigh;
_previousLow = donchianLow;
_previousWidth = donchianHigh - donchianLow;
_widthAverageValue = _widthAverage.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
_widthStdDevValue = _widthStdDev.Process(_previousWidth, candle.OpenTime, true).ToDecimal();
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math, Decimal
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import Highest, Lowest, AverageTrueRange, SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
from indicator_extensions import *
class donchian_volatility_contraction_strategy(Strategy):
"""
Breakout strategy that waits for Donchian channel contraction before trading a break of the previous channel.
"""
def __init__(self):
super(donchian_volatility_contraction_strategy, self).__init__()
self._donchian_period = self.Param("DonchianPeriod", 20) \
.SetRange(2, 100) \
.SetDisplay("Donchian Period", "Period for the Donchian channel", "Indicators")
self._atr_period = self.Param("AtrPeriod", 14) \
.SetRange(2, 100) \
.SetDisplay("ATR Period", "Period for the ATR", "Indicators")
self._volatility_factor = self.Param("VolatilityFactor", 0.8) \
.SetRange(0.1, 5.0) \
.SetDisplay("Volatility Factor", "Standard deviation multiplier for contraction detection", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 72) \
.SetRange(1, 500) \
.SetDisplay("Cooldown Bars", "Bars to wait after each order", "Risk")
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._previous_high = 0.0
self._previous_low = 0.0
self._previous_width = 0.0
self._width_average_value = 0.0
self._width_std_dev_value = 0.0
self._is_initialized = False
self._cooldown = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(donchian_volatility_contraction_strategy, self).OnReseted()
self._previous_high = 0.0
self._previous_low = 0.0
self._previous_width = 0.0
self._width_average_value = 0.0
self._width_std_dev_value = 0.0
self._is_initialized = False
self._cooldown = 0
def OnStarted2(self, time):
super(donchian_volatility_contraction_strategy, self).OnStarted2(time)
donchian_high = Highest()
donchian_high.Length = int(self._donchian_period.Value)
donchian_low = Lowest()
donchian_low.Length = int(self._donchian_period.Value)
atr = AverageTrueRange()
atr.Length = int(self._atr_period.Value)
self._width_average = SimpleMovingAverage()
self._width_average.Length = int(self._donchian_period.Value)
self._width_std_dev = StandardDeviation()
self._width_std_dev.Length = int(self._donchian_period.Value)
self._is_initialized = False
self._cooldown = 0
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(donchian_high, donchian_low, atr, self._process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, atr)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(0, UnitTypes.Absolute),
Unit(self._stop_loss_percent.Value, UnitTypes.Percent),
False
)
def _process_width(self, width, open_time):
self._width_average_value = float(process_float(self._width_average, Decimal(width), open_time, True))
self._width_std_dev_value = float(process_float(self._width_std_dev, Decimal(width), open_time, True))
def _process_candle(self, candle, donchian_high_val, donchian_low_val, atr_value):
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
dh = float(donchian_high_val)
dl = float(donchian_low_val)
width = dh - dl
self._previous_high = dh
self._previous_low = dl
self._previous_width = width
self._process_width(width, candle.OpenTime)
if not self._is_initialized:
self._is_initialized = True
return
dh = float(donchian_high_val)
dl = float(donchian_low_val)
atr_val = float(atr_value)
if not self._is_initialized:
self._previous_high = dh
self._previous_low = dl
self._previous_width = dh - dl
self._process_width(self._previous_width, candle.OpenTime)
self._is_initialized = True
return
if not self._width_average.IsFormed or not self._width_std_dev.IsFormed:
self._previous_high = dh
self._previous_low = dl
self._previous_width = dh - dl
self._process_width(self._previous_width, candle.OpenTime)
return
if self._cooldown > 0:
self._cooldown -= 1
self._update_channel_stats(candle, dh, dl)
return
price = float(candle.ClosePrice)
channel_middle = (self._previous_high + self._previous_low) / 2.0
step = 1.0
if self.Security is not None and self.Security.PriceStep is not None:
step = float(self.Security.PriceStep)
if step <= 0:
step = 1.0
vf = float(self._volatility_factor.Value)
vol_threshold = max(self._width_average_value - vf * self._width_std_dev_value, step)
is_contracted = self._previous_width <= vol_threshold
cd = int(self._cooldown_bars.Value)
if self.Position == 0:
if is_contracted and price >= self._previous_high + atr_val * 0.05:
self.BuyMarket()
self._cooldown = cd
elif is_contracted and price <= self._previous_low - atr_val * 0.05:
self.SellMarket()
self._cooldown = cd
elif self.Position > 0:
if price <= channel_middle:
self.SellMarket(Math.Abs(self.Position))
self._cooldown = cd
elif self.Position < 0:
if price >= channel_middle:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown = cd
self._update_channel_stats(candle, dh, dl)
def _update_channel_stats(self, candle, dh, dl):
self._previous_high = dh
self._previous_low = dl
self._previous_width = dh - dl
self._process_width(self._previous_width, candle.OpenTime)
def CreateClone(self):
return donchian_volatility_contraction_strategy()