VWAP Hidden Markov Model
The VWAP Hidden Markov Model strategy is built around that trades based on VWAP with Hidden Markov Model for market state detection.
Testing indicates an average annual return of about 100%. It performs best in the forex market.
Signals trigger when Markov confirms trend changes on intraday (5m) data. This makes the method suitable for active traders.
Stops rely on ATR multiples and factors like HmmDataLength, StopLossPercent. Adjust these defaults to balance risk and reward.
Details
- Entry Criteria: see implementation for indicator conditions.
- Long/Short: Both directions.
- Exit Criteria: opposite signal or stop logic.
- Stops: Yes, using indicator-based calculations.
- Default Values:
HmmDataLength = 100StopLossPercent = 2mCandleType = TimeSpan.FromMinutes(5).TimeFrame()
- Filters:
- Category: Trend following
- Direction: Both
- Indicators: Markov
- Stops: Yes
- Complexity: Intermediate
- Timeframe: Intraday (5m)
- Seasonality: No
- Neural Networks: Yes
- Divergence: No
- Risk Level: Medium
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that trades based on VWAP with Hidden Markov Model for market state detection.
/// </summary>
public class VwapHiddenMarkovModelStrategy : Strategy
{
private readonly StrategyParam<int> _hmmDataLength;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<DataType> _candleType;
private enum MarketStates
{
Neutral,
Bullish,
Bearish
}
private MarketStates _currentMarketState = MarketStates.Neutral;
// Feature data for HMM
private readonly Queue<decimal> _priceData = [];
private readonly Queue<decimal> _volumeData = [];
// Transition probabilities
private readonly decimal[,] _transitionMatrix = new decimal[3, 3]
{
{ 0.8m, 0.1m, 0.1m }, // Neutral -> Neutral, Bullish, Bearish
{ 0.2m, 0.7m, 0.1m }, // Bullish -> Neutral, Bullish, Bearish
{ 0.2m, 0.1m, 0.7m } // Bearish -> Neutral, Bullish, Bearish
};
/// <summary>
/// Strategy parameter: Length of data to use for HMM.
/// </summary>
public int HmmDataLength
{
get => _hmmDataLength.Value;
set => _hmmDataLength.Value = value;
}
/// <summary>
/// Strategy parameter: Stop-loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Strategy parameter: Candle type.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public VwapHiddenMarkovModelStrategy()
{
_hmmDataLength = Param(nameof(HmmDataLength), 100)
.SetGreaterThanZero()
.SetDisplay("HMM Data Length", "Number of periods to use for HMM", "HMM Settings");
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetGreaterThanZero()
.SetDisplay("Stop Loss %", "Stop Loss percentage from entry price", "Risk Management");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(1).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_currentMarketState = MarketStates.Neutral;
_priceData.Clear();
_volumeData.Clear();
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Create Vwap indicator
var vwap = new VolumeWeightedMovingAverage();
// Create subscription for candles
var subscription = SubscribeCandles(CandleType);
// Bind VWAP indicator to subscription and start
subscription
.Bind(vwap, ProcessVwap)
.Start();
// Add chart visualization
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, vwap);
DrawOwnTrades(area);
}
// Start position protection with percentage-based stop-loss
StartProtection(
takeProfit: new Unit(0), // No fixed take profit
stopLoss: new Unit(StopLossPercent, UnitTypes.Percent)
);
}
private void ProcessVwap(ICandleMessage candle, decimal vwapValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready to trade
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Update data for HMM
UpdateHmmData(candle);
// Run HMM algorithm when enough data is collected
if (_priceData.Count >= HmmDataLength && _volumeData.Count >= HmmDataLength)
{
// Update current market state using HMM
_currentMarketState = RunHmm();
// Log market state updates periodically
if (candle.OpenTime.Second == 0 && candle.OpenTime.Minute % 15 == 0)
{
LogInfo($"Current market state: {_currentMarketState}");
}
}
// Trading logic based on VWAP and HMM state
if (_currentMarketState == MarketStates.Bullish && candle.ClosePrice > vwapValue && Position <= 0)
{
// Price above VWAP in bullish state - Buy signal
LogInfo($"Buy signal: Price ({candle.ClosePrice}) above VWAP ({vwapValue}) in bullish state");
BuyMarket(Volume + Math.Abs(Position));
}
else if (_currentMarketState == MarketStates.Bearish && candle.ClosePrice < vwapValue && Position >= 0)
{
// Price below VWAP in bearish state - Sell signal
LogInfo($"Sell signal: Price ({candle.ClosePrice}) below VWAP ({vwapValue}) in bearish state");
SellMarket(Volume + Math.Abs(Position));
}
}
private void UpdateHmmData(ICandleMessage candle)
{
// Add price data to queue
_priceData.Enqueue(candle.ClosePrice);
if (_priceData.Count > HmmDataLength)
_priceData.Dequeue();
// Add volume data to queue
_volumeData.Enqueue(candle.TotalVolume);
if (_volumeData.Count > HmmDataLength)
_volumeData.Dequeue();
}
private MarketStates RunHmm()
{
// This is a simplified implementation of Hidden Markov Model
// A full implementation would use Baum-Welch algorithm for training and Viterbi algorithm for decoding
// Convert data to observations
var observations = GenerateObservations();
if (observations.Count == 0)
return MarketStates.Neutral;
// Decode the most likely state sequence (simplified Viterbi)
var states = SimplifiedViterbi(observations);
if (states.Count == 0)
return MarketStates.Neutral;
// Return the most recent state
return states.Last();
}
private List<int> GenerateObservations()
{
// Generate observation sequence from price and volume data
// This is a simplified approach - in a real implementation, we would
// use more sophisticated techniques to generate observations
var result = new List<int>();
var prices = _priceData.ToArray();
var volumes = _volumeData.ToArray();
for (int i = 1; i < prices.Length; i++)
{
var previousPrice = prices[i - 1];
if (previousPrice <= 0)
continue;
var previousVolume = volumes[i - 1];
var priceChange = (prices[i] - previousPrice) / previousPrice;
var volumeRatio = previousVolume > 0 ? volumes[i] / previousVolume : 1m;
// Classify observation:
// 0: Price down, low volume
// 1: Price down, high volume
// 2: Price up, low volume
// 3: Price up, high volume
int observation;
if (priceChange < 0)
observation = volumeRatio > 1.1m ? 1 : 0;
else
observation = volumeRatio > 1.1m ? 3 : 2;
result.Add(observation);
}
return result;
}
private List<MarketStates> SimplifiedViterbi(List<int> observations)
{
// This is a very simplified version of the Viterbi algorithm
// For a real implementation, proper HMM libraries should be used
// Emission probabilities: P(observation | state)
var emissionMatrix = new decimal[3, 4]
{
{ 0.3m, 0.2m, 0.3m, 0.2m }, // Neutral -> obs0, obs1, obs2, obs3
{ 0.1m, 0.1m, 0.3m, 0.5m }, // Bullish -> obs0, obs1, obs2, obs3
{ 0.5m, 0.3m, 0.1m, 0.1m } // Bearish -> obs0, obs1, obs2, obs3
};
// Initialize with equal probabilities for each state
var currentStateProbabilities = new decimal[3] { 1m / 3, 1m / 3, 1m / 3 };
var stateSequence = new List<MarketStates>();
// Process each observation
foreach (var obs in observations)
{
var newProbabilities = new decimal[3];
// Calculate new state probabilities based on observation and transition matrix
for (int newState = 0; newState < 3; newState++)
{
decimal totalProb = 0;
for (int oldState = 0; oldState < 3; oldState++)
{
totalProb += currentStateProbabilities[oldState] *
_transitionMatrix[oldState, newState] *
emissionMatrix[newState, obs];
}
newProbabilities[newState] = totalProb;
}
// Normalize probabilities
decimal sum = newProbabilities.Sum();
if (sum > 0)
{
for (int i = 0; i < 3; i++)
newProbabilities[i] /= sum;
}
// Find most likely state
int maxIndex = 0;
for (int i = 1; i < 3; i++)
{
if (newProbabilities[i] > newProbabilities[maxIndex])
maxIndex = i;
}
// Add state to sequence
stateSequence.Add((MarketStates)maxIndex);
// Update current probabilities
currentStateProbabilities = newProbabilities;
}
return stateSequence;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import VolumeWeightedMovingAverage
from StockSharp.Algo.Strategies import Strategy
from collections import deque
class MarketState:
"""Market states for Hidden Markov Model."""
Neutral = 0
Bullish = 1
Bearish = 2
class vwap_hidden_markov_model_strategy(Strategy):
"""Strategy that trades based on VWAP with Hidden Markov Model for market state detection."""
def __init__(self):
super(vwap_hidden_markov_model_strategy, self).__init__()
# Strategy parameter: Length of data to use for HMM.
self._hmm_data_length = self.Param("HmmDataLength", 100) \
.SetGreaterThanZero() \
.SetDisplay("HMM Data Length", "Number of periods to use for HMM", "HMM Settings")
# Strategy parameter: Stop-loss percentage.
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Stop Loss %", "Stop Loss percentage from entry price", "Risk Management")
# Strategy parameter: Candle type.
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(1))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
# HMM state and data
self._current_market_state = MarketState.Neutral
self._price_data = deque()
self._volume_data = deque()
# Transition probabilities
self._transition_matrix = [
[0.8, 0.1, 0.1], # Neutral -> Neutral, Bullish, Bearish
[0.2, 0.7, 0.1], # Bullish -> Neutral, Bullish, Bearish
[0.2, 0.1, 0.7], # Bearish -> Neutral, Bullish, Bearish
]
@property
def HmmDataLength(self):
return self._hmm_data_length.Value
@HmmDataLength.setter
def HmmDataLength(self, value):
self._hmm_data_length.Value = value
@property
def StopLossPercent(self):
return self._stop_loss_percent.Value
@StopLossPercent.setter
def StopLossPercent(self, value):
self._stop_loss_percent.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType)]
def OnReseted(self):
super(vwap_hidden_markov_model_strategy, self).OnReseted()
self._current_market_state = MarketState.Neutral
self._price_data.clear()
self._volume_data.clear()
def OnStarted2(self, time):
super(vwap_hidden_markov_model_strategy, self).OnStarted2(time)
# Create Vwap indicator
vwap = VolumeWeightedMovingAverage()
# Create subscription for candles
subscription = self.SubscribeCandles(self.CandleType)
# Bind VWAP indicator to subscription and start
subscription.Bind(vwap, self.ProcessVwap).Start()
# Add chart visualization
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, vwap)
self.DrawOwnTrades(area)
# Start position protection with percentage-based stop-loss
self.StartProtection(
takeProfit=Unit(0),
stopLoss=Unit(self.StopLossPercent, UnitTypes.Percent)
)
def ProcessVwap(self, candle, vwap_value):
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
# Update data for HMM
self.UpdateHmmData(candle)
# Run HMM algorithm when enough data is collected
if len(self._price_data) >= self.HmmDataLength and len(self._volume_data) >= self.HmmDataLength:
# Update current market state using HMM
self._current_market_state = self.RunHmm()
# Log market state updates periodically
if candle.OpenTime.Second == 0 and candle.OpenTime.Minute % 15 == 0:
self.LogInfo(f"Current market state: {self._current_market_state}")
# Trading logic based on VWAP and HMM state
if self._current_market_state == MarketState.Bullish and candle.ClosePrice > vwap_value and self.Position <= 0:
# Price above VWAP in bullish state - Buy signal
self.LogInfo(f"Buy signal: Price ({candle.ClosePrice}) above VWAP ({vwap_value}) in bullish state")
self.BuyMarket(self.Volume + Math.Abs(self.Position))
elif self._current_market_state == MarketState.Bearish and candle.ClosePrice < vwap_value and self.Position >= 0:
# Price below VWAP in bearish state - Sell signal
self.LogInfo(f"Sell signal: Price ({candle.ClosePrice}) below VWAP ({vwap_value}) in bearish state")
self.SellMarket(self.Volume + Math.Abs(self.Position))
def UpdateHmmData(self, candle):
# Add price data to queue
self._price_data.append(candle.ClosePrice)
if len(self._price_data) > self.HmmDataLength:
self._price_data.popleft()
# Add volume data to queue
self._volume_data.append(candle.TotalVolume)
if len(self._volume_data) > self.HmmDataLength:
self._volume_data.popleft()
def RunHmm(self):
# This is a simplified implementation of Hidden Markov Model
# A full implementation would use Baum-Welch algorithm for training and Viterbi algorithm for decoding
# Convert data to observations
observations = self.GenerateObservations()
# Decode the most likely state sequence (simplified Viterbi)
states = self.SimplifiedViterbi(observations)
# Return the most recent state
return states[-1] if states else MarketState.Neutral
def GenerateObservations(self):
# Generate observation sequence from price and volume data
# This is a simplified approach - in a real implementation, we would
# use more sophisticated techniques to generate observations
result = []
prices = list(self._price_data)
volumes = list(self._volume_data)
for i in range(1, len(prices)):
prev_price = float(prices[i - 1])
if prev_price <= 0:
continue
prev_volume = float(volumes[i - 1])
price_change = (float(prices[i]) - prev_price) / prev_price
volume_ratio = float(volumes[i]) / prev_volume if prev_volume > 0 else 1.0
# Classify observation:
# 0: Price down, low volume
# 1: Price down, high volume
# 2: Price up, low volume
# 3: Price up, high volume
if price_change < 0:
observation = 1 if volume_ratio > 1.1 else 0
else:
observation = 3 if volume_ratio > 1.1 else 2
result.append(observation)
return result
def SimplifiedViterbi(self, observations):
# This is a very simplified version of the Viterbi algorithm
# For a real implementation, proper HMM libraries should be used
# Emission probabilities: P(observation | state)
emission_matrix = [
[0.3, 0.2, 0.3, 0.2], # Neutral -> obs0, obs1, obs2, obs3
[0.1, 0.1, 0.3, 0.5], # Bullish -> obs0, obs1, obs2, obs3
[0.5, 0.3, 0.1, 0.1], # Bearish -> obs0, obs1, obs2, obs3
]
# Initialize with equal probabilities for each state
current_state_probabilities = [1.0 / 3] * 3
state_sequence = []
# Process each observation
for obs in observations:
new_probabilities = [0.0, 0.0, 0.0]
# Calculate new state probabilities based on observation and transition matrix
for new_state in range(3):
total_prob = 0.0
for old_state in range(3):
total_prob += (
current_state_probabilities[old_state]
* self._transition_matrix[old_state][new_state]
* emission_matrix[new_state][obs]
)
new_probabilities[new_state] = total_prob
# Normalize probabilities
_sum = sum(new_probabilities)
if _sum > 0:
new_probabilities = [p / _sum for p in new_probabilities]
# Find most likely state
max_index = 0
for i in range(1, 3):
if new_probabilities[i] > new_probabilities[max_index]:
max_index = i
# Add state to sequence
state_sequence.append(max_index)
# Update current probabilities
current_state_probabilities = new_probabilities
return state_sequence
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return vwap_hidden_markov_model_strategy()