using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
using StockSharp.Algo;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Force Trend strategy that mirrors the original MT5 expert advisor logic.
/// It reacts to ForceTrend indicator color changes to switch between long and short positions.
/// </summary>
public class ForceTrendStrategy : Strategy
{
private readonly StrategyParam<int> _length;
private readonly StrategyParam<int> _signalBar;
private readonly StrategyParam<bool> _enableLongEntry;
private readonly StrategyParam<bool> _enableShortEntry;
private readonly StrategyParam<bool> _enableLongExit;
private readonly StrategyParam<bool> _enableShortExit;
private readonly StrategyParam<DataType> _candleType;
private Highest _highest = null!;
private Lowest _lowest = null!;
private decimal _previousForceValue;
private decimal _previousIndicatorValue;
private int?[] _directionHistory = Array.Empty<int?>();
private int _historyCount;
private int? _lastKnownDirection;
/// <summary>
/// Initializes strategy parameters.
/// </summary>
public ForceTrendStrategy()
{
_length = Param(nameof(Length), 13)
.SetDisplay("Length", "ForceTrend lookback length", "Indicator")
.SetGreaterThanZero()
;
_signalBar = Param(nameof(SignalBar), 1)
.SetDisplay("Signal Bar", "Number of finished bars to shift the signal", "Trading")
;
_enableLongEntry = Param(nameof(EnableLongEntry), true)
.SetDisplay("Enable Long Entry", "Allow opening long positions", "Trading")
;
_enableShortEntry = Param(nameof(EnableShortEntry), true)
.SetDisplay("Enable Short Entry", "Allow opening short positions", "Trading")
;
_enableLongExit = Param(nameof(EnableLongExit), true)
.SetDisplay("Enable Long Exit", "Allow closing long positions", "Trading")
;
_enableShortExit = Param(nameof(EnableShortExit), true)
.SetDisplay("Enable Short Exit", "Allow closing short positions", "Trading")
;
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Timeframe used for ForceTrend calculations", "General");
}
/// <summary>
/// ForceTrend lookback length.
/// </summary>
public int Length
{
get => _length.Value;
set => _length.Value = value;
}
/// <summary>
/// Number of finished candles used to shift the trade signal.
/// </summary>
public int SignalBar
{
get => _signalBar.Value;
set => _signalBar.Value = value;
}
/// <summary>
/// Enable opening long positions when the ForceTrend becomes bullish.
/// </summary>
public bool EnableLongEntry
{
get => _enableLongEntry.Value;
set => _enableLongEntry.Value = value;
}
/// <summary>
/// Enable opening short positions when the ForceTrend becomes bearish.
/// </summary>
public bool EnableShortEntry
{
get => _enableShortEntry.Value;
set => _enableShortEntry.Value = value;
}
/// <summary>
/// Enable closing long positions on bearish ForceTrend signals.
/// </summary>
public bool EnableLongExit
{
get => _enableLongExit.Value;
set => _enableLongExit.Value = value;
}
/// <summary>
/// Enable closing short positions on bullish ForceTrend signals.
/// </summary>
public bool EnableShortExit
{
get => _enableShortExit.Value;
set => _enableShortExit.Value = value;
}
/// <summary>
/// Candle type used for indicator calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_previousForceValue = 0m;
_previousIndicatorValue = 0m;
_directionHistory = Array.Empty<int?>();
_historyCount = 0;
_lastKnownDirection = null;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
yield return (Security, CandleType);
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_previousForceValue = 0m;
_previousIndicatorValue = 0m;
_historyCount = 0;
_lastKnownDirection = null;
_directionHistory = new int?[Math.Max(SignalBar + 2, 2)];
_highest = new Highest { Length = Length };
_lowest = new Lowest { Length = Length };
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var highestValue = _highest.Process(new CandleIndicatorValue(_highest, candle)).ToDecimal();
var lowestValue = _lowest.Process(new CandleIndicatorValue(_lowest, candle)).ToDecimal();
if (!_highest.IsFormed || !_lowest.IsFormed)
return;
var range = highestValue - lowestValue;
decimal forceValue;
if (range != 0m)
{
var average = (candle.HighPrice + candle.LowPrice) / 2m;
var normalized = (average - lowestValue) / range - 0.5m;
forceValue = 0.66m * normalized + 0.67m * _previousForceValue;
}
else
{
forceValue = 0.67m * _previousForceValue - 0.33m;
}
forceValue = Math.Clamp(forceValue, -0.999m, 0.999m);
decimal indicatorValue;
var denominator = 1m - forceValue;
if (denominator != 0m)
{
var ratio = (forceValue + 1m) / denominator;
indicatorValue = (decimal)(Math.Log((double)ratio) / 2.0) + _previousIndicatorValue / 2m;
}
else
{
indicatorValue = _previousIndicatorValue / 2m + 0.5m;
}
_previousForceValue = forceValue;
_previousIndicatorValue = indicatorValue;
var direction = indicatorValue > 0m ? 1 : indicatorValue < 0m ? -1 : _lastKnownDirection ?? 0;
if (direction != 0)
_lastKnownDirection = direction;
AddDirection(direction);
var currentDirection = GetDirection(SignalBar);
if (currentDirection is null)
return;
var previousDirection = GetDirection(SignalBar + 1);
var bullish = currentDirection.Value > 0;
var bearish = currentDirection.Value < 0;
var bullishFlip = bullish && previousDirection.HasValue && previousDirection.Value <= 0;
var bearishFlip = bearish && previousDirection.HasValue && previousDirection.Value >= 0;
// indicators processed manually, no BindEx
if (bullish)
{
var volumeToBuy = 0m;
if (EnableShortExit && Position < 0m)
volumeToBuy += Math.Abs(Position);
if (EnableLongEntry && bullishFlip && Position <= 0m)
volumeToBuy += Volume;
if (volumeToBuy > 0m)
BuyMarket();
}
else if (bearish)
{
var volumeToSell = 0m;
if (EnableLongExit && Position > 0m)
volumeToSell += Math.Abs(Position);
if (EnableShortEntry && bearishFlip && Position >= 0m)
volumeToSell += 1m;
if (volumeToSell > 0m)
SellMarket();
}
}
private void AddDirection(int direction)
{
if (_historyCount < _directionHistory.Length)
{
_directionHistory[_historyCount] = direction;
_historyCount++;
}
else
{
for (var i = 1; i < _directionHistory.Length; i++)
_directionHistory[i - 1] = _directionHistory[i];
_directionHistory[^1] = direction;
}
}
private int? GetDirection(int offset)
{
if (offset < 0)
return null;
var index = _historyCount - 1 - offset;
if (index < 0)
return null;
return _directionHistory[index];
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from StockSharp.Algo.Indicators import Highest, Lowest, CandleIndicatorValue
class force_trend_strategy(Strategy):
"""Force Trend strategy: reacts to Fisher-like transform color changes to switch positions."""
def __init__(self):
super(force_trend_strategy, self).__init__()
self._length = self.Param("Length", 13) \
.SetGreaterThanZero() \
.SetDisplay("Length", "ForceTrend lookback length", "Indicator")
self._signal_bar = self.Param("SignalBar", 1) \
.SetDisplay("Signal Bar", "Number of finished bars to shift the signal", "Trading")
self._enable_long_entry = self.Param("EnableLongEntry", True) \
.SetDisplay("Enable Long Entry", "Allow opening long positions", "Trading")
self._enable_short_entry = self.Param("EnableShortEntry", True) \
.SetDisplay("Enable Short Entry", "Allow opening short positions", "Trading")
self._enable_long_exit = self.Param("EnableLongExit", True) \
.SetDisplay("Enable Long Exit", "Allow closing long positions", "Trading")
self._enable_short_exit = self.Param("EnableShortExit", True) \
.SetDisplay("Enable Short Exit", "Allow closing short positions", "Trading")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Timeframe used for ForceTrend calculations", "General")
self._prev_force = 0.0
self._prev_indicator = 0.0
self._dir_history = []
self._last_known_dir = 0
@property
def Length(self):
return int(self._length.Value)
@property
def SignalBar(self):
return int(self._signal_bar.Value)
@property
def EnableLongEntry(self):
return self._enable_long_entry.Value
@property
def EnableShortEntry(self):
return self._enable_short_entry.Value
@property
def EnableLongExit(self):
return self._enable_long_exit.Value
@property
def EnableShortExit(self):
return self._enable_short_exit.Value
@property
def CandleType(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(force_trend_strategy, self).OnStarted2(time)
self._prev_force = 0.0
self._prev_indicator = 0.0
self._dir_history = []
self._last_known_dir = 0
self._highest = Highest()
self._highest.Length = self.Length
self._lowest = Lowest()
self._lowest.Length = self.Length
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self.process_candle).Start()
def process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
h_val = float(self._highest.Process(CandleIndicatorValue(self._highest, candle)))
l_val = float(self._lowest.Process(CandleIndicatorValue(self._lowest, candle)))
if not self._highest.IsFormed or not self._lowest.IsFormed:
return
rng = h_val - l_val
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if rng != 0:
avg = (h + lo) / 2.0
normalized = (avg - l_val) / rng - 0.5
force_val = 0.66 * normalized + 0.67 * self._prev_force
else:
force_val = 0.67 * self._prev_force - 0.33
force_val = max(-0.999, min(0.999, force_val))
denom = 1.0 - force_val
if denom != 0:
ratio = (force_val + 1.0) / denom
indicator_val = math.log(ratio) / 2.0 + self._prev_indicator / 2.0
else:
indicator_val = self._prev_indicator / 2.0 + 0.5
self._prev_force = force_val
self._prev_indicator = indicator_val
if indicator_val > 0:
direction = 1
elif indicator_val < 0:
direction = -1
else:
direction = self._last_known_dir
if direction != 0:
self._last_known_dir = direction
self._dir_history.append(direction)
max_len = max(self.SignalBar + 2, 2)
while len(self._dir_history) > max_len:
self._dir_history.pop(0)
cur_dir = self._get_dir(self.SignalBar)
if cur_dir is None:
return
prev_dir = self._get_dir(self.SignalBar + 1)
bullish = cur_dir > 0
bearish = cur_dir < 0
bullish_flip = bullish and prev_dir is not None and prev_dir <= 0
bearish_flip = bearish and prev_dir is not None and prev_dir >= 0
if bullish:
should_buy = False
if self.EnableShortExit and self.Position < 0:
should_buy = True
if self.EnableLongEntry and bullish_flip and self.Position <= 0:
should_buy = True
if should_buy:
self.BuyMarket()
elif bearish:
should_sell = False
if self.EnableLongExit and self.Position > 0:
should_sell = True
if self.EnableShortEntry and bearish_flip and self.Position >= 0:
should_sell = True
if should_sell:
self.SellMarket()
def _get_dir(self, offset):
idx = len(self._dir_history) - 1 - offset
if idx < 0:
return None
return self._dir_history[idx]
def OnReseted(self):
super(force_trend_strategy, self).OnReseted()
self._prev_force = 0.0
self._prev_indicator = 0.0
self._dir_history = []
self._last_known_dir = 0
def CreateClone(self):
return force_trend_strategy()