Keltner Reinforcement Learning Signal
The Keltner Reinforcement Learning Signal strategy is built around Keltner Reinforcement Learning Signal.
Testing indicates an average annual return of about 118%. It performs best in the stocks market.
Signals trigger when Keltner confirms trend changes on intraday (15m) data. This makes the method suitable for active traders.
Stops rely on ATR multiples and factors like EmaPeriod, AtrPeriod. Adjust these defaults to balance risk and reward.
Details
- Entry Criteria: see implementation for indicator conditions.
- Long/Short: Both directions.
- Exit Criteria: opposite signal or stop logic.
- Stops: Yes, using indicator-based calculations.
- Default Values:
EmaPeriod = 20AtrPeriod = 14AtrMultiplier = 2mStopLossAtr = 2mCandleType = TimeSpan.FromMinutes(15).TimeFrame()
- Filters:
- Category: Trend following
- Direction: Both
- Indicators: Keltner, Reinforcement
- Stops: Yes
- Complexity: Intermediate
- Timeframe: Intraday (15m)
- Seasonality: No
- Neural Networks: Yes
- Divergence: No
- Risk Level: Medium
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Keltner with Reinforcement Learning Signal strategy.
/// Entry condition:
/// Long: Price > EMA + k*ATR && RL_Signal = Buy
/// Short: Price < EMA - k*ATR && RL_Signal = Sell
/// Exit condition:
/// Long: Price < EMA
/// Short: Price > EMA
/// </summary>
public class KeltnerWithRLSignalStrategy : Strategy
{
private readonly StrategyParam<int> _emaPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _atrMultiplier;
private readonly StrategyParam<decimal> _stopLossAtr;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<DataType> _candleType;
private enum RLSignals
{
None,
Buy,
Sell
}
private RLSignals _currentSignal = RLSignals.None;
// State variables for RL
private decimal _lastPrice;
private decimal _previousEma;
private decimal _previousAtr;
private decimal _previousPrice;
private decimal _previousSignalPrice;
private int _consecutiveWins;
private int _consecutiveLosses;
private int _cooldownRemaining;
private bool _previousAboveUpperBand;
private bool _previousBelowLowerBand;
/// <summary>
/// EMA period.
/// </summary>
public int EmaPeriod
{
get => _emaPeriod.Value;
set => _emaPeriod.Value = value;
}
/// <summary>
/// ATR period.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// ATR multiplier for Keltner channel.
/// </summary>
public decimal AtrMultiplier
{
get => _atrMultiplier.Value;
set => _atrMultiplier.Value = value;
}
/// <summary>
/// Stop loss in ATR multiples.
/// </summary>
public decimal StopLossAtr
{
get => _stopLossAtr.Value;
set => _stopLossAtr.Value = value;
}
/// <summary>
/// Closed candles to wait between position changes.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Type of candles to use.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor with default parameters.
/// </summary>
public KeltnerWithRLSignalStrategy()
{
_emaPeriod = Param(nameof(EmaPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("EMA Period", "Period for the exponential moving average", "Keltner Settings")
.SetOptimize(10, 30, 5);
_atrPeriod = Param(nameof(AtrPeriod), 14)
.SetGreaterThanZero()
.SetDisplay("ATR Period", "Period for the average true range", "Keltner Settings")
.SetOptimize(7, 21, 7);
_atrMultiplier = Param(nameof(AtrMultiplier), 1.25m)
.SetGreaterThanZero()
.SetDisplay("ATR Multiplier", "Multiplier for ATR in Keltner Channels", "Keltner Settings")
.SetOptimize(1.5m, 3m, 0.5m);
_cooldownBars = Param(nameof(CooldownBars), 48)
.SetNotNegative()
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "General");
_stopLossAtr = Param(nameof(StopLossAtr), 2m)
.SetGreaterThanZero()
.SetDisplay("Stop Loss (ATR)", "Stop Loss in multiples of ATR", "Risk Management")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_currentSignal = default;
_consecutiveWins = _consecutiveLosses = default;
_lastPrice = _previousEma = _previousAtr = _previousPrice = _previousSignalPrice = default;
_cooldownRemaining = default;
_previousAboveUpperBand = default;
_previousBelowLowerBand = default;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Create Keltner Channels using EMA and ATR
var keltner = new KeltnerChannels
{
Length = EmaPeriod,
Multiplier = AtrMultiplier
};
// Subscribe to candles and bind indicators
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(keltner, ProcessCandle)
.Start();
// Create chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, keltner);
DrawOwnTrades(area);
}
}
/// <summary>
/// Process each candle and Keltner Channel values.
/// </summary>
private void ProcessCandle(ICandleMessage candle, IIndicatorValue keltnerValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready to trade
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Extract Keltner Channel values
var keltnerTyped = (KeltnerChannelsValue)keltnerValue;
if (keltnerTyped.Upper is not decimal upperBand)
return;
if (keltnerTyped.Lower is not decimal lowerBand)
return;
if (keltnerTyped.Middle is not decimal middleBand)
return;
// Calculate current ATR value (upper - middle)/multiplier
var currentAtr = (upperBand - middleBand) / AtrMultiplier;
// Update price and RL state
_lastPrice = candle.ClosePrice;
// Generate RL signal based on current state
UpdateRLSignal(candle, middleBand, currentAtr);
if (_cooldownRemaining > 0)
_cooldownRemaining--;
// Trading logic
var price = candle.ClosePrice;
var priceAboveUpperBand = price > upperBand;
var priceBelowLowerBand = price < lowerBand;
var bullishBreakout = !_previousAboveUpperBand && priceAboveUpperBand;
var bearishBreakout = !_previousBelowLowerBand && priceBelowLowerBand;
// Entry conditions
// Long entry: Price above upper band and RL signal is Buy
if (_cooldownRemaining == 0 && bullishBreakout && _currentSignal == RLSignals.Buy && Position <= 0)
{
LogInfo($"Long signal: Price {price} > Upper Band {upperBand}, RL Signal = Buy");
BuyMarket(Volume + (Position < 0 ? Math.Abs(Position) : 0m));
_previousSignalPrice = price;
_cooldownRemaining = CooldownBars;
}
// Short entry: Price below lower band and RL signal is Sell
else if (_cooldownRemaining == 0 && bearishBreakout && _currentSignal == RLSignals.Sell && Position >= 0)
{
LogInfo($"Short signal: Price {price} < Lower Band {lowerBand}, RL Signal = Sell");
SellMarket(Volume + (Position > 0 ? Math.Abs(Position) : 0m));
_previousSignalPrice = price;
_cooldownRemaining = CooldownBars;
}
// Exit conditions
// Exit long: Price drops below EMA (middle band)
if (Position > 0 && price < middleBand)
{
LogInfo($"Exit long: Price {price} < EMA {middleBand}");
SellMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
// Exit short: Price rises above EMA (middle band)
else if (Position < 0 && price > middleBand)
{
LogInfo($"Exit short: Price {price} > EMA {middleBand}");
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
// Set stop loss based on ATR
ApplyAtrStopLoss(price, currentAtr);
// Update previous values for next iteration
_previousEma = middleBand;
_previousAtr = currentAtr;
_previousPrice = price;
_previousAboveUpperBand = priceAboveUpperBand;
_previousBelowLowerBand = priceBelowLowerBand;
}
/// <summary>
/// Update Reinforcement Learning signal based on current state.
/// This is a simplified RL model (Q-learning) for demonstration.
/// In a real system, this would likely be a more sophisticated model.
/// </summary>
private void UpdateRLSignal(ICandleMessage candle, decimal ema, decimal atr)
{
// Features for RL decision:
// 1. Price position relative to EMA
bool priceAboveEma = candle.ClosePrice > ema;
// 2. Recent momentum
bool priceIncreasing = candle.ClosePrice > _previousPrice;
// 3. Volatility
bool volatilityIncreasing = atr > _previousAtr;
// 4. Candle pattern (bullish/bearish)
bool bullishCandle = candle.ClosePrice > candle.OpenPrice;
// 5. Previous trade outcome
// More conservative after losses, more aggressive after wins
bool aggressiveMode = _consecutiveWins > _consecutiveLosses;
// Simplified Q-learning decision matrix
if (bullishCandle && priceAboveEma && (priceIncreasing || aggressiveMode))
{
_currentSignal = RLSignals.Buy;
LogInfo("RL Signal: Buy");
}
else if (!bullishCandle && !priceAboveEma && (!priceIncreasing || aggressiveMode))
{
_currentSignal = RLSignals.Sell;
LogInfo("RL Signal: Sell");
}
else
{
// If conditions are mixed, maintain current signal or go neutral
if (volatilityIncreasing)
{
// High volatility might warrant reducing exposure
_currentSignal = RLSignals.None;
LogInfo("RL Signal: None (high volatility)");
}
// Otherwise keep current signal
}
}
/// <summary>
/// Process own trades for reinforcement learning feedback.
/// </summary>
protected override void OnOwnTradeReceived(MyTrade trade)
{
// Skip if we don't have a previous signal price (first trade)
if (_previousSignalPrice == 0)
return;
// Determine if the trade was profitable
bool profitable;
if (trade.Order.Side == Sides.Buy)
{
// For buys, it's profitable if current price > entry price
profitable = _lastPrice > trade.Trade.Price;
}
else
{
// For sells, it's profitable if current price < entry price
profitable = _lastPrice < trade.Trade.Price;
}
// Update consecutive win/loss counters for RL state
if (profitable)
{
_consecutiveWins++;
_consecutiveLosses = 0;
LogInfo($"Profitable trade: Win streak = {_consecutiveWins}");
}
else
{
_consecutiveLosses++;
_consecutiveWins = 0;
LogInfo($"Unprofitable trade: Loss streak = {_consecutiveLosses}");
}
}
/// <summary>
/// Apply ATR-based stop loss.
/// </summary>
private void ApplyAtrStopLoss(decimal price, decimal atr)
{
// Dynamic stop loss based on ATR
if (Position > 0) // Long position
{
var stopLevel = price - (StopLossAtr * atr);
if (_lastPrice < stopLevel)
{
LogInfo($"ATR Stop Loss triggered for long position: Current {_lastPrice} < Stop {stopLevel}");
SellMarket(Math.Abs(Position));
}
}
else if (Position < 0) // Short position
{
var stopLevel = price + (StopLossAtr * atr);
if (_lastPrice > stopLevel)
{
LogInfo($"ATR Stop Loss triggered for short position: Current {_lastPrice} > Stop {stopLevel}");
BuyMarket(Math.Abs(Position));
}
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math, Decimal
from StockSharp.Messages import DataType, CandleStates, Sides
from StockSharp.Algo.Indicators import KeltnerChannels
from StockSharp.Algo.Strategies import Strategy
class keltner_with_rl_signal_strategy(Strategy):
"""
Keltner with Reinforcement Learning Signal strategy.
"""
# RL signal constants
RL_NONE = 0
RL_BUY = 1
RL_SELL = 2
def __init__(self):
super(keltner_with_rl_signal_strategy, self).__init__()
self._ema_period = self.Param("EmaPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("EMA Period", "Period for the exponential moving average", "Keltner Settings")
self._atr_period = self.Param("AtrPeriod", 14) \
.SetGreaterThanZero() \
.SetDisplay("ATR Period", "Period for the average true range", "Keltner Settings")
self._atr_multiplier = self.Param("AtrMultiplier", 1.25) \
.SetGreaterThanZero() \
.SetDisplay("ATR Multiplier", "Multiplier for ATR in Keltner Channels", "Keltner Settings")
self._cooldown_bars = self.Param("CooldownBars", 48) \
.SetNotNegative() \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "General")
self._stop_loss_atr = self.Param("StopLossAtr", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Stop Loss (ATR)", "Stop Loss in multiples of ATR", "Risk Management")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._current_signal = self.RL_NONE
self._last_price = 0.0
self._previous_ema = 0.0
self._previous_atr = 0.0
self._previous_price = 0.0
self._previous_signal_price = 0.0
self._consecutive_wins = 0
self._consecutive_losses = 0
self._cooldown_remaining = 0
self._previous_above_upper = False
self._previous_below_lower = False
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
return [(self.Security, self.candle_type)]
def OnReseted(self):
super(keltner_with_rl_signal_strategy, self).OnReseted()
self._current_signal = self.RL_NONE
self._consecutive_wins = 0
self._consecutive_losses = 0
self._last_price = 0.0
self._previous_ema = 0.0
self._previous_atr = 0.0
self._previous_price = 0.0
self._previous_signal_price = 0.0
self._cooldown_remaining = 0
self._previous_above_upper = False
self._previous_below_lower = False
def OnStarted2(self, time):
super(keltner_with_rl_signal_strategy, self).OnStarted2(time)
keltner = KeltnerChannels()
keltner.Length = int(self._ema_period.Value)
keltner.Multiplier = Decimal(float(self._atr_multiplier.Value))
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(keltner, self.ProcessCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, keltner)
self.DrawOwnTrades(area)
def ProcessCandle(self, candle, keltner_value):
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
upper_val = keltner_value.Upper
lower_val = keltner_value.Lower
middle_val = keltner_value.Middle
if upper_val is None or lower_val is None or middle_val is None:
return
upper_band = float(upper_val)
lower_band = float(lower_val)
middle_band = float(middle_val)
atr_mult = float(self._atr_multiplier.Value)
current_atr = (upper_band - middle_band) / atr_mult
self._last_price = float(candle.ClosePrice)
self.UpdateRLSignal(candle, middle_band, current_atr)
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
price = float(candle.ClosePrice)
price_above_upper = price > upper_band
price_below_lower = price < lower_band
bullish_breakout = (not self._previous_above_upper) and price_above_upper
bearish_breakout = (not self._previous_below_lower) and price_below_lower
cooldown = int(self._cooldown_bars.Value)
if self._cooldown_remaining == 0 and bullish_breakout and self._current_signal == self.RL_BUY and self.Position <= 0:
vol = self.Volume
if self.Position < 0:
vol = self.Volume + Math.Abs(self.Position)
self.BuyMarket(vol)
self._previous_signal_price = price
self._cooldown_remaining = cooldown
elif self._cooldown_remaining == 0 and bearish_breakout and self._current_signal == self.RL_SELL and self.Position >= 0:
vol = self.Volume
if self.Position > 0:
vol = self.Volume + Math.Abs(self.Position)
self.SellMarket(vol)
self._previous_signal_price = price
self._cooldown_remaining = cooldown
if self.Position > 0 and price < middle_band:
self.SellMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
elif self.Position < 0 and price > middle_band:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self.ApplyAtrStopLoss(price, current_atr)
self._previous_ema = middle_band
self._previous_atr = current_atr
self._previous_price = price
self._previous_above_upper = price_above_upper
self._previous_below_lower = price_below_lower
def UpdateRLSignal(self, candle, ema, atr):
price_above_ema = float(candle.ClosePrice) > ema
price_increasing = float(candle.ClosePrice) > self._previous_price
volatility_increasing = atr > self._previous_atr
bullish_candle = candle.ClosePrice > candle.OpenPrice
aggressive_mode = self._consecutive_wins > self._consecutive_losses
if bullish_candle and price_above_ema and (price_increasing or aggressive_mode):
self._current_signal = self.RL_BUY
elif not bullish_candle and not price_above_ema and (not price_increasing or aggressive_mode):
self._current_signal = self.RL_SELL
else:
if volatility_increasing:
self._current_signal = self.RL_NONE
def OnOwnTradeReceived(self, trade):
if self._previous_signal_price == 0:
return
if trade.Order.Side == Sides.Buy:
profitable = self._last_price > float(trade.Trade.Price)
else:
profitable = self._last_price < float(trade.Trade.Price)
if profitable:
self._consecutive_wins += 1
self._consecutive_losses = 0
else:
self._consecutive_losses += 1
self._consecutive_wins = 0
def ApplyAtrStopLoss(self, price, atr):
stop_loss_mult = float(self._stop_loss_atr.Value)
if self.Position > 0:
stop_level = price - (stop_loss_mult * atr)
if self._last_price < stop_level:
self.SellMarket(Math.Abs(self.Position))
elif self.Position < 0:
stop_level = price + (stop_loss_mult * atr)
if self._last_price > stop_level:
self.BuyMarket(Math.Abs(self.Position))
def CreateClone(self):
return keltner_with_rl_signal_strategy()