Price Impulse 策略
Price Impulse 策略直接订阅 Level1 行情,监控最佳买卖价的瞬间跳动。它完整复刻了原始 MT5 专家顾问:比较当前报价与若干 tick 之前的价格,只要差值超过指定点数阈值就立即入场。同时,通过高阶 StartProtection 接口自动设置固定的止损和止盈,无需额外的手动挂单。
策略保持多空平衡:当卖价相对旧报价出现显著上涨且当前没有多头敞口时买入;当买价急剧下跌且没有空头敞口时卖出。CooldownSeconds 参数提供的冷却时间与 MQL 版本的 InpSleep 一致,防止策略在单次冲击后频繁翻仓。
工作流程
- 订阅 Level1 数据并维护最佳买价与最佳卖价的滚动历史。
- 计算最新报价与
HistoryGap个 tick 之前报价之间的差值,ExtraHistory提供额外缓冲以处理突发的连续报价。 - 当卖价上涨超过
ImpulsePoints * PriceStep且未持有多头仓位时开多单。 - 当买价下跌超过同样的阈值且未持有空头仓位时开空单。
- 以点数形式应用固定的止盈止损,并在两次交易之间强制等待
CooldownSeconds秒。
参数说明
- OrderVolume – 每次市价单的成交量。默认值
0.1对应原始 EA,可根据标的自行优化。 - StopLossPoints – 入场价到止损位的距离(点)。设置为
0时不启用止损。 - TakeProfitPoints – 入场价到止盈位的距离(点)。设置为
0时不启用止盈。 - ImpulsePoints – 触发入场所需的最小价格冲击(点),比较的是当前报价与
HistoryGaptick 前的报价。 - HistoryGap – 当前报价与对比基准之间的 tick 间隔。数值越大,信号越平滑但响应越慢。
- ExtraHistory – 额外保留的报价数量,用于吸收一次回调中到达的多条行情,保持与 MT5 版“超量”缓存一致。
- CooldownSeconds – 每次交易后必须等待的秒数。与 MQL 参数
InpSleep等价,可避免策略在震荡行情中不断进出。
备注
- 所有以点数表示的距离都会自动乘以
Security.PriceStep(若不存在则回退到Security.MinPriceStep),从而适配不同 tick 大小的品种。 - 只有在策略连接正常、历史缓存满足
HistoryGap要求并且冲击条件成立时才会下单。 - 该策略对 Level1 数据的质量要求较高,更适合流动性充足的市场。
- 本目录仅包含 C# 版本,暂未提供 Python 实现,符合任务要求。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Price impulse strategy that trades on rapid price moves using candle close prices.
/// </summary>
public class PriceImpulseStrategy : Strategy
{
private readonly StrategyParam<decimal> _orderVolume;
private readonly StrategyParam<int> _stopLossPoints;
private readonly StrategyParam<int> _takeProfitPoints;
private readonly StrategyParam<int> _impulsePoints;
private readonly StrategyParam<int> _historyGap;
private readonly StrategyParam<int> _extraHistory;
private readonly StrategyParam<int> _cooldownSeconds;
private readonly StrategyParam<DataType> _candleType;
private readonly List<decimal> _priceHistory = [];
private decimal _tickSize;
private DateTimeOffset? _lastTradeTime;
private decimal? _entryPrice;
private decimal? _stopLossPrice;
private decimal? _takeProfitPrice;
/// <summary>
/// Volume used for each market order.
/// </summary>
public decimal OrderVolume
{
get => _orderVolume.Value;
set => _orderVolume.Value = value;
}
/// <summary>
/// Stop loss distance expressed in price points.
/// </summary>
public int StopLossPoints
{
get => _stopLossPoints.Value;
set => _stopLossPoints.Value = value;
}
/// <summary>
/// Take profit distance expressed in price points.
/// </summary>
public int TakeProfitPoints
{
get => _takeProfitPoints.Value;
set => _takeProfitPoints.Value = value;
}
/// <summary>
/// Minimum impulse measured in price points to trigger a trade.
/// </summary>
public int ImpulsePoints
{
get => _impulsePoints.Value;
set => _impulsePoints.Value = value;
}
/// <summary>
/// Number of candles between price comparisons.
/// </summary>
public int HistoryGap
{
get => _historyGap.Value;
set => _historyGap.Value = value;
}
/// <summary>
/// Additional samples kept in the rolling buffer.
/// </summary>
public int ExtraHistory
{
get => _extraHistory.Value;
set => _extraHistory.Value = value;
}
/// <summary>
/// Minimum number of seconds between two trades.
/// </summary>
public int CooldownSeconds
{
get => _cooldownSeconds.Value;
set => _cooldownSeconds.Value = value;
}
/// <summary>
/// Candle type used for price monitoring.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
private int HistoryCapacity => Math.Max(HistoryGap + ExtraHistory + 1, HistoryGap + 1);
/// <summary>
/// Initializes strategy parameters with sensible defaults.
/// </summary>
public PriceImpulseStrategy()
{
_orderVolume = Param(nameof(OrderVolume), 0.1m)
.SetDisplay("Order Volume", "Volume used for each market order", "Trading")
.SetGreaterThanZero()
.SetOptimize(0.1m, 2m, 0.1m);
_stopLossPoints = Param(nameof(StopLossPoints), 150)
.SetDisplay("Stop Loss Points", "Stop loss distance expressed in price points", "Risk")
.SetNotNegative()
.SetOptimize(50, 300, 50);
_takeProfitPoints = Param(nameof(TakeProfitPoints), 50)
.SetDisplay("Take Profit Points", "Take profit distance expressed in price points", "Risk")
.SetNotNegative()
.SetOptimize(10, 200, 10);
_impulsePoints = Param(nameof(ImpulsePoints), 15)
.SetDisplay("Impulse Points", "Minimum price impulse required to trade", "Signals")
.SetGreaterThanZero()
.SetOptimize(5, 40, 5);
_historyGap = Param(nameof(HistoryGap), 15)
.SetDisplay("Gap Candles", "Number of candles between comparison points", "Signals")
.SetNotNegative()
.SetOptimize(5, 40, 5);
_extraHistory = Param(nameof(ExtraHistory), 15)
.SetDisplay("Extra History", "Additional samples kept to absorb bursts", "Signals")
.SetNotNegative()
.SetOptimize(0, 30, 5);
_cooldownSeconds = Param(nameof(CooldownSeconds), 100)
.SetDisplay("Cooldown Seconds", "Minimum number of seconds between trades", "Risk")
.SetNotNegative()
.SetOptimize(0, 300, 20);
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Candle type for price tracking", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_priceHistory.Clear();
_tickSize = 0m;
_lastTradeTime = null;
_entryPrice = null;
_stopLossPrice = null;
_takeProfitPrice = null;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_tickSize = Security?.PriceStep ?? 1m;
if (_tickSize <= 0)
_tickSize = 1m;
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var currentPrice = candle.ClosePrice;
_priceHistory.Add(currentPrice);
var capacity = HistoryCapacity;
while (_priceHistory.Count > capacity)
_priceHistory.RemoveAt(0);
var candleTime = candle.CloseTime;
// Check SL/TP for existing positions.
if (Position > 0)
{
var stopLossPrice = _stopLossPrice;
var takeProfitPrice = _takeProfitPrice;
if (stopLossPrice is decimal longStop && candle.LowPrice <= longStop)
{ SellMarket(Position); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
if (takeProfitPrice is decimal longTake && candle.HighPrice >= longTake)
{ SellMarket(Position); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
}
else if (Position < 0)
{
var stopLossPrice = _stopLossPrice;
var takeProfitPrice = _takeProfitPrice;
if (stopLossPrice is decimal shortStop && candle.HighPrice >= shortStop)
{ BuyMarket(Math.Abs(Position)); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
if (takeProfitPrice is decimal shortTake && candle.LowPrice <= shortTake)
{ BuyMarket(Math.Abs(Position)); _entryPrice = null; _stopLossPrice = null; _takeProfitPrice = null; return; }
}
if (_priceHistory.Count <= HistoryGap)
return;
var impulseThreshold = ImpulsePoints * _tickSize;
var lastIndex = _priceHistory.Count - 1;
var compareIndex = lastIndex - HistoryGap;
if (compareIndex < 0) return;
var comparisonPrice = _priceHistory[compareIndex];
var upImpulse = currentPrice - comparisonPrice;
var downImpulse = comparisonPrice - currentPrice;
if (upImpulse > impulseThreshold && Position <= 0 && IsCooldownPassed(candleTime))
{
BuyMarket(OrderVolume);
_entryPrice = currentPrice;
_stopLossPrice = StopLossPoints > 0 ? currentPrice - StopLossPoints * _tickSize : null;
_takeProfitPrice = TakeProfitPoints > 0 ? currentPrice + TakeProfitPoints * _tickSize : null;
_lastTradeTime = candleTime;
return;
}
if (downImpulse > impulseThreshold && Position >= 0 && IsCooldownPassed(candleTime))
{
SellMarket(OrderVolume);
_entryPrice = currentPrice;
_stopLossPrice = StopLossPoints > 0 ? currentPrice + StopLossPoints * _tickSize : null;
_takeProfitPrice = TakeProfitPoints > 0 ? currentPrice - TakeProfitPoints * _tickSize : null;
_lastTradeTime = candleTime;
}
}
private bool IsCooldownPassed(DateTimeOffset time)
{
if (_lastTradeTime is null)
return true;
var cooldownSeconds = CooldownSeconds;
if (cooldownSeconds <= 0)
return true;
return time - _lastTradeTime.Value >= TimeSpan.FromSeconds(cooldownSeconds);
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from datatype_extensions import *
from indicator_extensions import *
class price_impulse_strategy(Strategy):
def __init__(self):
super(price_impulse_strategy, self).__init__()
self._sl_points = self.Param("StopLossPoints", 150).SetNotNegative().SetDisplay("Stop Loss Points", "Stop loss distance in price steps", "Risk")
self._tp_points = self.Param("TakeProfitPoints", 50).SetNotNegative().SetDisplay("Take Profit Points", "Take profit distance in price steps", "Risk")
self._impulse_points = self.Param("ImpulsePoints", 15).SetGreaterThanZero().SetDisplay("Impulse Points", "Minimum price impulse to trade", "Signals")
self._history_gap = self.Param("HistoryGap", 15).SetNotNegative().SetDisplay("Gap Candles", "Candles between comparison points", "Signals")
self._extra_history = self.Param("ExtraHistory", 15).SetNotNegative().SetDisplay("Extra History", "Additional buffer samples", "Signals")
self._cooldown_seconds = self.Param("CooldownSeconds", 100).SetNotNegative().SetDisplay("Cooldown Seconds", "Min seconds between trades", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))).SetDisplay("Candle Type", "Candle type for price tracking", "General")
@property
def CandleType(self): return self._candle_type.Value
@CandleType.setter
def CandleType(self, value): self._candle_type.Value = value
def OnReseted(self):
super(price_impulse_strategy, self).OnReseted()
self._price_history = []
self._tick_size = 0
self._last_trade_time = None
self._entry_price = None
self._stop_price = None
self._tp_price = None
def OnStarted2(self, time):
super(price_impulse_strategy, self).OnStarted2(time)
self._price_history = []
self._tick_size = 1.0
if self.Security is not None and self.Security.PriceStep is not None and self.Security.PriceStep > 0:
self._tick_size = float(self.Security.PriceStep)
self._last_trade_time = None
self._entry_price = None
self._stop_price = None
self._tp_price = None
sub = self.SubscribeCandles(self.CandleType)
sub.Bind(self.OnProcess).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, sub)
self.DrawOwnTrades(area)
def _history_capacity(self):
gap = self._history_gap.Value
extra = self._extra_history.Value
return max(gap + extra + 1, gap + 1)
def _is_cooldown_passed(self, candle_time):
if self._last_trade_time is None:
return True
cd = self._cooldown_seconds.Value
if cd <= 0:
return True
return (candle_time - self._last_trade_time) >= TimeSpan.FromSeconds(cd)
def OnProcess(self, candle):
if candle.State != CandleStates.Finished:
return
close = float(candle.ClosePrice)
self._price_history.append(close)
cap = self._history_capacity()
while len(self._price_history) > cap:
self._price_history.pop(0)
candle_time = candle.CloseTime
if self.Position > 0:
if self._stop_price is not None and candle.LowPrice <= self._stop_price:
self.SellMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
if self._tp_price is not None and candle.HighPrice >= self._tp_price:
self.SellMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
elif self.Position < 0:
if self._stop_price is not None and candle.HighPrice >= self._stop_price:
self.BuyMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
if self._tp_price is not None and candle.LowPrice <= self._tp_price:
self.BuyMarket()
self._entry_price = None
self._stop_price = None
self._tp_price = None
return
gap = self._history_gap.Value
if len(self._price_history) <= gap:
return
impulse_threshold = self._impulse_points.Value * self._tick_size
last_idx = len(self._price_history) - 1
compare_idx = last_idx - gap
if compare_idx < 0:
return
comparison_price = self._price_history[compare_idx]
up_impulse = close - comparison_price
down_impulse = comparison_price - close
if up_impulse > impulse_threshold and self.Position <= 0 and self._is_cooldown_passed(candle_time):
self.BuyMarket()
self._entry_price = close
self._stop_price = close - self._sl_points.Value * self._tick_size if self._sl_points.Value > 0 else None
self._tp_price = close + self._tp_points.Value * self._tick_size if self._tp_points.Value > 0 else None
self._last_trade_time = candle_time
return
if down_impulse > impulse_threshold and self.Position >= 0 and self._is_cooldown_passed(candle_time):
self.SellMarket()
self._entry_price = close
self._stop_price = close + self._sl_points.Value * self._tick_size if self._sl_points.Value > 0 else None
self._tp_price = close - self._tp_points.Value * self._tick_size if self._tp_points.Value > 0 else None
self._last_trade_time = candle_time
def CreateClone(self):
return price_impulse_strategy()