Keltner Width Mean Reversion
Keltner Width Mean Reversion 策略关注指标的极端读数以捕捉均值回归。远离正常水平的情况通常不会持续太久。
测试表明年均收益约为 160%,该策略在外汇市场表现最佳。
当指标大幅偏离均值后开始反转时产生交易信号,可做多也可做空,并带有保护性止损。
适合预期震荡行情的交易者,当指标回归平衡时平仓。初始参数 EmaPeriod = 20.
详细信息
- 入场条件: Indicator crosses back toward mean.
- 多空: Both directions.
- 出场条件: Indicator reverts to average.
- 止损: Yes.
- 默认值:
EmaPeriod= 20AtrPeriod= 14KeltnerMultiplier= 2.0mWidthLookbackPeriod= 20WidthDeviationMultiplier= 2.0mAtrStopMultiplier= 2.0mCandleType= TimeSpan.FromMinutes(5)
- 过滤器:
- 分类: Mean Reversion
- 方向: Both
- 指标: Keltner
- 止损: Yes
- 复杂度: Intermediate
- 时间框架: Short-term
- 季节性: No
- 神经网络: No
- 背离: No
- 风险级别: Medium
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Keltner width mean reversion strategy.
/// Trades contractions and expansions of Keltner Channel width around its recent average.
/// </summary>
public class KeltnerWidthMeanReversionStrategy : Strategy
{
private readonly StrategyParam<int> _emaPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _keltnerMultiplier;
private readonly StrategyParam<decimal> _widthDeviationMultiplier;
private readonly StrategyParam<int> _widthLookbackPeriod;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<DataType> _candleType;
private ExponentialMovingAverage _ema;
private AverageTrueRange _atr;
private decimal[] _widthHistory;
private int _currentIndex;
private int _filledCount;
private int _cooldown;
/// <summary>
/// Period for EMA calculation.
/// </summary>
public int EmaPeriod
{
get => _emaPeriod.Value;
set => _emaPeriod.Value = value;
}
/// <summary>
/// Period for ATR calculation.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// Multiplier for Keltner Channel bands.
/// </summary>
public decimal KeltnerMultiplier
{
get => _keltnerMultiplier.Value;
set => _keltnerMultiplier.Value = value;
}
/// <summary>
/// Multiplier for width standard deviation thresholds.
/// </summary>
public decimal WidthDeviationMultiplier
{
get => _widthDeviationMultiplier.Value;
set => _widthDeviationMultiplier.Value = value;
}
/// <summary>
/// Lookback period for width statistics.
/// </summary>
public int WidthLookbackPeriod
{
get => _widthLookbackPeriod.Value;
set => _widthLookbackPeriod.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Cooldown bars between orders.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Candle type.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="KeltnerWidthMeanReversionStrategy"/>.
/// </summary>
public KeltnerWidthMeanReversionStrategy()
{
_emaPeriod = Param(nameof(EmaPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("EMA Period", "Period for EMA calculation", "Indicators");
_atrPeriod = Param(nameof(AtrPeriod), 14)
.SetGreaterThanZero()
.SetDisplay("ATR Period", "Period for ATR calculation", "Indicators");
_keltnerMultiplier = Param(nameof(KeltnerMultiplier), 2m)
.SetGreaterThanZero()
.SetDisplay("Keltner Multiplier", "Multiplier for Keltner Channel bands", "Indicators");
_widthDeviationMultiplier = Param(nameof(WidthDeviationMultiplier), 1m)
.SetGreaterThanZero()
.SetDisplay("Width Dev Multiplier", "Multiplier for width deviation threshold", "Strategy Parameters");
_widthLookbackPeriod = Param(nameof(WidthLookbackPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Width Lookback", "Lookback period for width statistics", "Strategy Parameters");
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetGreaterThanZero()
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk Management");
_cooldownBars = Param(nameof(CooldownBars), 1200)
.SetRange(1, 5000)
.SetDisplay("Cooldown Bars", "Bars to wait between orders", "Risk Management");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_ema = null;
_atr = null;
_currentIndex = default;
_filledCount = default;
_cooldown = default;
_widthHistory = new decimal[WidthLookbackPeriod];
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_ema = new ExponentialMovingAverage { Length = EmaPeriod };
_atr = new AverageTrueRange { Length = AtrPeriod };
_widthHistory = new decimal[WidthLookbackPeriod];
_currentIndex = 0;
_filledCount = 0;
_cooldown = 0;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_ema, _atr, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _ema);
DrawIndicator(area, _atr);
DrawOwnTrades(area);
}
StartProtection(new(), new Unit(StopLossPercent, UnitTypes.Percent));
}
private void ProcessCandle(ICandleMessage candle, decimal emaValue, decimal atrValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!_ema.IsFormed || !_atr.IsFormed)
return;
var width = 2m * KeltnerMultiplier * atrValue;
_widthHistory[_currentIndex] = width;
_currentIndex = (_currentIndex + 1) % WidthLookbackPeriod;
if (_filledCount < WidthLookbackPeriod)
_filledCount++;
if (_filledCount < WidthLookbackPeriod)
return;
var avgWidth = 0m;
var sumSq = 0m;
for (var i = 0; i < WidthLookbackPeriod; i++)
avgWidth += _widthHistory[i];
avgWidth /= WidthLookbackPeriod;
for (var i = 0; i < WidthLookbackPeriod; i++)
{
var diff = _widthHistory[i] - avgWidth;
sumSq += diff * diff;
}
var stdWidth = (decimal)Math.Sqrt((double)(sumSq / WidthLookbackPeriod));
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldown > 0)
{
_cooldown--;
return;
}
var lowerThreshold = avgWidth - WidthDeviationMultiplier * stdWidth;
var upperThreshold = avgWidth + WidthDeviationMultiplier * stdWidth;
if (Position == 0)
{
if (width < lowerThreshold)
{
BuyMarket();
_cooldown = CooldownBars;
}
else if (width > upperThreshold)
{
SellMarket();
_cooldown = CooldownBars;
}
}
else if (Position > 0 && width >= avgWidth)
{
SellMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
else if (Position < 0 && width <= avgWidth)
{
BuyMarket(Math.Abs(Position));
_cooldown = CooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
import math
from System import TimeSpan, Math
from StockSharp.Messages import DataType, Unit, UnitTypes, CandleStates
from StockSharp.Algo.Indicators import ExponentialMovingAverage, AverageTrueRange
from StockSharp.Algo.Strategies import Strategy
class keltner_width_mean_reversion_strategy(Strategy):
"""
Keltner width mean reversion strategy.
Trades contractions and expansions of Keltner Channel width around its recent average.
"""
def __init__(self):
super(keltner_width_mean_reversion_strategy, self).__init__()
self._ema_period = self.Param("EmaPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("EMA Period", "Period for EMA calculation", "Indicators")
self._atr_period = self.Param("AtrPeriod", 14) \
.SetGreaterThanZero() \
.SetDisplay("ATR Period", "Period for ATR calculation", "Indicators")
self._keltner_multiplier = self.Param("KeltnerMultiplier", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Keltner Multiplier", "Multiplier for Keltner Channel bands", "Indicators")
self._width_dev_mult = self.Param("WidthDeviationMultiplier", 1.0) \
.SetGreaterThanZero() \
.SetDisplay("Width Dev Multiplier", "Multiplier for width deviation threshold", "Strategy Parameters")
self._width_lookback = self.Param("WidthLookbackPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Width Lookback", "Lookback period for width statistics", "Strategy Parameters")
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk Management")
self._cooldown_bars = self.Param("CooldownBars", 1200) \
.SetDisplay("Cooldown Bars", "Bars to wait between orders", "Risk Management")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._ema = None
self._atr = None
self._width_history = None
self._current_index = 0
self._filled_count = 0
self._cooldown = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(keltner_width_mean_reversion_strategy, self).OnReseted()
self._ema = None
self._atr = None
lb = int(self._width_lookback.Value)
self._width_history = [0.0] * lb
self._current_index = 0
self._filled_count = 0
self._cooldown = 0
def OnStarted2(self, time):
super(keltner_width_mean_reversion_strategy, self).OnStarted2(time)
lb = int(self._width_lookback.Value)
self._width_history = [0.0] * lb
self._current_index = 0
self._filled_count = 0
self._cooldown = 0
self._ema = ExponentialMovingAverage()
self._ema.Length = int(self._ema_period.Value)
self._atr = AverageTrueRange()
self._atr.Length = int(self._atr_period.Value)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._ema, self._atr, self._process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._ema)
self.DrawIndicator(area, self._atr)
self.DrawOwnTrades(area)
self.StartProtection(Unit(), Unit(self._stop_loss_percent.Value, UnitTypes.Percent))
def _process_candle(self, candle, ema_value, atr_value):
if candle.State != CandleStates.Finished:
return
if not self._ema.IsFormed or not self._atr.IsFormed:
return
km = float(self._keltner_multiplier.Value)
width = 2.0 * km * float(atr_value)
lb = int(self._width_lookback.Value)
self._width_history[self._current_index] = width
self._current_index = (self._current_index + 1) % lb
if self._filled_count < lb:
self._filled_count += 1
if self._filled_count < lb:
return
avg_width = 0.0
for i in range(lb):
avg_width += self._width_history[i]
avg_width /= float(lb)
sum_sq = 0.0
for i in range(lb):
diff = self._width_history[i] - avg_width
sum_sq += diff * diff
std_width = math.sqrt(sum_sq / float(lb))
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown > 0:
self._cooldown -= 1
return
wdm = float(self._width_dev_mult.Value)
lower_threshold = avg_width - wdm * std_width
upper_threshold = avg_width + wdm * std_width
if self.Position == 0:
if width < lower_threshold:
self.BuyMarket()
self._cooldown = int(self._cooldown_bars.Value)
elif width > upper_threshold:
self.SellMarket()
self._cooldown = int(self._cooldown_bars.Value)
elif self.Position > 0 and width >= avg_width:
self.SellMarket(Math.Abs(self.Position))
self._cooldown = int(self._cooldown_bars.Value)
elif self.Position < 0 and width <= avg_width:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown = int(self._cooldown_bars.Value)
def CreateClone(self):
return keltner_width_mean_reversion_strategy()