TTM挤压策略
TTM挤压策略寻找价格波动被压缩的阶段,即布林带收缩至肯特纳通道内。当出现这种“挤压”时,可能预示即将发生的波动扩张。策略在挤压期间使用线性回归动量振荡器和RSI判断方向,一旦挤压解除且动量开始转向,就按走势方向建立头寸。
该方法旨在捕捉平静区间后的强势突破。多头信号需要动量在零值下方回升且RSI高于30,空头信号则要求动量在正区间下降并且RSI低于70。可选的止盈参数可以在达到预设收益时自动平仓。
细节
- 入场条件:
- 无挤压(布林带在肯特纳通道外)。
- 多头:动量 < 0 且上升,RSI > 30。
- 空头:动量 > 0 且下降,RSI < 70。
- 方向:双向。
- 出场条件:
- 反向信号或触发止盈(若启用)。
- 止损:默认无,可选止盈。
- 默认参数:
SqueezeLength= 20RsiLength= 14UseTP= FalseTpPercent= 1.2
- 过滤器:
- 类型:波动性突破
- 方向:双向
- 指标:布林带、肯特纳通道、RSI、线性回归
- 止损:可选
- 复杂度:中等
- 时间框架:任意
- 季节性:否
- 神经网络:否
- 背离:否
- 风险等级:中等
namespace StockSharp.Samples.Strategies;
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
/// <summary>
/// TTM Squeeze Strategy.
/// Detects volatility squeeze using BB width narrowing, then trades breakouts.
/// Uses RSI for momentum confirmation.
/// Buys when BB width expands from narrow and RSI > 50.
/// Sells when BB width expands from narrow and RSI less than 50.
/// </summary>
public class TtmSqueezeStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _bbLength;
private readonly StrategyParam<int> _rsiLength;
private readonly StrategyParam<int> _cooldownBars;
private BollingerBands _bb;
private RelativeStrengthIndex _rsi;
private ExponentialMovingAverage _ema;
private decimal _prevBbWidth;
private decimal _minBbWidth;
private int _narrowBars;
private int _cooldownRemaining;
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public int BbLength
{
get => _bbLength.Value;
set => _bbLength.Value = value;
}
public int RsiLength
{
get => _rsiLength.Value;
set => _rsiLength.Value = value;
}
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
public TtmSqueezeStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(30).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_bbLength = Param(nameof(BbLength), 20)
.SetGreaterThanZero()
.SetDisplay("BB Length", "Bollinger Bands period", "Indicators");
_rsiLength = Param(nameof(RsiLength), 14)
.SetGreaterThanZero()
.SetDisplay("RSI Length", "RSI period", "Indicators");
_cooldownBars = Param(nameof(CooldownBars), 15)
.SetDisplay("Cooldown Bars", "Bars to wait between trades", "Risk");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
=> [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_bb = null;
_rsi = null;
_ema = null;
_prevBbWidth = 0;
_minBbWidth = decimal.MaxValue;
_narrowBars = 0;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_bb = new BollingerBands { Length = BbLength, Width = 2.0m };
_rsi = new RelativeStrengthIndex { Length = RsiLength };
_ema = new ExponentialMovingAverage { Length = BbLength };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(_bb, _rsi, _ema, OnProcess)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _bb);
DrawOwnTrades(area);
}
}
private void OnProcess(ICandleMessage candle, IIndicatorValue bbValue, IIndicatorValue rsiValue, IIndicatorValue emaValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!_bb.IsFormed || !_rsi.IsFormed || !_ema.IsFormed)
return;
if (bbValue.IsEmpty || rsiValue.IsEmpty || emaValue.IsEmpty)
return;
var bb = (BollingerBandsValue)bbValue;
if (bb.UpBand is not decimal upper || bb.LowBand is not decimal lower || bb.MovingAverage is not decimal mid)
return;
var rsiVal = rsiValue.ToDecimal();
var emaVal = emaValue.ToDecimal();
// Calculate BB width as percentage
var bbWidth = mid > 0 ? (upper - lower) / mid * 100 : 0;
if (!IsFormedAndOnlineAndAllowTrading())
{
_prevBbWidth = bbWidth;
_minBbWidth = Math.Min(_minBbWidth, bbWidth);
return;
}
if (_cooldownRemaining > 0)
{
_cooldownRemaining--;
_prevBbWidth = bbWidth;
_minBbWidth = Math.Min(_minBbWidth, bbWidth);
return;
}
if (_prevBbWidth == 0)
{
_prevBbWidth = bbWidth;
_minBbWidth = bbWidth;
return;
}
// Track narrow BB (squeeze)
if (bbWidth <= _minBbWidth * 1.1m)
{
_narrowBars++;
_minBbWidth = Math.Min(_minBbWidth, bbWidth);
}
else if (bbWidth > _prevBbWidth && _narrowBars >= 3)
{
// BB is expanding after squeeze - breakout
if (rsiVal > 50 && candle.ClosePrice > emaVal && Position <= 0)
{
if (Position < 0)
BuyMarket(Math.Abs(Position));
BuyMarket(Volume);
_cooldownRemaining = CooldownBars;
_narrowBars = 0;
_minBbWidth = bbWidth;
}
else if (rsiVal < 50 && candle.ClosePrice < emaVal && Position >= 0)
{
if (Position > 0)
SellMarket(Math.Abs(Position));
SellMarket(Volume);
_cooldownRemaining = CooldownBars;
_narrowBars = 0;
_minBbWidth = bbWidth;
}
else
{
_narrowBars = 0;
_minBbWidth = bbWidth;
}
}
else
{
_narrowBars = 0;
_minBbWidth = bbWidth;
}
// Exit long: price falls below lower BB
if (Position > 0 && candle.ClosePrice < lower)
{
SellMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
// Exit short: price rises above upper BB
else if (Position < 0 && candle.ClosePrice > upper)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_prevBbWidth = bbWidth;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math, Decimal
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import BollingerBands, RelativeStrengthIndex, ExponentialMovingAverage, IndicatorHelper
from StockSharp.Algo.Strategies import Strategy
import sys
class ttm_squeeze_strategy(Strategy):
"""TTM Squeeze Strategy."""
def __init__(self):
super(ttm_squeeze_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(30))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._bb_length = self.Param("BbLength", 20) \
.SetDisplay("BB Length", "Bollinger Bands period", "Indicators")
self._rsi_length = self.Param("RsiLength", 14) \
.SetDisplay("RSI Length", "RSI period", "Indicators")
self._cooldown_bars = self.Param("CooldownBars", 15) \
.SetDisplay("Cooldown Bars", "Bars to wait between trades", "Risk")
self._bb = None
self._rsi = None
self._ema = None
self._prev_bb_width = 0.0
self._min_bb_width = float(sys.maxsize)
self._narrow_bars = 0
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(ttm_squeeze_strategy, self).OnReseted()
self._bb = None
self._rsi = None
self._ema = None
self._prev_bb_width = 0.0
self._min_bb_width = float(sys.maxsize)
self._narrow_bars = 0
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(ttm_squeeze_strategy, self).OnStarted2(time)
bb_len = int(self._bb_length.Value)
self._bb = BollingerBands()
self._bb.Length = bb_len
self._bb.Width = 2.0
self._rsi = RelativeStrengthIndex()
self._rsi.Length = int(self._rsi_length.Value)
self._ema = ExponentialMovingAverage()
self._ema.Length = bb_len
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(self._bb, self._rsi, self._ema, self._on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._bb)
self.DrawOwnTrades(area)
def _on_process(self, candle, bb_value, rsi_value, ema_value):
if candle.State != CandleStates.Finished:
return
if not self._bb.IsFormed or not self._rsi.IsFormed or not self._ema.IsFormed:
return
if bb_value.IsEmpty or rsi_value.IsEmpty or ema_value.IsEmpty:
return
if bb_value.UpBand is None or bb_value.LowBand is None or bb_value.MovingAverage is None:
return
upper = float(bb_value.UpBand)
lower = float(bb_value.LowBand)
mid = float(bb_value.MovingAverage)
rsi_val = float(IndicatorHelper.ToDecimal(rsi_value))
ema_val = float(IndicatorHelper.ToDecimal(ema_value))
bb_width = (upper - lower) / mid * 100 if mid > 0 else 0.0
if not self.IsFormedAndOnlineAndAllowTrading():
self._prev_bb_width = bb_width
self._min_bb_width = min(self._min_bb_width, bb_width)
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
self._prev_bb_width = bb_width
self._min_bb_width = min(self._min_bb_width, bb_width)
return
if self._prev_bb_width == 0.0:
self._prev_bb_width = bb_width
self._min_bb_width = bb_width
return
close = float(candle.ClosePrice)
cooldown = int(self._cooldown_bars.Value)
if bb_width <= self._min_bb_width * 1.1:
self._narrow_bars += 1
self._min_bb_width = min(self._min_bb_width, bb_width)
elif bb_width > self._prev_bb_width and self._narrow_bars >= 3:
if rsi_val > 50 and close > ema_val and self.Position <= 0:
if self.Position < 0:
self.BuyMarket(Math.Abs(self.Position))
self.BuyMarket(self.Volume)
self._cooldown_remaining = cooldown
self._narrow_bars = 0
self._min_bb_width = bb_width
elif rsi_val < 50 and close < ema_val and self.Position >= 0:
if self.Position > 0:
self.SellMarket(Math.Abs(self.Position))
self.SellMarket(self.Volume)
self._cooldown_remaining = cooldown
self._narrow_bars = 0
self._min_bb_width = bb_width
else:
self._narrow_bars = 0
self._min_bb_width = bb_width
else:
self._narrow_bars = 0
self._min_bb_width = bb_width
if self.Position > 0 and close < lower:
self.SellMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
elif self.Position < 0 and close > upper:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._prev_bb_width = bb_width
def CreateClone(self):
return ttm_squeeze_strategy()