Fractal RSI Strategy
Adaptive strategy based on the Fractal RSI indicator. Fractal RSI adjusts the length of the RSI calculation using the fractal dimension of price movement, allowing the oscillator to react faster in trending markets and slower in ranging conditions.
The strategy opens positions when the indicator crosses predefined levels. It can trade with the detected trend or against it depending on the chosen mode.
Details
- Entry Criteria:
- Trend Mode - Direct:
- Buy: value crosses below
LowLevel - Sell: value crosses above
HighLevel
- Buy: value crosses below
- Trend Mode - Against:
- Buy: value crosses above
HighLevel - Sell: value crosses below
LowLevel
- Buy: value crosses above
- Trend Mode - Direct:
- Long/Short: Both
- Exit Criteria: Opposite signal
- Stops: Optional fixed stop-loss and take-profit
- Default Values:
CandleType=TimeSpan.FromHours(4).TimeFrame()FractalPeriod= 30NormalSpeed= 30HighLevel= 60LowLevel= 40StopLoss= 1000 pointsTakeProfit= 2000 points
- Filters:
- Category: Trend / Oscillator
- Direction: Both
- Indicators: Fractal Dimension, RSI
- Stops: Yes
- Complexity: Advanced indicator usage
- Timeframe: 4H (configurable)
- Seasonality: No
- Neural Networks: No
- Divergence: No
- Risk Level: Medium
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that trades using adaptive Fractal RSI indicator computed inline.
/// </summary>
public class FractalRsiStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _fractalPeriod;
private readonly StrategyParam<int> _normalSpeed;
private readonly StrategyParam<decimal> _highLevel;
private readonly StrategyParam<decimal> _lowLevel;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<decimal> _takeProfit;
private readonly List<decimal> _prices = new();
private decimal? _previousValue;
private int _lastSignal;
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int FractalPeriod { get => _fractalPeriod.Value; set => _fractalPeriod.Value = value; }
public int NormalSpeed { get => _normalSpeed.Value; set => _normalSpeed.Value = value; }
public decimal HighLevel { get => _highLevel.Value; set => _highLevel.Value = value; }
public decimal LowLevel { get => _lowLevel.Value; set => _lowLevel.Value = value; }
public decimal StopLoss { get => _stopLoss.Value; set => _stopLoss.Value = value; }
public decimal TakeProfit { get => _takeProfit.Value; set => _takeProfit.Value = value; }
public FractalRsiStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(1).TimeFrame())
.SetDisplay("Candle Type", "Timeframe for indicator", "General");
_fractalPeriod = Param(nameof(FractalPeriod), 50)
.SetGreaterThanZero()
.SetDisplay("Fractal Period", "Period for fractal dimension", "Indicator");
_normalSpeed = Param(nameof(NormalSpeed), 50)
.SetGreaterThanZero()
.SetDisplay("Normal Speed", "Base period for RSI", "Indicator");
_highLevel = Param(nameof(HighLevel), 70m)
.SetDisplay("High Level", "Upper threshold", "Indicator");
_lowLevel = Param(nameof(LowLevel), 30m)
.SetDisplay("Low Level", "Lower threshold", "Indicator");
_stopLoss = Param(nameof(StopLoss), 1000m)
.SetGreaterThanZero()
.SetDisplay("Stop Loss", "Stop loss in price units", "Risk");
_takeProfit = Param(nameof(TakeProfit), 2000m)
.SetGreaterThanZero()
.SetDisplay("Take Profit", "Take profit in price units", "Risk");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
=> [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_prices.Clear();
_previousValue = null;
_lastSignal = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
SubscribeCandles(CandleType)
.Bind(ProcessCandle)
.Start();
StartProtection(
new Unit(TakeProfit, UnitTypes.Absolute),
new Unit(StopLoss, UnitTypes.Absolute));
}
private decimal? ComputeFractalRsi()
{
var period = FractalPeriod;
if (_prices.Count < period + 1)
return null;
var lastIndex = _prices.Count - 1;
var startIndex = lastIndex - period + 1;
var priceMax = _prices[startIndex];
var priceMin = _prices[startIndex];
for (var i = startIndex; i <= lastIndex; i++)
{
if (_prices[i] > priceMax) priceMax = _prices[i];
if (_prices[i] < priceMin) priceMin = _prices[i];
}
double length = 0.0;
double? priorDiff = null;
if (priceMax - priceMin > 0m)
{
for (var k = 0; k < period; k++)
{
var p = (double)((_prices[lastIndex - k] - priceMin) / (priceMax - priceMin));
if (priorDiff != null)
length += Math.Sqrt(Math.Pow(p - priorDiff.Value, 2.0) + 1.0 / (period * period));
priorDiff = p;
}
}
var log2 = Math.Log(2.0);
double fdi = length > 0.0 ? 1.0 + (Math.Log(length) + log2) / Math.Log(2.0 * (period - 1)) : 0.0;
double hurst = 2.0 - fdi;
double trailDim = hurst != 0.0 ? 1.0 / hurst : 0.0;
var speed = (int)Math.Max(1, Math.Round(NormalSpeed * trailDim / 2.0));
if (_prices.Count <= speed)
return null;
decimal sumUp = 0m;
decimal sumDown = 0m;
for (var i = lastIndex - speed + 1; i <= lastIndex; i++)
{
var diff = _prices[i] - _prices[i - 1];
if (diff > 0) sumUp += diff;
else sumDown -= diff;
}
var pos = sumUp / speed;
var neg = sumDown / speed;
if (neg > 0) return 100m - (100m / (1m + pos / neg));
return pos > 0 ? 100m : 50m;
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_prices.Add(candle.ClosePrice);
if (_prices.Count > 500)
_prices.RemoveAt(0);
var value = ComputeFractalRsi();
if (value == null)
return;
var prev = _previousValue;
_previousValue = value;
if (prev is null)
return;
// Direct mode: buy on oversold cross down, sell on overbought cross up
if (prev > LowLevel && value <= LowLevel && _lastSignal != 1 && Position <= 0)
{
BuyMarket();
_lastSignal = 1;
}
else if (prev < HighLevel && value >= HighLevel && _lastSignal != -1 && Position >= 0)
{
SellMarket();
_lastSignal = -1;
}
else if (value > LowLevel && value < HighLevel)
_lastSignal = 0;
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Strategies import Strategy
class fractal_rsi_strategy(Strategy):
def __init__(self):
super(fractal_rsi_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(1)))
self._fractal_period = self.Param("FractalPeriod", 50)
self._normal_speed = self.Param("NormalSpeed", 50)
self._high_level = self.Param("HighLevel", 70.0)
self._low_level = self.Param("LowLevel", 30.0)
self._stop_loss = self.Param("StopLoss", 1000.0)
self._take_profit = self.Param("TakeProfit", 2000.0)
self._prices = []
self._previous_value = None
self._last_signal = 0
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
@property
def FractalPeriod(self):
return self._fractal_period.Value
@FractalPeriod.setter
def FractalPeriod(self, value):
self._fractal_period.Value = value
@property
def NormalSpeed(self):
return self._normal_speed.Value
@NormalSpeed.setter
def NormalSpeed(self, value):
self._normal_speed.Value = value
@property
def HighLevel(self):
return self._high_level.Value
@HighLevel.setter
def HighLevel(self, value):
self._high_level.Value = value
@property
def LowLevel(self):
return self._low_level.Value
@LowLevel.setter
def LowLevel(self, value):
self._low_level.Value = value
@property
def StopLoss(self):
return self._stop_loss.Value
@StopLoss.setter
def StopLoss(self, value):
self._stop_loss.Value = value
@property
def TakeProfit(self):
return self._take_profit.Value
@TakeProfit.setter
def TakeProfit(self, value):
self._take_profit.Value = value
def OnStarted2(self, time):
super(fractal_rsi_strategy, self).OnStarted2(time)
self._prices = []
self._previous_value = None
self._last_signal = 0
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self.ProcessCandle).Start()
self.StartProtection(
Unit(self.TakeProfit, UnitTypes.Absolute),
Unit(self.StopLoss, UnitTypes.Absolute))
def _compute_fractal_rsi(self):
period = int(self.FractalPeriod)
if len(self._prices) < period + 1:
return None
last_index = len(self._prices) - 1
start_index = last_index - period + 1
price_max = self._prices[start_index]
price_min = self._prices[start_index]
for i in range(start_index, last_index + 1):
if self._prices[i] > price_max:
price_max = self._prices[i]
if self._prices[i] < price_min:
price_min = self._prices[i]
length = 0.0
prior_diff = None
if price_max - price_min > 0.0:
for k in range(period):
p = (self._prices[last_index - k] - price_min) / (price_max - price_min)
if prior_diff is not None:
length += math.sqrt((p - prior_diff) ** 2 + 1.0 / (period * period))
prior_diff = p
log2 = math.log(2.0)
if length > 0.0:
fdi = 1.0 + (math.log(length) + log2) / math.log(2.0 * (period - 1))
else:
fdi = 0.0
hurst = 2.0 - fdi
trail_dim = 1.0 / hurst if hurst != 0.0 else 0.0
speed = max(1, int(round(int(self.NormalSpeed) * trail_dim / 2.0)))
if len(self._prices) <= speed:
return None
sum_up = 0.0
sum_down = 0.0
for i in range(last_index - speed + 1, last_index + 1):
diff = self._prices[i] - self._prices[i - 1]
if diff > 0.0:
sum_up += diff
else:
sum_down -= diff
pos = sum_up / speed
neg = sum_down / speed
if neg > 0.0:
return 100.0 - (100.0 / (1.0 + pos / neg))
return 100.0 if pos > 0.0 else 50.0
def ProcessCandle(self, candle):
if candle.State != CandleStates.Finished:
return
self._prices.append(float(candle.ClosePrice))
if len(self._prices) > 500:
self._prices.pop(0)
value = self._compute_fractal_rsi()
if value is None:
return
prev = self._previous_value
self._previous_value = value
if prev is None:
return
low_lvl = float(self.LowLevel)
high_lvl = float(self.HighLevel)
if prev > low_lvl and value <= low_lvl and self._last_signal != 1 and self.Position <= 0:
self.BuyMarket()
self._last_signal = 1
elif prev < high_lvl and value >= high_lvl and self._last_signal != -1 and self.Position >= 0:
self.SellMarket()
self._last_signal = -1
elif value > low_lvl and value < high_lvl:
self._last_signal = 0
def OnReseted(self):
super(fractal_rsi_strategy, self).OnReseted()
self._prices = []
self._previous_value = None
self._last_signal = 0
def CreateClone(self):
return fractal_rsi_strategy()