Double ZigZag Alignment 策略
该策略是 MQL5 专家顾问「Double ZigZag」的 StockSharp 版本。通过两个不同窗口的摆动检测器来复现双 ZigZag 的确认逻辑。 只有当两个检测器在最近三个关键点上完全一致且最新波动相对更强时才会触发交易。
核心思想
- 快速检测器使用滑动最高值/最低值窗口来逼近原始 ZigZag(13, 5, 3) 设置。
- 慢速检测器使用更长的窗口(默认放大 8 倍)过滤噪声并确认主要拐点。
- 当两个检测器在同一根 K 线同时翻转方向时,会记录一个“对齐”枢轴以及自上次对齐以来出现的快速
摆动次数,这些计数对应原程序中的
up和dw。
多头条件
- 最近一次对齐是高点,再上一对齐是低点,再往前也是高点。
- 自最近对齐以来的快速摆动数量大于前一个区段的数量乘以
StrengthMultiplier(默认 2.1),即up > dw * k。 - 最新高点突破中间低点的力度要强于更早的高点:
(previousHigh - swingLow) * BreakoutMultiplier < (newestHigh - swingLow)。 - 满足条件后买入
Volume加上现有空头仓位的数量,使净头寸变为多头。
空头条件
- 最近一次对齐是低点,再上一对齐是高点,再往前也是低点。
- 最近区段的数量小于前一段乘以
StrengthMultiplier,即up * k < dw。 - 当前低点向下突破中间高点的幅度大于更早的低点,使用同样的
BreakoutMultiplier。 - 卖出足够的数量以平掉多头并建立净空头仓位。
仓位管理
- 信号互斥,新信号会自动反向持仓。
- 不设置止损或止盈,平仓完全依赖反向信号。
- 分析使用
CandleType指定的 K 线类型(默认 1 分钟)。
默认参数
FastLength= 13SlowLength= 104StrengthMultiplier= 2.1BreakoutMultiplier= 2.1CandleType=TimeSpan.FromMinutes(1)时间框架
标签
- 类别:趋势/形态识别
- 方向:双向
- 指标:ZigZag(近似实现)、Highest/Lowest
- 止损:无
- 时间框架:默认日内
- 复杂度:中等(需要同步跟踪摆动)
- 季节性:无
- 神经网络:无
- 背离:无
- 风险等级:中等(无保护性止损)
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Double ZigZag alignment strategy. Uses fast and slow swing detectors
/// based on highest/lowest price lookbacks to find aligned pivot points and trade breakouts.
/// </summary>
public class DoubleZigZagStrategy : Strategy
{
private readonly StrategyParam<int> _fastLength;
private readonly StrategyParam<int> _slowLength;
private readonly StrategyParam<DataType> _candleType;
private readonly List<decimal> _highs = new();
private readonly List<decimal> _lows = new();
private int _fastDirection;
private int _slowDirection;
private decimal _lastFastPivotHigh;
private decimal _lastFastPivotLow;
public int FastLength
{
get => _fastLength.Value;
set => _fastLength.Value = value;
}
public int SlowLength
{
get => _slowLength.Value;
set => _slowLength.Value = value;
}
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public DoubleZigZagStrategy()
{
_fastLength = Param(nameof(FastLength), 8)
.SetDisplay("Fast Length", "Lookback for the fast swing detector", "Indicators");
_slowLength = Param(nameof(SlowLength), 30)
.SetDisplay("Slow Length", "Lookback for the slow confirmation swing", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to analyze", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_highs.Clear();
_lows.Clear();
_fastDirection = 0;
_slowDirection = 0;
_lastFastPivotHigh = 0;
_lastFastPivotLow = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_highs.Clear();
_lows.Clear();
_fastDirection = 0;
_slowDirection = 0;
_lastFastPivotHigh = 0;
_lastFastPivotLow = 0;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_highs.Add(candle.HighPrice);
_lows.Add(candle.LowPrice);
var maxBuf = Math.Max(FastLength, SlowLength) + 1;
if (_highs.Count > maxBuf)
{
_highs.RemoveAt(0);
_lows.RemoveAt(0);
}
if (_highs.Count < SlowLength)
return;
// Calculate fast and slow highest/lowest
var fastHighest = GetMax(_highs, FastLength);
var fastLowest = GetMin(_lows, FastLength);
var slowHighest = GetMax(_highs, SlowLength);
var slowLowest = GetMin(_lows, SlowLength);
var prevFastDir = _fastDirection;
var prevSlowDir = _slowDirection;
// Detect fast swing pivot
if (_fastDirection <= 0 && candle.HighPrice >= fastHighest)
{
_fastDirection = 1;
_lastFastPivotHigh = candle.HighPrice;
}
else if (_fastDirection >= 0 && candle.LowPrice <= fastLowest)
{
_fastDirection = -1;
_lastFastPivotLow = candle.LowPrice;
}
// Detect slow swing pivot
if (_slowDirection <= 0 && candle.HighPrice >= slowHighest)
_slowDirection = 1;
else if (_slowDirection >= 0 && candle.LowPrice <= slowLowest)
_slowDirection = -1;
// When both fast and slow pivot in the same direction, generate signal
var fastFlippedUp = prevFastDir <= 0 && _fastDirection > 0;
var fastFlippedDown = prevFastDir >= 0 && _fastDirection < 0;
var slowFlippedUp = prevSlowDir <= 0 && _slowDirection > 0;
var slowFlippedDown = prevSlowDir >= 0 && _slowDirection < 0;
if (fastFlippedUp && (slowFlippedUp || _slowDirection > 0))
{
if (Position <= 0)
BuyMarket();
}
else if (fastFlippedDown && (slowFlippedDown || _slowDirection < 0))
{
if (Position >= 0)
SellMarket();
}
}
private static decimal GetMax(List<decimal> data, int length)
{
var max = decimal.MinValue;
var start = Math.Max(0, data.Count - length);
for (var i = start; i < data.Count; i++)
{
if (data[i] > max)
max = data[i];
}
return max;
}
private static decimal GetMin(List<decimal> data, int length)
{
var min = decimal.MaxValue;
var start = Math.Max(0, data.Count - length);
for (var i = start; i < data.Count; i++)
{
if (data[i] < min)
min = data[i];
}
return min;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class double_zig_zag_strategy(Strategy):
"""Double ZigZag alignment strategy: fast and slow swing detectors for aligned breakout signals."""
def __init__(self):
super(double_zig_zag_strategy, self).__init__()
self._fast_length = self.Param("FastLength", 8) \
.SetGreaterThanZero() \
.SetDisplay("Fast Length", "Lookback for the fast swing detector", "Indicators")
self._slow_length = self.Param("SlowLength", 30) \
.SetGreaterThanZero() \
.SetDisplay("Slow Length", "Lookback for the slow confirmation swing", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles to analyze", "General")
self._highs = []
self._lows = []
self._fast_dir = 0
self._slow_dir = 0
@property
def FastLength(self):
return self._fast_length.Value
@property
def SlowLength(self):
return self._slow_length.Value
@property
def CandleType(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(double_zig_zag_strategy, self).OnStarted2(time)
self._highs = []
self._lows = []
self._fast_dir = 0
self._slow_dir = 0
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
self._highs.append(h)
self._lows.append(lo)
max_buf = max(self.FastLength, self.SlowLength) + 1
if len(self._highs) > max_buf:
self._highs.pop(0)
self._lows.pop(0)
if len(self._highs) < self.SlowLength:
return
fast_highest = self._get_max(self._highs, self.FastLength)
fast_lowest = self._get_min(self._lows, self.FastLength)
slow_highest = self._get_max(self._highs, self.SlowLength)
slow_lowest = self._get_min(self._lows, self.SlowLength)
prev_fast = self._fast_dir
prev_slow = self._slow_dir
if self._fast_dir <= 0 and h >= fast_highest:
self._fast_dir = 1
elif self._fast_dir >= 0 and lo <= fast_lowest:
self._fast_dir = -1
if self._slow_dir <= 0 and h >= slow_highest:
self._slow_dir = 1
elif self._slow_dir >= 0 and lo <= slow_lowest:
self._slow_dir = -1
fast_up = prev_fast <= 0 and self._fast_dir > 0
fast_down = prev_fast >= 0 and self._fast_dir < 0
slow_up = prev_slow <= 0 and self._slow_dir > 0
if fast_up and (slow_up or self._slow_dir > 0):
if self.Position <= 0:
self.BuyMarket()
elif fast_down and (prev_slow >= 0 and self._slow_dir < 0 or self._slow_dir < 0):
if self.Position >= 0:
self.SellMarket()
def _get_max(self, data, length):
start = max(0, len(data) - length)
mx = data[start]
for i in range(start + 1, len(data)):
if data[i] > mx:
mx = data[i]
return mx
def _get_min(self, data, length):
start = max(0, len(data) - length)
mn = data[start]
for i in range(start + 1, len(data)):
if data[i] < mn:
mn = data[i]
return mn
def OnReseted(self):
super(double_zig_zag_strategy, self).OnReseted()
self._highs = []
self._lows = []
self._fast_dir = 0
self._slow_dir = 0
def CreateClone(self):
return double_zig_zag_strategy()