Kloss Strategy
The Kloss strategy combines a weighted moving average (WMA), the Commodity Channel Index (CCI), and the Stochastic oscillator. All indicators are evaluated on shifted historical values, allowing signals to be based on past market context. A long position is opened when CCI drops below a negative threshold, the Stochastic main line falls under a deviation from the neutral 50 level, and the shifted price is above the shifted WMA. A short position is opened on the opposite conditions. Optional reverse closing exits an existing position when the opposite signal appears. Stop loss and take profit are set in points from the entry price.
Details
- Entry Criteria:
- Long: Shifted CCI below
-CciDiffer, shifted Stochastic below50 - StochDiffer, and shifted price above shifted WMA. - Short: Shifted CCI above
CciDiffer, shifted Stochastic above50 + StochDiffer, and shifted price below shifted WMA.
- Long: Shifted CCI below
- Long/Short: Both.
- Exit Criteria:
- Reverse signal if
RevCloseenabled or stop loss / take profit levels.
- Reverse signal if
- Stops: Absolute stop loss and take profit in points.
- Filters:
- Indicator and price shifts via
CommonShiftallow signal generation from past bars.
- Indicator and price shifts via
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that combines Weighted Moving Average, CCI and Stochastic oscillator.
/// </summary>
public class KlossStrategy : Strategy
{
private readonly StrategyParam<int> _maPeriod;
private readonly StrategyParam<int> _cciPeriod;
private readonly StrategyParam<decimal> _cciLevel;
private readonly StrategyParam<decimal> _stochLevel;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<decimal> _takeProfit;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _cooldownBars;
private int _previousSignal;
private int _cooldownRemaining;
/// <summary>Moving Average period.</summary>
public int MaPeriod { get => _maPeriod.Value; set => _maPeriod.Value = value; }
/// <summary>CCI calculation period.</summary>
public int CciPeriod { get => _cciPeriod.Value; set => _cciPeriod.Value = value; }
/// <summary>CCI level for signals.</summary>
public decimal CciLevel { get => _cciLevel.Value; set => _cciLevel.Value = value; }
/// <summary>Stochastic level offset from 50.</summary>
public decimal StochLevel { get => _stochLevel.Value; set => _stochLevel.Value = value; }
/// <summary>Stop loss in price steps.</summary>
public decimal StopLoss { get => _stopLoss.Value; set => _stopLoss.Value = value; }
/// <summary>Take profit in price steps.</summary>
public decimal TakeProfit { get => _takeProfit.Value; set => _takeProfit.Value = value; }
/// <summary>Candle type used for calculations.</summary>
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
/// <summary>Completed candles to wait after a position change.</summary>
public int CooldownBars { get => _cooldownBars.Value; set => _cooldownBars.Value = value; }
/// <summary>
/// Initialize <see cref="KlossStrategy"/>.
/// </summary>
public KlossStrategy()
{
_maPeriod = Param(nameof(MaPeriod), 10)
.SetGreaterThanZero()
.SetDisplay("MA Period", "Length of weighted MA", "Indicators")
.SetOptimize(5, 50, 5);
_cciPeriod = Param(nameof(CciPeriod), 10)
.SetGreaterThanZero()
.SetDisplay("CCI Period", "Length of CCI", "Indicators")
.SetOptimize(5, 30, 5);
_cciLevel = Param(nameof(CciLevel), 50m)
.SetGreaterThanZero()
.SetDisplay("CCI Level", "Distance from zero to trigger signal", "Indicators")
.SetOptimize(50m, 200m, 10m);
_stochLevel = Param(nameof(StochLevel), 10m)
.SetGreaterThanZero()
.SetDisplay("Stochastic Level", "Distance from 50 to trigger", "Indicators")
.SetOptimize(5m, 40m, 5m);
_stopLoss = Param(nameof(StopLoss), 550m)
.SetNotNegative()
.SetDisplay("Stop Loss", "Stop loss in price steps", "Risk");
_takeProfit = Param(nameof(TakeProfit), 550m)
.SetNotNegative()
.SetDisplay("Take Profit", "Take profit in price steps", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Candles for calculations", "General");
_cooldownBars = Param(nameof(CooldownBars), 3)
.SetDisplay("Cooldown Bars", "Completed candles to wait after a position change", "Trading");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_previousSignal = 0;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var ma = new WeightedMovingAverage { Length = MaPeriod };
var cci = new CommodityChannelIndex { Length = CciPeriod };
var stoch = new StochasticOscillator();
var subscription = SubscribeCandles(CandleType);
subscription.BindEx(ma, cci, stoch, ProcessCandle).Start();
StartProtection(
takeProfit: new Unit(2, UnitTypes.Percent),
stopLoss: new Unit(1, UnitTypes.Percent));
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, ma);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue maValue, IIndicatorValue cciValue, IIndicatorValue stochValue)
{
if (candle.State != CandleStates.Finished || !maValue.IsFinal || !cciValue.IsFinal || !stochValue.IsFinal)
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var ma = maValue.ToDecimal();
var cci = cciValue.ToDecimal();
var stoch = (StochasticOscillatorValue)stochValue;
if (stoch.K is not decimal stochK || stoch.D is not decimal stochD)
return;
var price = candle.ClosePrice;
var buySignal = cci < -CciLevel && stochK < 50m - StochLevel && stochD < 50m - StochLevel && price > ma;
var sellSignal = cci > CciLevel && stochK > 50m + StochLevel && stochD > 50m + StochLevel && price < ma;
var currentSignal = buySignal ? 1 : sellSignal ? -1 : 0;
if (_cooldownRemaining == 0 && Position == 0)
{
if (currentSignal > 0 && _previousSignal <= 0)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (currentSignal < 0 && _previousSignal >= 0)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
if (currentSignal != 0)
_previousSignal = currentSignal;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import WeightedMovingAverage, CommodityChannelIndex, StochasticOscillator
from StockSharp.Algo.Strategies import Strategy
class kloss_strategy(Strategy):
def __init__(self):
super(kloss_strategy, self).__init__()
self._ma_period = self.Param("MaPeriod", 10) \
.SetGreaterThanZero() \
.SetDisplay("MA Period", "Length of weighted MA", "Indicators") \
.SetOptimize(5, 50, 5)
self._cci_period = self.Param("CciPeriod", 10) \
.SetGreaterThanZero() \
.SetDisplay("CCI Period", "Length of CCI", "Indicators") \
.SetOptimize(5, 30, 5)
self._cci_level = self.Param("CciLevel", 50.0) \
.SetGreaterThanZero() \
.SetDisplay("CCI Level", "Distance from zero to trigger signal", "Indicators") \
.SetOptimize(50.0, 200.0, 10.0)
self._stoch_level = self.Param("StochLevel", 10.0) \
.SetGreaterThanZero() \
.SetDisplay("Stochastic Level", "Distance from 50 to trigger", "Indicators") \
.SetOptimize(5.0, 40.0, 5.0)
self._stop_loss = self.Param("StopLoss", 550.0) \
.SetNotNegative() \
.SetDisplay("Stop Loss", "Stop loss in price steps", "Risk")
self._take_profit_param = self.Param("TakeProfit", 550.0) \
.SetNotNegative() \
.SetDisplay("Take Profit", "Take profit in price steps", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Candles for calculations", "General")
self._cooldown_bars = self.Param("CooldownBars", 3) \
.SetDisplay("Cooldown Bars", "Completed candles to wait after a position change", "Trading")
self._previous_signal = 0
self._cooldown_remaining = 0
@property
def ma_period(self):
return self._ma_period.Value
@property
def cci_period(self):
return self._cci_period.Value
@property
def cci_level(self):
return self._cci_level.Value
@property
def stoch_level(self):
return self._stoch_level.Value
@property
def stop_loss(self):
return self._stop_loss.Value
@property
def take_profit(self):
return self._take_profit_param.Value
@property
def candle_type(self):
return self._candle_type.Value
@property
def cooldown_bars(self):
return self._cooldown_bars.Value
def OnReseted(self):
super(kloss_strategy, self).OnReseted()
self._previous_signal = 0
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(kloss_strategy, self).OnStarted2(time)
ma = WeightedMovingAverage()
ma.Length = self.ma_period
cci = CommodityChannelIndex()
cci.Length = self.cci_period
stoch = StochasticOscillator()
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(ma, cci, stoch, self.process_candle).Start()
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(1, UnitTypes.Percent))
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, ma)
self.DrawOwnTrades(area)
def process_candle(self, candle, ma_value, cci_value, stoch_value):
if candle.State != CandleStates.Finished:
return
if not ma_value.IsFinal or not cci_value.IsFinal or not stoch_value.IsFinal:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
ma = float(ma_value)
cci = float(cci_value)
stoch_k = stoch_value.K
stoch_d = stoch_value.D
if stoch_k is None or stoch_d is None:
return
k = float(stoch_k)
d = float(stoch_d)
price = float(candle.ClosePrice)
cci_lvl = float(self.cci_level)
stoch_lvl = float(self.stoch_level)
buy_signal = cci < -cci_lvl and k < 50.0 - stoch_lvl and d < 50.0 - stoch_lvl and price > ma
sell_signal = cci > cci_lvl and k > 50.0 + stoch_lvl and d > 50.0 + stoch_lvl and price < ma
if buy_signal:
current_signal = 1
elif sell_signal:
current_signal = -1
else:
current_signal = 0
if self._cooldown_remaining == 0 and self.Position == 0:
if current_signal > 0 and self._previous_signal <= 0:
self.BuyMarket()
self._cooldown_remaining = self.cooldown_bars
elif current_signal < 0 and self._previous_signal >= 0:
self.SellMarket()
self._cooldown_remaining = self.cooldown_bars
if current_signal != 0:
self._previous_signal = current_signal
def CreateClone(self):
return kloss_strategy()