Volume Block Order Analyzer Strategy
Simplified strategy based on the TradingView script "Volume Block Order Analyzer". It measures how large volume spikes impact price direction and accumulates this effect over time. When the cumulative impact crosses user-defined thresholds, the strategy enters trades and protects them with a trailing stop.
Details
- Entry: Cumulative impact above or below threshold.
- Exit: Trailing stop based on percentage from entry.
- Long/Short: Both.
- Indicators: SMA.
- Timeframe: Any.
This port focuses on the core idea; many visual features of the original script are omitted.
namespace StockSharp.Samples.Strategies;
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
/// <summary>
/// Strategy that accumulates directional volume impact and trades on persistent imbalances.
/// </summary>
public class VolumeBlockOrderAnalyzerStrategy : Strategy
{
private readonly StrategyParam<decimal> _volumeThreshold;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _impactDecay;
private readonly StrategyParam<decimal> _impactNormalization;
private readonly StrategyParam<decimal> _signalThreshold;
private readonly StrategyParam<int> _signalCooldownBars;
private readonly StrategyParam<DataType> _candleType;
private decimal _cumulativeImpact;
private int _cooldownRemaining;
private readonly List<decimal> _volumeBuffer = new();
public decimal VolumeThreshold { get => _volumeThreshold.Value; set => _volumeThreshold.Value = value; }
public int LookbackPeriod { get => _lookbackPeriod.Value; set => _lookbackPeriod.Value = value; }
public decimal ImpactDecay { get => _impactDecay.Value; set => _impactDecay.Value = value; }
public decimal ImpactNormalization { get => _impactNormalization.Value; set => _impactNormalization.Value = value; }
public decimal SignalThreshold { get => _signalThreshold.Value; set => _signalThreshold.Value = value; }
public int SignalCooldownBars { get => _signalCooldownBars.Value; set => _signalCooldownBars.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public VolumeBlockOrderAnalyzerStrategy()
{
_volumeThreshold = Param(nameof(VolumeThreshold), 1.05m)
.SetDisplay("Volume Threshold", "Relative volume required for an impact update", "Volume")
.SetGreaterThanZero();
_lookbackPeriod = Param(nameof(LookbackPeriod), 20)
.SetDisplay("Lookback Period", "Lookback used for average volume", "Volume")
.SetGreaterThanZero();
_impactDecay = Param(nameof(ImpactDecay), 0.9m)
.SetDisplay("Impact Decay", "Decay applied to accumulated impact", "Impact");
_impactNormalization = Param(nameof(ImpactNormalization), 2m)
.SetDisplay("Impact Normalization", "Normalization applied to directional volume", "Impact")
.SetGreaterThanZero();
_signalThreshold = Param(nameof(SignalThreshold), 0.3m)
.SetDisplay("Signal Threshold", "Absolute impact required for a new trade", "Strategy");
_signalCooldownBars = Param(nameof(SignalCooldownBars), 10)
.SetDisplay("Signal Cooldown", "Bars to wait after entries and exits", "Strategy")
.SetGreaterThanZero();
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_cumulativeImpact = 0m;
_cooldownRemaining = 0;
_volumeBuffer.Clear();
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_cumulativeImpact = 0m;
_cooldownRemaining = 0;
_volumeBuffer.Clear();
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
StartProtection(
takeProfit: new Unit(2, UnitTypes.Percent),
stopLoss: new Unit(1, UnitTypes.Percent));
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
// Track volume for averaging
_volumeBuffer.Add(candle.TotalVolume);
if (_volumeBuffer.Count > LookbackPeriod)
_volumeBuffer.RemoveAt(0);
if (_volumeBuffer.Count < LookbackPeriod)
return;
// Calculate average volume
var sumVol = 0m;
for (var i = 0; i < _volumeBuffer.Count; i++)
sumVol += _volumeBuffer[i];
var averageVolume = sumVol / _volumeBuffer.Count;
var relativeVolume = averageVolume <= 0m ? 0m : candle.TotalVolume / averageVolume;
var directionalMove = candle.ClosePrice > candle.OpenPrice ? 1m : candle.ClosePrice < candle.OpenPrice ? -1m : 0m;
var impact = relativeVolume >= VolumeThreshold ? directionalMove * relativeVolume / ImpactNormalization : 0m;
_cumulativeImpact = _cumulativeImpact * ImpactDecay + impact;
if (Position != 0 || _cooldownRemaining > 0)
return;
if (_cumulativeImpact >= SignalThreshold)
{
BuyMarket();
_cooldownRemaining = SignalCooldownBars;
}
else if (_cumulativeImpact <= -SignalThreshold)
{
SellMarket();
_cooldownRemaining = SignalCooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Strategies import Strategy
class volume_block_order_analyzer_strategy(Strategy):
def __init__(self):
super(volume_block_order_analyzer_strategy, self).__init__()
self._volume_threshold = self.Param("VolumeThreshold", 1.05) \
.SetDisplay("Volume Threshold", "Relative volume required for an impact update", "Volume")
self._lookback_period = self.Param("LookbackPeriod", 20) \
.SetDisplay("Lookback Period", "Lookback used for average volume", "Volume")
self._impact_decay = self.Param("ImpactDecay", 0.9) \
.SetDisplay("Impact Decay", "Decay applied to accumulated impact", "Impact")
self._impact_normalization = self.Param("ImpactNormalization", 2.0) \
.SetDisplay("Impact Normalization", "Normalization applied to directional volume", "Impact")
self._signal_threshold = self.Param("SignalThreshold", 0.3) \
.SetDisplay("Signal Threshold", "Absolute impact required for a new trade", "Strategy")
self._signal_cooldown_bars = self.Param("SignalCooldownBars", 10) \
.SetDisplay("Signal Cooldown", "Bars to wait after entries and exits", "Strategy")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
@property
def volume_threshold(self):
return self._volume_threshold.Value
@property
def lookback_period(self):
return self._lookback_period.Value
@property
def impact_decay(self):
return self._impact_decay.Value
@property
def impact_normalization(self):
return self._impact_normalization.Value
@property
def signal_threshold(self):
return self._signal_threshold.Value
@property
def signal_cooldown_bars(self):
return self._signal_cooldown_bars.Value
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(volume_block_order_analyzer_strategy, self).OnReseted()
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
def OnStarted2(self, time):
super(volume_block_order_analyzer_strategy, self).OnStarted2(time)
self._cumulative_impact = 0.0
self._cooldown_remaining = 0
self._volume_buffer = []
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self.on_process).Start()
self.StartProtection(
takeProfit=Unit(2, UnitTypes.Percent),
stopLoss=Unit(1, UnitTypes.Percent))
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def on_process(self, candle):
if candle.State != CandleStates.Finished:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
self._volume_buffer.append(float(candle.TotalVolume))
if len(self._volume_buffer) > self.lookback_period:
self._volume_buffer.pop(0)
if len(self._volume_buffer) < self.lookback_period:
return
sum_vol = 0.0
for i in range(len(self._volume_buffer)):
sum_vol += self._volume_buffer[i]
average_volume = sum_vol / len(self._volume_buffer)
relative_volume = 0.0 if average_volume <= 0 else float(candle.TotalVolume) / average_volume
if candle.ClosePrice > candle.OpenPrice:
directional_move = 1.0
elif candle.ClosePrice < candle.OpenPrice:
directional_move = -1.0
else:
directional_move = 0.0
impact = (directional_move * relative_volume / self.impact_normalization) if relative_volume >= self.volume_threshold else 0.0
self._cumulative_impact = self._cumulative_impact * self.impact_decay + impact
if self.Position != 0 or self._cooldown_remaining > 0:
return
if self._cumulative_impact >= self.signal_threshold:
self.BuyMarket()
self._cooldown_remaining = self.signal_cooldown_bars
elif self._cumulative_impact <= -self.signal_threshold:
self.SellMarket()
self._cooldown_remaining = self.signal_cooldown_bars
def CreateClone(self):
return volume_block_order_analyzer_strategy()