Keltner Kalman Filter
Keltner Kalman Filter 策略基于 combining Keltner Channels with a Kalman Filter to identify trends and trade opportunities。
测试表明年均收益约为 73%,该策略在加密市场表现最佳。
当 Keltner confirms filtered entries 在日内(15m)数据上得到确认时触发信号,适合积极交易者。
止损依赖于 ATR 倍数以及 EmaPeriod, AtrPeriod 等参数,可根据需要调整以平衡风险与收益。
详情
- 入场条件:参见指标条件实现.
- 多空方向:双向.
- 退出条件:反向信号或止损逻辑.
- 止损:是,基于指标计算.
- 默认值:
EmaPeriod = 20AtrPeriod = 14AtrMultiplier = 2.0mKalmanProcessNoise = 0.01mKalmanMeasurementNoise = 0.1mCandleType = TimeSpan.FromMinutes(15).TimeFrame()
- 过滤器:
- 分类: 趋势跟随
- 方向: 双向
- 指标: Keltner
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内 (15m)
- 季节性: 否
- 神经网络: 否
- 背离: 否
- 风险等级: 中等
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy combining Keltner Channels with a Kalman Filter to identify trends and trade opportunities.
/// </summary>
public class KeltnerKalmanStrategy : Strategy
{
private readonly StrategyParam<int> _emaPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _atrMultiplier;
private readonly StrategyParam<decimal> _kalmanProcessNoise;
private readonly StrategyParam<decimal> _kalmanMeasurementNoise;
private readonly StrategyParam<DataType> _candleType;
private ExponentialMovingAverage _ema;
private AverageTrueRange _atr;
// Kalman filter parameters
private decimal _kalmanEstimate;
private decimal _kalmanError;
private readonly SynchronizedList<decimal> _prices = [];
// Saved values for decision making
private decimal _emaValue;
private decimal _atrValue;
private decimal _upperBand;
private decimal _lowerBand;
/// <summary>
/// EMA period for Keltner Channel.
/// </summary>
public int EmaPeriod
{
get => _emaPeriod.Value;
set => _emaPeriod.Value = value;
}
/// <summary>
/// ATR period for Keltner Channel.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// ATR multiplier for Keltner Channel.
/// </summary>
public decimal AtrMultiplier
{
get => _atrMultiplier.Value;
set => _atrMultiplier.Value = value;
}
/// <summary>
/// Kalman filter process noise parameter (Q).
/// </summary>
public decimal KalmanProcessNoise
{
get => _kalmanProcessNoise.Value;
set => _kalmanProcessNoise.Value = value;
}
/// <summary>
/// Kalman filter measurement noise parameter (R).
/// </summary>
public decimal KalmanMeasurementNoise
{
get => _kalmanMeasurementNoise.Value;
set => _kalmanMeasurementNoise.Value = value;
}
/// <summary>
/// Candle type to use for the strategy.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of the <see cref="KeltnerKalmanStrategy"/>.
/// </summary>
public KeltnerKalmanStrategy()
{
_emaPeriod = Param(nameof(EmaPeriod), 20)
.SetDisplay("EMA Period", "EMA period for Keltner Channel", "Keltner Channel")
.SetOptimize(10, 30, 5);
_atrPeriod = Param(nameof(AtrPeriod), 14)
.SetDisplay("ATR Period", "ATR period for Keltner Channel", "Keltner Channel")
.SetOptimize(10, 20, 2);
_atrMultiplier = Param(nameof(AtrMultiplier), 2.0m)
.SetDisplay("ATR Multiplier", "ATR multiplier for Keltner Channel", "Keltner Channel")
.SetOptimize(1.5m, 3.0m, 0.5m);
_kalmanProcessNoise = Param(nameof(KalmanProcessNoise), 0.01m)
.SetDisplay("Kalman Process Noise (Q)", "Kalman filter process noise parameter", "Kalman Filter")
.SetOptimize(0.001m, 0.1m, 0.005m);
_kalmanMeasurementNoise = Param(nameof(KalmanMeasurementNoise), 0.1m)
.SetDisplay("Kalman Measurement Noise (R)", "Kalman filter measurement noise parameter", "Kalman Filter")
.SetOptimize(0.01m, 1.0m, 0.05m);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_kalmanEstimate = 0;
_kalmanError = 1;
_prices.Clear();
_emaValue = 0;
_atrValue = 0;
_upperBand = 0;
_lowerBand = 0;
_ema = null;
_atr = null;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Create indicators
_ema = new ExponentialMovingAverage
{
Length = EmaPeriod
};
_atr = new AverageTrueRange
{
Length = AtrPeriod
};
// Create subscription and bind indicators
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_ema, _atr, ProcessCandle)
.Start();
// Setup chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _ema);
DrawOwnTrades(area);
}
// Setup position protection
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(2, UnitTypes.Percent)
);
}
private void ProcessCandle(ICandleMessage candle, decimal emaValue, decimal atrValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Save indicator values
_emaValue = emaValue;
_atrValue = atrValue;
// Calculate Keltner Channels
_upperBand = _emaValue + (_atrValue * AtrMultiplier);
_lowerBand = _emaValue - (_atrValue * AtrMultiplier);
// Update Kalman filter
UpdateKalmanFilter(candle.ClosePrice);
// Store prices for slope calculation
_prices.Add(candle.ClosePrice);
if (_prices.Count > 10)
_prices.RemoveAt(0);
// Calculate Kalman slope (trend direction)
decimal kalmanSlope = CalculateKalmanSlope();
// Trading logic
if (Position == 0)
{
// Buy when price breaks above upper band and Kalman slope is positive
if (candle.ClosePrice > _upperBand && kalmanSlope > 0)
{
BuyMarket();
}
// Sell when price breaks below lower band and Kalman slope is negative
else if (candle.ClosePrice < _lowerBand && kalmanSlope < 0)
{
SellMarket();
}
}
}
private void UpdateKalmanFilter(decimal price)
{
// Kalman filter implementation (one-dimensional)
// Prediction step
decimal predictedEstimate = _kalmanEstimate;
decimal predictedError = _kalmanError + KalmanProcessNoise;
// Update step
decimal kalmanGain = predictedError / (predictedError + KalmanMeasurementNoise);
_kalmanEstimate = predictedEstimate + kalmanGain * (price - predictedEstimate);
_kalmanError = (1 - kalmanGain) * predictedError;
LogInfo($"Kalman Filter: Price {price:F2}, Estimate {_kalmanEstimate:F2}, Error {_kalmanError:F6}, Gain {kalmanGain:F6}");
}
private decimal CalculateKalmanSlope()
{
var prices = _prices.ToArray();
// Need at least a few points to calculate a slope
if (prices.Length < 3)
return 0;
// Simple linear regression slope calculation
int n = prices.Length;
decimal sumX = 0;
decimal sumY = 0;
decimal sumXY = 0;
decimal sumX2 = 0;
for (int i = 0; i < n; i++)
{
decimal x = i;
decimal y = prices[i];
sumX += x;
sumY += y;
sumXY += x * y;
sumX2 += x * x;
}
decimal denominator = n * sumX2 - sumX * sumX;
if (denominator == 0)
return 0;
decimal slope = (n * sumXY - sumX * sumY) / denominator;
return slope;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import ExponentialMovingAverage, AverageTrueRange
from StockSharp.Algo.Strategies import Strategy
class keltner_kalman_strategy(Strategy):
"""
Strategy combining Keltner Channels with a Kalman Filter to identify trends and trade opportunities.
"""
def __init__(self):
super(keltner_kalman_strategy, self).__init__()
self._ema_period = self.Param("EmaPeriod", 20) \
.SetDisplay("EMA Period", "EMA period for Keltner Channel", "Keltner Channel") \
.SetCanOptimize(True) \
.SetOptimize(10, 30, 5)
self._atr_period = self.Param("AtrPeriod", 14) \
.SetDisplay("ATR Period", "ATR period for Keltner Channel", "Keltner Channel") \
.SetCanOptimize(True) \
.SetOptimize(10, 20, 2)
self._atr_multiplier = self.Param("AtrMultiplier", 2.0) \
.SetDisplay("ATR Multiplier", "ATR multiplier for Keltner Channel", "Keltner Channel") \
.SetCanOptimize(True) \
.SetOptimize(1.5, 3.0, 0.5)
self._kalman_process_noise = self.Param("KalmanProcessNoise", 0.01) \
.SetDisplay("Kalman Process Noise (Q)", "Kalman filter process noise parameter", "Kalman Filter") \
.SetCanOptimize(True) \
.SetOptimize(0.001, 0.1, 0.005)
self._kalman_measurement_noise = self.Param("KalmanMeasurementNoise", 0.1) \
.SetDisplay("Kalman Measurement Noise (R)", "Kalman filter measurement noise parameter", "Kalman Filter") \
.SetCanOptimize(True) \
.SetOptimize(0.01, 1.0, 0.05)
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._ema = None
self._atr = None
self._kalman_estimate = 0.0
self._kalman_error = 1.0
self._prices = []
self._ema_value = 0.0
self._atr_value = 0.0
self._upper_band = 0.0
self._lower_band = 0.0
@property
def EmaPeriod(self):
return self._ema_period.Value
@EmaPeriod.setter
def EmaPeriod(self, value):
self._ema_period.Value = value
@property
def AtrPeriod(self):
return self._atr_period.Value
@AtrPeriod.setter
def AtrPeriod(self, value):
self._atr_period.Value = value
@property
def AtrMultiplier(self):
return self._atr_multiplier.Value
@AtrMultiplier.setter
def AtrMultiplier(self, value):
self._atr_multiplier.Value = value
@property
def KalmanProcessNoise(self):
return self._kalman_process_noise.Value
@KalmanProcessNoise.setter
def KalmanProcessNoise(self, value):
self._kalman_process_noise.Value = value
@property
def KalmanMeasurementNoise(self):
return self._kalman_measurement_noise.Value
@KalmanMeasurementNoise.setter
def KalmanMeasurementNoise(self, value):
self._kalman_measurement_noise.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType)]
def OnReseted(self):
super(keltner_kalman_strategy, self).OnReseted()
self._kalman_estimate = 0.0
self._kalman_error = 1.0
self._prices = []
self._ema_value = 0.0
self._atr_value = 0.0
self._upper_band = 0.0
self._lower_band = 0.0
self._ema = None
self._atr = None
def OnStarted2(self, time):
super(keltner_kalman_strategy, self).OnStarted2(time)
self._ema = ExponentialMovingAverage()
self._ema.Length = self.EmaPeriod
self._atr = AverageTrueRange()
self._atr.Length = self.AtrPeriod
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self._ema, self._atr, self.ProcessCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._ema)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(2, UnitTypes.Percent)
)
def ProcessCandle(self, candle, ema_value, atr_value):
if candle.State != CandleStates.Finished:
return
self._ema_value = float(ema_value)
self._atr_value = float(atr_value)
self._upper_band = self._ema_value + (self._atr_value * self.AtrMultiplier)
self._lower_band = self._ema_value - (self._atr_value * self.AtrMultiplier)
price = float(candle.ClosePrice)
self.UpdateKalmanFilter(price)
self._prices.append(price)
if len(self._prices) > 10:
self._prices.pop(0)
kalman_slope = self.CalculateKalmanSlope()
if self.Position != 0:
return
if price > self._upper_band and kalman_slope > 0:
self.BuyMarket()
elif price < self._lower_band and kalman_slope < 0:
self.SellMarket()
def UpdateKalmanFilter(self, price):
predicted_estimate = self._kalman_estimate
predicted_error = self._kalman_error + self.KalmanProcessNoise
kalman_gain = predicted_error / (predicted_error + self.KalmanMeasurementNoise)
self._kalman_estimate = predicted_estimate + kalman_gain * (price - predicted_estimate)
self._kalman_error = (1.0 - kalman_gain) * predicted_error
def CalculateKalmanSlope(self):
if len(self._prices) < 3:
return 0.0
n = len(self._prices)
sum_x = 0.0
sum_y = 0.0
sum_xy = 0.0
sum_x2 = 0.0
for i in range(n):
x = float(i)
y = self._prices[i]
sum_x += x
sum_y += y
sum_xy += x * y
sum_x2 += x * x
denominator = n * sum_x2 - sum_x * sum_x
if denominator == 0:
return 0.0
slope = (n * sum_xy - sum_x * sum_y) / denominator
return slope
def CreateClone(self):
return keltner_kalman_strategy()