Divergence Expert
Strategy trading RSI price divergences. Detects bullish divergence when price makes a lower low but RSI forms a higher low, and bearish divergence when price makes a higher high but RSI forms a lower high. Enters long or short positions accordingly and uses a percentage stop loss.
Details
- Entry Criteria:
- Long: price makes a new low and RSI makes a higher low (bullish divergence)
- Short: price makes a new high and RSI makes a lower high (bearish divergence)
- Long/Short: Both
- Exit Criteria:
- Long: price hits stop loss or bearish divergence appears
- Short: price hits stop loss or bullish divergence appears
- Stops: Percent from entry price
- Default Values:
RsiPeriod= 14StopLossPercent= 2mCandleType= TimeSpan.FromMinutes(5).TimeFrame()
- Filters:
- Category: Divergence
- Direction: Both
- Indicators: RSI
- Stops: Yes
- Complexity: Basic
- Timeframe: Mid-term
- Seasonality: No
- Neural Networks: No
- Divergence: Yes
- Risk Level: Medium
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy trading RSI price divergences.
/// </summary>
public class DivergenceExpertStrategy : Strategy
{
private readonly StrategyParam<int> _rsiPeriod;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<DateTimeOffset> _startDate;
private readonly StrategyParam<DateTimeOffset> _endDate;
private decimal _entryPrice;
private decimal _stopPrice;
private decimal _lastPriceHigh;
private decimal _lastPriceLow;
private decimal _lastRsiHigh;
private decimal _lastRsiLow;
public int RsiPeriod
{
get => _rsiPeriod.Value;
set => _rsiPeriod.Value = value;
}
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public DateTimeOffset StartDate
{
get => _startDate.Value;
set => _startDate.Value = value;
}
public DateTimeOffset EndDate
{
get => _endDate.Value;
set => _endDate.Value = value;
}
public DivergenceExpertStrategy()
{
_rsiPeriod = Param(nameof(RsiPeriod), 14)
.SetDisplay("RSI Period", "RSI calculation period", "Parameters")
.SetOptimize(5, 30, 5);
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetDisplay("Stop Loss (%)", "Max risk per trade in percent", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
_startDate = Param(nameof(StartDate), new DateTimeOffset(new DateTime(2017, 1, 1), TimeSpan.Zero))
.SetDisplay("Start Date", "Backtest start date", "General");
_endDate = Param(nameof(EndDate), new DateTimeOffset(new DateTime(2024, 7, 1), TimeSpan.Zero))
.SetDisplay("End Date", "Backtest end date", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_entryPrice = default;
_stopPrice = default;
_lastPriceHigh = default;
_lastPriceLow = default;
_lastRsiHigh = default;
_lastRsiLow = default;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var rsi = new RelativeStrengthIndex
{
Length = RsiPeriod
};
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(rsi, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal rsi)
{
if (candle.State != CandleStates.Finished)
return;
var time = candle.OpenTime;
var inRange = time >= StartDate && time <= EndDate;
if (!inRange)
{
if (Position != 0)
if (Position > 0) SellMarket(); else if (Position < 0) BuyMarket();
return;
}
var close = candle.ClosePrice;
// Track new highs for bearish divergence detection
if (candle.HighPrice > _lastPriceHigh)
{
if (_lastPriceHigh != 0m && rsi < _lastRsiHigh && Position >= 0)
{
if (Position > 0)
if (Position > 0) SellMarket(); else if (Position < 0) BuyMarket();
SellMarket();
_entryPrice = close;
_stopPrice = _entryPrice * (1m + StopLossPercent / 100m);
}
_lastPriceHigh = candle.HighPrice;
_lastRsiHigh = rsi;
}
// Track new lows for bullish divergence detection
if (_lastPriceLow == 0m || candle.LowPrice < _lastPriceLow)
{
if (_lastPriceLow != 0m && rsi > _lastRsiLow && Position <= 0)
{
if (Position < 0)
if (Position > 0) SellMarket(); else if (Position < 0) BuyMarket();
BuyMarket();
_entryPrice = close;
_stopPrice = _entryPrice * (1m - StopLossPercent / 100m);
}
_lastPriceLow = candle.LowPrice;
_lastRsiLow = rsi;
}
if (Position > 0 && candle.LowPrice <= _stopPrice)
if (Position > 0) SellMarket(); else if (Position < 0) BuyMarket();
else if (Position < 0 && candle.HighPrice >= _stopPrice)
if (Position > 0) SellMarket(); else if (Position < 0) BuyMarket();
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, DateTimeOffset, DateTime, TimeSpan as SysTimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import RelativeStrengthIndex
from StockSharp.Algo.Strategies import Strategy
class divergence_expert_strategy(Strategy):
def __init__(self):
super(divergence_expert_strategy, self).__init__()
self._rsi_period = self.Param("RsiPeriod", 14) \
.SetDisplay("RSI Period", "RSI calculation period", "Parameters")
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetDisplay("Stop Loss (%)", "Max risk per trade in percent", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._start_date = self.Param("StartDate", DateTimeOffset(DateTime(2017, 1, 1), SysTimeSpan.Zero)) \
.SetDisplay("Start Date", "Backtest start date", "General")
self._end_date = self.Param("EndDate", DateTimeOffset(DateTime(2024, 7, 1), SysTimeSpan.Zero)) \
.SetDisplay("End Date", "Backtest end date", "General")
self._entry_price = 0.0
self._stop_price = 0.0
self._last_price_high = 0.0
self._last_price_low = 0.0
self._last_rsi_high = 0.0
self._last_rsi_low = 0.0
@property
def rsi_period(self):
return self._rsi_period.Value
@property
def stop_loss_percent(self):
return self._stop_loss_percent.Value
@property
def candle_type(self):
return self._candle_type.Value
@property
def start_date(self):
return self._start_date.Value
@property
def end_date(self):
return self._end_date.Value
def OnReseted(self):
super(divergence_expert_strategy, self).OnReseted()
self._entry_price = 0.0
self._stop_price = 0.0
self._last_price_high = 0.0
self._last_price_low = 0.0
self._last_rsi_high = 0.0
self._last_rsi_low = 0.0
def OnStarted2(self, time):
super(divergence_expert_strategy, self).OnStarted2(time)
rsi = RelativeStrengthIndex()
rsi.Length = self.rsi_period
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(rsi, self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def process_candle(self, candle, rsi):
if candle.State != CandleStates.Finished:
return
t = DateTimeOffset(candle.OpenTime) if not isinstance(candle.OpenTime, DateTimeOffset) else candle.OpenTime
in_range = t >= self.start_date and t <= self.end_date
if not in_range:
if self.Position > 0:
self.SellMarket()
elif self.Position < 0:
self.BuyMarket()
return
rsi = float(rsi)
close = float(candle.ClosePrice)
high = float(candle.HighPrice)
low = float(candle.LowPrice)
sl_pct = float(self.stop_loss_percent)
# Track new highs for bearish divergence detection
if high > self._last_price_high:
if self._last_price_high != 0.0 and rsi < self._last_rsi_high and self.Position >= 0:
if self.Position > 0:
self.SellMarket()
self.SellMarket()
self._entry_price = close
self._stop_price = self._entry_price * (1.0 + sl_pct / 100.0)
self._last_price_high = high
self._last_rsi_high = rsi
# Track new lows for bullish divergence detection
if self._last_price_low == 0.0 or low < self._last_price_low:
if self._last_price_low != 0.0 and rsi > self._last_rsi_low and self.Position <= 0:
if self.Position < 0:
self.BuyMarket()
self.BuyMarket()
self._entry_price = close
self._stop_price = self._entry_price * (1.0 - sl_pct / 100.0)
self._last_price_low = low
self._last_rsi_low = rsi
if self.Position > 0 and low <= self._stop_price:
self.SellMarket()
elif self.Position < 0 and high >= self._stop_price:
self.BuyMarket()
def CreateClone(self):
return divergence_expert_strategy()