Стратегия Adaptive Squeeze Momentum
Стратегия Adaptive Squeeze Momentum ищет периоды сжатия волатильности, когда полосы Боллинджера находятся внутри каналов Кельтнера, и ждёт пробоя, сопровождающегося сильным импульсом. Сила импульса оценивается через порог на основе стандартного отклонения. Дополнительные фильтры RSI и EMA тренда уточняют точки входа. ATR может использоваться для динамических стоп-лоссов и тейк-профитов, а позиции закрываются по истечении заданного времени удержания.
Подробности
- Условия входа:
- Выход из сжатия (полосы Боллинджера вне каналов Кельтнера).
- Лонг: импульс > динамического порога, RSI пересекает снизу уровень перепроданности, EMA растёт (опц.).
- Шорт: импульс < -динамического порога, RSI пересекает сверху уровень перекупленности, EMA падает (опц.).
- Направление: обе стороны.
- Условия выхода:
- Противоположный сигнал, ATR-стоп/тейк-профит или выход по времени.
- Стопы: опциональные ATR стоп-лосс и тейк-профит.
- Параметры по умолчанию:
BollingerPeriod= 20BollingerMultiplier= 2.0KeltnerPeriod= 20KeltnerMultiplier= 1.5MomentumLength= 12TrendMaLength= 50UseAtrStops= TrueAtrMultiplierSl= 1.5AtrMultiplierTp= 2.5AtrLength= 14MinVolatility= 0.5HoldingPeriodMultiplier= 1.5UseTrendFilter= TrueUseRsiFilter= TrueRsiLength= 14RsiOversold= 40RsiOverbought= 60MomentumMultiplier= 1.5AllowLong= TrueAllowShort= True
- Фильтры:
- Категория: Волатильностный пробой
- Направление: Обе
- Индикаторы: Полосы Боллинджера, каналы Кельтнера, импульс, RSI, EMA, ATR
- Стопы: Опционально
- Сложность: Высокая
- Таймфрейм: Любой
- Сезонность: Нет
- Нейросети: Нет
- Дивергенция: Нет
- Уровень риска: Средний
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Adaptive Squeeze Momentum strategy.
/// Detects squeeze release with Bollinger Bands and Keltner Channels
/// and confirms breakout using momentum.
/// </summary>
public class AdaptiveSqueezeMomentumStrategy : Strategy
{
private readonly StrategyParam<int> _bollingerPeriod;
private readonly StrategyParam<decimal> _bollingerMultiplier;
private readonly StrategyParam<int> _keltnerPeriod;
private readonly StrategyParam<decimal> _keltnerMultiplier;
private readonly StrategyParam<int> _momentumLength;
private readonly StrategyParam<int> _trendMaLength;
private readonly StrategyParam<decimal> _atrMultiplierSl;
private readonly StrategyParam<decimal> _atrMultiplierTp;
private readonly StrategyParam<int> _atrLength;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _cooldownBars;
private bool _squeezeOffPrev;
private decimal _stopPrice;
private decimal _profitTarget;
private int _cooldownRemaining;
public int BollingerPeriod { get => _bollingerPeriod.Value; set => _bollingerPeriod.Value = value; }
public decimal BollingerMultiplier { get => _bollingerMultiplier.Value; set => _bollingerMultiplier.Value = value; }
public int KeltnerPeriod { get => _keltnerPeriod.Value; set => _keltnerPeriod.Value = value; }
public decimal KeltnerMultiplier { get => _keltnerMultiplier.Value; set => _keltnerMultiplier.Value = value; }
public int MomentumLength { get => _momentumLength.Value; set => _momentumLength.Value = value; }
public int TrendMaLength { get => _trendMaLength.Value; set => _trendMaLength.Value = value; }
public decimal AtrMultiplierSl { get => _atrMultiplierSl.Value; set => _atrMultiplierSl.Value = value; }
public decimal AtrMultiplierTp { get => _atrMultiplierTp.Value; set => _atrMultiplierTp.Value = value; }
public int AtrLength { get => _atrLength.Value; set => _atrLength.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int CooldownBars { get => _cooldownBars.Value; set => _cooldownBars.Value = value; }
public AdaptiveSqueezeMomentumStrategy()
{
_bollingerPeriod = Param(nameof(BollingerPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Bollinger Period", "Periods for Bollinger Bands", "Indicators");
_bollingerMultiplier = Param(nameof(BollingerMultiplier), 2.0m)
.SetDisplay("Bollinger Multiplier", "Deviation multiplier", "Indicators");
_keltnerPeriod = Param(nameof(KeltnerPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Keltner Period", "EMA period for Keltner Channels", "Indicators");
_keltnerMultiplier = Param(nameof(KeltnerMultiplier), 1.5m)
.SetGreaterThanZero()
.SetDisplay("Keltner Multiplier", "ATR multiplier for Keltner Channels", "Indicators");
_momentumLength = Param(nameof(MomentumLength), 12)
.SetGreaterThanZero()
.SetDisplay("Momentum Length", "Periods for momentum", "Indicators");
_trendMaLength = Param(nameof(TrendMaLength), 50)
.SetGreaterThanZero()
.SetDisplay("Trend EMA Length", "EMA period for trend filter", "Indicators");
_atrMultiplierSl = Param(nameof(AtrMultiplierSl), 1.5m)
.SetGreaterThanZero()
.SetDisplay("ATR Stop Mult", "ATR multiplier for stop-loss", "Risk");
_atrMultiplierTp = Param(nameof(AtrMultiplierTp), 2.5m)
.SetGreaterThanZero()
.SetDisplay("ATR Take Mult", "ATR multiplier for take-profit", "Risk");
_atrLength = Param(nameof(AtrLength), 14)
.SetGreaterThanZero()
.SetDisplay("ATR Length", "Period for ATR", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(30).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_cooldownBars = Param(nameof(CooldownBars), 20)
.SetDisplay("Cooldown Bars", "Bars between trades", "Risk");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
=> [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_squeezeOffPrev = false;
_stopPrice = 0;
_profitTarget = 0;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var bollinger = new BollingerBands { Length = BollingerPeriod, Width = BollingerMultiplier };
var keltner = new KeltnerChannels { Length = KeltnerPeriod, Multiplier = KeltnerMultiplier };
var momentum = new Momentum { Length = MomentumLength };
var trendEma = new ExponentialMovingAverage { Length = TrendMaLength };
var atr = new AverageTrueRange { Length = AtrLength };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(bollinger, keltner, momentum, trendEma, atr, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, bollinger);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue bollingerValue, IIndicatorValue keltnerValue, IIndicatorValue momentumValue, IIndicatorValue emaValue, IIndicatorValue atrValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (bollingerValue.IsEmpty || keltnerValue.IsEmpty || momentumValue.IsEmpty || emaValue.IsEmpty || atrValue.IsEmpty)
return;
var bb = (BollingerBandsValue)bollingerValue;
var kc = (KeltnerChannelsValue)keltnerValue;
if (bb.UpBand is not decimal bbUpper || bb.LowBand is not decimal bbLower ||
kc.Upper is not decimal kcUpper || kc.Lower is not decimal kcLower)
return;
var mom = momentumValue.ToDecimal();
var trend = emaValue.ToDecimal();
var atrVal = atrValue.ToDecimal();
// Squeeze: BB inside KC
var squeezeOff = bbLower < kcLower && bbUpper > kcUpper;
if (_cooldownRemaining > 0)
{
_cooldownRemaining--;
// Still check stops
CheckStops(candle, atrVal);
_squeezeOffPrev = squeezeOff;
return;
}
// Check stops first
if (CheckStops(candle, atrVal))
{
_squeezeOffPrev = squeezeOff;
return;
}
var bullishTrend = candle.ClosePrice > trend;
var bearishTrend = candle.ClosePrice < trend;
var buySignal = _squeezeOffPrev && mom > 0 && bullishTrend;
var sellSignal = _squeezeOffPrev && mom < 0 && bearishTrend;
_squeezeOffPrev = squeezeOff;
if (buySignal && Position <= 0)
{
if (Position < 0)
BuyMarket(Math.Abs(Position));
BuyMarket(Volume);
_stopPrice = candle.ClosePrice - atrVal * AtrMultiplierSl;
_profitTarget = candle.ClosePrice + atrVal * AtrMultiplierTp;
_cooldownRemaining = CooldownBars;
}
else if (sellSignal && Position >= 0)
{
if (Position > 0)
SellMarket(Math.Abs(Position));
SellMarket(Volume);
_stopPrice = candle.ClosePrice + atrVal * AtrMultiplierSl;
_profitTarget = candle.ClosePrice - atrVal * AtrMultiplierTp;
_cooldownRemaining = CooldownBars;
}
}
private bool CheckStops(ICandleMessage candle, decimal atrVal)
{
if (Position > 0 && _stopPrice > 0)
{
if (candle.ClosePrice <= _stopPrice || candle.ClosePrice >= _profitTarget)
{
SellMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
_stopPrice = 0;
return true;
}
}
else if (Position < 0 && _stopPrice > 0)
{
if (candle.ClosePrice >= _stopPrice || candle.ClosePrice <= _profitTarget)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
_stopPrice = 0;
return true;
}
}
return false;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import BollingerBands, KeltnerChannels, Momentum, ExponentialMovingAverage, AverageTrueRange, IndicatorHelper
from StockSharp.Algo.Strategies import Strategy
class adaptive_squeeze_momentum_strategy(Strategy):
def __init__(self):
super(adaptive_squeeze_momentum_strategy, self).__init__()
self._bollinger_period = self.Param("BollingerPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Bollinger Period", "Periods for Bollinger Bands", "Indicators")
self._bollinger_multiplier = self.Param("BollingerMultiplier", 2.0) \
.SetDisplay("Bollinger Multiplier", "Deviation multiplier", "Indicators")
self._keltner_period = self.Param("KeltnerPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Keltner Period", "EMA period for Keltner Channels", "Indicators")
self._keltner_multiplier = self.Param("KeltnerMultiplier", 1.5) \
.SetGreaterThanZero() \
.SetDisplay("Keltner Multiplier", "ATR multiplier for Keltner Channels", "Indicators")
self._momentum_length = self.Param("MomentumLength", 12) \
.SetGreaterThanZero() \
.SetDisplay("Momentum Length", "Periods for momentum", "Indicators")
self._trend_ma_length = self.Param("TrendMaLength", 50) \
.SetGreaterThanZero() \
.SetDisplay("Trend EMA Length", "EMA period for trend filter", "Indicators")
self._atr_multiplier_sl = self.Param("AtrMultiplierSl", 1.5) \
.SetGreaterThanZero() \
.SetDisplay("ATR Stop Mult", "ATR multiplier for stop-loss", "Risk")
self._atr_multiplier_tp = self.Param("AtrMultiplierTp", 2.5) \
.SetGreaterThanZero() \
.SetDisplay("ATR Take Mult", "ATR multiplier for take-profit", "Risk")
self._atr_length = self.Param("AtrLength", 14) \
.SetGreaterThanZero() \
.SetDisplay("ATR Length", "Period for ATR", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(30))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._cooldown_bars = self.Param("CooldownBars", 20) \
.SetDisplay("Cooldown Bars", "Bars between trades", "Risk")
self._squeeze_off_prev = False
self._stop_price = 0.0
self._profit_target = 0.0
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(adaptive_squeeze_momentum_strategy, self).OnReseted()
self._squeeze_off_prev = False
self._stop_price = 0.0
self._profit_target = 0.0
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(adaptive_squeeze_momentum_strategy, self).OnStarted2(time)
bb = BollingerBands()
bb.Length = int(self._bollinger_period.Value)
bb.Width = self._bollinger_multiplier.Value
kc = KeltnerChannels()
kc.Length = int(self._keltner_period.Value)
kc.Multiplier = self._keltner_multiplier.Value
mom = Momentum()
mom.Length = int(self._momentum_length.Value)
trend_ema = ExponentialMovingAverage()
trend_ema.Length = int(self._trend_ma_length.Value)
atr = AverageTrueRange()
atr.Length = int(self._atr_length.Value)
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(bb, kc, mom, trend_ema, atr, self._on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, bb)
self.DrawOwnTrades(area)
def _on_process(self, candle, bb_value, kc_value, mom_value, ema_value, atr_value):
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if bb_value.IsEmpty or kc_value.IsEmpty or mom_value.IsEmpty or ema_value.IsEmpty or atr_value.IsEmpty:
return
if bb_value.UpBand is None or bb_value.LowBand is None:
return
if kc_value.Upper is None or kc_value.Lower is None:
return
bb_upper = float(bb_value.UpBand)
bb_lower = float(bb_value.LowBand)
kc_upper = float(kc_value.Upper)
kc_lower = float(kc_value.Lower)
mom_v = float(IndicatorHelper.ToDecimal(mom_value))
trend = float(IndicatorHelper.ToDecimal(ema_value))
atr_v = float(IndicatorHelper.ToDecimal(atr_value))
close = float(candle.ClosePrice)
squeeze_off = bb_lower < kc_lower and bb_upper > kc_upper
cooldown = int(self._cooldown_bars.Value)
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
self._check_stops(candle, atr_v, cooldown)
self._squeeze_off_prev = squeeze_off
return
if self._check_stops(candle, atr_v, cooldown):
self._squeeze_off_prev = squeeze_off
return
bullish_trend = close > trend
bearish_trend = close < trend
buy_signal = self._squeeze_off_prev and mom_v > 0 and bullish_trend
sell_signal = self._squeeze_off_prev and mom_v < 0 and bearish_trend
self._squeeze_off_prev = squeeze_off
sl_mult = float(self._atr_multiplier_sl.Value)
tp_mult = float(self._atr_multiplier_tp.Value)
if buy_signal and self.Position <= 0:
if self.Position < 0:
self.BuyMarket(Math.Abs(self.Position))
self.BuyMarket(self.Volume)
self._stop_price = close - atr_v * sl_mult
self._profit_target = close + atr_v * tp_mult
self._cooldown_remaining = cooldown
elif sell_signal and self.Position >= 0:
if self.Position > 0:
self.SellMarket(Math.Abs(self.Position))
self.SellMarket(self.Volume)
self._stop_price = close + atr_v * sl_mult
self._profit_target = close - atr_v * tp_mult
self._cooldown_remaining = cooldown
def _check_stops(self, candle, atr_v, cooldown):
close = float(candle.ClosePrice)
if self.Position > 0 and self._stop_price > 0:
if close <= self._stop_price or close >= self._profit_target:
self.SellMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._stop_price = 0.0
return True
elif self.Position < 0 and self._stop_price > 0:
if close >= self._stop_price or close <= self._profit_target:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._stop_price = 0.0
return True
return False
def CreateClone(self):
return adaptive_squeeze_momentum_strategy()