Стратегия Burg Extrapolator
Обзор
Стратегия Burg Extrapolator — это порт на StockSharp оригинального эксперта MetaTrader 4 «Burg Extrapolator». Исходный советник строит модель авторегрессии (AR) с помощью алгоритма Бёрга по скользящему окну цен открытия (или их преобразований MOM/ROC) и формирует прогноз траектории цены. Решения о входе/выходе принимаются по экстремальным точкам прогноза: если ожидаемый ход в одном направлении достаточно велик, алгоритм наращивает позицию, а при противоположном сигнале закрывает существующие сделки. Конвертация сохраняет модельные блоки, адаптируя управление позицией и капиталом под высокоуровневый API StockSharp.
Логика торговли
- Подготовка данных
- Формируется история из
PastBars + 1цен открытия для заданногоCandleType. - По выбору трейдера данные преобразуются в логарифмический моментум (по умолчанию) или процентное изменение. Для работы с «сырыми» ценами они центрируются на скользящем среднем, как в коде MT4.
- Формируется история из
- Линейное прогнозирование Бёрга
- Рассчитываются коэффициенты отражения до порядка
PastBars * ModelOrderалгоритмом Бёрга. - Модель рекурсивно разворачивается вперёд и строит последовательность будущих значений (на практике — до
PastBarsбаров). При необходимости прогноз переводится обратно в ценовое пространство.
- Рассчитываются коэффициенты отражения до порядка
- Поиск сигналов
- Стратегия просматривает прогноз и запоминает первую максимум/минимум до появления противоположного экстремума. Разница между найденными значениями сравнивается с порогами
MaxLossиMinProfit(в абсолютную цену переводятся умножением наPriceStep). - Достаточно сильный подъём даёт
OpenSignal = 1, падение —OpenSignal = -1. Если первым формируется противоположный экстремум, выставляетсяCloseSignalдля закрытия текущей позиции без открытия новой.
- Стратегия просматривает прогноз и запоминает первую максимум/минимум до появления противоположного экстремума. Разница между найденными значениями сравнивается с порогами
- Управление ордерами
- Сначала выполняются защитные блоки (стоп-лосс, тейк-профит, трейлинг). Трейлинг отслеживает лучшую цену с момента входа и закрывает позицию при откате на
TrailingStopпунктов, что соответствует поведению MT4 с переносом стопа. - При появлении сигнала на закрытие противоположной позиции отправляется рыночный ордер на полный объём текущего нетто-позиционирования.
- Сигналы входа последовательно наращивают позицию до предела
MaxTrades. Объём следующей заявки масштабируется по формуле1 + existingTrades * MaxRisk, что заменяет метатрейдеровскую привязку к марже и удобно для StockSharp.
- Сначала выполняются защитные блоки (стоп-лосс, тейк-профит, трейлинг). Трейлинг отслеживает лучшую цену с момента входа и закрывает позицию при откате на
Данные и индикаторы
- Свечи выбранного типа
CandleType(по умолчанию 30-минутные). - Встроенная модель авторегрессии Бёрга (без внешних индикаторов).
- Необязательные преобразования: логарифмический моментум и процентное изменение.
Параметры
| Имя | Значение по умолчанию | Описание |
|---|---|---|
CandleType |
30-минутные свечи | Основной таймфрейм стратегии. |
MaxRisk |
0.5 | Множитель риска для масштабирования дополнительных заявок. |
MaxTrades |
5 | Максимальное число сделок в одном направлении. |
MinProfit |
160 | Минимальный прогнозируемый профит (в пунктах) для открытия новых ордеров. |
MaxLoss |
130 | Предельный прогнозируемый убыток (в пунктах), при достижении которого позиция закрывается. |
TakeProfit |
0 | Фиксированный тейк-профит в пунктах (0 — отключено). |
StopLoss |
180 | Фиксированный стоп-лосс в пунктах (0 — отключено). |
TrailingStop |
10 | Размер трейлинг-стопа в пунктах, работает только при StopLoss > 0. |
PastBars |
200 | Длина истории для модели Бёрга. |
ModelOrder |
0.37 | Доля PastBars, задающая порядок модели Бёрга. |
UseMomentum |
true | Использовать логарифмический моментум. |
UseRateOfChange |
false | Использовать процентное изменение (игнорируется, если активирован моментум). |
Все параметры оформлены как StrategyParam<T>, доступны для оптимизации и изменения в Designer.
Особенности реализации
- Алгоритм Бёрга реализован напрямую на C# и повторяет логику MT4. Расчёты выполняются в
double, а перед проверкой сигналов прогнозы переводятся вdecimal. - В MT4 объём рассчитывался из свободной маржи. В версии для StockSharp объём контролируется за счёт свойства
Volumeи параметраMaxRisk: базовый лот задаётсяVolume, дальнейшие доливки масштабируются линейно. - Защитные действия (стопы, трейлинг) реализованы через рыночные ордера, что соответствует высокоуровневому API и предотвращает зависание виртуальных заявок в тестах.
- При изменении
PastBarsилиModelOrderмассивы и состояние модели пересоздаются, поэтому изменение параметров на лету немедленно влияет на расчёт прогнозов. - Для визуального контроля стратегия автоматически рисует свечи и сделки; при необходимости легко добавить вывод прогноза на отдельную серию.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Burg extrapolator strategy converted from MetaTrader 4 implementation.
/// Predicts the future price path with Burg linear prediction coefficients and trades on forecasted extremes.
/// </summary>
public class BurgExtrapolatorForecastStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<decimal> _maxRisk;
private readonly StrategyParam<int> _maxTrades;
private readonly StrategyParam<int> _minProfit;
private readonly StrategyParam<int> _maxLoss;
private readonly StrategyParam<int> _takeProfit;
private readonly StrategyParam<int> _stopLoss;
private readonly StrategyParam<int> _trailingStop;
private readonly StrategyParam<int> _pastBars;
private readonly StrategyParam<decimal> _modelOrder;
private readonly StrategyParam<bool> _useMomentum;
private readonly StrategyParam<bool> _useRateOfChange;
private readonly List<decimal> _openHistory = new();
private double[] _samples = Array.Empty<double>();
private double[] _coefficients = Array.Empty<double>();
private double[] _predictions = Array.Empty<double>();
private int _np;
private int _no;
private int _nf;
private double _averagePrice;
private bool _isFirstRun = true;
private decimal? _longEntryPrice;
private decimal? _shortEntryPrice;
private decimal? _longHigh;
private decimal? _shortLow;
/// <summary>
/// Initializes a new instance of the <see cref="BurgExtrapolatorForecastStrategy"/> class.
/// </summary>
public BurgExtrapolatorForecastStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Primary timeframe used for forecasting", "General");
_maxRisk = Param(nameof(MaxRisk), 0.5m)
.SetGreaterThanZero()
.SetDisplay("Max Risk", "Risk factor controlling position scaling", "Money Management");
_maxTrades = Param(nameof(MaxTrades), 5)
.SetGreaterThanZero()
.SetDisplay("Max Trades", "Maximum stacked trades per direction", "Money Management");
_minProfit = Param(nameof(MinProfit), 160)
.SetGreaterThanZero()
.SetDisplay("Min Profit", "Forecasted profit in points required to open trades", "Signals");
_maxLoss = Param(nameof(MaxLoss), 130)
.SetGreaterThanZero()
.SetDisplay("Max Loss", "Forecasted adverse excursion closing existing trades", "Signals");
_takeProfit = Param(nameof(TakeProfit), 0)
.SetNotNegative()
.SetDisplay("Take Profit", "Optional fixed take profit in points", "Protection")
;
_stopLoss = Param(nameof(StopLoss), 180)
.SetNotNegative()
.SetDisplay("Stop Loss", "Optional fixed stop loss in points", "Protection")
;
_trailingStop = Param(nameof(TrailingStop), 10)
.SetNotNegative()
.SetDisplay("Trailing Stop", "Trailing distance in points (requires stop loss)", "Protection")
;
_pastBars = Param(nameof(PastBars), 200)
.SetGreaterThanZero()
.SetDisplay("Past Bars", "History length used for Burg model", "Forecast");
_modelOrder = Param(nameof(ModelOrder), 0.37m)
.SetGreaterThanZero()
.SetDisplay("Model Order", "Fraction of past bars used as Burg order", "Forecast");
_useMomentum = Param(nameof(UseMomentum), true)
.SetDisplay("Use Momentum", "Use logarithmic momentum instead of raw prices", "Forecast");
_useRateOfChange = Param(nameof(UseRateOfChange), false)
.SetDisplay("Use ROC", "Use percentage rate of change instead of raw prices", "Forecast");
}
/// <summary>
/// Type of candles processed by the strategy.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Risk factor used when stacking positions.
/// </summary>
public decimal MaxRisk
{
get => _maxRisk.Value;
set => _maxRisk.Value = value;
}
/// <summary>
/// Maximum number of trades allowed in one direction.
/// </summary>
public int MaxTrades
{
get => _maxTrades.Value;
set => _maxTrades.Value = value;
}
/// <summary>
/// Minimum profit in points predicted by the Burg model to initiate new trades.
/// </summary>
public int MinProfit
{
get => _minProfit.Value;
set => _minProfit.Value = value;
}
/// <summary>
/// Maximum loss in points predicted by the Burg model before closing positions.
/// </summary>
public int MaxLoss
{
get => _maxLoss.Value;
set => _maxLoss.Value = value;
}
/// <summary>
/// Optional take profit expressed in points.
/// </summary>
public int TakeProfit
{
get => _takeProfit.Value;
set => _takeProfit.Value = value;
}
/// <summary>
/// Optional stop loss expressed in points.
/// </summary>
public int StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Trailing stop distance in points.
/// </summary>
public int TrailingStop
{
get => _trailingStop.Value;
set => _trailingStop.Value = value;
}
/// <summary>
/// Number of historical candles used by the Burg predictor.
/// </summary>
public int PastBars
{
get => _pastBars.Value;
set => _pastBars.Value = value;
}
/// <summary>
/// Fraction of <see cref="PastBars"/> used as Burg model order.
/// </summary>
public decimal ModelOrder
{
get => _modelOrder.Value;
set => _modelOrder.Value = value;
}
/// <summary>
/// Use logarithmic momentum transformation instead of raw prices.
/// </summary>
public bool UseMomentum
{
get => _useMomentum.Value;
set => _useMomentum.Value = value;
}
/// <summary>
/// Use percentage rate of change transformation instead of raw prices.
/// </summary>
public bool UseRateOfChange
{
get => _useRateOfChange.Value;
set => _useRateOfChange.Value = value;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
yield return (Security, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_openHistory.Clear();
_samples = Array.Empty<double>();
_coefficients = Array.Empty<double>();
_predictions = Array.Empty<double>();
_np = 0;
_no = 0;
_nf = 0;
_averagePrice = 0.0;
_isFirstRun = true;
ResetLongState();
ResetShortState();
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
AddOpenPrice(candle.OpenPrice);
if (!EnsureModel())
return;
TrimHistory();
if (_openHistory.Count < _np + 1)
return;
if (!UpdateSamples())
return;
var predictionCount = ComputePredictions();
if (predictionCount <= 0)
return;
var (openSignal, closeSignal) = EvaluateSignals(predictionCount);
if (ManageProtection(candle))
{
// Position has been closed by protective logic, wait for the next candle to re-evaluate.
return;
}
HandleSignalClosures(openSignal, closeSignal);
if (openSignal == 1)
{
TryOpenLong(candle);
}
else if (openSignal == -1)
{
TryOpenShort(candle);
}
}
private void AddOpenPrice(decimal openPrice)
{
_openHistory.Add(openPrice);
}
private bool EnsureModel()
{
var np = PastBars;
if (np < 3)
return false;
var modelOrder = ModelOrder;
var no = (int)(modelOrder * np);
if (no < 1)
no = 1;
if (no >= np - 1)
no = np - 2;
var nf = np - no - 1;
if (nf < 1)
nf = 1;
var predictionLength = nf + 1;
if (_np != np || _no != no || _nf != nf || _predictions.Length != predictionLength)
{
_np = np;
_no = no;
_nf = nf;
_samples = new double[np];
_coefficients = new double[no + 1];
_predictions = new double[predictionLength];
_averagePrice = 0.0;
_isFirstRun = true;
}
return true;
}
private void TrimHistory()
{
var maxHistory = _np + 1;
while (_openHistory.Count > maxHistory)
{
_openHistory.RemoveAt(0);
}
}
private bool UpdateSamples()
{
if (_np <= 0)
return false;
var useMomentum = UseMomentum;
var useRoc = !useMomentum && UseRateOfChange;
if (useMomentum || useRoc)
{
if (!_isFirstRun)
{
for (var i = 0; i < _np - 1; i++)
_samples[i] = _samples[i + 1];
var current = GetOpen(0);
var previous = GetOpen(1);
if (previous == 0m)
return false;
var ratio = (double)(current / previous);
_samples[_np - 1] = useMomentum ? Math.Log(ratio) : ratio - 1.0;
}
else
{
for (var i = 0; i < _np; i++)
{
var current = GetOpen(i);
var previous = GetOpen(i + 1);
if (previous == 0m)
return false;
var ratio = (double)(current / previous);
_samples[_np - 1 - i] = useMomentum ? Math.Log(ratio) : ratio - 1.0;
}
_averagePrice = 0.0;
_isFirstRun = false;
}
}
else
{
if (_isFirstRun)
{
double sum = 0.0;
for (var i = 0; i < _np; i++)
sum += (double)GetOpen(i);
_averagePrice = sum / _np;
for (var i = 0; i < _np; i++)
{
var open = (double)GetOpen(i);
_samples[_np - 1 - i] = open - _averagePrice;
}
_isFirstRun = false;
}
else
{
var newest = (double)GetOpen(0);
var leaving = (double)GetOpen(_np);
_averagePrice += (newest - leaving) / _np;
for (var i = 0; i < _np; i++)
{
var open = (double)GetOpen(i);
_samples[_np - 1 - i] = open - _averagePrice;
}
}
}
return true;
}
private int ComputePredictions()
{
Array.Clear(_coefficients, 0, _coefficients.Length);
Array.Clear(_predictions, 0, _predictions.Length);
double den = 0.0;
for (var i = 0; i < _np; i++)
{
var value = _samples[i];
den += value * value;
}
den *= 2.0;
var df = new double[_np];
var db = new double[_np];
for (var i = 0; i < _np; i++)
{
var value = _samples[i];
df[i] = value;
db[i] = value;
}
double r = 0.0;
for (var k = 1; k <= _no; k++)
{
double num = 0.0;
for (var i = k; i < _np; i++)
num += df[i] * db[i - 1];
var denominator = (1.0 - r * r) * den - df[k - 1] * df[k - 1] - db[_np - 1] * db[_np - 1];
if (Math.Abs(denominator) < 1e-12)
return 0;
r = -2.0 * num / denominator;
_coefficients[k] = r;
var half = k / 2;
for (var i = 1; i <= half; i++)
{
var ki = k - i;
var tmp = _coefficients[i];
_coefficients[i] += r * _coefficients[ki];
if (i != ki)
_coefficients[ki] += r * tmp;
}
if (k < _no)
{
for (var i = _np - 1; i >= k; i--)
{
var tmp = df[i];
df[i] += r * db[i - 1];
db[i] = db[i - 1] + r * tmp;
}
}
den = denominator;
}
for (var n = _np - 1; n < _np + _nf; n++)
{
double sum = 0.0;
for (var i = 1; i <= _no; i++)
{
if (n - i < _np)
sum -= _coefficients[i] * _samples[n - i];
else
sum -= _coefficients[i] * _predictions[n - i - _np + 1];
}
var index = n - _np + 1;
if (index < _predictions.Length)
_predictions[index] = sum;
}
var useMomentum = UseMomentum;
var useRoc = !useMomentum && UseRateOfChange;
if (useMomentum || useRoc)
{
var startPrice = (double)GetOpen(0);
_predictions[0] = startPrice;
for (var i = 1; i < _predictions.Length; i++)
{
_predictions[i] = useMomentum
? _predictions[i - 1] * Math.Exp(_predictions[i])
: _predictions[i - 1] * (1.0 + _predictions[i]);
}
}
else
{
for (var i = 0; i < _predictions.Length; i++)
_predictions[i] += _averagePrice;
}
return _predictions.Length;
}
private (int openSignal, int closeSignal) EvaluateSignals(int predictionCount)
{
if (predictionCount == 0)
return (0, 0);
var step = Security?.PriceStep ?? 1m;
var maxLossDelta = MaxLoss * step;
var minProfitDelta = MinProfit * step;
var ymax = (decimal)_predictions[0];
var ymin = ymax;
var imax = 0;
var imin = 0;
var openSignal = 0;
var closeSignal = 0;
var limit = Math.Min(_np, predictionCount);
for (var i = 1; i < limit; i++)
{
var value = (decimal)_predictions[i];
if (value > ymax && openSignal == 0)
{
ymax = value;
imax = i;
if (imin == 0 && ymax - ymin >= maxLossDelta)
closeSignal = 1;
if (imin == 0 && ymax - ymin >= minProfitDelta)
openSignal = 1;
}
if (value < ymin && openSignal == 0)
{
ymin = value;
imin = i;
if (imax == 0 && ymax - ymin >= maxLossDelta)
closeSignal = -1;
if (imax == 0 && ymax - ymin >= minProfitDelta)
openSignal = -1;
}
}
return (openSignal, closeSignal);
}
private bool ManageProtection(ICandleMessage candle)
{
var step = Security?.PriceStep ?? 1m;
var stopDistance = StopLoss * step;
var takeDistance = TakeProfit * step;
var trailingDistance = TrailingStop * step;
if (Position > 0)
{
_longEntryPrice ??= candle.ClosePrice;
_longHigh = _longHigh.HasValue ? Math.Max(_longHigh.Value, candle.HighPrice) : candle.HighPrice;
if (StopLoss > 0 && _longEntryPrice.HasValue && candle.LowPrice <= _longEntryPrice.Value - stopDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
if (TakeProfit > 0 && _longEntryPrice.HasValue && candle.HighPrice >= _longEntryPrice.Value + takeDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
if (TrailingStop > 0 && StopLoss > 0 && _longHigh.HasValue && candle.LowPrice <= _longHigh.Value - trailingDistance)
{
SellMarket(Position);
ResetLongState();
return true;
}
}
else
{
ResetLongState();
}
if (Position < 0)
{
_shortEntryPrice ??= candle.ClosePrice;
_shortLow = _shortLow.HasValue ? Math.Min(_shortLow.Value, candle.LowPrice) : candle.LowPrice;
if (StopLoss > 0 && _shortEntryPrice.HasValue && candle.HighPrice >= _shortEntryPrice.Value + stopDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
if (TakeProfit > 0 && _shortEntryPrice.HasValue && candle.LowPrice <= _shortEntryPrice.Value - takeDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
if (TrailingStop > 0 && StopLoss > 0 && _shortLow.HasValue && candle.HighPrice >= _shortLow.Value + trailingDistance)
{
BuyMarket(-Position);
ResetShortState();
return true;
}
}
else
{
ResetShortState();
}
return false;
}
private void HandleSignalClosures(int openSignal, int closeSignal)
{
if (Position > 0 && (closeSignal == -1 || openSignal == -1))
{
SellMarket(Position);
ResetLongState();
}
else if (Position < 0 && (closeSignal == 1 || openSignal == 1))
{
BuyMarket(-Position);
ResetShortState();
}
}
private void TryOpenLong(ICandleMessage candle)
{
var baseVolume = Volume;
if (baseVolume <= 0m)
return;
var tradeCount = GetTradeCount(baseVolume);
if (tradeCount >= MaxTrades)
return;
var orderVolume = CalculateOrderVolume(baseVolume, tradeCount);
if (orderVolume <= 0m)
return;
BuyMarket(orderVolume);
_longEntryPrice = candle.ClosePrice;
_longHigh = candle.ClosePrice;
ResetShortState();
}
private void TryOpenShort(ICandleMessage candle)
{
var baseVolume = Volume;
if (baseVolume <= 0m)
return;
var tradeCount = GetTradeCount(baseVolume);
if (tradeCount >= MaxTrades)
return;
var orderVolume = CalculateOrderVolume(baseVolume, tradeCount);
if (orderVolume <= 0m)
return;
SellMarket(orderVolume);
_shortEntryPrice = candle.ClosePrice;
_shortLow = candle.ClosePrice;
ResetLongState();
}
private int GetTradeCount(decimal baseVolume)
{
if (baseVolume <= 0m)
return 0;
var trades = Math.Abs(Position) / baseVolume;
return (int)Math.Ceiling((double)(trades - 1e-8m));
}
private decimal CalculateOrderVolume(decimal baseVolume, int existingTrades)
{
var multiplier = 1m + existingTrades * MaxRisk;
if (multiplier <= 0m)
return 0m;
return baseVolume * multiplier;
}
private decimal GetOpen(int shift)
{
var index = _openHistory.Count - 1 - shift;
return index >= 0 && index < _openHistory.Count ? _openHistory[index] : 0m;
}
private void ResetLongState()
{
_longEntryPrice = null;
_longHigh = null;
}
private void ResetShortState()
{
_shortEntryPrice = null;
_shortLow = null;
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class burg_extrapolator_forecast_strategy(Strategy):
"""
Burg extrapolator forecast strategy.
Predicts future price path with Burg linear prediction coefficients and trades on forecasted extremes.
"""
def __init__(self):
super(burg_extrapolator_forecast_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Primary timeframe used for forecasting", "General")
self._max_risk = self.Param("MaxRisk", 0.5) \
.SetDisplay("Max Risk", "Risk factor controlling position scaling", "Money Management")
self._max_trades = self.Param("MaxTrades", 5) \
.SetDisplay("Max Trades", "Maximum stacked trades per direction", "Money Management")
self._min_profit = self.Param("MinProfit", 160) \
.SetDisplay("Min Profit", "Forecasted profit in points required to open trades", "Signals")
self._max_loss = self.Param("MaxLoss", 130) \
.SetDisplay("Max Loss", "Forecasted adverse excursion closing existing trades", "Signals")
self._take_profit = self.Param("TakeProfit", 0) \
.SetDisplay("Take Profit", "Optional fixed take profit in points", "Protection")
self._stop_loss = self.Param("StopLoss", 180) \
.SetDisplay("Stop Loss", "Optional fixed stop loss in points", "Protection")
self._trailing_stop = self.Param("TrailingStop", 10) \
.SetDisplay("Trailing Stop", "Trailing distance in points", "Protection")
self._past_bars = self.Param("PastBars", 200) \
.SetDisplay("Past Bars", "History length used for Burg model", "Forecast")
self._model_order = self.Param("ModelOrder", 0.37) \
.SetDisplay("Model Order", "Fraction of past bars used as Burg order", "Forecast")
self._use_momentum = self.Param("UseMomentum", True) \
.SetDisplay("Use Momentum", "Use logarithmic momentum instead of raw prices", "Forecast")
self._use_rate_of_change = self.Param("UseRateOfChange", False) \
.SetDisplay("Use ROC", "Use percentage rate of change instead of raw prices", "Forecast")
self._open_history = []
self._np = 0
self._no = 0
self._nf = 0
self._average_price = 0.0
self._is_first_run = True
self._samples = []
self._long_entry_price = None
self._short_entry_price = None
self._long_high = None
self._short_low = None
@property
def candle_type(self):
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
def OnReseted(self):
super(burg_extrapolator_forecast_strategy, self).OnReseted()
self._open_history = []
self._np = 0
self._no = 0
self._nf = 0
self._average_price = 0.0
self._is_first_run = True
self._samples = []
self._long_entry_price = None
self._short_entry_price = None
self._long_high = None
self._short_low = None
def OnStarted2(self, time):
super(burg_extrapolator_forecast_strategy, self).OnStarted2(time)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self.on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def _get_open(self, shift):
idx = len(self._open_history) - 1 - shift
if 0 <= idx < len(self._open_history):
return self._open_history[idx]
return 0.0
def _ensure_model(self):
np_val = self._past_bars.Value
if np_val < 3:
return False
mo = self._model_order.Value
no = int(mo * np_val)
if no < 1:
no = 1
if no >= np_val - 1:
no = np_val - 2
nf = np_val - no - 1
if nf < 1:
nf = 1
if self._np != np_val or self._no != no or self._nf != nf:
self._np = np_val
self._no = no
self._nf = nf
self._samples = [0.0] * np_val
self._average_price = 0.0
self._is_first_run = True
return True
def on_process(self, candle):
if candle.State != CandleStates.Finished:
return
self._open_history.append(float(candle.OpenPrice))
if not self._ensure_model():
return
max_hist = self._np + 1
while len(self._open_history) > max_hist:
self._open_history.pop(0)
if len(self._open_history) < self._np + 1:
return
if not self._update_samples():
return
predictions = self._compute_predictions()
if predictions is None:
return
open_signal, close_signal = self._evaluate_signals(predictions)
if self._manage_protection(candle):
return
self._handle_signal_closures(open_signal, close_signal)
if open_signal == 1:
self._try_open_long(candle)
elif open_signal == -1:
self._try_open_short(candle)
def _update_samples(self):
use_mom = self._use_momentum.Value
use_roc = not use_mom and self._use_rate_of_change.Value
if use_mom or use_roc:
if not self._is_first_run:
for i in range(self._np - 1):
self._samples[i] = self._samples[i + 1]
current = self._get_open(0)
previous = self._get_open(1)
if previous == 0:
return False
ratio = current / previous
self._samples[self._np - 1] = math.log(ratio) if use_mom else ratio - 1.0
else:
for i in range(self._np):
current = self._get_open(i)
previous = self._get_open(i + 1)
if previous == 0:
return False
ratio = current / previous
self._samples[self._np - 1 - i] = math.log(ratio) if use_mom else ratio - 1.0
self._average_price = 0.0
self._is_first_run = False
else:
if self._is_first_run:
total = sum(self._get_open(i) for i in range(self._np))
self._average_price = total / self._np
for i in range(self._np):
self._samples[self._np - 1 - i] = self._get_open(i) - self._average_price
self._is_first_run = False
else:
newest = self._get_open(0)
leaving = self._get_open(self._np)
self._average_price += (newest - leaving) / self._np
for i in range(self._np):
self._samples[self._np - 1 - i] = self._get_open(i) - self._average_price
return True
def _compute_predictions(self):
coefficients = [0.0] * (self._no + 1)
pred_len = self._nf + 1
predictions = [0.0] * pred_len
den = sum(v * v for v in self._samples) * 2.0
df = list(self._samples)
db = list(self._samples)
r = 0.0
for k in range(1, self._no + 1):
num = sum(df[i] * db[i - 1] for i in range(k, self._np))
denom = (1.0 - r * r) * den - df[k - 1] * df[k - 1] - db[self._np - 1] * db[self._np - 1]
if abs(denom) < 1e-12:
return None
r = -2.0 * num / denom
coefficients[k] = r
half = k // 2
for i in range(1, half + 1):
ki = k - i
tmp = coefficients[i]
coefficients[i] += r * coefficients[ki]
if i != ki:
coefficients[ki] += r * tmp
if k < self._no:
for i in range(self._np - 1, k - 1, -1):
tmp = df[i]
df[i] += r * db[i - 1]
db[i] = db[i - 1] + r * tmp
den = denom
for n in range(self._np - 1, self._np + self._nf):
s = 0.0
for i in range(1, self._no + 1):
if n - i < self._np:
s -= coefficients[i] * self._samples[n - i]
else:
s -= coefficients[i] * predictions[n - i - self._np + 1]
idx = n - self._np + 1
if idx < len(predictions):
predictions[idx] = s
use_mom = self._use_momentum.Value
use_roc = not use_mom and self._use_rate_of_change.Value
if use_mom or use_roc:
start_price = self._get_open(0)
predictions[0] = start_price
for i in range(1, len(predictions)):
if use_mom:
predictions[i] = predictions[i - 1] * math.exp(predictions[i])
else:
predictions[i] = predictions[i - 1] * (1.0 + predictions[i])
else:
for i in range(len(predictions)):
predictions[i] += self._average_price
return predictions
def _evaluate_signals(self, predictions):
step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
max_loss_delta = float(self._max_loss.Value) * step
min_profit_delta = float(self._min_profit.Value) * step
ymax = predictions[0]
ymin = ymax
imax = 0
imin = 0
open_signal = 0
close_signal = 0
limit = min(self._np, len(predictions))
for i in range(1, limit):
value = predictions[i]
if value > ymax and open_signal == 0:
ymax = value
imax = i
if imin == 0 and ymax - ymin >= max_loss_delta:
close_signal = 1
if imin == 0 and ymax - ymin >= min_profit_delta:
open_signal = 1
if value < ymin and open_signal == 0:
ymin = value
imin = i
if imax == 0 and ymax - ymin >= max_loss_delta:
close_signal = -1
if imax == 0 and ymax - ymin >= min_profit_delta:
open_signal = -1
return open_signal, close_signal
def _manage_protection(self, candle):
step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
stop_dist = float(self._stop_loss.Value) * step
take_dist = float(self._take_profit.Value) * step
trail_dist = float(self._trailing_stop.Value) * step
if self.Position > 0:
if self._long_entry_price is None:
self._long_entry_price = float(candle.ClosePrice)
self._long_high = max(self._long_high, float(candle.HighPrice)) if self._long_high is not None else float(candle.HighPrice)
if self._stop_loss.Value > 0 and float(candle.LowPrice) <= self._long_entry_price - stop_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
if self._take_profit.Value > 0 and float(candle.HighPrice) >= self._long_entry_price + take_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
if self._trailing_stop.Value > 0 and self._stop_loss.Value > 0 and self._long_high is not None:
if float(candle.LowPrice) <= self._long_high - trail_dist:
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
return True
else:
self._long_entry_price = None
self._long_high = None
if self.Position < 0:
if self._short_entry_price is None:
self._short_entry_price = float(candle.ClosePrice)
self._short_low = min(self._short_low, float(candle.LowPrice)) if self._short_low is not None else float(candle.LowPrice)
if self._stop_loss.Value > 0 and float(candle.HighPrice) >= self._short_entry_price + stop_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
if self._take_profit.Value > 0 and float(candle.LowPrice) <= self._short_entry_price - take_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
if self._trailing_stop.Value > 0 and self._stop_loss.Value > 0 and self._short_low is not None:
if float(candle.HighPrice) >= self._short_low + trail_dist:
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
return True
else:
self._short_entry_price = None
self._short_low = None
return False
def _handle_signal_closures(self, open_signal, close_signal):
if self.Position > 0 and (close_signal == -1 or open_signal == -1):
self.SellMarket(self.Position)
self._long_entry_price = None
self._long_high = None
elif self.Position < 0 and (close_signal == 1 or open_signal == 1):
self.BuyMarket(Math.Abs(self.Position))
self._short_entry_price = None
self._short_low = None
def _get_trade_count(self, base_vol):
if float(base_vol) <= 0:
return 0
trades = float(Math.Abs(self.Position)) / float(base_vol)
return int(math.ceil(trades - 1e-8))
def _calculate_order_volume(self, base_vol, existing_trades):
multiplier = 1.0 + existing_trades * float(self._max_risk.Value)
if multiplier <= 0:
return 0.0
return float(base_vol) * multiplier
def _try_open_long(self, candle):
base_vol = self.Volume
if float(base_vol) <= 0:
return
trade_count = self._get_trade_count(base_vol)
if trade_count >= self._max_trades.Value:
return
order_volume = self._calculate_order_volume(base_vol, trade_count)
if order_volume <= 0:
return
self.BuyMarket(order_volume)
self._long_entry_price = float(candle.ClosePrice)
self._long_high = float(candle.ClosePrice)
self._short_entry_price = None
self._short_low = None
def _try_open_short(self, candle):
base_vol = self.Volume
if float(base_vol) <= 0:
return
trade_count = self._get_trade_count(base_vol)
if trade_count >= self._max_trades.Value:
return
order_volume = self._calculate_order_volume(base_vol, trade_count)
if order_volume <= 0:
return
self.SellMarket(order_volume)
self._short_entry_price = float(candle.ClosePrice)
self._short_low = float(candle.ClosePrice)
self._long_entry_price = None
self._long_high = None
def CreateClone(self):
return burg_extrapolator_forecast_strategy()