using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Flat channel breakout strategy converted from the MetaTrader 5 version.
/// Detects consolidation via falling standard deviation, then trades breakouts of the channel.
/// </summary>
public class FlatChannelStrategy : Strategy
{
private readonly StrategyParam<int> _stdDevPeriod;
private readonly StrategyParam<int> _flatBars;
private readonly StrategyParam<decimal> _channelMinPips;
private readonly StrategyParam<decimal> _channelMaxPips;
private readonly StrategyParam<DataType> _candleType;
private StandardDeviation _stdDev = null!;
private DonchianChannels _donchian = null!;
private decimal _previousStdDev;
private int _flatBarCount;
private decimal _channelHigh;
private decimal _channelLow;
private decimal? _pendingBuyPrice;
private decimal? _pendingSellPrice;
private decimal _entryPrice;
private decimal _longStop;
private decimal _longTake;
private decimal _shortStop;
private decimal _shortTake;
/// <summary>
/// Standard deviation indicator period.
/// </summary>
public int StdDevPeriod
{
get => _stdDevPeriod.Value;
set => _stdDevPeriod.Value = value;
}
/// <summary>
/// Minimum number of bars with falling volatility required to form a flat channel.
/// </summary>
public int FlatBars
{
get => _flatBars.Value;
set => _flatBars.Value = value;
}
/// <summary>
/// Minimum channel width expressed in pips.
/// </summary>
public decimal ChannelMinPips
{
get => _channelMinPips.Value;
set => _channelMinPips.Value = value;
}
/// <summary>
/// Maximum channel width expressed in pips.
/// </summary>
public decimal ChannelMaxPips
{
get => _channelMaxPips.Value;
set => _channelMaxPips.Value = value;
}
/// <summary>
/// Candle type to analyse.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public FlatChannelStrategy()
{
_stdDevPeriod = Param(nameof(StdDevPeriod), 37)
.SetDisplay("StdDev Period", "Standard deviation indicator period", "Indicators")
.SetGreaterThanZero();
_flatBars = Param(nameof(FlatBars), 2)
.SetDisplay("Flat Bars", "Minimum bars in flat state", "Indicators")
.SetGreaterThanZero();
_channelMinPips = Param(nameof(ChannelMinPips), 10m)
.SetDisplay("Channel Min Pips", "Minimum channel width in pips", "Indicators")
.SetGreaterThanZero();
_channelMaxPips = Param(nameof(ChannelMaxPips), 100000m)
.SetDisplay("Channel Max Pips", "Maximum channel width in pips", "Indicators")
.SetGreaterThanZero();
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Primary candle type", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
=> [(Security, CandleType)];
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_previousStdDev = 0m;
_flatBarCount = 0;
_channelHigh = 0m;
_channelLow = 0m;
_pendingBuyPrice = null;
_pendingSellPrice = null;
_entryPrice = 0m;
_longStop = 0m;
_longTake = 0m;
_shortStop = 0m;
_shortTake = 0m;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_stdDev = new StandardDeviation { Length = StdDevPeriod };
_donchian = new DonchianChannels { Length = FlatBars };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(_donchian, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _donchian);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue channelValue)
{
if (candle.State != CandleStates.Finished)
return;
var medianPrice = (candle.HighPrice + candle.LowPrice) / 2m;
var stdDevValue = _stdDev.Process(new DecimalIndicatorValue(_stdDev, medianPrice, candle.CloseTime) { IsFinal = true }).ToDecimal();
if (!_stdDev.IsFormed || channelValue is not DonchianChannelsValue donchianValue)
{
_previousStdDev = stdDevValue;
return;
}
if (donchianValue.UpperBand is not decimal upper || donchianValue.LowerBand is not decimal lower)
{
_previousStdDev = stdDevValue;
return;
}
// Update flat state based on StdDev direction.
UpdateStdDevState(stdDevValue, upper, lower, candle);
// Check simulated pending entries.
CheckPendingEntries(candle);
// Manage existing positions with SL/TP.
ManagePosition(candle);
// If flat and no position, set up pending breakout entries.
if (Position == 0 && _flatBarCount >= FlatBars && _channelHigh > _channelLow)
{
var channelWidth = _channelHigh - _channelLow;
var priceStep = Security?.PriceStep ?? 0.01m;
if (priceStep <= 0m) priceStep = 0.01m;
var minWidth = ChannelMinPips * priceStep;
var maxWidth = ChannelMaxPips * priceStep;
if (channelWidth >= minWidth && channelWidth <= maxWidth)
{
// Set pending breakout entries at channel boundaries.
_pendingBuyPrice = _channelHigh;
_pendingSellPrice = _channelLow;
_longStop = _channelHigh - channelWidth * 2m;
_longTake = _channelHigh + channelWidth;
_shortStop = _channelLow + channelWidth * 2m;
_shortTake = _channelLow - channelWidth;
}
}
_previousStdDev = stdDevValue;
}
private void UpdateStdDevState(decimal stdDevValue, decimal upper, decimal lower, ICandleMessage candle)
{
if (_previousStdDev == 0m)
{
_previousStdDev = stdDevValue;
return;
}
if (stdDevValue < _previousStdDev)
{
_flatBarCount++;
if (_flatBarCount == FlatBars)
{
_channelHigh = upper;
_channelLow = lower;
}
else if (_flatBarCount > FlatBars)
{
if (candle.HighPrice > _channelHigh)
_channelHigh = candle.HighPrice;
if (candle.LowPrice < _channelLow)
_channelLow = candle.LowPrice;
}
}
else if (stdDevValue > _previousStdDev)
{
_flatBarCount = 0;
_channelHigh = 0m;
_channelLow = 0m;
_pendingBuyPrice = null;
_pendingSellPrice = null;
}
else if (_flatBarCount >= FlatBars && _channelHigh <= _channelLow)
{
_channelHigh = upper;
_channelLow = lower;
}
}
private void CheckPendingEntries(ICandleMessage candle)
{
if (Position != 0)
return;
if (_pendingBuyPrice is decimal buyPrice && candle.HighPrice >= buyPrice)
{
BuyMarket();
_entryPrice = buyPrice;
_pendingBuyPrice = null;
_pendingSellPrice = null;
return;
}
if (_pendingSellPrice is decimal sellPrice && candle.LowPrice <= sellPrice)
{
SellMarket();
_entryPrice = sellPrice;
_pendingBuyPrice = null;
_pendingSellPrice = null;
}
}
private void ManagePosition(ICandleMessage candle)
{
if (Position > 0)
{
if (_longStop > 0m && candle.LowPrice <= _longStop)
{
SellMarket();
ResetPositionState();
return;
}
if (_longTake > 0m && candle.HighPrice >= _longTake)
{
SellMarket();
ResetPositionState();
}
}
else if (Position < 0)
{
if (_shortStop > 0m && candle.HighPrice >= _shortStop)
{
BuyMarket();
ResetPositionState();
return;
}
if (_shortTake > 0m && candle.LowPrice <= _shortTake)
{
BuyMarket();
ResetPositionState();
}
}
}
private void ResetPositionState()
{
_entryPrice = 0m;
_longStop = 0m;
_longTake = 0m;
_shortStop = 0m;
_shortTake = 0m;
_pendingBuyPrice = null;
_pendingSellPrice = null;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Decimal
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from StockSharp.Algo.Indicators import StandardDeviation, DonchianChannels
from indicator_extensions import *
class flat_channel_strategy(Strategy):
"""Flat channel breakout: detects consolidation via falling StdDev, then trades channel breakouts."""
def __init__(self):
super(flat_channel_strategy, self).__init__()
self._std_dev_period = self.Param("StdDevPeriod", 37) \
.SetGreaterThanZero() \
.SetDisplay("StdDev Period", "Standard deviation indicator period", "Indicators")
self._flat_bars = self.Param("FlatBars", 2) \
.SetGreaterThanZero() \
.SetDisplay("Flat Bars", "Minimum bars in flat state", "Indicators")
self._channel_min_pips = self.Param("ChannelMinPips", 10.0) \
.SetGreaterThanZero() \
.SetDisplay("Channel Min Pips", "Minimum channel width in pips", "Indicators")
self._channel_max_pips = self.Param("ChannelMaxPips", 100000.0) \
.SetGreaterThanZero() \
.SetDisplay("Channel Max Pips", "Maximum channel width in pips", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Primary candle type", "General")
self._previous_std_dev = 0.0
self._flat_bar_count = 0
self._channel_high = 0.0
self._channel_low = 0.0
self._pending_buy_price = None
self._pending_sell_price = None
self._entry_price = 0.0
self._long_stop = 0.0
self._long_take = 0.0
self._short_stop = 0.0
self._short_take = 0.0
@property
def StdDevPeriod(self):
return int(self._std_dev_period.Value)
@property
def FlatBars(self):
return int(self._flat_bars.Value)
@property
def ChannelMinPips(self):
return float(self._channel_min_pips.Value)
@property
def ChannelMaxPips(self):
return float(self._channel_max_pips.Value)
@property
def CandleType(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(flat_channel_strategy, self).OnStarted2(time)
self._previous_std_dev = 0.0
self._flat_bar_count = 0
self._channel_high = 0.0
self._channel_low = 0.0
self._pending_buy_price = None
self._pending_sell_price = None
self._entry_price = 0.0
self._long_stop = 0.0
self._long_take = 0.0
self._short_stop = 0.0
self._short_take = 0.0
self._std_dev = StandardDeviation()
self._std_dev.Length = self.StdDevPeriod
self._donchian = DonchianChannels()
self._donchian.Length = self.FlatBars
subscription = self.SubscribeCandles(self.CandleType)
subscription.BindEx(self._donchian, self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._donchian)
self.DrawOwnTrades(area)
def process_candle(self, candle, channel_value):
if candle.State != CandleStates.Finished:
return
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
median_price = (h + lo) / 2.0
std_dev_value = float(process_float(self._std_dev, Decimal(median_price), candle.ServerTime, True).Value)
if not self._std_dev.IsFormed:
self._previous_std_dev = std_dev_value
return
upper_val = channel_value.UpperBand
lower_val = channel_value.LowerBand
if upper_val is None or lower_val is None:
self._previous_std_dev = std_dev_value
return
upper = float(upper_val)
lower = float(lower_val)
# Update flat state
self._update_std_dev_state(std_dev_value, upper, lower, candle)
# Check pending entries
self._check_pending_entries(candle)
# Manage position
self._manage_position(candle)
# If flat and no position, setup pending breakout entries
if self.Position == 0 and self._flat_bar_count >= self.FlatBars and self._channel_high > self._channel_low:
channel_width = self._channel_high - self._channel_low
sec = self.Security
price_step = float(sec.PriceStep) if sec is not None and sec.PriceStep is not None and float(sec.PriceStep) > 0 else 0.01
min_width = self.ChannelMinPips * price_step
max_width = self.ChannelMaxPips * price_step
if channel_width >= min_width and channel_width <= max_width:
self._pending_buy_price = self._channel_high
self._pending_sell_price = self._channel_low
self._long_stop = self._channel_high - channel_width * 2.0
self._long_take = self._channel_high + channel_width
self._short_stop = self._channel_low + channel_width * 2.0
self._short_take = self._channel_low - channel_width
self._previous_std_dev = std_dev_value
def _update_std_dev_state(self, std_dev_value, upper, lower, candle):
if self._previous_std_dev == 0:
self._previous_std_dev = std_dev_value
return
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if std_dev_value < self._previous_std_dev:
self._flat_bar_count += 1
if self._flat_bar_count == self.FlatBars:
self._channel_high = upper
self._channel_low = lower
elif self._flat_bar_count > self.FlatBars:
if h > self._channel_high:
self._channel_high = h
if lo < self._channel_low:
self._channel_low = lo
elif std_dev_value > self._previous_std_dev:
self._flat_bar_count = 0
self._channel_high = 0.0
self._channel_low = 0.0
self._pending_buy_price = None
self._pending_sell_price = None
elif self._flat_bar_count >= self.FlatBars and self._channel_high <= self._channel_low:
self._channel_high = upper
self._channel_low = lower
def _check_pending_entries(self, candle):
if self.Position != 0:
return
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if self._pending_buy_price is not None and h >= self._pending_buy_price:
self.BuyMarket()
self._entry_price = self._pending_buy_price
self._pending_buy_price = None
self._pending_sell_price = None
return
if self._pending_sell_price is not None and lo <= self._pending_sell_price:
self.SellMarket()
self._entry_price = self._pending_sell_price
self._pending_buy_price = None
self._pending_sell_price = None
def _manage_position(self, candle):
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if self.Position > 0:
if self._long_stop > 0 and lo <= self._long_stop:
self.SellMarket()
self._reset_position_state()
return
if self._long_take > 0 and h >= self._long_take:
self.SellMarket()
self._reset_position_state()
elif self.Position < 0:
if self._short_stop > 0 and h >= self._short_stop:
self.BuyMarket()
self._reset_position_state()
return
if self._short_take > 0 and lo <= self._short_take:
self.BuyMarket()
self._reset_position_state()
def _reset_position_state(self):
self._entry_price = 0.0
self._long_stop = 0.0
self._long_take = 0.0
self._short_stop = 0.0
self._short_take = 0.0
self._pending_buy_price = None
self._pending_sell_price = None
def OnReseted(self):
super(flat_channel_strategy, self).OnReseted()
self._previous_std_dev = 0.0
self._flat_bar_count = 0
self._channel_high = 0.0
self._channel_low = 0.0
self._pending_buy_price = None
self._pending_sell_price = None
self._entry_price = 0.0
self._long_stop = 0.0
self._long_take = 0.0
self._short_stop = 0.0
self._short_take = 0.0
def CreateClone(self):
return flat_channel_strategy()