PriceChannel Signal v2 策略
概述
PriceChannel Signal v2 是基于改进型唐奇安通道的趋势突破系统。原始的 MQL5 专家顾问监控通道趋势的切换、在价格重新突破区间时的二次入场信号,以及来源于同一价格区间的保护性退出水平。本次移植保持了全部行为:始终只持有一个方向的仓位,只处理已完成的K线,并可选用日内时间过滤。
交易逻辑
- 使用设定的
ChannelPeriod计算唐奇安通道的最高价与最低价。 - 对原始通道宽度施加两个调节系数:
- Risk Factor:向中线收缩入场通道的上下边界。
- Exit Level:生成一对位于内部的保护带,用于触发离场。
- 维护当前趋势状态:
- 收盘价上穿上方入场带时,趋势切换为多头。
- 收盘价下破下方入场带时,趋势切换为空头。
- 未发生突破时,保持上一根K线的趋势判断。
- 依据趋势状态生成信号:
- 多头入场:趋势由空头转为多头。
- 空头入场:趋势由多头转为空头。
- 多头二次入场:可选,趋势保持多头且收盘价重新站上上方入场带。
- 空头二次入场:可选,趋势保持空头且收盘价重新跌破下方入场带。
- 多头离场:可选,上一根K线收盘在保护带之上,本根K线收盘跌破保护带。
- 空头离场:可选,上一根K线收盘在保护带之下,本根K线收盘上破保护带。
- 每根K线最多触发一次下单,每次只持有单向仓位。
- 启用时间过滤时,所有信号仅在指定的日内时间窗口内生效。
参数
| 参数 | 说明 |
|---|---|
ChannelPeriod |
计算唐奇安通道与保护带时使用的周期长度。 |
RiskFactor |
入场带的缩放系数(0–10),值越大带宽越窄。 |
ExitLevel |
保护带的缩放系数,需大于 RiskFactor 才能位于入场带内部。 |
UseReEntry |
是否启用二次入场信号。 |
UseExitSignals |
是否启用保护带离场信号。 |
CandleType |
计算所使用的K线类型/周期。 |
UseTimeControl |
是否启用日内时间过滤。 |
StartHour / StartMinute |
时间过滤启用时的起始时间(包含)。 |
EndHour / EndMinute |
时间过滤启用时的结束时间(不包含)。 |
入场与离场规则
- 开多仓: 趋势转为多头或触发多头二次入场条件,且当前无持仓,同时当前K线位于允许的交易时段内。
- 开空仓: 趋势转为空头或触发空头二次入场条件,且当前无持仓,同时当前K线位于允许的交易时段内。
- 平多仓:
UseExitSignals启用且收盘价从上一根之上的位置跌破多头保护带。 - 平空仓:
UseExitSignals启用且收盘价从上一根之下的位置突破空头保护带。
其他说明
- 策略使用市价单,不会加仓或对冲。
- 仅在K线收盘后处理指标值,避免盘中重绘问题。
- 若未设置交易量,默认下单数量为 1。
- 时间过滤遵循原始EA的逻辑:结束时间为开区间,支持跨越午夜的时间段。
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Price Channel Signal v2 strategy that reacts to Donchian channel breakouts.
/// </summary>
public class PriceChannelSignalV2Strategy : Strategy
{
private readonly StrategyParam<int> _channelPeriod;
private readonly StrategyParam<DataType> _candleType;
private readonly Queue<decimal> _highHistory = new();
private readonly Queue<decimal> _lowHistory = new();
private int _previousTrend;
private decimal? _previousClose;
/// <summary>
/// Channel lookback length.
/// </summary>
public int ChannelPeriod
{
get => _channelPeriod.Value;
set => _channelPeriod.Value = value;
}
/// <summary>
/// Candle type used for indicator calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initialize a new instance of <see cref="PriceChannelSignalV2Strategy"/>.
/// </summary>
public PriceChannelSignalV2Strategy()
{
_channelPeriod = Param(nameof(ChannelPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Channel Period", "Donchian lookback used for Price Channel", "Price Channel");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(60).TimeFrame())
.SetDisplay("Candle Type", "Primary timeframe for Price Channel", "General");
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_previousTrend = 0;
_previousClose = null;
_highHistory.Clear();
_lowHistory.Clear();
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
if (_highHistory.Count < ChannelPeriod)
{
EnqueueCandle(candle);
return;
}
var highs = _highHistory.ToArray();
var lows = _lowHistory.ToArray();
var channelHigh = GetMax(highs);
var channelLow = GetMin(lows);
var range = channelHigh - channelLow;
if (range <= 0m)
{
_previousClose = candle.ClosePrice;
EnqueueCandle(candle);
return;
}
var mid = (channelHigh + channelLow) / 2m;
// Update trend state based on channel breakout
var trend = _previousTrend;
if (candle.ClosePrice > channelHigh + range * 0.05m)
trend = 1;
else if (candle.ClosePrice < channelLow - range * 0.05m)
trend = -1;
var volume = Volume;
if (volume <= 0)
volume = 1;
// Trend reversal signals
var changedPosition = false;
if (trend > 0 && _previousTrend <= 0)
{
if (Position <= 0)
{
BuyMarket(Position < 0 ? Math.Abs(Position) + volume : volume);
changedPosition = true;
}
}
else if (trend < 0 && _previousTrend >= 0)
{
if (Position >= 0)
{
SellMarket(Position > 0 ? Math.Abs(Position) + volume : volume);
changedPosition = true;
}
}
// Exit on mid-line cross
if (!changedPosition && Position > 0 && _previousClose is decimal pc1 && pc1 >= mid && candle.ClosePrice < mid)
{
SellMarket(Math.Abs(Position));
}
else if (!changedPosition && Position < 0 && _previousClose is decimal pc2 && pc2 <= mid && candle.ClosePrice > mid)
{
BuyMarket(Math.Abs(Position));
}
_previousTrend = trend;
_previousClose = candle.ClosePrice;
EnqueueCandle(candle);
}
private void EnqueueCandle(ICandleMessage candle)
{
_highHistory.Enqueue(candle.HighPrice);
_lowHistory.Enqueue(candle.LowPrice);
while (_highHistory.Count > ChannelPeriod)
_highHistory.Dequeue();
while (_lowHistory.Count > ChannelPeriod)
_lowHistory.Dequeue();
}
private static decimal GetMax(IEnumerable<decimal> values)
{
var max = decimal.MinValue;
foreach (var value in values)
{
if (value > max)
max = value;
}
return max;
}
private static decimal GetMin(IEnumerable<decimal> values)
{
var min = decimal.MaxValue;
foreach (var value in values)
{
if (value < min)
min = value;
}
return min;
}
/// <inheritdoc />
protected override void OnReseted()
{
_previousTrend = 0;
_previousClose = null;
_highHistory.Clear();
_lowHistory.Clear();
base.OnReseted();
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class price_channel_signal_v2_strategy(Strategy):
def __init__(self):
super(price_channel_signal_v2_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(60)))
self._channel_period = self.Param("ChannelPeriod", 20)
self._high_history = []
self._low_history = []
self._prev_trend = 0
self._prev_close = 0.0
self._has_prev_close = False
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
@property
def ChannelPeriod(self):
return self._channel_period.Value
@ChannelPeriod.setter
def ChannelPeriod(self, value):
self._channel_period.Value = value
def OnReseted(self):
super(price_channel_signal_v2_strategy, self).OnReseted()
self._high_history = []
self._low_history = []
self._prev_trend = 0
self._prev_close = 0.0
self._has_prev_close = False
def OnStarted2(self, time):
super(price_channel_signal_v2_strategy, self).OnStarted2(time)
self._high_history = []
self._low_history = []
self._prev_trend = 0
self._prev_close = 0.0
self._has_prev_close = False
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self._process_candle).Start()
def _process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
close = float(candle.ClosePrice)
high = float(candle.HighPrice)
low = float(candle.LowPrice)
period = self.ChannelPeriod
if len(self._high_history) < period:
self._high_history.append(high)
self._low_history.append(low)
self._prev_close = close
self._has_prev_close = True
return
channel_high = max(self._high_history)
channel_low = min(self._low_history)
ch_range = channel_high - channel_low
if ch_range <= 0:
self._prev_close = close
self._high_history.append(high)
self._low_history.append(low)
while len(self._high_history) > period:
self._high_history.pop(0)
while len(self._low_history) > period:
self._low_history.pop(0)
return
mid = (channel_high + channel_low) / 2.0
trend = self._prev_trend
if close > channel_high + ch_range * 0.05:
trend = 1
elif close < channel_low - ch_range * 0.05:
trend = -1
changed_position = False
if trend > 0 and self._prev_trend <= 0:
if self.Position <= 0:
self.BuyMarket()
changed_position = True
elif trend < 0 and self._prev_trend >= 0:
if self.Position >= 0:
self.SellMarket()
changed_position = True
# Exit on mid-line cross
if not changed_position and self._has_prev_close:
if self.Position > 0 and self._prev_close >= mid and close < mid:
self.SellMarket()
elif self.Position < 0 and self._prev_close <= mid and close > mid:
self.BuyMarket()
self._prev_trend = trend
self._prev_close = close
self._has_prev_close = True
self._high_history.append(high)
self._low_history.append(low)
while len(self._high_history) > period:
self._high_history.pop(0)
while len(self._low_history) > period:
self._low_history.pop(0)
def CreateClone(self):
return price_channel_signal_v2_strategy()