Волатильный импульс с динамическим выходом
Стратегия ловит расширение волатильности по ATR и подтверждение по импульсу цены. Вход выполняется в направлении импульса, выход осуществляется после фиксированного числа баров с динамическим стопом и тейк-профитом, рассчитанными по ATR.
Детали
- Критерий входа: расширение ATR и подтверждающий импульс
- Лонг/Шорт: оба направления
- Критерий выхода: стоп и тейк-профит после удержания позиции
- Стопы: по ATR, тейк-профит по соотношению риск/прибыль
- Параметры по умолчанию:
AtrLength= 14MomentumLength= 20VolThreshold= 0.5MinVolatility= 1.0ExitBars= 42RiskReward= 2
- Фильтры:
- Категория: Волатильность
- Направление: Оба
- Индикаторы: ATR, SMA, Momentum
- Стопы: Да
- Сложность: Средняя
- Таймфрейм: Внутридневной
- Сезонность: Нет
- Нейросети: Нет
- Дивергенция: Нет
- Уровень риска: Средний
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy using volatility expansion with momentum confirmation and dynamic exits.
/// Uses StdDev as volatility proxy and manual momentum calculation.
/// Enters on vol expansion + momentum, exits on TP/SL or time-based exit.
/// </summary>
public class VolatilityPulseWithDynamicExitStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _stdLength;
private readonly StrategyParam<int> _momentumLength;
private readonly StrategyParam<decimal> _volThreshold;
private readonly StrategyParam<int> _exitBars;
private readonly StrategyParam<decimal> _riskReward;
private readonly StrategyParam<decimal> _stopPct;
private readonly List<decimal> _closes = new();
private int _barIndex;
private int _entryBarIndex;
private decimal _entryPrice;
private decimal _stopDist;
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int StdLength { get => _stdLength.Value; set => _stdLength.Value = value; }
public int MomentumLength { get => _momentumLength.Value; set => _momentumLength.Value = value; }
public decimal VolThreshold { get => _volThreshold.Value; set => _volThreshold.Value = value; }
public int ExitBars { get => _exitBars.Value; set => _exitBars.Value = value; }
public decimal RiskReward { get => _riskReward.Value; set => _riskReward.Value = value; }
public decimal StopPct { get => _stopPct.Value; set => _stopPct.Value = value; }
public VolatilityPulseWithDynamicExitStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
_stdLength = Param(nameof(StdLength), 14)
.SetGreaterThanZero()
.SetDisplay("StdDev Length", "Volatility period", "Parameters");
_momentumLength = Param(nameof(MomentumLength), 20)
.SetGreaterThanZero()
.SetDisplay("Momentum Length", "Momentum lookback", "Parameters");
_volThreshold = Param(nameof(VolThreshold), 1.2m)
.SetGreaterThanZero()
.SetDisplay("Vol Threshold", "StdDev expansion multiplier", "Parameters");
_exitBars = Param(nameof(ExitBars), 42)
.SetGreaterThanZero()
.SetDisplay("Exit Bars", "Time-based exit after N bars", "Risk");
_riskReward = Param(nameof(RiskReward), 2m)
.SetGreaterThanZero()
.SetDisplay("Risk Reward", "TP to SL ratio", "Risk");
_stopPct = Param(nameof(StopPct), 1m)
.SetGreaterThanZero()
.SetDisplay("Stop %", "Stop loss percent", "Risk");
}
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
protected override void OnReseted()
{
base.OnReseted();
_closes.Clear();
_barIndex = 0;
_entryBarIndex = -1;
_entryPrice = 0;
_stopDist = 0;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var stdDev = new StandardDeviation { Length = StdLength };
var sma = new SimpleMovingAverage { Length = StdLength };
_closes.Clear();
_barIndex = 0;
_entryBarIndex = -1;
_entryPrice = 0;
_stopDist = 0;
var subscription = SubscribeCandles(CandleType);
subscription.Bind(stdDev, sma, ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, stdDev);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal stdVal, decimal smaVal)
{
if (candle.State != CandleStates.Finished)
return;
var close = candle.ClosePrice;
_closes.Add(close);
while (_closes.Count > MomentumLength + 1)
_closes.RemoveAt(0);
_barIndex++;
if (_closes.Count <= MomentumLength || stdVal <= 0 || smaVal <= 0)
return;
// Momentum = current close - close N bars ago
var momentum = close - _closes[0];
// Volatility expansion: stdDev relative to price vs average
var volRatio = stdVal / smaVal;
var volExpansion = volRatio > VolThreshold * 0.01m;
var momentumUp = momentum > 0;
var momentumDown = momentum < 0;
// TP/SL management
if (Position > 0 && _entryPrice > 0 && _stopDist > 0)
{
var sl = _entryPrice - _stopDist;
var tp = _entryPrice + _stopDist * RiskReward;
if (close <= sl || close >= tp)
{
SellMarket();
_entryPrice = 0;
_stopDist = 0;
_entryBarIndex = -1;
}
// Time-based exit
else if (_entryBarIndex >= 0 && _barIndex - _entryBarIndex >= ExitBars)
{
SellMarket();
_entryPrice = 0;
_stopDist = 0;
_entryBarIndex = -1;
}
}
else if (Position < 0 && _entryPrice > 0 && _stopDist > 0)
{
var sl = _entryPrice + _stopDist;
var tp = _entryPrice - _stopDist * RiskReward;
if (close >= sl || close <= tp)
{
BuyMarket();
_entryPrice = 0;
_stopDist = 0;
_entryBarIndex = -1;
}
// Time-based exit
else if (_entryBarIndex >= 0 && _barIndex - _entryBarIndex >= ExitBars)
{
BuyMarket();
_entryPrice = 0;
_stopDist = 0;
_entryBarIndex = -1;
}
}
// Entry signals
if (Position <= 0 && volExpansion && momentumUp)
{
BuyMarket();
_entryPrice = close;
_stopDist = close * StopPct / 100m;
_entryBarIndex = _barIndex;
}
else if (Position >= 0 && volExpansion && momentumDown)
{
SellMarket();
_entryPrice = close;
_stopDist = close * StopPct / 100m;
_entryBarIndex = _barIndex;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
class volatility_pulse_with_dynamic_exit_strategy(Strategy):
def __init__(self):
super(volatility_pulse_with_dynamic_exit_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._std_length = self.Param("StdLength", 14) \
.SetDisplay("StdDev Length", "Volatility period", "Parameters")
self._momentum_length = self.Param("MomentumLength", 20) \
.SetDisplay("Momentum Length", "Momentum lookback", "Parameters")
self._vol_threshold = self.Param("VolThreshold", 1.2) \
.SetDisplay("Vol Threshold", "StdDev expansion multiplier", "Parameters")
self._exit_bars = self.Param("ExitBars", 42) \
.SetDisplay("Exit Bars", "Time-based exit after N bars", "Risk")
self._risk_reward = self.Param("RiskReward", 2) \
.SetDisplay("Risk Reward", "TP to SL ratio", "Risk")
self._stop_pct = self.Param("StopPct", 1) \
.SetDisplay("Stop %", "Stop loss percent", "Risk")
self._closes = []
self._bar_index = 0
self._entry_bar_index = -1
self._entry_price = 0.0
self._stop_dist = 0.0
@property
def candle_type(self):
return self._candle_type.Value
@property
def std_length(self):
return self._std_length.Value
@property
def momentum_length(self):
return self._momentum_length.Value
@property
def vol_threshold(self):
return self._vol_threshold.Value
@property
def exit_bars(self):
return self._exit_bars.Value
@property
def risk_reward(self):
return self._risk_reward.Value
@property
def stop_pct(self):
return self._stop_pct.Value
def OnReseted(self):
super(volatility_pulse_with_dynamic_exit_strategy, self).OnReseted()
self._closes = []
self._bar_index = 0
self._entry_bar_index = -1
self._entry_price = 0.0
self._stop_dist = 0.0
def OnStarted2(self, time):
super(volatility_pulse_with_dynamic_exit_strategy, self).OnStarted2(time)
std_dev = StandardDeviation()
std_dev.Length = self.std_length
sma = SimpleMovingAverage()
sma.Length = self.std_length
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(std_dev, sma, self.on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, std_dev)
self.DrawOwnTrades(area)
def on_process(self, candle, std_val, sma_val):
if candle.State != CandleStates.Finished:
return
std_val = float(std_val)
sma_val = float(sma_val)
close = float(candle.ClosePrice)
self._closes.append(close)
while len(self._closes) > self.momentum_length + 1:
self._closes.pop(0)
self._bar_index += 1
if len(self._closes) <= self.momentum_length or std_val <= 0 or sma_val <= 0:
return
# Momentum = current close - close N bars ago
momentum = close - self._closes[0]
# Volatility expansion: stdDev relative to price vs average
vol_ratio = std_val / sma_val
vol_expansion = vol_ratio > float(self.vol_threshold) * 0.01
momentum_up = momentum > 0
momentum_down = momentum < 0
# TP/SL management
if self.Position > 0 and self._entry_price > 0 and self._stop_dist > 0:
sl = self._entry_price - self._stop_dist
tp = self._entry_price + self._stop_dist * float(self.risk_reward)
if close <= sl or close >= tp:
self.SellMarket()
self._entry_price = 0
self._stop_dist = 0
self._entry_bar_index = -1
# Time-based exit
elif self._entry_bar_index >= 0 and self._bar_index - self._entry_bar_index >= self.exit_bars:
self.SellMarket()
self._entry_price = 0
self._stop_dist = 0
self._entry_bar_index = -1
elif self.Position < 0 and self._entry_price > 0 and self._stop_dist > 0:
sl = self._entry_price + self._stop_dist
tp = self._entry_price - self._stop_dist * float(self.risk_reward)
if close >= sl or close <= tp:
self.BuyMarket()
self._entry_price = 0
self._stop_dist = 0
self._entry_bar_index = -1
# Time-based exit
elif self._entry_bar_index >= 0 and self._bar_index - self._entry_bar_index >= self.exit_bars:
self.BuyMarket()
self._entry_price = 0
self._stop_dist = 0
self._entry_bar_index = -1
# Entry signals
if self.Position <= 0 and vol_expansion and momentum_up:
self.BuyMarket()
self._entry_price = close
self._stop_dist = close * float(self.stop_pct) / 100.0
self._entry_bar_index = self._bar_index
elif self.Position >= 0 and vol_expansion and momentum_down:
self.SellMarket()
self._entry_price = close
self._stop_dist = close * float(self.stop_pct) / 100.0
self._entry_bar_index = self._bar_index
def CreateClone(self):
return volatility_pulse_with_dynamic_exit_strategy()