Mawreez' RSI Divergence Detector Strategy
This strategy checks for price and RSI divergence over a range of lookback lengths. It buys when price makes a lower low while RSI forms a higher low, and sells on the opposite signal.
Parameters
- Candle Type
- RSI Length
- Minimum Divergence Length
- Maximum Divergence Length
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
public class MawreezRsiDivergenceDetectorStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _rsiLength;
private readonly StrategyParam<int> _minDivLength;
private readonly StrategyParam<int> _maxDivLength;
private readonly StrategyParam<decimal> _minPriceMovePercent;
private readonly StrategyParam<decimal> _minRsiMove;
private readonly StrategyParam<int> _signalCooldownBars;
private RelativeStrengthIndex _rsi;
private decimal[] _priceHistory;
private decimal[] _rsiHistory;
private int _index;
private int _barsFromSignal;
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int RsiLength { get => _rsiLength.Value; set => _rsiLength.Value = value; }
public int MinDivLength { get => _minDivLength.Value; set => _minDivLength.Value = value; }
public int MaxDivLength { get => _maxDivLength.Value; set => _maxDivLength.Value = value; }
public decimal MinPriceMovePercent { get => _minPriceMovePercent.Value; set => _minPriceMovePercent.Value = value; }
public decimal MinRsiMove { get => _minRsiMove.Value; set => _minRsiMove.Value = value; }
public int SignalCooldownBars { get => _signalCooldownBars.Value; set => _signalCooldownBars.Value = value; }
public MawreezRsiDivergenceDetectorStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(15).TimeFrame())
.SetDisplay("Candle Type", "Candles timeframe", "General");
_rsiLength = Param(nameof(RsiLength), 14)
.SetGreaterThanZero()
.SetDisplay("RSI Length", "RSI period", "General");
_minDivLength = Param(nameof(MinDivLength), 5)
.SetGreaterThanZero()
.SetDisplay("Min Div Length", "Minimum divergence length", "General");
_maxDivLength = Param(nameof(MaxDivLength), 30)
.SetGreaterThanZero()
.SetDisplay("Max Div Length", "Maximum divergence length", "General");
_minPriceMovePercent = Param(nameof(MinPriceMovePercent), 0.35m)
.SetGreaterThanZero()
.SetDisplay("Min Price Move %", "Minimum price distance for divergence", "General");
_minRsiMove = Param(nameof(MinRsiMove), 6m)
.SetGreaterThanZero()
.SetDisplay("Min RSI Move", "Minimum RSI distance for divergence", "General");
_signalCooldownBars = Param(nameof(SignalCooldownBars), 12)
.SetGreaterThanZero()
.SetDisplay("Signal Cooldown Bars", "Minimum bars between entries", "General");
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_rsi = null;
_priceHistory = null;
_rsiHistory = null;
_index = 0;
_barsFromSignal = 0;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
StartProtection(null, null);
_rsi = new RelativeStrengthIndex { Length = RsiLength };
_priceHistory = new decimal[MaxDivLength + 1];
_rsiHistory = new decimal[MaxDivLength + 1];
_index = 0;
_barsFromSignal = SignalCooldownBars;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_rsi, ProcessCandle)
.Start();
}
private void ProcessCandle(ICandleMessage candle, decimal rsi)
{
if (candle.State != CandleStates.Finished)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (!_rsi.IsFormed)
return;
var price = candle.ClosePrice;
var pos = _index % _priceHistory.Length;
_priceHistory[pos] = price;
_rsiHistory[pos] = rsi;
_index++;
if (_index <= MaxDivLength)
return;
_barsFromSignal++;
if (_barsFromSignal < SignalCooldownBars)
return;
int winner = 0;
for (var l = MinDivLength; l <= MaxDivLength; l++)
{
var idx = (_index - l - 1) % _priceHistory.Length;
if (idx < 0) idx += _priceHistory.Length;
var pastPrice = _priceHistory[idx];
var pastRsi = _rsiHistory[idx];
var dsrc = price - pastPrice;
var dosc = rsi - pastRsi;
var priceMovePercent = price > 0m ? Math.Abs(dsrc) / price * 100m : 0m;
var rsiMove = Math.Abs(dosc);
if (priceMovePercent < MinPriceMovePercent || rsiMove < MinRsiMove)
continue;
if (Math.Sign(dsrc) == Math.Sign(dosc))
continue;
if (winner == 0)
{
if (dsrc < 0 && dosc > 0)
winner = 1;
else if (dsrc > 0 && dosc < 0)
winner = -1;
}
}
if (winner > 0 && Position <= 0)
{
BuyMarket();
_barsFromSignal = 0;
}
else if (winner < 0 && Position >= 0)
{
SellMarket();
_barsFromSignal = 0;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import RelativeStrengthIndex
from StockSharp.Algo.Strategies import Strategy
class mawreez_rsi_divergence_detector_strategy(Strategy):
def __init__(self):
super(mawreez_rsi_divergence_detector_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(15))) \
.SetDisplay("Candle Type", "Candles timeframe", "General")
self._rsi_length = self.Param("RsiLength", 14) \
.SetGreaterThanZero() \
.SetDisplay("RSI Length", "RSI period", "General")
self._min_div_length = self.Param("MinDivLength", 5) \
.SetGreaterThanZero() \
.SetDisplay("Min Div Length", "Minimum divergence length", "General")
self._max_div_length = self.Param("MaxDivLength", 30) \
.SetGreaterThanZero() \
.SetDisplay("Max Div Length", "Maximum divergence length", "General")
self._min_price_move_percent = self.Param("MinPriceMovePercent", 0.35) \
.SetGreaterThanZero() \
.SetDisplay("Min Price Move %", "Minimum price distance for divergence", "General")
self._min_rsi_move = self.Param("MinRsiMove", 6.0) \
.SetGreaterThanZero() \
.SetDisplay("Min RSI Move", "Minimum RSI distance for divergence", "General")
self._signal_cooldown_bars = self.Param("SignalCooldownBars", 12) \
.SetGreaterThanZero() \
.SetDisplay("Signal Cooldown Bars", "Minimum bars between entries", "General")
self._price_history = []
self._rsi_history = []
self._index = 0
self._bars_from_signal = 0
@property
def candle_type(self):
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
def OnReseted(self):
super(mawreez_rsi_divergence_detector_strategy, self).OnReseted()
self._price_history = []
self._rsi_history = []
self._index = 0
self._bars_from_signal = 0
def OnStarted2(self, time):
super(mawreez_rsi_divergence_detector_strategy, self).OnStarted2(time)
buf_size = self._max_div_length.Value + 1
self._price_history = [0.0] * buf_size
self._rsi_history = [0.0] * buf_size
self._index = 0
self._bars_from_signal = self._signal_cooldown_bars.Value
self._rsi = RelativeStrengthIndex()
self._rsi.Length = self._rsi_length.Value
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._rsi, self.OnProcess).Start()
def OnProcess(self, candle, rsi):
if candle.State != CandleStates.Finished:
return
if not self._rsi.IsFormed:
return
price = float(candle.ClosePrice)
rv = float(rsi)
buf_size = len(self._price_history)
pos = self._index % buf_size
self._price_history[pos] = price
self._rsi_history[pos] = rv
self._index += 1
max_div = self._max_div_length.Value
if self._index <= max_div:
return
self._bars_from_signal += 1
cd = self._signal_cooldown_bars.Value
if self._bars_from_signal < cd:
return
min_div = self._min_div_length.Value
min_price_pct = float(self._min_price_move_percent.Value)
min_rsi_mv = float(self._min_rsi_move.Value)
winner = 0
for l in range(min_div, max_div + 1):
idx = (self._index - l - 1) % buf_size
if idx < 0:
idx += buf_size
past_price = self._price_history[idx]
past_rsi = self._rsi_history[idx]
dsrc = price - past_price
dosc = rv - past_rsi
price_move_pct = abs(dsrc) / price * 100.0 if price > 0.0 else 0.0
rsi_move = abs(dosc)
if price_move_pct < min_price_pct or rsi_move < min_rsi_mv:
continue
if (dsrc > 0 and dosc > 0) or (dsrc < 0 and dosc < 0) or (dsrc == 0 or dosc == 0):
continue
if winner == 0:
if dsrc < 0 and dosc > 0:
winner = 1
elif dsrc > 0 and dosc < 0:
winner = -1
if winner > 0 and self.Position <= 0:
self.BuyMarket()
self._bars_from_signal = 0
elif winner < 0 and self.Position >= 0:
self.SellMarket()
self._bars_from_signal = 0
def CreateClone(self):
return mawreez_rsi_divergence_detector_strategy()