Стратегия Price Impulse
Стратегия Price Impulse отслеживает поток заявок Level1 и реагирует на резкие изменения лучшего бид/аск. Логика повторяет исходный советник MetaTrader 5: сравнивает текущую котировку с ценой, поступившей несколько тиков назад, и открывает позицию, если импульс превышает заданный порог в пунктах. Фиксированные стоп‑лосс и тейк‑профит автоматически настраиваются через StartProtection, поэтому дополнительных ручных заявок не требуется.
Подход нейтрален к направлению рынка. Длинная позиция появляется, когда аск заметно вырос относительно прошлой цены, короткая — когда бид провалился ниже выбранного уровня. После каждой сделки включается пауза (CooldownSeconds), полностью повторяющая параметр InpSleep из MQL и защищающая от постоянного перезахода.
Как работает
- Подписывается на поток Level1 и хранит скользящую историю лучших бидов и асков.
- Сравнивает текущую цену с котировкой, пришедшей
HistoryGap тиков назад (дополнительный буфер управляется параметром ExtraHistory).
- Открывает лонг, если аск вырос более чем на
ImpulsePoints * PriceStep, и стратегия не находится в длинной позиции.
- Открывает шорт, если бид упал более чем на тот же порог, и открытых коротких позиций нет.
- Применяет фиксированные стоп‑лосс/тейк‑профит в пунктах и выдерживает паузу
CooldownSeconds перед следующей сделкой.
Параметры
- OrderVolume – объём каждой рыночной заявки. По умолчанию
0.1, что соответствует оригинальному эксперту, но допускает оптимизацию под любой инструмент.
- StopLossPoints – расстояние от точки входа до защитного стопа в пунктах. Значение
0 отключает стоп‑лосс.
- TakeProfitPoints – расстояние до тейк‑профита в пунктах. Значение
0 отключает тейк‑профит.
- ImpulsePoints – минимальный импульс (в пунктах) между текущей котировкой и ценой
HistoryGap тиков назад, необходимый для входа.
- HistoryGap – количество тиков между текущей ценой и сравниваемой точкой. Большие значения фильтруют шум, но делают сигналы менее оперативными.
- ExtraHistory – дополнительное число хранимых тиков, чтобы корректно обрабатывать bursts котировок и повторять логику MQL, где массив заполнялся с запасом.
- CooldownSeconds – время ожидания после сделки перед новым входом. Полностью заменяет
InpSleep из оригинала и предотвращает постоянные перевороты.
Особенности
- Параметры, выраженные в пунктах, автоматически переводятся в абсолютные значения через
Security.PriceStep (с запасным использованием Security.MinPriceStep).
- Торговля начинается только после получения достаточной истории и проверки, что соединение активно и торговля разрешена.
- Стратегия рассчитана на ликвидные инструменты с надёжным потоком Level1, иначе импульсы могут быть ложными.
- Python‑версии нет — предоставлена только реализация на C#, как и требовалось в задаче.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Price impulse strategy that trades on rapid price moves using candle close prices.
/// </summary>
public class PriceImpulseStrategy : Strategy
{
private readonly StrategyParam<decimal> _orderVolume;
private readonly StrategyParam<int> _stopLossPoints;
private readonly StrategyParam<int> _takeProfitPoints;
private readonly StrategyParam<int> _impulsePoints;
private readonly StrategyParam<int> _historyGap;
private readonly StrategyParam<int> _extraHistory;
private readonly StrategyParam<int> _cooldownSeconds;
private readonly StrategyParam<DataType> _candleType;
private readonly List<decimal> _priceHistory = [];
private decimal _tickSize;
private DateTimeOffset? _lastTradeTime;
private decimal? _entryPrice;
private decimal? _stopLossPrice;
private decimal? _takeProfitPrice;
/// <summary>
/// Volume used for each market order.
/// </summary>
public decimal OrderVolume
{
get => _orderVolume.Value;
set => _orderVolume.Value = value;
}
/// <summary>
/// Stop loss distance expressed in price points.
/// </summary>
public int StopLossPoints
{
get => _stopLossPoints.Value;
set => _stopLossPoints.Value = value;
}
/// <summary>
/// Take profit distance expressed in price points.
/// </summary>
public int TakeProfitPoints
{
get => _takeProfitPoints.Value;
set => _takeProfitPoints.Value = value;
}
/// <summary>
/// Minimum impulse measured in price points to trigger a trade.
/// </summary>
public int ImpulsePoints
{
get => _impulsePoints.Value;
set => _impulsePoints.Value = value;
}
/// <summary>
/// Number of candles between price comparisons.
/// </summary>
public int HistoryGap
{
get => _historyGap.Value;
set => _historyGap.Value = value;
}
/// <summary>
/// Additional samples kept in the rolling buffer.
/// </summary>
public int ExtraHistory
{
get => _extraHistory.Value;
set => _extraHistory.Value = value;
}
/// <summary>
/// Minimum number of seconds between two trades.
/// </summary>
public int CooldownSeconds
{
get => _cooldownSeconds.Value;
set => _cooldownSeconds.Value = value;
}
/// <summary>
/// Candle type used for price monitoring.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
private int HistoryCapacity => Math.Max(HistoryGap + ExtraHistory + 1, HistoryGap + 1);
/// <summary>
/// Initializes strategy parameters with sensible defaults.
/// </summary>
public PriceImpulseStrategy()
{
_orderVolume = Param(nameof(OrderVolume), 0.1m)
.SetDisplay("Order Volume", "Volume used for each market order", "Trading")
.SetGreaterThanZero()
.SetOptimize(0.1m, 2m, 0.1m);
_stopLossPoints = Param(nameof(StopLossPoints), 150)
.SetDisplay("Stop Loss Points", "Stop loss distance expressed in price points", "Risk")
.SetNotNegative()
.SetOptimize(50, 300, 50);
_takeProfitPoints = Param(nameof(TakeProfitPoints), 50)
.SetDisplay("Take Profit Points", "Take profit distance expressed in price points", "Risk")
.SetNotNegative()
.SetOptimize(10, 200, 10);
_impulsePoints = Param(nameof(ImpulsePoints), 15)
.SetDisplay("Impulse Points", "Minimum price impulse required to trade", "Signals")
.SetGreaterThanZero()
.SetOptimize(5, 40, 5);
_historyGap = Param(nameof(HistoryGap), 15)
.SetDisplay("Gap Candles", "Number of candles between comparison points", "Signals")
.SetNotNegative()
.SetOptimize(5, 40, 5);
_extraHistory = Param(nameof(ExtraHistory), 15)
.SetDisplay("Extra History", "Additional samples kept to absorb bursts", "Signals")
.SetNotNegative()
.SetOptimize(0, 30, 5);
_cooldownSeconds = Param(nameof(CooldownSeconds), 100)
.SetDisplay("Cooldown Seconds", "Minimum number of seconds between trades", "Risk")
.SetNotNegative()
.SetOptimize(0, 300, 20);
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Candle type for price tracking", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_priceHistory.Clear();
_tickSize = 0m;
_lastTradeTime = null;
_entryPrice = null;
_stopLossPrice = null;
_takeProfitPrice = null;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_tickSize = Security?.PriceStep ?? 1m;
if (_tickSize <= 0)
_tickSize = 1m;
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var currentPrice = candle.ClosePrice;
_priceHistory.Add(currentPrice);
var capacity = HistoryCapacity;
while (_priceHistory.Count > capacity)
_priceHistory.RemoveAt(0);
var candleTime = candle.CloseTime;
// Check SL/TP for existing positions.
if (Position > 0)
{
var stopLossPrice = _stopLossPrice;
var takeProfitPrice = _takeProfitPrice;
if (stopLossPrice is decimal longStop && candle.LowPrice <= longStop)
{ SellMarket(Position); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
if (takeProfitPrice is decimal longTake && candle.HighPrice >= longTake)
{ SellMarket(Position); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
}
else if (Position < 0)
{
var stopLossPrice = _stopLossPrice;
var takeProfitPrice = _takeProfitPrice;
if (stopLossPrice is decimal shortStop && candle.HighPrice >= shortStop)
{ BuyMarket(Math.Abs(Position)); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
if (takeProfitPrice is decimal shortTake && candle.LowPrice <= shortTake)
{ BuyMarket(Math.Abs(Position)); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
}
if (_priceHistory.Count <= HistoryGap)
return;
var impulseThreshold = ImpulsePoints * _tickSize;
var lastIndex = _priceHistory.Count - 1;
var compareIndex = lastIndex - HistoryGap;
if (compareIndex < 0) return;
var comparisonPrice = _priceHistory[compareIndex];
var upImpulse = currentPrice - comparisonPrice;
var downImpulse = comparisonPrice - currentPrice;
if (upImpulse > impulseThreshold && Position <= 0 && IsCooldownPassed(candleTime))
{
BuyMarket(OrderVolume);
_entryPrice = currentPrice;
_stopLossPrice = StopLossPoints > 0 ? currentPrice - StopLossPoints * _tickSize : null;
_takeProfitPrice = TakeProfitPoints > 0 ? currentPrice + TakeProfitPoints * _tickSize : null;
_lastTradeTime = candleTime;
return;
}
if (downImpulse > impulseThreshold && Position >= 0 && IsCooldownPassed(candleTime))
{
SellMarket(OrderVolume);
_entryPrice = currentPrice;
_stopLossPrice = StopLossPoints > 0 ? currentPrice + StopLossPoints * _tickSize : null;
_takeProfitPrice = TakeProfitPoints > 0 ? currentPrice - TakeProfitPoints * _tickSize : null;
_lastTradeTime = candleTime;
}
}
private bool IsCooldownPassed(DateTimeOffset time)
{
if (_lastTradeTime is null)
return true;
var cooldownSeconds = CooldownSeconds;
if (cooldownSeconds <= 0)
return true;
return time - _lastTradeTime.Value >= TimeSpan.FromSeconds(cooldownSeconds);
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from datatype_extensions import *
from indicator_extensions import *
class price_impulse_strategy(Strategy):
def __init__(self):
super(price_impulse_strategy, self).__init__()
self._sl_points = self.Param("StopLossPoints", 150).SetNotNegative().SetDisplay("Stop Loss Points", "Stop loss distance in price steps", "Risk")
self._tp_points = self.Param("TakeProfitPoints", 50).SetNotNegative().SetDisplay("Take Profit Points", "Take profit distance in price steps", "Risk")
self._impulse_points = self.Param("ImpulsePoints", 15).SetGreaterThanZero().SetDisplay("Impulse Points", "Minimum price impulse to trade", "Signals")
self._history_gap = self.Param("HistoryGap", 15).SetNotNegative().SetDisplay("Gap Candles", "Candles between comparison points", "Signals")
self._extra_history = self.Param("ExtraHistory", 15).SetNotNegative().SetDisplay("Extra History", "Additional buffer samples", "Signals")
self._cooldown_seconds = self.Param("CooldownSeconds", 100).SetNotNegative().SetDisplay("Cooldown Seconds", "Min seconds between trades", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))).SetDisplay("Candle Type", "Candle type for price tracking", "General")
@property
def CandleType(self): return self._candle_type.Value
@CandleType.setter
def CandleType(self, value): self._candle_type.Value = value
def OnReseted(self):
super(price_impulse_strategy, self).OnReseted()
self._price_history = []
self._tick_size = 0
self._last_trade_time = None
self._entry_price = None
self._stop_price = None
self._tp_price = None
def OnStarted2(self, time):
super(price_impulse_strategy, self).OnStarted2(time)
self._price_history = []
self._tick_size = 1.0
if self.Security is not None and self.Security.PriceStep is not None and self.Security.PriceStep > 0:
self._tick_size = float(self.Security.PriceStep)
self._last_trade_time = None
self._entry_price = None
self._stop_price = None
self._tp_price = None
sub = self.SubscribeCandles(self.CandleType)
sub.Bind(self.OnProcess).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, sub)
self.DrawOwnTrades(area)
def _history_capacity(self):
gap = self._history_gap.Value
extra = self._extra_history.Value
return max(gap + extra + 1, gap + 1)
def _is_cooldown_passed(self, candle_time):
if self._last_trade_time is None:
return True
cd = self._cooldown_seconds.Value
if cd <= 0:
return True
return (candle_time - self._last_trade_time) >= TimeSpan.FromSeconds(cd)
def OnProcess(self, candle):
if candle.State != CandleStates.Finished:
return
close = float(candle.ClosePrice)
self._price_history.append(close)
cap = self._history_capacity()
while len(self._price_history) > cap:
self._price_history.pop(0)
candle_time = candle.CloseTime
if self.Position > 0:
if self._stop_price is not None and candle.LowPrice <= self._stop_price:
self.SellMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
if self._tp_price is not None and candle.HighPrice >= self._tp_price:
self.SellMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
elif self.Position < 0:
if self._stop_price is not None and candle.HighPrice >= self._stop_price:
self.BuyMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
if self._tp_price is not None and candle.LowPrice <= self._tp_price:
self.BuyMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
gap = self._history_gap.Value
if len(self._price_history) <= gap:
return
impulse_threshold = self._impulse_points.Value * self._tick_size
last_idx = len(self._price_history) - 1
compare_idx = last_idx - gap
if compare_idx < 0:
return
comparison_price = self._price_history[compare_idx]
up_impulse = close - comparison_price
down_impulse = comparison_price - close
if up_impulse > impulse_threshold and self.Position <= 0 and self._is_cooldown_passed(candle_time):
self.BuyMarket()
self._entry_price = close
self._stop_price = close - self._sl_points.Value * self._tick_size if self._sl_points.Value > 0 else None
self._tp_price = close + self._tp_points.Value * self._tick_size if self._tp_points.Value > 0 else None
self._last_trade_time = candle_time
return
if down_impulse > impulse_threshold and self.Position >= 0 and self._is_cooldown_passed(candle_time):
self.SellMarket()
self._entry_price = close
self._stop_price = close + self._sl_points.Value * self._tick_size if self._sl_points.Value > 0 else None
self._tp_price = close - self._tp_points.Value * self._tick_size if self._tp_points.Value > 0 else None
self._last_trade_time = candle_time
def CreateClone(self):
return price_impulse_strategy()