Стратегия логистической регрессии
Стратегия использует простую модель логистической регрессии, которая обучается на каждом баре. Для обучения применяются последние цены закрытия и синтетический ряд. Если вероятность роста выше 0.5, открывается длинная позиция, иначе короткая. Позиции удерживаются фиксированное количество баров.
Детали
- Вход: вероятность > 0.5 → покупка, иначе продажа.
- Выход: противоположный сигнал или истечение периода удержания.
- Длинная/Короткая: обе стороны.
- Таймфрейм: настраиваемый, по умолчанию 1 минута.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Logistic regression based strategy.
/// Retrains a simple model on each finished candle and trades by prediction.
/// </summary>
public class MachineLearningLogisticRegressionStrategy : Strategy
{
private readonly StrategyParam<int> _lookback;
private readonly StrategyParam<decimal> _learningRate;
private readonly StrategyParam<int> _iterations;
private readonly StrategyParam<int> _holdingPeriod;
private readonly StrategyParam<DataType> _candleType;
private decimal[] _baseSeries = Array.Empty<decimal>();
private decimal[] _synthSeries = Array.Empty<decimal>();
private int _filled;
private int _signal;
private int _hpCounter;
private bool _isInitialized;
/// <summary>
/// Training window size.
/// </summary>
public int Lookback
{
get => _lookback.Value;
set => _lookback.Value = value;
}
/// <summary>
/// Learning rate for gradient descent.
/// </summary>
public decimal LearningRate
{
get => _learningRate.Value;
set => _learningRate.Value = value;
}
/// <summary>
/// Number of training iterations.
/// </summary>
public int Iterations
{
get => _iterations.Value;
set => _iterations.Value = value;
}
/// <summary>
/// Bars to hold position before exit.
/// </summary>
public int HoldingPeriod
{
get => _holdingPeriod.Value;
set => _holdingPeriod.Value = value;
}
/// <summary>
/// Type of candles used by the strategy.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of the strategy.
/// </summary>
public MachineLearningLogisticRegressionStrategy()
{
_lookback = Param(nameof(Lookback), 3)
.SetGreaterThanZero()
.SetDisplay("Lookback", "Number of bars for training", "General")
.SetOptimize(2, 10, 1);
_learningRate = Param(nameof(LearningRate), 0.0009m)
.SetGreaterThanZero()
.SetDisplay("Learning Rate", "Gradient descent step", "General")
.SetOptimize(0.0001m, 0.01m, 0.0001m);
_iterations = Param(nameof(Iterations), 1000)
.SetGreaterThanZero()
.SetDisplay("Iterations", "Training iterations", "General")
.SetOptimize(50, 5000, 50);
_holdingPeriod = Param(nameof(HoldingPeriod), 5)
.SetGreaterThanZero()
.SetDisplay("Holding Period", "Bars to hold position", "General")
.SetOptimize(1, 20, 1);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(1).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_baseSeries = new decimal[Lookback];
_synthSeries = new decimal[Lookback];
_filled = 0;
_signal = 0;
_hpCounter = 0;
_isInitialized = false;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_baseSeries = new decimal[Lookback];
_synthSeries = new decimal[Lookback];
_filled = 0;
_signal = 0;
_hpCounter = 0;
_isInitialized = false;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_baseSeries = new decimal[Lookback];
_synthSeries = new decimal[Lookback];
_filled = 0;
_signal = 0;
_hpCounter = 0;
_isInitialized = false;
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
Shift(_baseSeries, candle.ClosePrice);
var synthetic = (decimal)Math.Log(Math.Abs(Math.Pow((double)candle.ClosePrice, 2) - 1) + 0.5);
Shift(_synthSeries, synthetic);
if (_filled < Lookback)
{
_filled++;
return;
}
if (!_isInitialized)
{
_isInitialized = true;
return;
}
// Bootstrap first direction once model buffers are initialized.
if (_signal == 0)
{
_signal = candle.ClosePrice >= _baseSeries[^2] ? 1 : -1;
_hpCounter = 0;
if (_signal == 1 && Position <= 0)
BuyMarket(Volume + Math.Abs(Position));
else if (_signal == -1 && Position >= 0)
SellMarket(Volume + Math.Abs(Position));
return;
}
var prediction = RunLogisticRegression(_baseSeries, _synthSeries, Lookback, LearningRate, Iterations);
var newSignal = prediction > 0.5m ? 1 : -1;
if (newSignal != _signal)
{
_hpCounter = 0;
if (newSignal == 1 && Position <= 0)
BuyMarket(Volume + Math.Abs(Position));
else if (newSignal == -1 && Position >= 0)
SellMarket(Volume + Math.Abs(Position));
}
else
{
_hpCounter++;
if (_signal == 1 && _hpCounter >= HoldingPeriod && Position > 0)
SellMarket(Position);
else if (_signal == -1 && _hpCounter >= HoldingPeriod && Position < 0)
BuyMarket(-Position);
}
_signal = newSignal;
}
private static void Shift(decimal[] buffer, decimal value)
{
for (var i = 0; i < buffer.Length - 1; i++)
buffer[i] = buffer[i + 1];
buffer[^1] = value;
}
private static decimal RunLogisticRegression(decimal[] x, decimal[] y, int p, decimal lr, int iterations)
{
var w = 0m;
for (var i = 0; i < iterations; i++)
{
var gradient = 0m;
for (var j = 0; j < p; j++)
{
var z = w * x[j];
var h = Sigmoid(z);
gradient += (h - y[j]) * x[j];
}
gradient /= p;
w -= lr * gradient;
}
var prediction = Sigmoid(w * x[^1]);
return prediction;
}
private static decimal Sigmoid(decimal z)
{
var exp = (decimal)Math.Exp((double)(-z));
return 1m / (1m + exp);
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class machine_learning_logistic_regression_strategy(Strategy):
def __init__(self):
super(machine_learning_logistic_regression_strategy, self).__init__()
self._lookback = self.Param("Lookback", 3) \
.SetGreaterThanZero() \
.SetDisplay("Lookback", "Number of bars for training", "General")
self._learning_rate = self.Param("LearningRate", 0.0009) \
.SetGreaterThanZero() \
.SetDisplay("Learning Rate", "Gradient descent step", "General")
self._iterations = self.Param("Iterations", 1000) \
.SetGreaterThanZero() \
.SetDisplay("Iterations", "Training iterations", "General")
self._holding_period = self.Param("HoldingPeriod", 5) \
.SetGreaterThanZero() \
.SetDisplay("Holding Period", "Bars to hold position", "General")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(1))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._base_series = []
self._synth_series = []
self._filled = 0
self._signal = 0
self._hp_counter = 0
self._is_initialized = False
@property
def candle_type(self):
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
def OnReseted(self):
super(machine_learning_logistic_regression_strategy, self).OnReseted()
lb = self._lookback.Value
self._base_series = [0.0] * lb
self._synth_series = [0.0] * lb
self._filled = 0
self._signal = 0
self._hp_counter = 0
self._is_initialized = False
def OnStarted2(self, time):
super(machine_learning_logistic_regression_strategy, self).OnStarted2(time)
lb = self._lookback.Value
self._base_series = [0.0] * lb
self._synth_series = [0.0] * lb
self._filled = 0
self._signal = 0
self._hp_counter = 0
self._is_initialized = False
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self.OnProcess).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def _shift(self, buffer, value):
for i in range(len(buffer) - 1):
buffer[i] = buffer[i + 1]
buffer[-1] = value
def _sigmoid(self, z):
try:
exp_val = math.exp(-z)
except OverflowError:
exp_val = float('inf')
return 1.0 / (1.0 + exp_val)
def _run_logistic_regression(self, x, y, p, lr, iterations):
w = 0.0
for _ in range(iterations):
gradient = 0.0
for j in range(p):
z = w * x[j]
h = self._sigmoid(z)
gradient += (h - y[j]) * x[j]
gradient /= p
w -= lr * gradient
prediction = self._sigmoid(w * x[-1])
return prediction
def OnProcess(self, candle):
if candle.State != CandleStates.Finished:
return
close = float(candle.ClosePrice)
lb = self._lookback.Value
self._shift(self._base_series, close)
synthetic = math.log(abs(close * close - 1.0) + 0.5)
self._shift(self._synth_series, synthetic)
if self._filled < lb:
self._filled += 1
return
if not self._is_initialized:
self._is_initialized = True
return
if self._signal == 0:
prev_close = self._base_series[-2] if len(self._base_series) >= 2 else 0.0
self._signal = 1 if close >= prev_close else -1
self._hp_counter = 0
if self._signal == 1 and self.Position <= 0:
self.BuyMarket(self.Volume + abs(self.Position))
elif self._signal == -1 and self.Position >= 0:
self.SellMarket(self.Volume + abs(self.Position))
return
lr = float(self._learning_rate.Value)
iters = self._iterations.Value
prediction = self._run_logistic_regression(
self._base_series, self._synth_series, lb, lr, iters
)
new_signal = 1 if prediction > 0.5 else -1
if new_signal != self._signal:
self._hp_counter = 0
if new_signal == 1 and self.Position <= 0:
self.BuyMarket(self.Volume + abs(self.Position))
elif new_signal == -1 and self.Position >= 0:
self.SellMarket(self.Volume + abs(self.Position))
else:
self._hp_counter += 1
hp = self._holding_period.Value
if self._signal == 1 and self._hp_counter >= hp and self.Position > 0:
self.SellMarket(self.Position)
elif self._signal == -1 and self._hp_counter >= hp and self.Position < 0:
self.BuyMarket(abs(self.Position))
self._signal = new_signal
def CreateClone(self):
return machine_learning_logistic_regression_strategy()