成交量均值回归策略
该策略关注成交量相对其历史均值的异常高低。巨大成交量峰值往往在活动恢复正常后回落,提供逆势交易机会。
测试表明年均收益约为 76%,该策略在外汇市场表现最佳。
当成交量低于均值减 DeviationMultiplier 倍标准差且价格在均线下方时做多;当成交量高于上轨并且价格位于均线之上时做空。成交量回到均值附近即平仓。
此方法适合关注放量后的衰减行情,百分比止损可避免成交量持续扩大时产生过大损失。
详细信息
- 入场条件:
- 做多: Volume < Avg - DeviationMultiplier * StdDev && Close < MA
- 做空: Volume > Avg + DeviationMultiplier * StdDev && Close > MA
- 多空方向: 双向
- 退出条件:
- 做多: Exit when volume > Avg
- 做空: Exit when volume < Avg
- 止损: 是
- 默认值:
AveragePeriod= 20DeviationMultiplier= 2mCandleType= TimeSpan.FromMinutes(5)StopLossPercent= 2m
- 筛选条件:
- 类别: 均值回归
- 方向: 双向
- 指标: Volume
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内
- 季节性: 否
- 神经网络: 否
- 背离: 否
- 风险等级: 中等
namespace StockSharp.Samples.Strategies;
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo;
using StockSharp.Algo.Candles;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
/// <summary>
/// Volume Mean Reversion strategy.
/// This strategy enters positions when trading volume is significantly below or above its average value.
/// </summary>
public class VolumeMeanReversionStrategy : Strategy
{
private readonly StrategyParam<int> _averagePeriod;
private readonly StrategyParam<decimal> _deviationMultiplier;
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<decimal> _stopLossPercent;
private decimal _avgVolume;
private decimal _stdDevVolume;
private decimal _sumVolume;
private decimal _sumSquaresVolume;
private int _count;
private readonly Queue<decimal> _volumeValues = [];
/// <summary>
/// Period for calculating mean and standard deviation of Volume.
/// </summary>
public int AveragePeriod
{
get => _averagePeriod.Value;
set => _averagePeriod.Value = value;
}
/// <summary>
/// Deviation multiplier for entry signals.
/// </summary>
public decimal DeviationMultiplier
{
get => _deviationMultiplier.Value;
set => _deviationMultiplier.Value = value;
}
/// <summary>
/// Candle type.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Stop-loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public VolumeMeanReversionStrategy()
{
_averagePeriod = Param(nameof(AveragePeriod), 20)
.SetGreaterThanZero()
.SetOptimize(10, 50, 10)
.SetDisplay("Average Period", "Period for calculating Volume average and standard deviation", "Settings");
_deviationMultiplier = Param(nameof(DeviationMultiplier), 2m)
.SetGreaterThanZero()
.SetOptimize(1.5m, 3m, 0.5m)
.SetDisplay("Deviation Multiplier", "Multiplier for standard deviation", "Settings");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetGreaterThanZero()
.SetOptimize(1m, 3m, 0.5m)
.SetDisplay("Stop Loss %", "Stop loss as percentage of entry price", "Risk Management");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_avgVolume = 0;
_stdDevVolume = 0;
_sumVolume = 0;
_sumSquaresVolume = 0;
_count = 0;
_volumeValues.Clear();
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
// Create Volume indicator (for visualization)
var volume = new VolumeIndicator();
// Create subscription
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
// Setup chart visualization
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, volume);
DrawOwnTrades(area);
// Create additional area for volume
var volumeArea = CreateChartArea();
if (volumeArea != null)
DrawIndicator(volumeArea, volume);
}
// Enable position protection
StartProtection(
takeProfit: new Unit(0m), // We'll manage exits ourselves based on Volume
stopLoss: new Unit(StopLossPercent, UnitTypes.Percent)
);
base.OnStarted2(time);
}
private void ProcessCandle(ICandleMessage candle)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready to trade
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Extract Volume value (for candles, this is TotalVolume)
var currentVolume = candle.TotalVolume;
// Update Volume statistics
UpdateVolumeStatistics(currentVolume);
// If we don't have enough data yet for statistics
if (_count < AveragePeriod)
return;
// For volume-based strategies, price direction is important
var priceDirection = candle.ClosePrice > candle.OpenPrice ? Sides.Buy : Sides.Sell;
// Check for entry conditions
if (Position == 0)
{
// Volume is significantly below average - expecting a return to average trading activity
if (currentVolume < _avgVolume - DeviationMultiplier * _stdDevVolume)
{
// In low volume environments, we might look for potential market accumulation
// and follow the small price movement which could be institutional accumulation
if (priceDirection == Sides.Buy)
{
BuyMarket(Volume);
LogInfo($"Long entry: Volume = {currentVolume}, Avg = {_avgVolume}, StdDev = {_stdDevVolume}, Low volume with price up");
}
else
{
SellMarket(Volume);
LogInfo($"Short entry: Volume = {currentVolume}, Avg = {_avgVolume}, StdDev = {_stdDevVolume}, Low volume with price down");
}
}
// Volume is significantly above average - potential high volume climax
else if (currentVolume > _avgVolume + DeviationMultiplier * _stdDevVolume)
{
// High volume often indicates climactic moves that might reverse
// So we consider going against the price direction on high volume bars
if (priceDirection == Sides.Sell)
{
BuyMarket(Volume);
LogInfo($"Contrarian long entry: Volume = {currentVolume}, Avg = {_avgVolume}, StdDev = {_stdDevVolume}, High volume with price down");
}
else
{
SellMarket(Volume);
LogInfo($"Contrarian short entry: Volume = {currentVolume}, Avg = {_avgVolume}, StdDev = {_stdDevVolume}, High volume with price up");
}
}
}
// Check for exit conditions
else if (Position > 0) // Long position
{
// Exit long position when volume returns to average
if (currentVolume > _avgVolume || (currentVolume > _avgVolume * 0.8m && priceDirection == Sides.Sell))
{
ClosePosition();
LogInfo($"Long exit: Volume = {currentVolume}, Avg = {_avgVolume}");
}
}
else if (Position < 0) // Short position
{
// Exit short position when volume returns to average
if (currentVolume > _avgVolume || (currentVolume > _avgVolume * 0.8m && priceDirection == Sides.Buy))
{
ClosePosition();
LogInfo($"Short exit: Volume = {currentVolume}, Avg = {_avgVolume}");
}
}
}
private void UpdateVolumeStatistics(decimal currentVolume)
{
// Add current value to the queue
_volumeValues.Enqueue(currentVolume);
_sumVolume += currentVolume;
_sumSquaresVolume += currentVolume * currentVolume;
_count++;
// If queue is larger than period, remove oldest value
if (_volumeValues.Count > AveragePeriod)
{
var oldestVolume = _volumeValues.Dequeue();
_sumVolume -= oldestVolume;
_sumSquaresVolume -= oldestVolume * oldestVolume;
_count--;
}
// Calculate average and standard deviation
if (_count > 0)
{
_avgVolume = _sumVolume / _count;
if (_count > 1)
{
var variance = (_sumSquaresVolume - (_sumVolume * _sumVolume) / _count) / (_count - 1);
_stdDevVolume = variance <= 0 ? 0 : (decimal)Math.Sqrt((double)variance);
}
else
{
_stdDevVolume = 0;
}
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, Unit, UnitTypes, ICandleMessage, CandleStates, Sides
from StockSharp.Algo.Indicators import VolumeIndicator
from StockSharp.Algo.Strategies import Strategy
from datatype_extensions import *
class volume_mean_reversion_strategy(Strategy):
"""
Volume Mean Reversion strategy.
This strategy enters positions when trading volume is significantly below or above its average value.
"""
def __init__(self):
super(volume_mean_reversion_strategy, self).__init__()
# Period for calculating mean and standard deviation of Volume.
self._average_period = self.Param("AveragePeriod", 20) \
.SetGreaterThanZero() \
.SetCanOptimize(True) \
.SetOptimize(10, 50, 10) \
.SetDisplay("Average Period", "Period for calculating Volume average and standard deviation", "Settings")
# Deviation multiplier for entry signals.
self._deviation_multiplier = self.Param("DeviationMultiplier", 2.0) \
.SetGreaterThanZero() \
.SetCanOptimize(True) \
.SetOptimize(1.5, 3.0, 0.5) \
.SetDisplay("Deviation Multiplier", "Multiplier for standard deviation", "Settings")
# Candle type.
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
# Stop-loss percentage.
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetGreaterThanZero() \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5) \
.SetDisplay("Stop Loss %", "Stop loss as percentage of entry price", "Risk Management")
# Internal state variables
self._avg_volume = 0.0
self._std_dev_volume = 0.0
self._sum_volume = 0.0
self._sum_squares_volume = 0.0
self._count = 0
self._volume_values = []
@property
def AveragePeriod(self):
return self._average_period.Value
@AveragePeriod.setter
def AveragePeriod(self, value):
self._average_period.Value = value
@property
def DeviationMultiplier(self):
return self._deviation_multiplier.Value
@DeviationMultiplier.setter
def DeviationMultiplier(self, value):
self._deviation_multiplier.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
@property
def StopLossPercent(self):
return self._stop_loss_percent.Value
@StopLossPercent.setter
def StopLossPercent(self, value):
self._stop_loss_percent.Value = value
def GetWorkingSecurities(self):
"""Return security and timeframe used by the strategy."""
return [(self.Security, self.CandleType)]
def OnReseted(self):
super(volume_mean_reversion_strategy, self).OnReseted()
self._avg_volume = 0.0
self._std_dev_volume = 0.0
self._sum_volume = 0.0
self._sum_squares_volume = 0.0
self._count = 0
self._volume_values = []
def OnStarted2(self, time):
"""
Initialize indicators, subscriptions and charting.
"""
super(volume_mean_reversion_strategy, self).OnStarted2(time)
# Create Volume indicator (for visualization)
volume = VolumeIndicator()
# Create subscription
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self.ProcessCandle).Start()
# Setup chart visualization
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, volume)
self.DrawOwnTrades(area)
# Create additional area for volume
volume_area = self.CreateChartArea()
if volume_area is not None:
self.DrawIndicator(volume_area, volume)
# Enable position protection
self.StartProtection(
takeProfit=Unit(0),
stopLoss=Unit(self.StopLossPercent, UnitTypes.Percent)
)
def ProcessCandle(self, candle):
"""
Process candle and execute trading logic based on volume statistics.
"""
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
# Check if strategy is ready to trade
# Extract Volume value (for candles, this is TotalVolume)
current_volume = float(candle.TotalVolume)
# Update Volume statistics
self.UpdateVolumeStatistics(current_volume)
# If we don't have enough data yet for statistics
if self._count < self.AveragePeriod:
return
# For volume-based strategies, price direction is important
price_direction = Sides.Buy if candle.ClosePrice > candle.OpenPrice else Sides.Sell
# Check for entry conditions
if self.Position == 0:
# Volume is significantly below average - expecting a return to average trading activity
if current_volume < self._avg_volume - self.DeviationMultiplier * self._std_dev_volume:
# In low volume environments, we might look for potential market accumulation
# and follow the small price movement which could be institutional accumulation
if price_direction == Sides.Buy:
self.BuyMarket(self.Volume)
self.LogInfo(
"Long entry: Volume = {0}, Avg = {1}, StdDev = {2}, Low volume with price up".format(
current_volume, self._avg_volume, self._std_dev_volume))
else:
self.SellMarket(self.Volume)
self.LogInfo(
"Short entry: Volume = {0}, Avg = {1}, StdDev = {2}, Low volume with price down".format(
current_volume, self._avg_volume, self._std_dev_volume))
# Volume is significantly above average - potential high volume climax
elif current_volume > self._avg_volume + self.DeviationMultiplier * self._std_dev_volume:
# High volume often indicates climactic moves that might reverse
# So we consider going against the price direction on high volume bars
if price_direction == Sides.Sell:
self.BuyMarket(self.Volume)
self.LogInfo(
"Contrarian long entry: Volume = {0}, Avg = {1}, StdDev = {2}, High volume with price down".format(
current_volume, self._avg_volume, self._std_dev_volume))
else:
self.SellMarket(self.Volume)
self.LogInfo(
"Contrarian short entry: Volume = {0}, Avg = {1}, StdDev = {2}, High volume with price up".format(
current_volume, self._avg_volume, self._std_dev_volume))
# Check for exit conditions
elif self.Position > 0: # Long position
# Exit long position when volume returns to average
if current_volume > self._avg_volume or (
current_volume > self._avg_volume * 0.8 and price_direction == Sides.Sell):
self.ClosePosition()
self.LogInfo("Long exit: Volume = {0}, Avg = {1}".format(current_volume, self._avg_volume))
elif self.Position < 0: # Short position
# Exit short position when volume returns to average
if current_volume > self._avg_volume or (
current_volume > self._avg_volume * 0.8 and price_direction == Sides.Buy):
self.ClosePosition()
self.LogInfo("Short exit: Volume = {0}, Avg = {1}".format(current_volume, self._avg_volume))
def UpdateVolumeStatistics(self, current_volume):
"""Update internal statistics for volume calculations."""
# Add current value to the queue
self._volume_values.append(current_volume)
self._sum_volume += current_volume
self._sum_squares_volume += current_volume * current_volume
self._count += 1
# If queue is larger than period, remove oldest value
if len(self._volume_values) > self.AveragePeriod:
oldest_volume = self._volume_values.pop(0)
self._sum_volume -= oldest_volume
self._sum_squares_volume -= oldest_volume * oldest_volume
self._count -= 1
# Calculate average and standard deviation
if self._count > 0:
self._avg_volume = self._sum_volume / self._count
if self._count > 1:
variance = (self._sum_squares_volume - (self._sum_volume * self._sum_volume) / self._count) / (self._count - 1)
self._std_dev_volume = 0 if variance <= 0 else Math.Sqrt(float(variance))
else:
self._std_dev_volume = 0
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return volume_mean_reversion_strategy()