namespace StockSharp.Samples.Strategies;
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
/// <summary>
/// RSI-based strategy with volume sizing and stair-like trailing stop.
/// </summary>
public class RsiTestStrategy : Strategy
{
private readonly StrategyParam<int> _rsiPeriod;
private readonly StrategyParam<decimal> _buyLevel;
private readonly StrategyParam<decimal> _sellLevel;
private readonly StrategyParam<decimal> _riskPercentage;
private readonly StrategyParam<int> _trailingDistanceSteps;
private readonly StrategyParam<int> _maxOpenPositions;
private readonly StrategyParam<DataType> _candleType;
private RelativeStrengthIndex _rsi;
private decimal? _previousRsi;
private decimal? _previousOpen;
private decimal? _entryPrice;
private decimal? _stopPrice;
private bool _trailingArmed;
private decimal _priceStep;
/// <summary>
/// Initialize <see cref="RsiTestStrategy"/>.
/// </summary>
public RsiTestStrategy()
{
_rsiPeriod = Param(nameof(RsiPeriod), 7)
.SetGreaterThanZero()
.SetDisplay("RSI Period", "Lookback period for RSI", "Indicators")
.SetOptimize(7, 28, 1);
_buyLevel = Param(nameof(BuyLevel), 40m)
.SetDisplay("RSI Buy Level", "Oversold threshold for long entries", "Trading");
_sellLevel = Param(nameof(SellLevel), 60m)
.SetDisplay("RSI Sell Level", "Overbought threshold for short entries", "Trading");
_riskPercentage = Param(nameof(RiskPercentage), 10m)
.SetDisplay("Risk Percentage", "Portfolio percentage used for sizing", "Risk");
_trailingDistanceSteps = Param(nameof(TrailingDistanceSteps), 50)
.SetDisplay("Trailing Distance Steps", "Steps before activating trailing stop", "Risk");
_maxOpenPositions = Param(nameof(MaxOpenPositions), 1)
.SetDisplay("Max Open Positions", "Maximum simultaneous positions. 0 disables the limit.", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Primary timeframe for calculations", "Data");
}
public int RsiPeriod
{
get => _rsiPeriod.Value;
set => _rsiPeriod.Value = value;
}
public decimal BuyLevel
{
get => _buyLevel.Value;
set => _buyLevel.Value = value;
}
public decimal SellLevel
{
get => _sellLevel.Value;
set => _sellLevel.Value = value;
}
public decimal RiskPercentage
{
get => _riskPercentage.Value;
set => _riskPercentage.Value = value;
}
public int TrailingDistanceSteps
{
get => _trailingDistanceSteps.Value;
set => _trailingDistanceSteps.Value = value;
}
public int MaxOpenPositions
{
get => _maxOpenPositions.Value;
set => _maxOpenPositions.Value = value;
}
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities() => [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_previousRsi = null;
_previousOpen = null;
_entryPrice = null;
_stopPrice = null;
_trailingArmed = false;
_priceStep = 0m;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_rsi = new RelativeStrengthIndex { Length = RsiPeriod };
_priceStep = Security?.PriceStep ?? 0m;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_rsi, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _rsi);
DrawOwnTrades(area);
}
StartProtection(null, null);
}
private void ProcessCandle(ICandleMessage candle, decimal rsiValue)
{
// Only react to fully formed candles to match the MQL implementation.
if (candle.State != CandleStates.Finished)
return;
// Manage trailing logic and exits before attempting fresh entries.
ManagePosition(candle);
if (!_rsi.IsFormed)
{
_previousRsi = rsiValue;
_previousOpen = candle.OpenPrice;
return;
}
if (_previousRsi is null || _previousOpen is null)
{
_previousRsi = rsiValue;
_previousOpen = candle.OpenPrice;
return;
}
if (rsiValue < BuyLevel && Position <= 0)
{
TryEnterLong(candle);
}
else if (rsiValue > SellLevel && Position >= 0)
{
TryEnterShort(candle);
}
_previousRsi = rsiValue;
_previousOpen = candle.OpenPrice;
}
private void TryEnterLong(ICandleMessage candle)
{
// Close short position first if needed
if (Position < 0)
{
BuyMarket(Math.Abs(Position));
ResetPositionState();
}
if (Position == 0)
{
BuyMarket();
_entryPrice = candle.ClosePrice;
_stopPrice = null;
_trailingArmed = false;
}
}
private void TryEnterShort(ICandleMessage candle)
{
// Close long position first if needed
if (Position > 0)
{
SellMarket(Math.Abs(Position));
ResetPositionState();
}
if (Position == 0)
{
SellMarket();
_entryPrice = candle.ClosePrice;
_stopPrice = null;
_trailingArmed = false;
}
}
private void ManagePosition(ICandleMessage candle)
{
if (Position == 0)
{
ResetPositionState();
return;
}
var avgPrice = _entryPrice;
if (avgPrice > 0m)
_entryPrice = avgPrice;
if (Position > 0)
{
UpdateTrailingForLong(candle);
TryExitLong(candle);
}
else if (Position < 0)
{
UpdateTrailingForShort(candle);
TryExitShort(candle);
}
}
private void UpdateTrailingForLong(ICandleMessage candle)
{
if (TrailingDistanceSteps <= 0 || _entryPrice is null || _trailingArmed)
return;
var trailingDistance = GetPriceOffset(TrailingDistanceSteps);
if (trailingDistance <= 0m)
return;
var activationPrice = _entryPrice.Value + trailingDistance;
if (candle.HighPrice < activationPrice)
return;
_stopPrice = _entryPrice.Value + trailingDistance;
_trailingArmed = true;
LogInfo($"Activated long trailing stop at {_stopPrice:0.#####}.");
}
private void UpdateTrailingForShort(ICandleMessage candle)
{
if (TrailingDistanceSteps <= 0 || _entryPrice is null || _trailingArmed)
return;
var trailingDistance = GetPriceOffset(TrailingDistanceSteps);
if (trailingDistance <= 0m)
return;
var activationPrice = _entryPrice.Value - trailingDistance;
if (candle.LowPrice > activationPrice)
return;
_stopPrice = _entryPrice.Value - trailingDistance;
_trailingArmed = true;
LogInfo($"Activated short trailing stop at {_stopPrice:0.#####}.");
}
private void TryExitLong(ICandleMessage candle)
{
if (_stopPrice is null)
return;
var volume = Math.Abs(Position);
if (volume <= 0m)
return;
if (candle.LowPrice > _stopPrice.Value)
return;
SellMarket(volume);
ResetPositionState();
}
private void TryExitShort(ICandleMessage candle)
{
if (_stopPrice is null)
return;
var volume = Math.Abs(Position);
if (volume <= 0m)
return;
if (candle.HighPrice < _stopPrice.Value)
return;
BuyMarket(volume);
ResetPositionState();
}
private decimal CalculateOrderVolume(decimal referencePrice)
{
var volume = Volume;
if (RiskPercentage > 0m)
{
var portfolioValue = Portfolio?.CurrentValue ?? Portfolio?.BeginValue ?? 0m;
var riskCapital = portfolioValue * RiskPercentage / 100m;
if (riskCapital > 0m)
{
var margin = GetSecurityValue<decimal?>(Level1Fields.MarginBuy) ?? GetSecurityValue<decimal?>(Level1Fields.MarginSell) ?? 0m;
if (margin > 0m)
{
volume = riskCapital / margin;
}
else if (referencePrice > 0m)
{
volume = riskCapital / referencePrice;
}
}
}
volume = RoundVolume(volume);
// Ensure volume is at least the base Volume when calculation produces too small a value
if (volume <= 0m)
volume = Volume;
var minVolume = Security?.MinVolume;
if (minVolume != null && minVolume.Value > 0m && volume < minVolume.Value)
{
volume = minVolume.Value;
}
var maxVolume = Security?.MaxVolume;
if (maxVolume != null && maxVolume.Value > 0m && volume > maxVolume.Value)
{
volume = maxVolume.Value;
}
return volume;
}
private decimal RoundVolume(decimal volume)
{
if (volume <= 0m)
{
return 0m;
}
var step = Security?.VolumeStep ?? 0m;
if (step > 0m)
{
var steps = Math.Floor(volume / step);
if (steps <= 0m)
{
return step;
}
return steps * step;
}
return Math.Round(volume, 2, MidpointRounding.ToZero);
}
private bool HasCapacityForNewPosition(decimal volume)
{
if (MaxOpenPositions <= 0)
{
return true;
}
if (volume <= 0m)
{
return false;
}
var exposure = Math.Abs(Position);
var maxExposure = MaxOpenPositions * volume;
return exposure + volume <= maxExposure + volume * 0.0001m;
}
private decimal GetPriceOffset(int steps)
{
if (steps <= 0)
{
return 0m;
}
if (_priceStep > 0m)
{
return steps * _priceStep;
}
return steps;
}
private void ResetPositionState()
{
_entryPrice = null;
_stopPrice = null;
_trailingArmed = false;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import RelativeStrengthIndex
from StockSharp.Algo.Strategies import Strategy
class rsi_test_strategy(Strategy):
def __init__(self):
super(rsi_test_strategy, self).__init__()
self._rsi_period = self.Param("RsiPeriod", 7).SetGreaterThanZero().SetDisplay("RSI Period", "Lookback period for RSI", "Indicators")
self._buy_level = self.Param("BuyLevel", 40.0).SetDisplay("RSI Buy Level", "Oversold threshold for long entries", "Trading")
self._sell_level = self.Param("SellLevel", 60.0).SetDisplay("RSI Sell Level", "Overbought threshold for short entries", "Trading")
self._trailing_distance_steps = self.Param("TrailingDistanceSteps", 50).SetDisplay("Trailing Distance Steps", "Steps before activating trailing stop", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))).SetDisplay("Candle Type", "Primary timeframe", "Data")
@property
def CandleType(self): return self._candle_type.Value
@CandleType.setter
def CandleType(self, value): self._candle_type.Value = value
def OnReseted(self):
super(rsi_test_strategy, self).OnReseted()
self._prev_rsi = None
self._entry_price = None
self._stop_price = None
self._trailing_armed = False
def OnStarted2(self, time):
super(rsi_test_strategy, self).OnStarted2(time)
self._prev_rsi = None
self._entry_price = None
self._stop_price = None
self._trailing_armed = False
self._price_step = 1.0
if self.Security is not None and self.Security.PriceStep is not None and self.Security.PriceStep > 0:
self._price_step = float(self.Security.PriceStep)
rsi = RelativeStrengthIndex()
rsi.Length = self._rsi_period.Value
sub = self.SubscribeCandles(self.CandleType)
sub.Bind(rsi, self.OnProcess).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, sub)
self.DrawIndicator(area, rsi)
self.DrawOwnTrades(area)
def OnProcess(self, candle, rsi_val):
if candle.State != CandleStates.Finished:
return
close = float(candle.ClosePrice)
# Manage existing position
self._manage_position(candle, close)
if self._prev_rsi is None:
self._prev_rsi = rsi_val
return
# Entry signals
if rsi_val < self._buy_level.Value and self.Position <= 0:
if self.Position < 0:
self.BuyMarket()
self._reset_state()
if self.Position == 0:
self.BuyMarket()
self._entry_price = close
self._stop_price = None
self._trailing_armed = False
elif rsi_val > self._sell_level.Value and self.Position >= 0:
if self.Position > 0:
self.SellMarket()
self._reset_state()
if self.Position == 0:
self.SellMarket()
self._entry_price = close
self._stop_price = None
self._trailing_armed = False
self._prev_rsi = rsi_val
def _manage_position(self, candle, close):
if self.Position == 0:
self._reset_state()
return
if self.Position > 0:
self._update_trailing_long(candle)
if self._stop_price is not None and float(candle.LowPrice) <= self._stop_price:
self.SellMarket()
self._reset_state()
elif self.Position < 0:
self._update_trailing_short(candle)
if self._stop_price is not None and float(candle.HighPrice) >= self._stop_price:
self.BuyMarket()
self._reset_state()
def _update_trailing_long(self, candle):
if self._trailing_distance_steps.Value <= 0 or self._entry_price is None or self._trailing_armed:
return
dist = self._trailing_distance_steps.Value * self._price_step
if dist <= 0:
return
activation = self._entry_price + dist
if float(candle.HighPrice) >= activation:
self._stop_price = self._entry_price + dist
self._trailing_armed = True
def _update_trailing_short(self, candle):
if self._trailing_distance_steps.Value <= 0 or self._entry_price is None or self._trailing_armed:
return
dist = self._trailing_distance_steps.Value * self._price_step
if dist <= 0:
return
activation = self._entry_price - dist
if float(candle.LowPrice) <= activation:
self._stop_price = self._entry_price - dist
self._trailing_armed = True
def _reset_state(self):
self._entry_price = None
self._stop_price = None
self._trailing_armed = False
def CreateClone(self):
return rsi_test_strategy()