using System;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy combining the MACD histogram (OsMA) with Bollinger Bands to trade reversals.
/// Similar to BandOsMa but with customizable moving average filter for exit signals.
/// </summary>
public class BandOsMaCustomStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _macdFastPeriod;
private readonly StrategyParam<int> _macdSlowPeriod;
private readonly StrategyParam<int> _macdSignalPeriod;
private readonly StrategyParam<int> _bollingerPeriod;
private readonly StrategyParam<decimal> _bollingerDeviation;
private readonly StrategyParam<int> _maPeriod;
private BollingerBands _bollinger;
private SMA _osmaMA;
private decimal _prevOsma;
private decimal _prevUpper;
private decimal _prevLower;
private decimal _prevMa;
private bool _hasPrev;
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public int MacdFastPeriod
{
get => _macdFastPeriod.Value;
set => _macdFastPeriod.Value = value;
}
public int MacdSlowPeriod
{
get => _macdSlowPeriod.Value;
set => _macdSlowPeriod.Value = value;
}
public int MacdSignalPeriod
{
get => _macdSignalPeriod.Value;
set => _macdSignalPeriod.Value = value;
}
public int BollingerPeriod
{
get => _bollingerPeriod.Value;
set => _bollingerPeriod.Value = value;
}
public decimal BollingerDeviation
{
get => _bollingerDeviation.Value;
set => _bollingerDeviation.Value = value;
}
public int MaPeriod
{
get => _maPeriod.Value;
set => _maPeriod.Value = value;
}
public BandOsMaCustomStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(60).TimeFrame())
.SetDisplay("Candle Type", "Primary timeframe", "General");
_macdFastPeriod = Param(nameof(MacdFastPeriod), 20)
.SetDisplay("MACD Fast", "Fast EMA length", "Indicators");
_macdSlowPeriod = Param(nameof(MacdSlowPeriod), 50)
.SetDisplay("MACD Slow", "Slow EMA length", "Indicators");
_macdSignalPeriod = Param(nameof(MacdSignalPeriod), 12)
.SetDisplay("MACD Signal", "Signal EMA length", "Indicators");
_bollingerPeriod = Param(nameof(BollingerPeriod), 14)
.SetDisplay("Bollinger Period", "OsMA Bollinger Bands period", "Indicators");
_bollingerDeviation = Param(nameof(BollingerDeviation), 2m)
.SetDisplay("Bollinger Deviation", "Bollinger Bands deviation", "Indicators");
_maPeriod = Param(nameof(MaPeriod), 10)
.SetDisplay("MA Period", "OsMA moving average period for exit filter", "Indicators");
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var macd = new MovingAverageConvergenceDivergenceSignal
{
Macd =
{
ShortMa = { Length = MacdFastPeriod },
LongMa = { Length = MacdSlowPeriod }
},
SignalMa = { Length = MacdSignalPeriod }
};
_bollinger = new BollingerBands
{
Length = BollingerPeriod,
Width = BollingerDeviation
};
_osmaMA = new SMA { Length = MaPeriod };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(macd, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, macd);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue macdValue)
{
if (candle.State != CandleStates.Finished)
return;
var val = (IMovingAverageConvergenceDivergenceSignalValue)macdValue;
if (val.Macd is not decimal macdLine || val.Signal is not decimal signalLine)
return;
var osma = macdLine - signalLine;
var bbResult = (BollingerBandsValue)_bollinger.Process(new DecimalIndicatorValue(_bollinger, osma, candle.CloseTime));
if (bbResult.UpBand is not decimal upper || bbResult.LowBand is not decimal lower)
return;
var maResult = _osmaMA.Process(new DecimalIndicatorValue(_osmaMA, osma, candle.CloseTime));
if (maResult.IsEmpty)
return;
var ma = maResult.GetValue<decimal>();
if (_hasPrev)
{
var buySignal = _prevOsma > _prevLower && osma <= lower && osma < ma;
var sellSignal = _prevOsma < _prevUpper && osma >= upper && osma > ma;
if (buySignal && Position <= 0)
BuyMarket(Position < 0 ? Math.Abs(Position) + 1 : 1);
else if (sellSignal && Position >= 0)
SellMarket(Position > 0 ? Math.Abs(Position) + 1 : 1);
}
_prevOsma = osma;
_prevUpper = upper;
_prevLower = lower;
_prevMa = ma;
_hasPrev = true;
}
/// <inheritdoc />
protected override void OnReseted()
{
_bollinger = null;
_osmaMA = null;
_prevOsma = 0;
_prevUpper = 0;
_prevLower = 0;
_prevMa = 0;
_hasPrev = false;
base.OnReseted();
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import MovingAverageConvergenceDivergence
from StockSharp.Algo.Strategies import Strategy
class band_os_ma_custom_strategy(Strategy):
def __init__(self):
super(band_os_ma_custom_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(60)))
self._macd_fast_period = self.Param("MacdFastPeriod", 20)
self._macd_slow_period = self.Param("MacdSlowPeriod", 50)
self._macd_signal_period = self.Param("MacdSignalPeriod", 12)
self._bollinger_period = self.Param("BollingerPeriod", 14)
self._bollinger_deviation = self.Param("BollingerDeviation", 2.0)
self._ma_period = self.Param("MaPeriod", 10)
self._macd_history = []
self._osma_history = []
self._osma_ma_history = []
self._prev_osma = None
self._prev_upper = None
self._prev_lower = None
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
@property
def MacdFastPeriod(self):
return self._macd_fast_period.Value
@MacdFastPeriod.setter
def MacdFastPeriod(self, value):
self._macd_fast_period.Value = value
@property
def MacdSlowPeriod(self):
return self._macd_slow_period.Value
@MacdSlowPeriod.setter
def MacdSlowPeriod(self, value):
self._macd_slow_period.Value = value
@property
def MacdSignalPeriod(self):
return self._macd_signal_period.Value
@MacdSignalPeriod.setter
def MacdSignalPeriod(self, value):
self._macd_signal_period.Value = value
@property
def BollingerPeriod(self):
return self._bollinger_period.Value
@BollingerPeriod.setter
def BollingerPeriod(self, value):
self._bollinger_period.Value = value
@property
def BollingerDeviation(self):
return self._bollinger_deviation.Value
@BollingerDeviation.setter
def BollingerDeviation(self, value):
self._bollinger_deviation.Value = value
@property
def MaPeriod(self):
return self._ma_period.Value
@MaPeriod.setter
def MaPeriod(self, value):
self._ma_period.Value = value
def OnReseted(self):
super(band_os_ma_custom_strategy, self).OnReseted()
self._macd_history = []
self._osma_history = []
self._osma_ma_history = []
self._prev_osma = None
self._prev_upper = None
self._prev_lower = None
def OnStarted2(self, time):
super(band_os_ma_custom_strategy, self).OnStarted2(time)
self._macd_history = []
self._osma_history = []
self._osma_ma_history = []
self._prev_osma = None
self._prev_upper = None
self._prev_lower = None
macd = MovingAverageConvergenceDivergence()
macd.ShortMa.Length = self.MacdFastPeriod
macd.LongMa.Length = self.MacdSlowPeriod
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(macd, self._process_candle).Start()
def _process_candle(self, candle, macd_value):
if candle.State != CandleStates.Finished:
return
macd_val = float(macd_value)
signal_period = self.MacdSignalPeriod
bb_period = self.BollingerPeriod
bb_dev = float(self.BollingerDeviation)
ma_period = self.MaPeriod
# Signal line
self._macd_history.append(macd_val)
while len(self._macd_history) > signal_period:
self._macd_history.pop(0)
if len(self._macd_history) < signal_period:
return
signal = sum(self._macd_history) / signal_period
osma = macd_val - signal
# BB on OsMA
self._osma_history.append(osma)
while len(self._osma_history) > bb_period:
self._osma_history.pop(0)
if len(self._osma_history) < bb_period:
return
mean = sum(self._osma_history) / len(self._osma_history)
variance = sum((x - mean) ** 2 for x in self._osma_history) / len(self._osma_history)
std_dev = math.sqrt(variance)
upper = mean + bb_dev * std_dev
lower = mean - bb_dev * std_dev
# MA of OsMA
self._osma_ma_history.append(osma)
while len(self._osma_ma_history) > ma_period:
self._osma_ma_history.pop(0)
if len(self._osma_ma_history) < ma_period:
return
ma = sum(self._osma_ma_history) / ma_period
if self._prev_osma is not None and self._prev_upper is not None and self._prev_lower is not None:
buy_signal = self._prev_osma > self._prev_lower and osma <= lower and osma < ma
sell_signal = self._prev_osma < self._prev_upper and osma >= upper and osma > ma
if buy_signal and self.Position <= 0:
self.BuyMarket()
elif sell_signal and self.Position >= 0:
self.SellMarket()
self._prev_osma = osma
self._prev_upper = upper
self._prev_lower = lower
def CreateClone(self):
return band_os_ma_custom_strategy()