ColorMetroDuplexStrategy
概述
ColorMetroDuplexStrategy 是 MT5 专家顾问 Exp_ColorMETRO_Duplex 的 C# 版本。原始 EA 通过两个独立的 ColorMETRO 指标模块进行多空操作:每个模块订阅自己的 K 线周期,读取指标输出的快慢 RSI 台阶包络线,并在交叉时决定是否开仓或平仓。
移植后的 StockSharp 策略保留了这两个模块,并使用高级 API 处理行情订阅、订单执行和指标绑定。同时实现了自定义的 ColorMetroIndicator,完全复刻 MT5 中的 iCustom 指标逻辑,输出快线、慢线以及内部使用的 RSI 值。
工作流程
- 启动时会创建两个
SignalModule:Long(多头模块)和 Short(空头模块),它们具有独立的周期与参数。 - 每个模块调用
SubscribeCandles订阅相应的周期,并使用BindEx与ColorMetroIndicator绑定。 - 每根收盘的 K 线都会触发一次计算,指标返回:
- 快速 ColorMETRO 包络线(依据快步长的 RSI 台阶)。
- 慢速 ColorMETRO 包络线。
- 指标内部的 RSI 数值(用于参考)。
- 模块将最新结果加入历史缓存,并按照
SignalBar指定的偏移量对比最近两根信号柱,复制 MT5 中CopyBuffer的用法。 - 交易规则:
- 多头模块:
- 开仓:前一根信号柱快线在慢线上方,本柱快线下穿或等于慢线。
- 平仓:前一根信号柱慢线在快线上方。
- 空头模块:
- 开仓:前一根信号柱快线在慢线下方,本柱快线上穿或等于慢线。
- 平仓:前一根信号柱慢线在快线下方。
- 多头模块:
- 下单使用
BuyMarket/SellMarket。策略始终检查净头寸,如果需要反向操作,会先平掉已有仓位再建立新仓。
参数说明
每个模块都拥有独立参数组,默认值与 MT5 EA 保持一致。
市场参数
- Long_Volume、Short_Volume:新开仓的手数。
- Long_OpenAllowed、Short_OpenAllowed:是否允许该模块开仓。
- Long_CloseAllowed、Short_CloseAllowed:是否允许信号自动平仓。
- Long_MarginMode、Short_MarginMode:保留的资金管理模式(本移植中不参与计算)。
- Long_StopLoss、Long_TakeProfit、Long_Deviation 以及空头对应参数:仅作为文档保留,本版本不会自动设置止损止盈。
- Long_Magic、Short_Magic:原 EA 的魔术号,便于对照。
指标参数
- Long_CandleType、Short_CandleType:各模块的 K 线周期。
- Long_PeriodRSI、Short_PeriodRSI:ColorMETRO 内部使用的 RSI 周期。
- Long_StepSizeFast、Short_StepSizeFast:快包络的步长(RSI 点数)。
- Long_StepSizeSlow、Short_StepSizeSlow:慢包络的步长。
- Long_SignalBar、Short_SignalBar:读取指标缓存时的偏移,与 MT5 的
SignalBar相同。 - Long_AppliedPrice、Short_AppliedPrice:计算 RSI 时使用的价格,默认收盘价。
与 MT5 版本的差异
- 头寸模型:MT5 可以通过不同魔术号同时持有多空仓位,StockSharp 采用净头寸模式,反手时会先平仓再开仓。
- 资金管理:保留了 MarginMode、Deviation 等输入,但默认不参与下单计算,仓位大小由 Volume 参数控制。
- 止损/止盈:原 EA 每次下单都会附带止损止盈。本策略仅记录相应距离,如需自动风控需自行扩展。
- 时间锁:MT5 使用全局变量避免同一时间重复开仓;在 StockSharp 中我们按收盘柱执行一次逻辑,并通过净头寸判断避免重复信号。
备注
ColorMetroIndicator完全仿真原始算法,包含趋势记忆逻辑,可用于绘图或调试。- 代码中提供了详细英文注释,便于进一步修改和优化。
- 若要加入自动止损、止盈或其他风险控制,可在
ProcessModule中增加对应的订单逻辑。
using System;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Conversion of the MT5 expert "Exp_ColorMETRO_Duplex".
/// Uses RSI with step-based envelopes (fast/slow) to generate long/short signals.
/// </summary>
public class ColorMetroDuplexStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _rsiPeriod;
private readonly StrategyParam<int> _fastStep;
private readonly StrategyParam<int> _slowStep;
private readonly StrategyParam<int> _signalCooldownBars;
// fast envelope state
private decimal? _fastMin, _fastMax;
private int _fastTrend;
private decimal? _prevFastBand;
// slow envelope state
private decimal? _slowMin, _slowMax;
private int _slowTrend;
private decimal? _prevSlowBand;
private int _cooldownRemaining;
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public int RsiPeriod { get => _rsiPeriod.Value; set => _rsiPeriod.Value = value; }
public int FastStep { get => _fastStep.Value; set => _fastStep.Value = value; }
public int SlowStep { get => _slowStep.Value; set => _slowStep.Value = value; }
public int SignalCooldownBars { get => _signalCooldownBars.Value; set => _signalCooldownBars.Value = value; }
public ColorMetroDuplexStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Timeframe", "General");
_rsiPeriod = Param(nameof(RsiPeriod), 7)
.SetGreaterThanZero()
.SetDisplay("RSI Period", "RSI lookback", "Indicator");
_fastStep = Param(nameof(FastStep), 8)
.SetGreaterThanZero()
.SetDisplay("Fast Step", "Step size for fast envelope", "Indicator");
_slowStep = Param(nameof(SlowStep), 24)
.SetGreaterThanZero()
.SetDisplay("Slow Step", "Step size for slow envelope", "Indicator");
_signalCooldownBars = Param(nameof(SignalCooldownBars), 4)
.SetGreaterThanZero()
.SetDisplay("Signal Cooldown", "Bars to wait between reversals", "Trading");
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_fastMin = _fastMax = null;
_slowMin = _slowMax = null;
_fastTrend = _slowTrend = 0;
_prevFastBand = _prevSlowBand = null;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_fastMin = _fastMax = null;
_slowMin = _slowMax = null;
_fastTrend = _slowTrend = 0;
_prevFastBand = _prevSlowBand = null;
_cooldownRemaining = 0;
var rsi = new RelativeStrengthIndex { Length = RsiPeriod };
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(rsi, ProcessCandle)
.Start();
StartProtection(
takeProfit: new Unit(2, UnitTypes.Percent),
stopLoss: new Unit(1, UnitTypes.Percent)
);
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, rsi);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal rsiVal)
{
if (candle.State != CandleStates.Finished)
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var fStep = (decimal)FastStep;
var sStep = (decimal)SlowStep;
// fast envelope
var fastMinCand = rsiVal - 2m * fStep;
var fastMaxCand = rsiVal + 2m * fStep;
if (_fastMin == null || _fastMax == null)
{
_fastMin = fastMinCand;
_fastMax = fastMaxCand;
_fastTrend = 0;
_slowMin = rsiVal - 2m * sStep;
_slowMax = rsiVal + 2m * sStep;
_slowTrend = 0;
return;
}
// fast trend
if (rsiVal > _fastMax) _fastTrend = 1;
else if (rsiVal < _fastMin) _fastTrend = -1;
if (_fastTrend > 0 && fastMinCand < _fastMin) fastMinCand = _fastMin.Value;
else if (_fastTrend < 0 && fastMaxCand > _fastMax) fastMaxCand = _fastMax.Value;
// slow envelope
var slowMinCand = rsiVal - 2m * sStep;
var slowMaxCand = rsiVal + 2m * sStep;
if (rsiVal > _slowMax) _slowTrend = 1;
else if (rsiVal < _slowMin) _slowTrend = -1;
if (_slowTrend > 0 && slowMinCand < _slowMin) slowMinCand = _slowMin.Value;
else if (_slowTrend < 0 && slowMaxCand > _slowMax) slowMaxCand = _slowMax.Value;
// compute band values
decimal? fastBand = null;
if (_fastTrend > 0) fastBand = fastMinCand + fStep;
else if (_fastTrend < 0) fastBand = fastMaxCand - fStep;
decimal? slowBand = null;
if (_slowTrend > 0) slowBand = slowMinCand + sStep;
else if (_slowTrend < 0) slowBand = slowMaxCand - sStep;
_fastMin = fastMinCand;
_fastMax = fastMaxCand;
_slowMin = slowMinCand;
_slowMax = slowMaxCand;
if (fastBand == null || slowBand == null)
{
_prevFastBand = fastBand;
_prevSlowBand = slowBand;
return;
}
if (_prevFastBand == null || _prevSlowBand == null)
{
_prevFastBand = fastBand;
_prevSlowBand = slowBand;
return;
}
var up = fastBand.Value;
var down = slowBand.Value;
var prevUp = _prevFastBand.Value;
var prevDown = _prevSlowBand.Value;
_prevFastBand = fastBand;
_prevSlowBand = slowBand;
// Long signal: fast crosses below slow (up crosses down downward)
var longOpen = prevUp > prevDown && up <= down;
// Short signal: fast crosses above slow (up crosses down upward)
var shortOpen = prevUp < prevDown && up >= down;
if (_cooldownRemaining == 0 && longOpen && Position == 0)
{
BuyMarket();
_cooldownRemaining = SignalCooldownBars;
}
else if (_cooldownRemaining == 0 && shortOpen && Position == 0)
{
SellMarket();
_cooldownRemaining = SignalCooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RelativeStrengthIndex
from StockSharp.Algo.Strategies import Strategy
class color_metro_duplex_strategy(Strategy):
def __init__(self):
super(color_metro_duplex_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Timeframe", "General")
self._rsi_period = self.Param("RsiPeriod", 7) \
.SetDisplay("RSI Period", "RSI lookback", "Indicator")
self._fast_step = self.Param("FastStep", 8) \
.SetDisplay("Fast Step", "Step size for fast envelope", "Indicator")
self._slow_step = self.Param("SlowStep", 24) \
.SetDisplay("Slow Step", "Step size for slow envelope", "Indicator")
self._signal_cooldown_bars = self.Param("SignalCooldownBars", 4) \
.SetDisplay("Signal Cooldown", "Bars to wait between reversals", "Trading")
self._fast_min = None
self._fast_max = None
self._fast_trend = 0
self._prev_fast_band = None
self._slow_min = None
self._slow_max = None
self._slow_trend = 0
self._prev_slow_band = None
self._cooldown_remaining = 0
@property
def CandleType(self):
return self._candle_type.Value
@property
def RsiPeriod(self):
return self._rsi_period.Value
@property
def FastStep(self):
return self._fast_step.Value
@property
def SlowStep(self):
return self._slow_step.Value
@property
def SignalCooldownBars(self):
return self._signal_cooldown_bars.Value
def OnReseted(self):
super(color_metro_duplex_strategy, self).OnReseted()
self._fast_min = None
self._fast_max = None
self._fast_trend = 0
self._prev_fast_band = None
self._slow_min = None
self._slow_max = None
self._slow_trend = 0
self._prev_slow_band = None
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(color_metro_duplex_strategy, self).OnStarted2(time)
self._fast_min = None
self._fast_max = None
self._fast_trend = 0
self._prev_fast_band = None
self._slow_min = None
self._slow_max = None
self._slow_trend = 0
self._prev_slow_band = None
self._cooldown_remaining = 0
rsi = RelativeStrengthIndex()
rsi.Length = self.RsiPeriod
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(rsi, self._on_process).Start()
self.StartProtection(Unit(2, UnitTypes.Percent), Unit(1, UnitTypes.Percent))
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, rsi)
self.DrawOwnTrades(area)
def _on_process(self, candle, rsi_value):
if candle.State != CandleStates.Finished:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
rv = float(rsi_value)
f_step = float(self.FastStep)
s_step = float(self.SlowStep)
fast_min_cand = rv - 2.0 * f_step
fast_max_cand = rv + 2.0 * f_step
if self._fast_min is None or self._fast_max is None:
self._fast_min = fast_min_cand
self._fast_max = fast_max_cand
self._fast_trend = 0
self._slow_min = rv - 2.0 * s_step
self._slow_max = rv + 2.0 * s_step
self._slow_trend = 0
return
if rv > self._fast_max:
self._fast_trend = 1
elif rv < self._fast_min:
self._fast_trend = -1
if self._fast_trend > 0 and fast_min_cand < self._fast_min:
fast_min_cand = self._fast_min
elif self._fast_trend < 0 and fast_max_cand > self._fast_max:
fast_max_cand = self._fast_max
slow_min_cand = rv - 2.0 * s_step
slow_max_cand = rv + 2.0 * s_step
if rv > self._slow_max:
self._slow_trend = 1
elif rv < self._slow_min:
self._slow_trend = -1
if self._slow_trend > 0 and slow_min_cand < self._slow_min:
slow_min_cand = self._slow_min
elif self._slow_trend < 0 and slow_max_cand > self._slow_max:
slow_max_cand = self._slow_max
fast_band = None
if self._fast_trend > 0:
fast_band = fast_min_cand + f_step
elif self._fast_trend < 0:
fast_band = fast_max_cand - f_step
slow_band = None
if self._slow_trend > 0:
slow_band = slow_min_cand + s_step
elif self._slow_trend < 0:
slow_band = slow_max_cand - s_step
self._fast_min = fast_min_cand
self._fast_max = fast_max_cand
self._slow_min = slow_min_cand
self._slow_max = slow_max_cand
if fast_band is None or slow_band is None:
self._prev_fast_band = fast_band
self._prev_slow_band = slow_band
return
if self._prev_fast_band is None or self._prev_slow_band is None:
self._prev_fast_band = fast_band
self._prev_slow_band = slow_band
return
up = fast_band
down = slow_band
prev_up = self._prev_fast_band
prev_down = self._prev_slow_band
self._prev_fast_band = fast_band
self._prev_slow_band = slow_band
long_open = prev_up > prev_down and up <= down
short_open = prev_up < prev_down and up >= down
if self._cooldown_remaining == 0 and long_open and self.Position == 0:
self.BuyMarket()
self._cooldown_remaining = self.SignalCooldownBars
elif self._cooldown_remaining == 0 and short_open and self.Position == 0:
self.SellMarket()
self._cooldown_remaining = self.SignalCooldownBars
def CreateClone(self):
return color_metro_duplex_strategy()