Keltner Reinforcement Learning Signal
Keltner Reinforcement Learning Signal 策略基于 Keltner Reinforcement Learning Signal。
测试表明年均收益约为 118%,该策略在股票市场表现最佳。
当 Keltner confirms trend changes 在日内(15m)数据上得到确认时触发信号,适合积极交易者。
止损依赖于 ATR 倍数以及 EmaPeriod, AtrPeriod 等参数,可根据需要调整以平衡风险与收益。
详情
- 入场条件:参见指标条件实现.
- 多空方向:双向.
- 退出条件:反向信号或止损逻辑.
- 止损:是,基于指标计算.
- 默认值:
EmaPeriod = 20AtrPeriod = 14AtrMultiplier = 2mStopLossAtr = 2mCandleType = TimeSpan.FromMinutes(15).TimeFrame()
- 过滤器:
- 分类: 趋势跟随
- 方向: 双向
- 指标: Keltner, Reinforcement
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内 (15m)
- 季节性: 否
- 神经网络: 是
- 背离: 否
- 风险等级: 中等
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Keltner with Reinforcement Learning Signal strategy.
/// Entry condition:
/// Long: Price > EMA + k*ATR && RL_Signal = Buy
/// Short: Price < EMA - k*ATR && RL_Signal = Sell
/// Exit condition:
/// Long: Price < EMA
/// Short: Price > EMA
/// </summary>
public class KeltnerWithRLSignalStrategy : Strategy
{
private readonly StrategyParam<int> _emaPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _atrMultiplier;
private readonly StrategyParam<decimal> _stopLossAtr;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<DataType> _candleType;
private enum RLSignals
{
None,
Buy,
Sell
}
private RLSignals _currentSignal = RLSignals.None;
// State variables for RL
private decimal _lastPrice;
private decimal _previousEma;
private decimal _previousAtr;
private decimal _previousPrice;
private decimal _previousSignalPrice;
private int _consecutiveWins;
private int _consecutiveLosses;
private int _cooldownRemaining;
private bool _previousAboveUpperBand;
private bool _previousBelowLowerBand;
/// <summary>
/// EMA period.
/// </summary>
public int EmaPeriod
{
get => _emaPeriod.Value;
set => _emaPeriod.Value = value;
}
/// <summary>
/// ATR period.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// ATR multiplier for Keltner channel.
/// </summary>
public decimal AtrMultiplier
{
get => _atrMultiplier.Value;
set => _atrMultiplier.Value = value;
}
/// <summary>
/// Stop loss in ATR multiples.
/// </summary>
public decimal StopLossAtr
{
get => _stopLossAtr.Value;
set => _stopLossAtr.Value = value;
}
/// <summary>
/// Closed candles to wait between position changes.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Type of candles to use.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor with default parameters.
/// </summary>
public KeltnerWithRLSignalStrategy()
{
_emaPeriod = Param(nameof(EmaPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("EMA Period", "Period for the exponential moving average", "Keltner Settings")
.SetOptimize(10, 30, 5);
_atrPeriod = Param(nameof(AtrPeriod), 14)
.SetGreaterThanZero()
.SetDisplay("ATR Period", "Period for the average true range", "Keltner Settings")
.SetOptimize(7, 21, 7);
_atrMultiplier = Param(nameof(AtrMultiplier), 1.25m)
.SetGreaterThanZero()
.SetDisplay("ATR Multiplier", "Multiplier for ATR in Keltner Channels", "Keltner Settings")
.SetOptimize(1.5m, 3m, 0.5m);
_cooldownBars = Param(nameof(CooldownBars), 48)
.SetNotNegative()
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "General");
_stopLossAtr = Param(nameof(StopLossAtr), 2m)
.SetGreaterThanZero()
.SetDisplay("Stop Loss (ATR)", "Stop Loss in multiples of ATR", "Risk Management")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_currentSignal = default;
_consecutiveWins = _consecutiveLosses = default;
_lastPrice = _previousEma = _previousAtr = _previousPrice = _previousSignalPrice = default;
_cooldownRemaining = default;
_previousAboveUpperBand = default;
_previousBelowLowerBand = default;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Create Keltner Channels using EMA and ATR
var keltner = new KeltnerChannels
{
Length = EmaPeriod,
Multiplier = AtrMultiplier
};
// Subscribe to candles and bind indicators
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(keltner, ProcessCandle)
.Start();
// Create chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, keltner);
DrawOwnTrades(area);
}
}
/// <summary>
/// Process each candle and Keltner Channel values.
/// </summary>
private void ProcessCandle(ICandleMessage candle, IIndicatorValue keltnerValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready to trade
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Extract Keltner Channel values
var keltnerTyped = (KeltnerChannelsValue)keltnerValue;
if (keltnerTyped.Upper is not decimal upperBand)
return;
if (keltnerTyped.Lower is not decimal lowerBand)
return;
if (keltnerTyped.Middle is not decimal middleBand)
return;
// Calculate current ATR value (upper - middle)/multiplier
var currentAtr = (upperBand - middleBand) / AtrMultiplier;
// Update price and RL state
_lastPrice = candle.ClosePrice;
// Generate RL signal based on current state
UpdateRLSignal(candle, middleBand, currentAtr);
if (_cooldownRemaining > 0)
_cooldownRemaining--;
// Trading logic
var price = candle.ClosePrice;
var priceAboveUpperBand = price > upperBand;
var priceBelowLowerBand = price < lowerBand;
var bullishBreakout = !_previousAboveUpperBand && priceAboveUpperBand;
var bearishBreakout = !_previousBelowLowerBand && priceBelowLowerBand;
// Entry conditions
// Long entry: Price above upper band and RL signal is Buy
if (_cooldownRemaining == 0 && bullishBreakout && _currentSignal == RLSignals.Buy && Position <= 0)
{
LogInfo($"Long signal: Price {price} > Upper Band {upperBand}, RL Signal = Buy");
BuyMarket(Volume + (Position < 0 ? Math.Abs(Position) : 0m));
_previousSignalPrice = price;
_cooldownRemaining = CooldownBars;
}
// Short entry: Price below lower band and RL signal is Sell
else if (_cooldownRemaining == 0 && bearishBreakout && _currentSignal == RLSignals.Sell && Position >= 0)
{
LogInfo($"Short signal: Price {price} < Lower Band {lowerBand}, RL Signal = Sell");
SellMarket(Volume + (Position > 0 ? Math.Abs(Position) : 0m));
_previousSignalPrice = price;
_cooldownRemaining = CooldownBars;
}
// Exit conditions
// Exit long: Price drops below EMA (middle band)
if (Position > 0 && price < middleBand)
{
LogInfo($"Exit long: Price {price} < EMA {middleBand}");
SellMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
// Exit short: Price rises above EMA (middle band)
else if (Position < 0 && price > middleBand)
{
LogInfo($"Exit short: Price {price} > EMA {middleBand}");
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
// Set stop loss based on ATR
ApplyAtrStopLoss(price, currentAtr);
// Update previous values for next iteration
_previousEma = middleBand;
_previousAtr = currentAtr;
_previousPrice = price;
_previousAboveUpperBand = priceAboveUpperBand;
_previousBelowLowerBand = priceBelowLowerBand;
}
/// <summary>
/// Update Reinforcement Learning signal based on current state.
/// This is a simplified RL model (Q-learning) for demonstration.
/// In a real system, this would likely be a more sophisticated model.
/// </summary>
private void UpdateRLSignal(ICandleMessage candle, decimal ema, decimal atr)
{
// Features for RL decision:
// 1. Price position relative to EMA
bool priceAboveEma = candle.ClosePrice > ema;
// 2. Recent momentum
bool priceIncreasing = candle.ClosePrice > _previousPrice;
// 3. Volatility
bool volatilityIncreasing = atr > _previousAtr;
// 4. Candle pattern (bullish/bearish)
bool bullishCandle = candle.ClosePrice > candle.OpenPrice;
// 5. Previous trade outcome
// More conservative after losses, more aggressive after wins
bool aggressiveMode = _consecutiveWins > _consecutiveLosses;
// Simplified Q-learning decision matrix
if (bullishCandle && priceAboveEma && (priceIncreasing || aggressiveMode))
{
_currentSignal = RLSignals.Buy;
LogInfo("RL Signal: Buy");
}
else if (!bullishCandle && !priceAboveEma && (!priceIncreasing || aggressiveMode))
{
_currentSignal = RLSignals.Sell;
LogInfo("RL Signal: Sell");
}
else
{
// If conditions are mixed, maintain current signal or go neutral
if (volatilityIncreasing)
{
// High volatility might warrant reducing exposure
_currentSignal = RLSignals.None;
LogInfo("RL Signal: None (high volatility)");
}
// Otherwise keep current signal
}
}
/// <summary>
/// Process own trades for reinforcement learning feedback.
/// </summary>
protected override void OnOwnTradeReceived(MyTrade trade)
{
// Skip if we don't have a previous signal price (first trade)
if (_previousSignalPrice == 0)
return;
// Determine if the trade was profitable
bool profitable;
if (trade.Order.Side == Sides.Buy)
{
// For buys, it's profitable if current price > entry price
profitable = _lastPrice > trade.Trade.Price;
}
else
{
// For sells, it's profitable if current price < entry price
profitable = _lastPrice < trade.Trade.Price;
}
// Update consecutive win/loss counters for RL state
if (profitable)
{
_consecutiveWins++;
_consecutiveLosses = 0;
LogInfo($"Profitable trade: Win streak = {_consecutiveWins}");
}
else
{
_consecutiveLosses++;
_consecutiveWins = 0;
LogInfo($"Unprofitable trade: Loss streak = {_consecutiveLosses}");
}
}
/// <summary>
/// Apply ATR-based stop loss.
/// </summary>
private void ApplyAtrStopLoss(decimal price, decimal atr)
{
// Dynamic stop loss based on ATR
if (Position > 0) // Long position
{
var stopLevel = price - (StopLossAtr * atr);
if (_lastPrice < stopLevel)
{
LogInfo($"ATR Stop Loss triggered for long position: Current {_lastPrice} < Stop {stopLevel}");
SellMarket(Math.Abs(Position));
}
}
else if (Position < 0) // Short position
{
var stopLevel = price + (StopLossAtr * atr);
if (_lastPrice > stopLevel)
{
LogInfo($"ATR Stop Loss triggered for short position: Current {_lastPrice} > Stop {stopLevel}");
BuyMarket(Math.Abs(Position));
}
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math, Decimal
from StockSharp.Messages import DataType, CandleStates, Sides
from StockSharp.Algo.Indicators import KeltnerChannels
from StockSharp.Algo.Strategies import Strategy
class keltner_with_rl_signal_strategy(Strategy):
"""
Keltner with Reinforcement Learning Signal strategy.
"""
# RL signal constants
RL_NONE = 0
RL_BUY = 1
RL_SELL = 2
def __init__(self):
super(keltner_with_rl_signal_strategy, self).__init__()
self._ema_period = self.Param("EmaPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("EMA Period", "Period for the exponential moving average", "Keltner Settings")
self._atr_period = self.Param("AtrPeriod", 14) \
.SetGreaterThanZero() \
.SetDisplay("ATR Period", "Period for the average true range", "Keltner Settings")
self._atr_multiplier = self.Param("AtrMultiplier", 1.25) \
.SetGreaterThanZero() \
.SetDisplay("ATR Multiplier", "Multiplier for ATR in Keltner Channels", "Keltner Settings")
self._cooldown_bars = self.Param("CooldownBars", 48) \
.SetNotNegative() \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "General")
self._stop_loss_atr = self.Param("StopLossAtr", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Stop Loss (ATR)", "Stop Loss in multiples of ATR", "Risk Management")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._current_signal = self.RL_NONE
self._last_price = 0.0
self._previous_ema = 0.0
self._previous_atr = 0.0
self._previous_price = 0.0
self._previous_signal_price = 0.0
self._consecutive_wins = 0
self._consecutive_losses = 0
self._cooldown_remaining = 0
self._previous_above_upper = False
self._previous_below_lower = False
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
return [(self.Security, self.candle_type)]
def OnReseted(self):
super(keltner_with_rl_signal_strategy, self).OnReseted()
self._current_signal = self.RL_NONE
self._consecutive_wins = 0
self._consecutive_losses = 0
self._last_price = 0.0
self._previous_ema = 0.0
self._previous_atr = 0.0
self._previous_price = 0.0
self._previous_signal_price = 0.0
self._cooldown_remaining = 0
self._previous_above_upper = False
self._previous_below_lower = False
def OnStarted2(self, time):
super(keltner_with_rl_signal_strategy, self).OnStarted2(time)
keltner = KeltnerChannels()
keltner.Length = int(self._ema_period.Value)
keltner.Multiplier = Decimal(float(self._atr_multiplier.Value))
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(keltner, self.ProcessCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, keltner)
self.DrawOwnTrades(area)
def ProcessCandle(self, candle, keltner_value):
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
upper_val = keltner_value.Upper
lower_val = keltner_value.Lower
middle_val = keltner_value.Middle
if upper_val is None or lower_val is None or middle_val is None:
return
upper_band = float(upper_val)
lower_band = float(lower_val)
middle_band = float(middle_val)
atr_mult = float(self._atr_multiplier.Value)
current_atr = (upper_band - middle_band) / atr_mult
self._last_price = float(candle.ClosePrice)
self.UpdateRLSignal(candle, middle_band, current_atr)
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
price = float(candle.ClosePrice)
price_above_upper = price > upper_band
price_below_lower = price < lower_band
bullish_breakout = (not self._previous_above_upper) and price_above_upper
bearish_breakout = (not self._previous_below_lower) and price_below_lower
cooldown = int(self._cooldown_bars.Value)
if self._cooldown_remaining == 0 and bullish_breakout and self._current_signal == self.RL_BUY and self.Position <= 0:
vol = self.Volume
if self.Position < 0:
vol = self.Volume + Math.Abs(self.Position)
self.BuyMarket(vol)
self._previous_signal_price = price
self._cooldown_remaining = cooldown
elif self._cooldown_remaining == 0 and bearish_breakout and self._current_signal == self.RL_SELL and self.Position >= 0:
vol = self.Volume
if self.Position > 0:
vol = self.Volume + Math.Abs(self.Position)
self.SellMarket(vol)
self._previous_signal_price = price
self._cooldown_remaining = cooldown
if self.Position > 0 and price < middle_band:
self.SellMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
elif self.Position < 0 and price > middle_band:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self.ApplyAtrStopLoss(price, current_atr)
self._previous_ema = middle_band
self._previous_atr = current_atr
self._previous_price = price
self._previous_above_upper = price_above_upper
self._previous_below_lower = price_below_lower
def UpdateRLSignal(self, candle, ema, atr):
price_above_ema = float(candle.ClosePrice) > ema
price_increasing = float(candle.ClosePrice) > self._previous_price
volatility_increasing = atr > self._previous_atr
bullish_candle = candle.ClosePrice > candle.OpenPrice
aggressive_mode = self._consecutive_wins > self._consecutive_losses
if bullish_candle and price_above_ema and (price_increasing or aggressive_mode):
self._current_signal = self.RL_BUY
elif not bullish_candle and not price_above_ema and (not price_increasing or aggressive_mode):
self._current_signal = self.RL_SELL
else:
if volatility_increasing:
self._current_signal = self.RL_NONE
def OnOwnTradeReceived(self, trade):
if self._previous_signal_price == 0:
return
if trade.Order.Side == Sides.Buy:
profitable = self._last_price > float(trade.Trade.Price)
else:
profitable = self._last_price < float(trade.Trade.Price)
if profitable:
self._consecutive_wins += 1
self._consecutive_losses = 0
else:
self._consecutive_losses += 1
self._consecutive_wins = 0
def ApplyAtrStopLoss(self, price, atr):
stop_loss_mult = float(self._stop_loss_atr.Value)
if self.Position > 0:
stop_level = price - (stop_loss_mult * atr)
if self._last_price < stop_level:
self.SellMarket(Math.Abs(self.Position))
elif self.Position < 0:
stop_level = price + (stop_loss_mult * atr)
if self._last_price > stop_level:
self.BuyMarket(Math.Abs(self.Position))
def CreateClone(self):
return keltner_with_rl_signal_strategy()