Burg Extrapolator Forecast Strategy
Overview
The Burg Extrapolator Strategy is a StockSharp port of the MetaTrader 4 expert advisor "Burg Extrapolator". The original system fits a Burg autoregressive (AR) model to a sliding window of open prices (or their momentum/ROC transforms) and projects a path of future prices. Trading decisions are derived from the most extreme forecast values: if the predicted excursion in one direction is large enough the strategy either stacks new positions or liquidates exposure in the opposite direction. The conversion keeps the same modelling blocks while mapping position management and money management to StockSharp primitives.
Trading Logic
- Data preparation
- Build a rolling history of
PastBars + 1open prices for the selectedCandleType. - Optionally transform the data into logarithmic momentum (default) or percentage rate of change before feeding it to the AR model. Raw prices are centered by their moving average to mirror the MT4 code.
- Build a rolling history of
- Burg linear prediction
- Estimate reflection coefficients up to the order
PastBars * ModelOrderusing the Burg algorithm. - Generate a sequence of future values (
PastBarssteps ahead in practice) by recursively expanding the AR model. Transforms are inverted back to price space so that all forecasts operate in absolute price units.
- Estimate reflection coefficients up to the order
- Signal detection
- Walk through the forecast path and record the highest and lowest predicted price before another extreme appears. The distance between the first extreme and the other side of the forecast range is compared with
MaxLossandMinProfitthresholds (converted to absolute price by multiplying with the instrumentPriceStep). - A sufficiently large upswing triggers
OpenSignal = 1while a large downswing yieldsOpenSignal = -1. If the opposing extreme appears first the logic setsCloseSignalto exit current exposure even if no fresh entry is planned.
- Walk through the forecast path and record the highest and lowest predicted price before another extreme appears. The distance between the first extreme and the other side of the forecast range is compared with
- Order management
- Protective exits (stop-loss, take-profit, and optional trailing-stop) run before any new signal is executed. The trailing-stop reuses the best price since the last entry and closes the position when the price retraces by
TrailingStoppoints, matching the MT4 behaviour of moving the protective order. - If a signal asks to close exposure in the opposite direction the strategy sends a market order sized to flatten the current net position.
- Entry signals stack additional market orders in the indicated direction until
MaxTradesis reached. Order volume scales linearly with the number of active trades using the factor1 + existingTrades * MaxRisk, a StockSharp-friendly replacement for the original margin-based sizing routine.
- Protective exits (stop-loss, take-profit, and optional trailing-stop) run before any new signal is executed. The trailing-stop reuses the best price since the last entry and closes the position when the price retraces by
Indicators and Data
- Candle subscription defined by
CandleType(default 30-minute time frame). - Internal Burg autoregressive model (implemented without external indicators).
- Optional logarithmic momentum and percentage rate of change transforms.
Parameters
| Name | Default | Description |
|---|---|---|
CandleType |
30-minute candles | Primary timeframe processed by the strategy. |
MaxRisk |
0.5 | Risk multiplier used when stacking multiple trades. |
MaxTrades |
5 | Maximum number of simultaneous trades per direction. |
MinProfit |
160 | Minimum predicted profit (in points) required to open new trades. |
MaxLoss |
130 | Maximum tolerated forecasted loss (in points) before closing trades. |
TakeProfit |
0 | Optional fixed take-profit distance in points (0 disables it). |
StopLoss |
180 | Optional fixed stop-loss distance in points (0 disables it). |
TrailingStop |
10 | Trailing stop distance in points, active only when StopLoss > 0. |
PastBars |
200 | Number of historical candles used by the Burg model. |
ModelOrder |
0.37 | Fraction of PastBars converted into the Burg order. |
UseMomentum |
true | Apply logarithmic momentum transform to input data. |
UseRateOfChange |
false | Apply percentage rate of change (ignored when momentum is enabled). |
All parameters are StrategyParam<T> instances and can be optimised or adjusted in the StockSharp Designer.
Implementation Notes
- The Burg algorithm is implemented directly in C# and keeps the same recursion as the MT4 version. All computations are executed in double precision while the final forecasts are converted back to
decimalbefore signal checks. - The original EA could rely on MetaTrader account information to size positions. In StockSharp the money management block is replaced with a deterministic scaling rule based on
VolumeandMaxRisk. SetVolumeto the desired base lot and the strategy will scale subsequent entries proportionally. - Protective logic closes positions with explicit market orders instead of modifying broker-side stops; this matches StockSharp's high-level API design and prevents stale state when running in simulation.
- The forecast arrays are re-created whenever
PastBarsorModelOrderchange so on-the-fly parameter edits immediately affect the AR model without restarting the strategy. - To visualise the behaviour you can attach a chart in Designer: the strategy already draws candles and executed trades on the default area. Extending the sample with custom series (e.g., forecast path) is straightforward if desired.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Burg extrapolator strategy converted from MetaTrader 4 implementation.
/// Predicts the future price path with Burg linear prediction coefficients and trades on forecasted extremes.
/// </summary>
public class BurgExtrapolatorForecastStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<decimal> _maxRisk;
private readonly StrategyParam<int> _maxTrades;
private readonly StrategyParam<int> _minProfit;
private readonly StrategyParam<int> _maxLoss;
private readonly StrategyParam<int> _takeProfit;
private readonly StrategyParam<int> _stopLoss;
private readonly StrategyParam<int> _trailingStop;
private readonly StrategyParam<int> _pastBars;
private readonly StrategyParam<decimal> _modelOrder;
private readonly StrategyParam<bool> _useMomentum;
private readonly StrategyParam<bool> _useRateOfChange;
private readonly List<decimal> _openHistory = new();
private double[] _samples = Array.Empty<double>();
private double[] _coefficients = Array.Empty<double>();
private double[] _predictions = Array.Empty<double>();
private int _np;
private int _no;
private int _nf;
private double _averagePrice;
private bool _isFirstRun = true;
private decimal? _longEntryPrice;
private decimal? _shortEntryPrice;
private decimal? _longHigh;
private decimal? _shortLow;
/// <summary>
/// Initializes a new instance of the <see cref="BurgExtrapolatorForecastStrategy"/> class.
/// </summary>
public BurgExtrapolatorForecastStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Primary timeframe used for forecasting", "General");
_maxRisk = Param(nameof(MaxRisk), 0.5m)
.SetGreaterThanZero()
.SetDisplay("Max Risk", "Risk factor controlling position scaling", "Money Management");
_maxTrades = Param(nameof(MaxTrades), 5)
.SetGreaterThanZero()
.SetDisplay("Max Trades", "Maximum stacked trades per direction", "Money Management");
_minProfit = Param(nameof(MinProfit), 160)
.SetGreaterThanZero()
.SetDisplay("Min Profit", "Forecasted profit in points required to open trades", "Signals");
_maxLoss = Param(nameof(MaxLoss), 130)
.SetGreaterThanZero()
.SetDisplay("Max Loss", "Forecasted adverse excursion closing existing trades", "Signals");
_takeProfit = Param(nameof(TakeProfit), 0)
.SetNotNegative()
.SetDisplay("Take Profit", "Optional fixed take profit in points", "Protection")
;
_stopLoss = Param(nameof(StopLoss), 180)
.SetNotNegative()
.SetDisplay("Stop Loss", "Optional fixed stop loss in points", "Protection")
;
_trailingStop = Param(nameof(TrailingStop), 10)
.SetNotNegative()
.SetDisplay("Trailing Stop", "Trailing distance in points (requires stop loss)", "Protection")
;
_pastBars = Param(nameof(PastBars), 200)
.SetGreaterThanZero()
.SetDisplay("Past Bars", "History length used for Burg model", "Forecast");
_modelOrder = Param(nameof(ModelOrder), 0.37m)
.SetGreaterThanZero()
.SetDisplay("Model Order", "Fraction of past bars used as Burg order", "Forecast");
_useMomentum = Param(nameof(UseMomentum), true)
.SetDisplay("Use Momentum", "Use logarithmic momentum instead of raw prices", "Forecast");
_useRateOfChange = Param(nameof(UseRateOfChange), false)
.SetDisplay("Use ROC", "Use percentage rate of change instead of raw prices", "Forecast");
}
/// <summary>
/// Type of candles processed by the strategy.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Risk factor used when stacking positions.
/// </summary>
public decimal MaxRisk
{
get => _maxRisk.Value;
set => _maxRisk.Value = value;
}
/// <summary>
/// Maximum number of trades allowed in one direction.
/// </summary>
public int MaxTrades
{
get => _maxTrades.Value;
set => _maxTrades.Value = value;
}
/// <summary>
/// Minimum profit in points predicted by the Burg model to initiate new trades.
/// </summary>
public int MinProfit
{
get => _minProfit.Value;
set => _minProfit.Value = value;
}
/// <summary>
/// Maximum loss in points predicted by the Burg model before closing positions.
/// </summary>
public int MaxLoss
{
get => _maxLoss.Value;
set => _maxLoss.Value = value;
}
/// <summary>
/// Optional take profit expressed in points.
/// </summary>
public int TakeProfit
{
get => _takeProfit.Value;
set => _takeProfit.Value = value;
}
/// <summary>
/// Optional stop loss expressed in points.
/// </summary>
public int StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Trailing stop distance in points.
/// </summary>
public int TrailingStop
{
get => _trailingStop.Value;
set => _trailingStop.Value = value;
}
/// <summary>
/// Number of historical candles used by the Burg predictor.
/// </summary>
public int PastBars
{
get => _pastBars.Value;
set => _pastBars.Value = value;
}
/// <summary>
/// Fraction of <see cref="PastBars"/> used as Burg model order.
/// </summary>
public decimal ModelOrder
{
get => _modelOrder.Value;
set => _modelOrder.Value = value;
}
/// <summary>
/// Use logarithmic momentum transformation instead of raw prices.
/// </summary>
public bool UseMomentum
{
get => _useMomentum.Value;
set => _useMomentum.Value = value;
}
/// <summary>
/// Use percentage rate of change transformation instead of raw prices.
/// </summary>
public bool UseRateOfChange
{
get => _useRateOfChange.Value;
set => _useRateOfChange.Value = value;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
yield return (Security, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_openHistory.Clear();
_samples = Array.Empty<double>();
_coefficients = Array.Empty<double>();
_predictions = Array.Empty<double>();
_np = 0;
_no = 0;
_nf = 0;
_averagePrice = 0.0;
_isFirstRun = true;
ResetLongState();
ResetShortState();
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
AddOpenPrice(candle.OpenPrice);
if (!EnsureModel())
return;
TrimHistory();
if (_openHistory.Count < _np + 1)
return;
if (!UpdateSamples())
return;
var predictionCount = ComputePredictions();
if (predictionCount <= 0)
return;
var (openSignal, closeSignal) = EvaluateSignals(predictionCount);
if (ManageProtection(candle))
{
// Position has been closed by protective logic, wait for the next candle to re-evaluate.
return;
}
HandleSignalClosures(openSignal, closeSignal);
if (openSignal == 1)
{
TryOpenLong(candle);
}
else if (openSignal == -1)
{
TryOpenShort(candle);
}
}
private void AddOpenPrice(decimal openPrice)
{
_openHistory.Add(openPrice);
}
private bool EnsureModel()
{
var np = PastBars;
if (np < 3)
return false;
var modelOrder = ModelOrder;
var no = (int)(modelOrder * np);
if (no < 1)
no = 1;
if (no >= np - 1)
no = np - 2;
var nf = np - no - 1;
if (nf < 1)
nf = 1;
var predictionLength = nf + 1;
if (_np != np || _no != no || _nf != nf || _predictions.Length != predictionLength)
{
_np = np;
_no = no;
_nf = nf;
_samples = new double[np];
_coefficients = new double[no + 1];
_predictions = new double[predictionLength];
_averagePrice = 0.0;
_isFirstRun = true;
}
return true;
}
private void TrimHistory()
{
var maxHistory = _np + 1;
while (_openHistory.Count > maxHistory)
{
_openHistory.RemoveAt(0);
}
}
private bool UpdateSamples()
{
if (_np <= 0)
return false;
var useMomentum = UseMomentum;
var useRoc = !useMomentum && UseRateOfChange;
if (useMomentum || useRoc)
{
if (!_isFirstRun)
{
for (var i = 0; i < _np - 1; i++)
_samples[i] = _samples[i + 1];
var current = GetOpen(0);
var previous = GetOpen(1);
if (previous == 0m)
return false;
var ratio = (double)(current / previous);
_samples[_np - 1] = useMomentum ? Math.Log(ratio) : ratio - 1.0;
}
else
{
for (var i = 0; i < _np; i++)
{
var current = GetOpen(i);
var previous = GetOpen(i + 1);
if (previous == 0m)
return false;
var ratio = (double)(current / previous);
_samples[_np - 1 - i] = useMomentum ? Math.Log(ratio) : ratio - 1.0;
}
_averagePrice = 0.0;
_isFirstRun = false;
}
}
else
{
if (_isFirstRun)
{
double sum = 0.0;
for (var i = 0; i < _np; i++)
sum += (double)GetOpen(i);
_averagePrice = sum / _np;
for (var i = 0; i < _np; i++)
{
var open = (double)GetOpen(i);
_samples[_np - 1 - i] = open - _averagePrice;
}
_isFirstRun = false;
}
else
{
var newest = (double)GetOpen(0);
var leaving = (double)GetOpen(_np);
_averagePrice += (newest - leaving) / _np;
for (var i = 0; i < _np; i++)
{
var open = (double)GetOpen(i);
_samples[_np - 1 - i] = open - _averagePrice;
}
}
}
return true;
}
private int ComputePredictions()
{
Array.Clear(_coefficients, 0, _coefficients.Length);
Array.Clear(_predictions, 0, _predictions.Length);
double den = 0.0;
for (var i = 0; i < _np; i++)
{
var value = _samples[i];
den += value * value;
}
den *= 2.0;
var df = new double[_np];
var db = new double[_np];
for (var i = 0; i < _np; i++)
{
var value = _samples[i];
df[i] = value;
db[i] = value;
}
double r = 0.0;
for (var k = 1; k <= _no; k++)
{
double num = 0.0;
for (var i = k; i < _np; i++)
num += df[i] * db[i - 1];
var denominator = (1.0 - r * r) * den - df[k - 1] * df[k - 1] - db[_np - 1] * db[_np - 1];
if (Math.Abs(denominator) < 1e-12)
return 0;
r = -2.0 * num / denominator;
_coefficients[k] = r;
var half = k / 2;
for (var i = 1; i <= half; i++)
{
var ki = k - i;
var tmp = _coefficients[i];
_coefficients[i] += r * _coefficients[ki];
if (i != ki)
_coefficients[ki] += r * tmp;
}
if (k < _no)
{
for (var i = _np - 1; i >= k; i--)
{
var tmp = df[i];
df[i] += r * db[i - 1];
db[i] = db[i - 1] + r * tmp;
}
}
den = denominator;
}
for (var n = _np - 1; n < _np + _nf; n++)
{
double sum = 0.0;
for (var i = 1; i <= _no; i++)
{
if (n - i < _np)
sum -= _coefficients[i] * _samples[n - i];
else
sum -= _coefficients[i] * _predictions[n - i - _np + 1];
}
var index = n - _np + 1;
if (index < _predictions.Length)
_predictions[index] = sum;
}
var useMomentum = UseMomentum;
var useRoc = !useMomentum && UseRateOfChange;
if (useMomentum || useRoc)
{
var startPrice = (double)GetOpen(0);
_predictions[0] = startPrice;
for (var i = 1; i < _predictions.Length; i++)
{
_predictions[i] = useMomentum
? _predictions[i - 1] * Math.Exp(_predictions[i])
: _predictions[i - 1] * (1.0 + _predictions[i]);
}
}
else
{
for (var i = 0; i < _predictions.Length; i++)
_predictions[i] += _averagePrice;
}
return _predictions.Length;
}
private (int openSignal, int closeSignal) EvaluateSignals(int predictionCount)
{
if (predictionCount == 0)
return (0, 0);
var step = Security?.PriceStep ?? 1m;
var maxLossDelta = MaxLoss * step;
var minProfitDelta = MinProfit * step;
var ymax = (decimal)_predictions[0];
var ymin = ymax;
var imax = 0;
var imin = 0;
var openSignal = 0;
var closeSignal = 0;
var limit = Math.Min(_np, predictionCount);
for (var i = 1; i < limit; i++)
{
var value = (decimal)_predictions[i];
if (value > ymax && openSignal == 0)
{
ymax = value;
imax = i;
if (imin == 0 && ymax - ymin >= maxLossDelta)
closeSignal = 1;
if (imin == 0 && ymax - ymin >= minProfitDelta)
openSignal = 1;
}
if (value < ymin && openSignal == 0)
{
ymin = value;
imin = i;
if (imax == 0 && ymax - ymin >= maxLossDelta)
closeSignal = -1;
if (imax == 0 && ymax - ymin >= minProfitDelta)
openSignal = -1;
}
}
return (openSignal, closeSignal);
}
private bool ManageProtection(ICandleMessage candle)
{
var step = Security?.PriceStep ?? 1m;
var stopDistance = StopLoss * step;
var takeDistance = TakeProfit * step;
var trailingDistance = TrailingStop * step;
if (Position > 0)
{
_longEntryPrice ??= candle.ClosePrice;
_longHigh = _longHigh.HasValue ? Math.Max(_longHigh.Value, candle.HighPrice) : candle.HighPrice;
if (StopLoss > 0 && _longEntryPrice.HasValue && candle.LowPrice <= _longEntryPrice.Value - stopDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
if (TakeProfit > 0 && _longEntryPrice.HasValue && candle.HighPrice >= _longEntryPrice.Value + takeDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
if (TrailingStop > 0 && StopLoss > 0 && _longHigh.HasValue && candle.LowPrice <= _longHigh.Value - trailingDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
}
else
{
ResetLongState();
}
if (Position < 0)
{
_shortEntryPrice ??= candle.ClosePrice;
_shortLow = _shortLow.HasValue ? Math.Min(_shortLow.Value, candle.LowPrice) : candle.LowPrice;
if (StopLoss > 0 && _shortEntryPrice.HasValue && candle.HighPrice >= _shortEntryPrice.Value + stopDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
if (TakeProfit > 0 && _shortEntryPrice.HasValue && candle.LowPrice <= _shortEntryPrice.Value - takeDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
if (TrailingStop > 0 && StopLoss > 0 && _shortLow.HasValue && candle.HighPrice >= _shortLow.Value + trailingDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
}
else
{
ResetShortState();
}
return false;
}
private void HandleSignalClosures(int openSignal, int closeSignal)
{
if (Position > 0 && (closeSignal == -1 || openSignal == -1))
{
SellMarket(Position);
ResetLongState();
}
else if (Position < 0 && (closeSignal == 1 || openSignal == 1))
{
BuyMarket(-Position);
ResetShortState();
}
}
private void TryOpenLong(ICandleMessage candle)
{
var baseVolume = Volume;
if (baseVolume <= 0m)
return;
var tradeCount = GetTradeCount(baseVolume);
if (tradeCount >= MaxTrades)
return;
var orderVolume = CalculateOrderVolume(baseVolume, tradeCount);
if (orderVolume <= 0m)
return;
BuyMarket(orderVolume);
_longEntryPrice = candle.ClosePrice;
_longHigh = candle.ClosePrice;
ResetShortState();
}
private void TryOpenShort(ICandleMessage candle)
{
var baseVolume = Volume;
if (baseVolume <= 0m)
return;
var tradeCount = GetTradeCount(baseVolume);
if (tradeCount >= MaxTrades)
return;
var orderVolume = CalculateOrderVolume(baseVolume, tradeCount);
if (orderVolume <= 0m)
return;
SellMarket(orderVolume);
_shortEntryPrice = candle.ClosePrice;
_shortLow = candle.ClosePrice;
ResetLongState();
}
private int GetTradeCount(decimal baseVolume)
{
if (baseVolume <= 0m)
return 0;
var trades = Math.Abs(Position) / baseVolume;
return (int)Math.Ceiling((double)(trades - 1e-8m));
}
private decimal CalculateOrderVolume(decimal baseVolume, int existingTrades)
{
var multiplier = 1m + existingTrades * MaxRisk;
if (multiplier <= 0m)
return 0m;
return baseVolume * multiplier;
}
private decimal GetOpen(int shift)
{
var index = _openHistory.Count - 1 - shift;
return index >= 0 && index < _openHistory.Count ? _openHistory[index] : 0m;
}
private void ResetLongState()
{
_longEntryPrice = null;
_longHigh = null;
}
private void ResetShortState()
{
_shortEntryPrice = null;
_shortLow = null;
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class burg_extrapolator_forecast_strategy(Strategy):
"""
Burg extrapolator forecast strategy.
Predicts future price path with Burg linear prediction coefficients and trades on forecasted extremes.
"""
def __init__(self):
super(burg_extrapolator_forecast_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Primary timeframe used for forecasting", "General")
self._max_risk = self.Param("MaxRisk", 0.5) \
.SetDisplay("Max Risk", "Risk factor controlling position scaling", "Money Management")
self._max_trades = self.Param("MaxTrades", 5) \
.SetDisplay("Max Trades", "Maximum stacked trades per direction", "Money Management")
self._min_profit = self.Param("MinProfit", 160) \
.SetDisplay("Min Profit", "Forecasted profit in points required to open trades", "Signals")
self._max_loss = self.Param("MaxLoss", 130) \
.SetDisplay("Max Loss", "Forecasted adverse excursion closing existing trades", "Signals")
self._take_profit = self.Param("TakeProfit", 0) \
.SetDisplay("Take Profit", "Optional fixed take profit in points", "Protection")
self._stop_loss = self.Param("StopLoss", 180) \
.SetDisplay("Stop Loss", "Optional fixed stop loss in points", "Protection")
self._trailing_stop = self.Param("TrailingStop", 10) \
.SetDisplay("Trailing Stop", "Trailing distance in points", "Protection")
self._past_bars = self.Param("PastBars", 200) \
.SetDisplay("Past Bars", "History length used for Burg model", "Forecast")
self._model_order = self.Param("ModelOrder", 0.37) \
.SetDisplay("Model Order", "Fraction of past bars used as Burg order", "Forecast")
self._use_momentum = self.Param("UseMomentum", True) \
.SetDisplay("Use Momentum", "Use logarithmic momentum instead of raw prices", "Forecast")
self._use_rate_of_change = self.Param("UseRateOfChange", False) \
.SetDisplay("Use ROC", "Use percentage rate of change instead of raw prices", "Forecast")
self._open_history = []
self._np = 0
self._no = 0
self._nf = 0
self._average_price = 0.0
self._is_first_run = True
self._samples = []
self._long_entry_price = None
self._short_entry_price = None
self._long_high = None
self._short_low = None
@property
def candle_type(self):
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
def OnReseted(self):
super(burg_extrapolator_forecast_strategy, self).OnReseted()
self._open_history = []
self._np = 0
self._no = 0
self._nf = 0
self._average_price = 0.0
self._is_first_run = True
self._samples = []
self._long_entry_price = None
self._short_entry_price = None
self._long_high = None
self._short_low = None
def OnStarted2(self, time):
super(burg_extrapolator_forecast_strategy, self).OnStarted2(time)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self.on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def _get_open(self, shift):
idx = len(self._open_history) - 1 - shift
if 0 <= idx < len(self._open_history):
return self._open_history[idx]
return 0.0
def _ensure_model(self):
np_val = self._past_bars.Value
if np_val < 3:
return False
mo = self._model_order.Value
no = int(mo * np_val)
if no < 1:
no = 1
if no >= np_val - 1:
no = np_val - 2
nf = np_val - no - 1
if nf < 1:
nf = 1
if self._np != np_val or self._no != no or self._nf != nf:
self._np = np_val
self._no = no
self._nf = nf
self._samples = [0.0] * np_val
self._average_price = 0.0
self._is_first_run = True
return True
def on_process(self, candle):
if candle.State != CandleStates.Finished:
return
self._open_history.append(float(candle.OpenPrice))
if not self._ensure_model():
return
max_hist = self._np + 1
while len(self._open_history) > max_hist:
self._open_history.pop(0)
if len(self._open_history) < self._np + 1:
return
if not self._update_samples():
return
predictions = self._compute_predictions()
if predictions is None:
return
open_signal, close_signal = self._evaluate_signals(predictions)
if self._manage_protection(candle):
return
self._handle_signal_closures(open_signal, close_signal)
if open_signal == 1:
self._try_open_long(candle)
elif open_signal == -1:
self._try_open_short(candle)
def _update_samples(self):
use_mom = self._use_momentum.Value
use_roc = not use_mom and self._use_rate_of_change.Value
if use_mom or use_roc:
if not self._is_first_run:
for i in range(self._np - 1):
self._samples[i] = self._samples[i + 1]
current = self._get_open(0)
previous = self._get_open(1)
if previous == 0:
return False
ratio = current / previous
self._samples[self._np - 1] = math.log(ratio) if use_mom else ratio - 1.0
else:
for i in range(self._np):
current = self._get_open(i)
previous = self._get_open(i + 1)
if previous == 0:
return False
ratio = current / previous
self._samples[self._np - 1 - i] = math.log(ratio) if use_mom else ratio - 1.0
self._average_price = 0.0
self._is_first_run = False
else:
if self._is_first_run:
total = sum(self._get_open(i) for i in range(self._np))
self._average_price = total / self._np
for i in range(self._np):
self._samples[self._np - 1 - i] = self._get_open(i) - self._average_price
self._is_first_run = False
else:
newest = self._get_open(0)
leaving = self._get_open(self._np)
self._average_price += (newest - leaving) / self._np
for i in range(self._np):
self._samples[self._np - 1 - i] = self._get_open(i) - self._average_price
return True
def _compute_predictions(self):
coefficients = [0.0] * (self._no + 1)
pred_len = self._nf + 1
predictions = [0.0] * pred_len
den = sum(v * v for v in self._samples) * 2.0
df = list(self._samples)
db = list(self._samples)
r = 0.0
for k in range(1, self._no + 1):
num = sum(df[i] * db[i - 1] for i in range(k, self._np))
denom = (1.0 - r * r) * den - df[k - 1] * df[k - 1] - db[self._np - 1] * db[self._np - 1]
if abs(denom) < 1e-12:
return None
r = -2.0 * num / denom
coefficients[k] = r
half = k // 2
for i in range(1, half + 1):
ki = k - i
tmp = coefficients[i]
coefficients[i] += r * coefficients[ki]
if i != ki:
coefficients[ki] += r * tmp
if k < self._no:
for i in range(self._np - 1, k - 1, -1):
tmp = df[i]
df[i] += r * db[i - 1]
db[i] = db[i - 1] + r * tmp
den = denom
for n in range(self._np - 1, self._np + self._nf):
s = 0.0
for i in range(1, self._no + 1):
if n - i < self._np:
s -= coefficients[i] * self._samples[n - i]
else:
s -= coefficients[i] * predictions[n - i - self._np + 1]
idx = n - self._np + 1
if idx < len(predictions):
predictions[idx] = s
use_mom = self._use_momentum.Value
use_roc = not use_mom and self._use_rate_of_change.Value
if use_mom or use_roc:
start_price = self._get_open(0)
predictions[0] = start_price
for i in range(1, len(predictions)):
if use_mom:
predictions[i] = predictions[i - 1] * math.exp(predictions[i])
else:
predictions[i] = predictions[i - 1] * (1.0 + predictions[i])
else:
for i in range(len(predictions)):
predictions[i] += self._average_price
return predictions
def _evaluate_signals(self, predictions):
step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
max_loss_delta = float(self._max_loss.Value) * step
min_profit_delta = float(self._min_profit.Value) * step
ymax = predictions[0]
ymin = ymax
imax = 0
imin = 0
open_signal = 0
close_signal = 0
limit = min(self._np, len(predictions))
for i in range(1, limit):
value = predictions[i]
if value > ymax and open_signal == 0:
ymax = value
imax = i
if imin == 0 and ymax - ymin >= max_loss_delta:
close_signal = 1
if imin == 0 and ymax - ymin >= min_profit_delta:
open_signal = 1
if value < ymin and open_signal == 0:
ymin = value
imin = i
if imax == 0 and ymax - ymin >= max_loss_delta:
close_signal = -1
if imax == 0 and ymax - ymin >= min_profit_delta:
open_signal = -1
return open_signal, close_signal
def _manage_protection(self, candle):
step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
stop_dist = float(self._stop_loss.Value) * step
take_dist = float(self._take_profit.Value) * step
trail_dist = float(self._trailing_stop.Value) * step
if self.Position > 0:
if self._long_entry_price is None:
self._long_entry_price = float(candle.ClosePrice)
self._long_high = max(self._long_high, float(candle.HighPrice)) if self._long_high is not None else float(candle.HighPrice)
if self._stop_loss.Value > 0 and float(candle.LowPrice) <= self._long_entry_price - stop_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
if self._take_profit.Value > 0 and float(candle.HighPrice) >= self._long_entry_price + take_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
if self._trailing_stop.Value > 0 and self._stop_loss.Value > 0 and self._long_high is not None:
if float(candle.LowPrice) <= self._long_high - trail_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
else:
self._long_entry_price = None
self._long_high = None
if self.Position < 0:
if self._short_entry_price is None:
self._short_entry_price = float(candle.ClosePrice)
self._short_low = min(self._short_low, float(candle.LowPrice)) if self._short_low is not None else float(candle.LowPrice)
if self._stop_loss.Value > 0 and float(candle.HighPrice) >= self._short_entry_price + stop_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
if self._take_profit.Value > 0 and float(candle.LowPrice) <= self._short_entry_price - take_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
if self._trailing_stop.Value > 0 and self._stop_loss.Value > 0 and self._short_low is not None:
if float(candle.HighPrice) >= self._short_low + trail_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
else:
self._short_entry_price = None
self._short_low = None
return False
def _handle_signal_closures(self, open_signal, close_signal):
if self.Position > 0 and (close_signal == -1 or open_signal == -1):
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
elif self.Position < 0 and (close_signal == 1 or open_signal == 1):
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
def _get_trade_count(self, base_vol):
if float(base_vol) <= 0:
return 0
trades = float(Math.Abs(self.Position)) / float(base_vol)
return int(math.ceil(trades - 1e-8))
def _calculate_order_volume(self, base_vol, existing_trades):
multiplier = 1.0 + existing_trades * float(self._max_risk.Value)
if multiplier <= 0:
return 0.0
return float(base_vol) * multiplier
def _try_open_long(self, candle):
base_vol = self.Volume
if float(base_vol) <= 0:
return
trade_count = self._get_trade_count(base_vol)
if trade_count >= self._max_trades.Value:
return
order_volume = self._calculate_order_volume(base_vol, trade_count)
if order_volume <= 0:
return
self.BuyMarket(order_volume)
self._long_entry_price = float(candle.ClosePrice)
self._long_high = float(candle.ClosePrice)
self._short_entry_price = None
self._short_low = None
def _try_open_short(self, candle):
base_vol = self.Volume
if float(base_vol) <= 0:
return
trade_count = self._get_trade_count(base_vol)
if trade_count >= self._max_trades.Value:
return
order_volume = self._calculate_order_volume(base_vol, trade_count)
if order_volume <= 0:
return
self.SellMarket(order_volume)
self._short_entry_price = float(candle.ClosePrice)
self._short_low = float(candle.ClosePrice)
self._long_entry_price = None
self._long_high = None
def CreateClone(self):
return burg_extrapolator_forecast_strategy()