通道交易策略
这是一套逆势通道策略,当唐奇安通道的上下边界保持不变时在极值附近反向建仓。策略把最新的高点/低点与 上一根K线计算出的通道边界和枢轴价(pivot)进行比较,从而判断是否在通道内反手。仓位保护基于ATR设定 的初始止损,并配合可调的移动止损,在价格向有利方向发展时锁定利润。
详情
- 入场条件:
- 做空:上轨未变化,并且上一根K线的最高价触及上轨,或前一收盘价位于枢轴与上轨之间。
- 做多:下轨未变化,并且上一根K线的最低价触及下轨,或前一收盘价位于枢轴与下轨之间。
- 方向:多、空双向。
- 出场条件:
- 做多:若上轨保持不变且价格触及上轨,或触发ATR止损/移动止损则平仓。
- 做空:若下轨保持不变且价格触及下轨,或触发ATR止损/移动止损则平仓。
- 止损设置:
- 多单初始止损放在
support - ATR,空单初始止损放在resistance + ATR。 - 当盈利超过
TrailingStopPips(按照最小价格步长转换)时,移动止损开始跟随最佳价格。
- 多单初始止损放在
- 默认参数:
ChannelPeriod= 20(唐奇安通道长度)AtrPeriod= 4(ATR 平滑周期)Volume= 1 手/合约TrailingStopPips= 30 个最小价格步长CandleType= 1 小时时间框架
- 筛选标签:
- 类型:通道 / 均值回归
- 方向:双向
- 指标:唐奇安通道、ATR
- 止损:ATR 固定止损 + 移动止损
- 复杂度:中等
- 时间框架:日内
- 季节性:无
- 神经网络:无
- 背离:无
- 风险等级:中等
备注
- 枢轴价计算公式为
(上轨 + 下轨 + 前一收盘价) / 3,与原始 MQL 程序保持一致。 - 策略同一时间只持有一个净头寸,只有完全平仓之后才会反向。
TrailingStopPips以“点”为单位,会乘以交易品种的PriceStep得到真实的价格偏移量。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Channel breakout reversal strategy based on Donchian channel and ATR stops.
/// </summary>
public class TradeInChannelStrategy : Strategy
{
private readonly StrategyParam<int> _channelPeriod;
private readonly StrategyParam<int> _atrPeriod;
private readonly StrategyParam<decimal> _trailingStopPips;
private readonly StrategyParam<DataType> _candleType;
private DonchianChannels _donchian = null!;
private AverageTrueRange _atr = null!;
private decimal? _previousUpper;
private decimal? _previousLower;
private decimal? _previousClose;
private decimal? _longEntryPrice;
private decimal? _shortEntryPrice;
private decimal? _longStop;
private decimal? _shortStop;
private decimal? _longBestPrice;
private decimal? _shortBestPrice;
private decimal? _longTrailingLevel;
private decimal? _shortTrailingLevel;
private decimal _priceStep = 1m;
/// <summary>
/// Donchian channel lookback.
/// </summary>
public int ChannelPeriod
{
get => _channelPeriod.Value;
set => _channelPeriod.Value = value;
}
/// <summary>
/// ATR calculation period.
/// </summary>
public int AtrPeriod
{
get => _atrPeriod.Value;
set => _atrPeriod.Value = value;
}
/// <summary>
/// Trailing stop distance expressed in price steps.
/// </summary>
public decimal TrailingStopPips
{
get => _trailingStopPips.Value;
set => _trailingStopPips.Value = value;
}
/// <summary>
/// Candle type for analysis.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="TradeInChannelStrategy"/>.
/// </summary>
public TradeInChannelStrategy()
{
_channelPeriod = Param(nameof(ChannelPeriod), 20)
.SetDisplay("Channel Period", "Donchian channel lookback", "Channel")
.SetGreaterThanZero();
_atrPeriod = Param(nameof(AtrPeriod), 4)
.SetDisplay("ATR Period", "Average True Range length", "Volatility")
.SetGreaterThanZero();
_trailingStopPips = Param(nameof(TrailingStopPips), 30m)
.SetDisplay("Trailing Stop (pips)", "Trailing stop distance in price steps", "Risk")
.SetNotNegative();
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Timeframe used for signals", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_donchian = null!;
_atr = null!;
_previousUpper = null;
_previousLower = null;
_previousClose = null;
ResetLongState();
ResetShortState();
_priceStep = 1m;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var ps = Security?.PriceStep;
_priceStep = ps is > 0m ? ps.Value : 1m;
_donchian = new DonchianChannels
{
Length = ChannelPeriod
};
_atr = new AverageTrueRange
{
Length = AtrPeriod
};
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(_donchian, _atr, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _donchian);
DrawIndicator(area, _atr);
DrawOwnTrades(area);
}
// no protection needed
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue donchianValue, IIndicatorValue atrValue)
{
// Ignore unfinished candles to work only with confirmed data.
if (candle.State != CandleStates.Finished)
return;
if (!_donchian.IsFormed || !_atr.IsFormed)
return;
var donchian = (DonchianChannelsValue)donchianValue;
if (donchian.UpperBand is not decimal upper || donchian.LowerBand is not decimal lower)
return;
if (!atrValue.IsFinal)
return;
var atr = atrValue.ToDecimal();
var previousUpper = _previousUpper;
var previousLower = _previousLower;
var previousClose = _previousClose;
// Need at least one full bar history to evaluate pivots and channel stability.
if (previousUpper is null || previousLower is null || previousClose is null)
{
_previousUpper = upper;
_previousLower = lower;
_previousClose = candle.ClosePrice;
return;
}
var pivot = (upper + lower + previousClose.Value) / 3m;
var closedLong = ManageLongPosition(candle, upper, previousUpper.Value);
var closedShort = ManageShortPosition(candle, lower, previousLower.Value);
if (Position == 0 && !closedLong && !closedShort)
{
EvaluateEntries(candle, upper, lower, previousUpper.Value, previousLower.Value, previousClose.Value, pivot, atr);
}
_previousUpper = upper;
_previousLower = lower;
_previousClose = candle.ClosePrice;
}
private bool ManageLongPosition(ICandleMessage candle, decimal upper, decimal previousUpper)
{
if (Position <= 0)
return false;
// Hard stop based on ATR.
if (_longStop is decimal stop && candle.LowPrice <= stop)
{
SellMarket();
ResetLongState();
return true;
}
// Exit when price breaks above a flat resistance level.
if (upper == previousUpper && candle.HighPrice >= upper)
{
SellMarket();
ResetLongState();
return true;
}
return ApplyLongTrailing(candle);
}
private bool ManageShortPosition(ICandleMessage candle, decimal lower, decimal previousLower)
{
if (Position >= 0)
return false;
// Hard stop based on ATR.
if (_shortStop is decimal stop && candle.HighPrice >= stop)
{
BuyMarket();
ResetShortState();
return true;
}
// Exit when price breaks below a flat support level.
if (lower == previousLower && candle.LowPrice <= lower)
{
BuyMarket();
ResetShortState();
return true;
}
return ApplyShortTrailing(candle);
}
private bool ApplyLongTrailing(ICandleMessage candle)
{
if (Position <= 0)
return false;
var offset = GetTrailingOffset();
if (offset <= 0m || _longEntryPrice is not decimal entryPrice)
{
_longBestPrice = candle.HighPrice;
return false;
}
_longBestPrice = _longBestPrice.HasValue
? Math.Max(_longBestPrice.Value, candle.HighPrice)
: candle.HighPrice;
if (_longBestPrice is decimal best && best - entryPrice > offset)
{
var newLevel = best - offset;
if (_longTrailingLevel is null || newLevel > _longTrailingLevel.Value)
_longTrailingLevel = newLevel;
if (_longTrailingLevel is decimal level && candle.LowPrice <= level)
{
SellMarket();
ResetLongState();
return true;
}
}
return false;
}
private bool ApplyShortTrailing(ICandleMessage candle)
{
if (Position >= 0)
return false;
var offset = GetTrailingOffset();
if (offset <= 0m || _shortEntryPrice is not decimal entryPrice)
{
_shortBestPrice = candle.LowPrice;
return false;
}
_shortBestPrice = _shortBestPrice.HasValue
? Math.Min(_shortBestPrice.Value, candle.LowPrice)
: candle.LowPrice;
if (_shortBestPrice is decimal best && entryPrice - best > offset)
{
var newLevel = best + offset;
if (_shortTrailingLevel is null || newLevel < _shortTrailingLevel.Value)
_shortTrailingLevel = newLevel;
if (_shortTrailingLevel is decimal level && candle.HighPrice >= level)
{
BuyMarket();
ResetShortState();
return true;
}
}
return false;
}
private void EvaluateEntries(
ICandleMessage candle,
decimal upper,
decimal lower,
decimal previousUpper,
decimal previousLower,
decimal previousClose,
decimal pivot,
decimal atr)
{
var resistanceFlat = upper == previousUpper;
var supportFlat = lower == previousLower;
var shouldOpenShort = resistanceFlat &&
(candle.HighPrice >= upper || (previousClose < upper && previousClose > pivot));
var shouldOpenLong = supportFlat &&
(candle.LowPrice <= lower || (previousClose > lower && previousClose < pivot));
if (shouldOpenLong)
{
OpenLong(candle, lower, atr);
}
else if (shouldOpenShort)
{
OpenShort(candle, upper, atr);
}
}
private void OpenLong(ICandleMessage candle, decimal support, decimal atr)
{
if (Volume <= 0m)
return;
BuyMarket();
_longEntryPrice = candle.ClosePrice;
_longBestPrice = candle.ClosePrice;
_longTrailingLevel = null;
_longStop = support - atr;
ResetShortState();
}
private void OpenShort(ICandleMessage candle, decimal resistance, decimal atr)
{
if (Volume <= 0m)
return;
SellMarket();
_shortEntryPrice = candle.ClosePrice;
_shortBestPrice = candle.ClosePrice;
_shortTrailingLevel = null;
_shortStop = resistance + atr;
ResetLongState();
}
private decimal GetTrailingOffset()
{
return TrailingStopPips * _priceStep;
}
private void ResetLongState()
{
_longEntryPrice = null;
_longStop = null;
_longBestPrice = null;
_longTrailingLevel = null;
}
private void ResetShortState()
{
_shortEntryPrice = null;
_shortStop = null;
_shortBestPrice = null;
_shortTrailingLevel = null;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from StockSharp.Algo.Indicators import DonchianChannels, AverageTrueRange
class trade_in_channel_strategy(Strategy):
"""Channel breakout reversal strategy based on Donchian channel and ATR stops with trailing."""
def __init__(self):
super(trade_in_channel_strategy, self).__init__()
self._channel_period = self.Param("ChannelPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Channel Period", "Donchian channel lookback", "Channel")
self._atr_period = self.Param("AtrPeriod", 4) \
.SetGreaterThanZero() \
.SetDisplay("ATR Period", "Average True Range length", "Volatility")
self._trailing_stop_pips = self.Param("TrailingStopPips", 30.0) \
.SetDisplay("Trailing Stop (pips)", "Trailing stop distance in price steps", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Timeframe used for signals", "General")
self._prev_upper = None
self._prev_lower = None
self._prev_close = None
self._long_entry = None
self._short_entry = None
self._long_stop = None
self._short_stop = None
self._long_best = None
self._short_best = None
self._long_trail = None
self._short_trail = None
self._price_step = 1.0
@property
def ChannelPeriod(self):
return int(self._channel_period.Value)
@property
def AtrPeriod(self):
return int(self._atr_period.Value)
@property
def TrailingStopPips(self):
return float(self._trailing_stop_pips.Value)
@property
def CandleType(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(trade_in_channel_strategy, self).OnStarted2(time)
sec = self.Security
ps = float(sec.PriceStep) if sec is not None and sec.PriceStep is not None and float(sec.PriceStep) > 0 else 1.0
self._price_step = ps
self._prev_upper = None
self._prev_lower = None
self._prev_close = None
self._reset_long()
self._reset_short()
self._donchian = DonchianChannels()
self._donchian.Length = self.ChannelPeriod
self._atr = AverageTrueRange()
self._atr.Length = self.AtrPeriod
subscription = self.SubscribeCandles(self.CandleType)
subscription.BindEx(self._donchian, self._atr, self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._donchian)
self.DrawOwnTrades(area)
def process_candle(self, candle, donchian_val, atr_val):
if candle.State != CandleStates.Finished:
return
if not self._donchian.IsFormed or not self._atr.IsFormed:
return
upper = float(donchian_val.UpperBand) if donchian_val.UpperBand is not None else None
lower = float(donchian_val.LowerBand) if donchian_val.LowerBand is not None else None
if upper is None or lower is None:
return
if not atr_val.IsFinal:
return
atr = float(atr_val)
if self._prev_upper is None or self._prev_lower is None or self._prev_close is None:
self._prev_upper = upper
self._prev_lower = lower
self._prev_close = float(candle.ClosePrice)
return
pivot = (upper + lower + self._prev_close) / 3.0
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
c = float(candle.ClosePrice)
closed_long = self._manage_long(candle, upper, self._prev_upper)
closed_short = self._manage_short(candle, lower, self._prev_lower)
if self.Position == 0 and not closed_long and not closed_short:
self._evaluate_entries(candle, upper, lower, self._prev_upper, self._prev_lower, self._prev_close, pivot, atr)
self._prev_upper = upper
self._prev_lower = lower
self._prev_close = c
def _manage_long(self, candle, upper, prev_upper):
if self.Position <= 0:
return False
lo = float(candle.LowPrice)
h = float(candle.HighPrice)
if self._long_stop is not None and lo <= self._long_stop:
self.SellMarket()
self._reset_long()
return True
if upper == prev_upper and h >= upper:
self.SellMarket()
self._reset_long()
return True
return self._apply_long_trailing(candle)
def _manage_short(self, candle, lower, prev_lower):
if self.Position >= 0:
return False
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if self._short_stop is not None and h >= self._short_stop:
self.BuyMarket()
self._reset_short()
return True
if lower == prev_lower and lo <= lower:
self.BuyMarket()
self._reset_short()
return True
return self._apply_short_trailing(candle)
def _apply_long_trailing(self, candle):
if self.Position <= 0:
return False
offset = self.TrailingStopPips * self._price_step
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if offset <= 0 or self._long_entry is None:
self._long_best = h
return False
self._long_best = max(self._long_best, h) if self._long_best is not None else h
if self._long_best is not None and self._long_best - self._long_entry > offset:
new_level = self._long_best - offset
if self._long_trail is None or new_level > self._long_trail:
self._long_trail = new_level
if self._long_trail is not None and lo <= self._long_trail:
self.SellMarket()
self._reset_long()
return True
return False
def _apply_short_trailing(self, candle):
if self.Position >= 0:
return False
offset = self.TrailingStopPips * self._price_step
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
if offset <= 0 or self._short_entry is None:
self._short_best = lo
return False
self._short_best = min(self._short_best, lo) if self._short_best is not None else lo
if self._short_best is not None and self._short_entry - self._short_best > offset:
new_level = self._short_best + offset
if self._short_trail is None or new_level < self._short_trail:
self._short_trail = new_level
if self._short_trail is not None and h >= self._short_trail:
self.BuyMarket()
self._reset_short()
return True
return False
def _evaluate_entries(self, candle, upper, lower, prev_upper, prev_lower, prev_close, pivot, atr):
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
c = float(candle.ClosePrice)
resistance_flat = upper == prev_upper
support_flat = lower == prev_lower
should_short = resistance_flat and (h >= upper or (prev_close < upper and prev_close > pivot))
should_long = support_flat and (lo <= lower or (prev_close > lower and prev_close < pivot))
if should_long:
self.BuyMarket()
self._long_entry = c
self._long_best = c
self._long_trail = None
self._long_stop = lower - atr
self._reset_short()
elif should_short:
self.SellMarket()
self._short_entry = c
self._short_best = c
self._short_trail = None
self._short_stop = upper + atr
self._reset_long()
def _reset_long(self):
self._long_entry = None
self._long_stop = None
self._long_best = None
self._long_trail = None
def _reset_short(self):
self._short_entry = None
self._short_stop = None
self._short_best = None
self._short_trail = None
def OnReseted(self):
super(trade_in_channel_strategy, self).OnReseted()
self._prev_upper = None
self._prev_lower = None
self._prev_close = None
self._reset_long()
self._reset_short()
self._price_step = 1.0
def CreateClone(self):
return trade_in_channel_strategy()