Adaptive Squeeze Momentum 策略
Adaptive Squeeze Momentum 策略在布林带进入肯特纳通道时识别波动性收缩,并等待伴随强动量的突破。动量强度通过标准差阈值衡量。可选的 RSI 和 EMA 趋势过滤器帮助优化进场。ATR 可用于设置动态止损和止盈,持仓在预定的时间周期后自动关闭。
细节
- 入场条件:
- 挤压结束(布林带在肯特纳通道外)。
- 多头:动量 > 动态阈值,RSI 上穿超卖位,EMA 上升(可选)。
- 空头:动量 < -动态阈值,RSI 下穿超买位,EMA 下降(可选)。
- 方向:双向。
- 出场条件:
- 反向信号、ATR 止损/止盈或时间退出。
- 止损:可选的 ATR 止损和止盈。
- 默认参数:
BollingerPeriod= 20BollingerMultiplier= 2.0KeltnerPeriod= 20KeltnerMultiplier= 1.5MomentumLength= 12TrendMaLength= 50UseAtrStops= TrueAtrMultiplierSl= 1.5AtrMultiplierTp= 2.5AtrLength= 14MinVolatility= 0.5HoldingPeriodMultiplier= 1.5UseTrendFilter= TrueUseRsiFilter= TrueRsiLength= 14RsiOversold= 40RsiOverbought= 60MomentumMultiplier= 1.5AllowLong= TrueAllowShort= True
- 过滤器:
- 类型:波动性突破
- 方向:双向
- 指标:布林带、肯特纳通道、动量、RSI、EMA、ATR
- 止损:可选
- 复杂度:高
- 时间框架:任意
- 季节性:否
- 神经网络:否
- 背离:否
- 风险等级:中等
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Adaptive Squeeze Momentum strategy.
/// Detects squeeze release with Bollinger Bands and Keltner Channels
/// and confirms breakout using momentum.
/// </summary>
public class AdaptiveSqueezeMomentumStrategy : Strategy
{
private readonly StrategyParam<int> _bollingerPeriod;
private readonly StrategyParam<decimal> _bollingerMultiplier;
private readonly StrategyParam<int> _keltnerPeriod;
private readonly StrategyParam<decimal> _keltnerMultiplier;
private readonly StrategyParam<int> _momentumLength;
private readonly StrategyParam<int> _trendMaLength;
private readonly StrategyParam<decimal> _atrMultiplierSl;
private readonly StrategyParam<decimal> _atrMultiplierTp;
private readonly StrategyParam<int> _atrLength;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _cooldownBars;
private bool _squeezeOffPrev;
private decimal _stopPrice;
private decimal _profitTarget;
private int _cooldownRemaining;
public int BollingerPeriod { get => _bollingerPeriod.Value; set => _bollingerPeriod.Value = value; }
public decimal BollingerMultiplier { get => _bollingerMultiplier.Value; set => _bollingerMultiplier.Value = value; }
public int KeltnerPeriod { get => _keltnerPeriod.Value; set => _keltnerPeriod.Value = value; }
public decimal KeltnerMultiplier { get => _keltnerMultiplier.Value; set => _keltnerMultiplier.Value = value; }
public int MomentumLength { get => _momentumLength.Value; set => _momentumLength.Value = value; }
public int TrendMaLength { get => _trendMaLength.Value; set => _trendMaLength.Value = value; }
public decimal AtrMultiplierSl { get => _atrMultiplierSl.Value; set => _atrMultiplierSl.Value = value; }
public decimal AtrMultiplierTp { get => _atrMultiplierTp.Value; set => _atrMultiplierTp.Value = value; }
public int AtrLength { get => _atrLength.Value; set => _atrLength.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int CooldownBars { get => _cooldownBars.Value; set => _cooldownBars.Value = value; }
public AdaptiveSqueezeMomentumStrategy()
{
_bollingerPeriod = Param(nameof(BollingerPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Bollinger Period", "Periods for Bollinger Bands", "Indicators");
_bollingerMultiplier = Param(nameof(BollingerMultiplier), 2.0m)
.SetDisplay("Bollinger Multiplier", "Deviation multiplier", "Indicators");
_keltnerPeriod = Param(nameof(KeltnerPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Keltner Period", "EMA period for Keltner Channels", "Indicators");
_keltnerMultiplier = Param(nameof(KeltnerMultiplier), 1.5m)
.SetGreaterThanZero()
.SetDisplay("Keltner Multiplier", "ATR multiplier for Keltner Channels", "Indicators");
_momentumLength = Param(nameof(MomentumLength), 12)
.SetGreaterThanZero()
.SetDisplay("Momentum Length", "Periods for momentum", "Indicators");
_trendMaLength = Param(nameof(TrendMaLength), 50)
.SetGreaterThanZero()
.SetDisplay("Trend EMA Length", "EMA period for trend filter", "Indicators");
_atrMultiplierSl = Param(nameof(AtrMultiplierSl), 1.5m)
.SetGreaterThanZero()
.SetDisplay("ATR Stop Mult", "ATR multiplier for stop-loss", "Risk");
_atrMultiplierTp = Param(nameof(AtrMultiplierTp), 2.5m)
.SetGreaterThanZero()
.SetDisplay("ATR Take Mult", "ATR multiplier for take-profit", "Risk");
_atrLength = Param(nameof(AtrLength), 14)
.SetGreaterThanZero()
.SetDisplay("ATR Length", "Period for ATR", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(30).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_cooldownBars = Param(nameof(CooldownBars), 20)
.SetDisplay("Cooldown Bars", "Bars between trades", "Risk");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
=> [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_squeezeOffPrev = false;
_stopPrice = 0;
_profitTarget = 0;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var bollinger = new BollingerBands { Length = BollingerPeriod, Width = BollingerMultiplier };
var keltner = new KeltnerChannels { Length = KeltnerPeriod, Multiplier = KeltnerMultiplier };
var momentum = new Momentum { Length = MomentumLength };
var trendEma = new ExponentialMovingAverage { Length = TrendMaLength };
var atr = new AverageTrueRange { Length = AtrLength };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(bollinger, keltner, momentum, trendEma, atr, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, bollinger);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue bollingerValue, IIndicatorValue keltnerValue, IIndicatorValue momentumValue, IIndicatorValue emaValue, IIndicatorValue atrValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (bollingerValue.IsEmpty || keltnerValue.IsEmpty || momentumValue.IsEmpty || emaValue.IsEmpty || atrValue.IsEmpty)
return;
var bb = (BollingerBandsValue)bollingerValue;
var kc = (KeltnerChannelsValue)keltnerValue;
if (bb.UpBand is not decimal bbUpper || bb.LowBand is not decimal bbLower ||
kc.Upper is not decimal kcUpper || kc.Lower is not decimal kcLower)
return;
var mom = momentumValue.ToDecimal();
var trend = emaValue.ToDecimal();
var atrVal = atrValue.ToDecimal();
// Squeeze: BB inside KC
var squeezeOff = bbLower < kcLower && bbUpper > kcUpper;
if (_cooldownRemaining > 0)
{
_cooldownRemaining--;
// Still check stops
CheckStops(candle, atrVal);
_squeezeOffPrev = squeezeOff;
return;
}
// Check stops first
if (CheckStops(candle, atrVal))
{
_squeezeOffPrev = squeezeOff;
return;
}
var bullishTrend = candle.ClosePrice > trend;
var bearishTrend = candle.ClosePrice < trend;
var buySignal = _squeezeOffPrev && mom > 0 && bullishTrend;
var sellSignal = _squeezeOffPrev && mom < 0 && bearishTrend;
_squeezeOffPrev = squeezeOff;
if (buySignal && Position <= 0)
{
if (Position < 0)
BuyMarket(Math.Abs(Position));
BuyMarket(Volume);
_stopPrice = candle.ClosePrice - atrVal * AtrMultiplierSl;
_profitTarget = candle.ClosePrice + atrVal * AtrMultiplierTp;
_cooldownRemaining = CooldownBars;
}
else if (sellSignal && Position >= 0)
{
if (Position > 0)
SellMarket(Math.Abs(Position));
SellMarket(Volume);
_stopPrice = candle.ClosePrice + atrVal * AtrMultiplierSl;
_profitTarget = candle.ClosePrice - atrVal * AtrMultiplierTp;
_cooldownRemaining = CooldownBars;
}
}
private bool CheckStops(ICandleMessage candle, decimal atrVal)
{
if (Position > 0 && _stopPrice > 0)
{
if (candle.ClosePrice <= _stopPrice || candle.ClosePrice >= _profitTarget)
{
SellMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
_stopPrice = 0;
return true;
}
}
else if (Position < 0 && _stopPrice > 0)
{
if (candle.ClosePrice >= _stopPrice || candle.ClosePrice <= _profitTarget)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
_stopPrice = 0;
return true;
}
}
return false;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import BollingerBands, KeltnerChannels, Momentum, ExponentialMovingAverage, AverageTrueRange, IndicatorHelper
from StockSharp.Algo.Strategies import Strategy
class adaptive_squeeze_momentum_strategy(Strategy):
def __init__(self):
super(adaptive_squeeze_momentum_strategy, self).__init__()
self._bollinger_period = self.Param("BollingerPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Bollinger Period", "Periods for Bollinger Bands", "Indicators")
self._bollinger_multiplier = self.Param("BollingerMultiplier", 2.0) \
.SetDisplay("Bollinger Multiplier", "Deviation multiplier", "Indicators")
self._keltner_period = self.Param("KeltnerPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Keltner Period", "EMA period for Keltner Channels", "Indicators")
self._keltner_multiplier = self.Param("KeltnerMultiplier", 1.5) \
.SetGreaterThanZero() \
.SetDisplay("Keltner Multiplier", "ATR multiplier for Keltner Channels", "Indicators")
self._momentum_length = self.Param("MomentumLength", 12) \
.SetGreaterThanZero() \
.SetDisplay("Momentum Length", "Periods for momentum", "Indicators")
self._trend_ma_length = self.Param("TrendMaLength", 50) \
.SetGreaterThanZero() \
.SetDisplay("Trend EMA Length", "EMA period for trend filter", "Indicators")
self._atr_multiplier_sl = self.Param("AtrMultiplierSl", 1.5) \
.SetGreaterThanZero() \
.SetDisplay("ATR Stop Mult", "ATR multiplier for stop-loss", "Risk")
self._atr_multiplier_tp = self.Param("AtrMultiplierTp", 2.5) \
.SetGreaterThanZero() \
.SetDisplay("ATR Take Mult", "ATR multiplier for take-profit", "Risk")
self._atr_length = self.Param("AtrLength", 14) \
.SetGreaterThanZero() \
.SetDisplay("ATR Length", "Period for ATR", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(30))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._cooldown_bars = self.Param("CooldownBars", 20) \
.SetDisplay("Cooldown Bars", "Bars between trades", "Risk")
self._squeeze_off_prev = False
self._stop_price = 0.0
self._profit_target = 0.0
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(adaptive_squeeze_momentum_strategy, self).OnReseted()
self._squeeze_off_prev = False
self._stop_price = 0.0
self._profit_target = 0.0
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(adaptive_squeeze_momentum_strategy, self).OnStarted2(time)
bb = BollingerBands()
bb.Length = int(self._bollinger_period.Value)
bb.Width = self._bollinger_multiplier.Value
kc = KeltnerChannels()
kc.Length = int(self._keltner_period.Value)
kc.Multiplier = self._keltner_multiplier.Value
mom = Momentum()
mom.Length = int(self._momentum_length.Value)
trend_ema = ExponentialMovingAverage()
trend_ema.Length = int(self._trend_ma_length.Value)
atr = AverageTrueRange()
atr.Length = int(self._atr_length.Value)
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(bb, kc, mom, trend_ema, atr, self._on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, bb)
self.DrawOwnTrades(area)
def _on_process(self, candle, bb_value, kc_value, mom_value, ema_value, atr_value):
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if bb_value.IsEmpty or kc_value.IsEmpty or mom_value.IsEmpty or ema_value.IsEmpty or atr_value.IsEmpty:
return
if bb_value.UpBand is None or bb_value.LowBand is None:
return
if kc_value.Upper is None or kc_value.Lower is None:
return
bb_upper = float(bb_value.UpBand)
bb_lower = float(bb_value.LowBand)
kc_upper = float(kc_value.Upper)
kc_lower = float(kc_value.Lower)
mom_v = float(IndicatorHelper.ToDecimal(mom_value))
trend = float(IndicatorHelper.ToDecimal(ema_value))
atr_v = float(IndicatorHelper.ToDecimal(atr_value))
close = float(candle.ClosePrice)
squeeze_off = bb_lower < kc_lower and bb_upper > kc_upper
cooldown = int(self._cooldown_bars.Value)
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
self._check_stops(candle, atr_v, cooldown)
self._squeeze_off_prev = squeeze_off
return
if self._check_stops(candle, atr_v, cooldown):
self._squeeze_off_prev = squeeze_off
return
bullish_trend = close > trend
bearish_trend = close < trend
buy_signal = self._squeeze_off_prev and mom_v > 0 and bullish_trend
sell_signal = self._squeeze_off_prev and mom_v < 0 and bearish_trend
self._squeeze_off_prev = squeeze_off
sl_mult = float(self._atr_multiplier_sl.Value)
tp_mult = float(self._atr_multiplier_tp.Value)
if buy_signal and self.Position <= 0:
if self.Position < 0:
self.BuyMarket(Math.Abs(self.Position))
self.BuyMarket(self.Volume)
self._stop_price = close - atr_v * sl_mult
self._profit_target = close + atr_v * tp_mult
self._cooldown_remaining = cooldown
elif sell_signal and self.Position >= 0:
if self.Position > 0:
self.SellMarket(Math.Abs(self.Position))
self.SellMarket(self.Volume)
self._stop_price = close + atr_v * sl_mult
self._profit_target = close - atr_v * tp_mult
self._cooldown_remaining = cooldown
def _check_stops(self, candle, atr_v, cooldown):
close = float(candle.ClosePrice)
if self.Position > 0 and self._stop_price > 0:
if close <= self._stop_price or close >= self._profit_target:
self.SellMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._stop_price = 0.0
return True
elif self.Position < 0 and self._stop_price > 0:
if close >= self._stop_price or close <= self._profit_target:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._stop_price = 0.0
return True
return False
def CreateClone(self):
return adaptive_squeeze_momentum_strategy()