大量订单影响分析策略
此策略基于 TradingView 脚本 "Volume Block Order Analyzer" 的简化实现。它计算成交量尖峰对价格方向的影响并随时间累加。当累计影响超过设定阈值时,策略开仓,同时使用百分比追踪止损保护持仓。
详情
- 入场:累计影响超过阈值。
- 出场:依据入场价的百分比追踪止损。
- 多空:双向。
- 指标:SMA。
- 时间框架:任意。
本策略仅保留核心逻辑,省略了原脚本的可视化部分。
namespace StockSharp.Samples.Strategies;
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
/// <summary>
/// Strategy that accumulates directional volume impact and trades on persistent imbalances.
/// </summary>
public class VolumeBlockOrderAnalyzerStrategy : Strategy
{
private readonly StrategyParam<decimal> _volumeThreshold;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _impactDecay;
private readonly StrategyParam<decimal> _impactNormalization;
private readonly StrategyParam<decimal> _signalThreshold;
private readonly StrategyParam<int> _signalCooldownBars;
private readonly StrategyParam<DataType> _candleType;
private decimal _cumulativeImpact;
private int _cooldownRemaining;
private readonly List<decimal> _volumeBuffer = new();
public decimal VolumeThreshold { get => _volumeThreshold.Value; set => _volumeThreshold.Value = value; }
public int LookbackPeriod { get => _lookbackPeriod.Value; set => _lookbackPeriod.Value = value; }
public decimal ImpactDecay { get => _impactDecay.Value; set => _impactDecay.Value = value; }
public decimal ImpactNormalization { get => _impactNormalization.Value; set => _impactNormalization.Value = value; }
public decimal SignalThreshold { get => _signalThreshold.Value; set => _signalThreshold.Value = value; }
public int SignalCooldownBars { get => _signalCooldownBars.Value; set => _signalCooldownBars.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public VolumeBlockOrderAnalyzerStrategy()
{
_volumeThreshold = Param(nameof(VolumeThreshold), 1.05m)
.SetDisplay("Volume Threshold", "Relative volume required for an impact update", "Volume")
.SetGreaterThanZero();
_lookbackPeriod = Param(nameof(LookbackPeriod), 20)
.SetDisplay("Lookback Period", "Lookback used for average volume", "Volume")
.SetGreaterThanZero();
_impactDecay = Param(nameof(ImpactDecay), 0.9m)
.SetDisplay("Impact Decay", "Decay applied to accumulated impact", "Impact");
_impactNormalization = Param(nameof(ImpactNormalization), 2m)
.SetDisplay("Impact Normalization", "Normalization applied to directional volume", "Impact")
.SetGreaterThanZero();
_signalThreshold = Param(nameof(SignalThreshold), 0.3m)
.SetDisplay("Signal Threshold", "Absolute impact required for a new trade", "Strategy");
_signalCooldownBars = Param(nameof(SignalCooldownBars), 10)
.SetDisplay("Signal Cooldown", "Bars to wait after entries and exits", "Strategy")
.SetGreaterThanZero();
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_cumulativeImpact = 0m;
_cooldownRemaining = 0;
_volumeBuffer.Clear();
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_cumulativeImpact = 0m;
_cooldownRemaining = 0;
_volumeBuffer.Clear();
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
StartProtection(
takeProfit: new Unit(2, UnitTypes.Percent),
stopLoss: new Unit(1, UnitTypes.Percent));
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
// Track volume for averaging
_volumeBuffer.Add(candle.TotalVolume);
if (_volumeBuffer.Count > LookbackPeriod)
_volumeBuffer.RemoveAt(0);
if (_volumeBuffer.Count < LookbackPeriod)
return;
// Calculate average volume
var sumVol = 0m;
for (var i = 0; i < _volumeBuffer.Count; i++)
sumVol += _volumeBuffer[i];
var averageVolume = sumVol / _volumeBuffer.Count;
var relativeVolume = averageVolume <= 0m ? 0m : candle.TotalVolume / averageVolume;
var directionalMove = candle.ClosePrice > candle.OpenPrice ? 1m : candle.ClosePrice < candle.OpenPrice ? -1m : 0m;
var impact = relativeVolume >= VolumeThreshold ? directionalMove * relativeVolume / ImpactNormalization : 0m;
_cumulativeImpact = _cumulativeImpact * ImpactDecay + impact;
if (Position != 0 || _cooldownRemaining > 0)
return;
if (_cumulativeImpact >= SignalThreshold)
{
BuyMarket();
_cooldownRemaining = SignalCooldownBars;
}
else if (_cumulativeImpact <= -SignalThreshold)
{
SellMarket();
_cooldownRemaining = SignalCooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Strategies import Strategy
class volume_block_order_analyzer_strategy(Strategy):
def __init__(self):
super(volume_block_order_analyzer_strategy, self).__init__()
self._volume_threshold = self.Param("VolumeThreshold", 1.05) \
.SetDisplay("Volume Threshold", "Relative volume required for an impact update", "Volume")
self._lookback_period = self.Param("LookbackPeriod", 20) \
.SetDisplay("Lookback Period", "Lookback used for average volume", "Volume")
self._impact_decay = self.Param("ImpactDecay", 0.9) \
.SetDisplay("Impact Decay", "Decay applied to accumulated impact", "Impact")
self._impact_normalization = self.Param("ImpactNormalization", 2.0) \
.SetDisplay("Impact Normalization", "Normalization applied to directional volume", "Impact")
self._signal_threshold = self.Param("SignalThreshold", 0.3) \
.SetDisplay("Signal Threshold", "Absolute impact required for a new trade", "Strategy")
self._signal_cooldown_bars = self.Param("SignalCooldownBars", 10) \
.SetDisplay("Signal Cooldown", "Bars to wait after entries and exits", "Strategy")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
@property
def volume_threshold(self):
return self._volume_threshold.Value
@property
def lookback_period(self):
return self._lookback_period.Value
@property
def impact_decay(self):
return self._impact_decay.Value
@property
def impact_normalization(self):
return self._impact_normalization.Value
@property
def signal_threshold(self):
return self._signal_threshold.Value
@property
def signal_cooldown_bars(self):
return self._signal_cooldown_bars.Value
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(volume_block_order_analyzer_strategy, self).OnReseted()
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
def OnStarted2(self, time):
super(volume_block_order_analyzer_strategy, self).OnStarted2(time)
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self.on_process).Start()
self.StartProtection(
takeProfit=Unit(2, UnitTypes.Percent),
stopLoss=Unit(1, UnitTypes.Percent))
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def on_process(self, candle):
if candle.State != CandleStates.Finished:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
self._volume_buffer.append(float(candle.TotalVolume))
if len(self._volume_buffer) > self.lookback_period:
self._volume_buffer.pop(0)
if len(self._volume_buffer) < self.lookback_period:
return
sum_vol = 0.0
for i in range(len(self._volume_buffer)):
sum_vol += self._volume_buffer[i]
average_volume = sum_vol / len(self._volume_buffer)
relative_volume = 0.0 if average_volume <= 0 else float(candle.TotalVolume) / average_volume
if candle.ClosePrice > candle.OpenPrice:
directional_move = 1.0
elif candle.ClosePrice < candle.OpenPrice:
directional_move = -1.0
else:
directional_move = 0.0
impact = (directional_move * relative_volume / self.impact_normalization) if relative_volume >= self.volume_threshold else 0.0
self._cumulative_impact = self._cumulative_impact * self.impact_decay + impact
if self.Position != 0 or self._cooldown_remaining > 0:
return
if self._cumulative_impact >= self.signal_threshold:
self.BuyMarket()
self._cooldown_remaining = self.signal_cooldown_bars
elif self._cumulative_impact <= -self.signal_threshold:
self.SellMarket()
self._cooldown_remaining = self.signal_cooldown_bars
def CreateClone(self):
return volume_block_order_analyzer_strategy()