Super Simple RSI Engulfing 策略
该策略把 MetaTrader 上的 SSEATwRSI 专家顾问迁移到 StockSharp。系统仅处理已经收盘的K线,并在每根K线的最高价上计算 7 周期 RSI。只有当 RSI 到达极值并且前两根K线组成清晰的反转型吞没形态时,才会触发交易信号。
做多需要 RSI 高于设定的超买阈值,同时两根历史K线形成“先跌后涨”的吞没结构:两根之前的看跌K线被后一根看涨K线完全吞没,并且收盘价高于较早K线的开盘价。做空逻辑相反:RSI 跌破超卖阈值,最近两根K线形成看涨转看跌的吞没。仓位规模由 Volume 参数固定,在开立新方向之前会自动平掉相反持仓。
持仓后策略持续监控整体盈亏。当浮动盈亏达到 ProfitGoal 设定的货币利润目标,或者跌破 -MaxLoss 允许的最大亏损时,会立即清仓。策略没有额外的移动止损,完全依赖吞没形态和账户级别的盈亏阈值来管理风险。
细节
- 入场条件:
- 做多:RSI(最高价) >
OverboughtLevel,最近一根K线吞没两根前的看跌K线,并且收盘价高于那根K线的开盘价。 - 做空:RSI(最高价) <
OversoldLevel,最近一根K线吞没两根前的看涨K线,并且收盘价低于那根K线的开盘价。
- 做多:RSI(最高价) >
- 多空方向:双向。
- 离场条件:
- 总盈亏 ≥
ProfitGoal→ 平仓。 - 总盈亏 ≤
-MaxLoss→ 平仓。 - 当出现反向信号时,新订单会自动冲销旧仓位。
- 总盈亏 ≥
- 止损/止盈:基于策略总盈亏的货币目标和最大亏损控制。
- 过滤器:
- RSI 使用最高价计算,以突出极端的延伸走势。
- 需要两根K线的吞没反转作为确认。
参数
Volume= 0.1 – 下单手数,在新方向建仓前会先对冲已有反向仓位。ProfitGoal= 190 – 账户货币计价的盈利目标,达到后立即平仓。MaxLoss= 10 – 账户货币计价的最大亏损,触发条件为总盈亏 ≤-MaxLoss。RsiPeriod= 7 – RSI 的计算周期。RsiPrice= High – RSI 使用的价格源。OverboughtLevel= 88 – 触发做多前需要突破的 RSI 超买阈值。OversoldLevel= 37 – 触发做空前需要跌破的 RSI 超卖阈值。CandleType= 默认使用 1 小时K线,可根据原始图表调整。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// RSI filter combined with engulfing candle pattern taken from the SSEATwRSI expert advisor.
/// </summary>
public class SuperSimpleRsiEngulfingStrategy : Strategy
{
public enum CandlePrices
{
Open,
High,
Low,
Close,
Median,
Typical,
Weighted
}
private readonly StrategyParam<decimal> _profitGoal;
private readonly StrategyParam<decimal> _maxLoss;
private readonly StrategyParam<int> _rsiPeriod;
private readonly StrategyParam<CandlePrices> _rsiPrice;
private readonly StrategyParam<decimal> _overboughtLevel;
private readonly StrategyParam<decimal> _oversoldLevel;
private readonly StrategyParam<DataType> _candleType;
private RelativeStrengthIndex _rsi = null!;
private decimal? _prevOpen;
private decimal? _prevClose;
private decimal? _prevPrevOpen;
private decimal? _prevPrevClose;
/// <summary>
/// Currency profit target that forces a flatten.
/// </summary>
public decimal ProfitGoal
{
get => _profitGoal.Value;
set => _profitGoal.Value = value;
}
/// <summary>
/// Maximum allowed currency loss before closing positions.
/// </summary>
public decimal MaxLoss
{
get => _maxLoss.Value;
set => _maxLoss.Value = value;
}
/// <summary>
/// RSI averaging period.
/// </summary>
public int RsiPeriod
{
get => _rsiPeriod.Value;
set => _rsiPeriod.Value = value;
}
/// <summary>
/// Price source used by the RSI indicator.
/// </summary>
public CandlePrices RsiPrice
{
get => _rsiPrice.Value;
set => _rsiPrice.Value = value;
}
/// <summary>
/// RSI level considered overbought.
/// </summary>
public decimal OverboughtLevel
{
get => _overboughtLevel.Value;
set => _overboughtLevel.Value = value;
}
/// <summary>
/// RSI level considered oversold.
/// </summary>
public decimal OversoldLevel
{
get => _oversoldLevel.Value;
set => _oversoldLevel.Value = value;
}
/// <summary>
/// Candle type processed by the strategy.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="SuperSimpleRsiEngulfingStrategy"/>.
/// </summary>
public SuperSimpleRsiEngulfingStrategy()
{
_profitGoal = Param(nameof(ProfitGoal), 190m)
.SetGreaterThanZero()
.SetDisplay("Profit Goal", "Currency profit target to flatten", "Risk");
_maxLoss = Param(nameof(MaxLoss), 10m)
.SetGreaterThanZero()
.SetDisplay("Max Loss", "Maximum currency drawdown before flattening", "Risk");
_rsiPeriod = Param(nameof(RsiPeriod), 7)
.SetGreaterThanZero()
.SetDisplay("RSI Period", "RSI averaging period", "Indicators");
_rsiPrice = Param(nameof(RsiPrice), CandlePrices.High)
.SetDisplay("RSI Price", "Price source for RSI", "Indicators");
_overboughtLevel = Param(nameof(OverboughtLevel), 88m)
.SetDisplay("Overbought Level", "RSI threshold for bullish reversals", "Indicators");
_oversoldLevel = Param(nameof(OversoldLevel), 37m)
.SetDisplay("Oversold Level", "RSI threshold for bearish reversals", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Candle series to process", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_rsi = null!;
_prevOpen = null;
_prevClose = null;
_prevPrevOpen = null;
_prevPrevClose = null;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_rsi = new RelativeStrengthIndex { Length = RsiPeriod };
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _rsi);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var price = GetPrice(candle, RsiPrice);
var rsiResult = _rsi.Process(new DecimalIndicatorValue(_rsi, price, candle.OpenTime) { IsFinal = true });
if (!_rsi.IsFormed)
{
UpdateHistory(candle);
return;
}
var rsiValue = rsiResult.ToDecimal();
if (_prevOpen is decimal prevOpen &&
_prevClose is decimal prevClose &&
_prevPrevOpen is decimal prevPrevOpen &&
_prevPrevClose is decimal prevPrevClose)
{
// Detect the two-candle engulfing pattern from the previous bars.
var bullishEngulfing = prevPrevOpen > prevPrevClose &&
prevOpen < prevClose &&
prevPrevOpen < prevClose;
var bearishEngulfing = prevPrevOpen < prevPrevClose &&
prevOpen > prevClose &&
prevPrevOpen > prevClose;
// Only enter long if RSI indicates overbought exhaustion and pattern flips to bullish.
var longSignal = rsiValue > OverboughtLevel && bullishEngulfing && Position <= 0m;
// Only enter short if RSI indicates oversold exhaustion and pattern flips to bearish.
var shortSignal = rsiValue < OversoldLevel && bearishEngulfing && Position >= 0m;
if (longSignal)
{
BuyMarket();
}
else if (shortSignal)
{
SellMarket();
}
}
if (Position != 0m)
{
// Flatten the position once floating PnL reaches the configured thresholds.
var totalPnL = PnL;
if (totalPnL >= ProfitGoal || totalPnL <= -MaxLoss)
ClosePosition();
}
UpdateHistory(candle);
}
private void ClosePosition()
{
if (Position > 0m)
SellMarket();
else if (Position < 0m)
BuyMarket();
}
private void UpdateHistory(ICandleMessage candle)
{
// Shift the last two completed candles so pattern checks use historical data only.
_prevPrevOpen = _prevOpen;
_prevPrevClose = _prevClose;
_prevOpen = candle.OpenPrice;
_prevClose = candle.ClosePrice;
}
private static decimal GetPrice(ICandleMessage candle, CandlePrices price)
{
// Support different RSI inputs without duplicating indicator logic.
return price switch
{
CandlePrices.Open => candle.OpenPrice,
CandlePrices.High => candle.HighPrice,
CandlePrices.Low => candle.LowPrice,
CandlePrices.Close => candle.ClosePrice,
CandlePrices.Median => (candle.HighPrice + candle.LowPrice) / 2m,
CandlePrices.Typical => (candle.HighPrice + candle.LowPrice + candle.ClosePrice) / 3m,
CandlePrices.Weighted => (candle.HighPrice + candle.LowPrice + 2m * candle.ClosePrice) / 4m,
_ => candle.ClosePrice,
};
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Decimal
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
from StockSharp.Algo.Indicators import RelativeStrengthIndex
from indicator_extensions import *
class super_simple_rsi_engulfing_strategy(Strategy):
"""RSI filter combined with engulfing candle pattern for reversals."""
def __init__(self):
super(super_simple_rsi_engulfing_strategy, self).__init__()
self._profit_goal = self.Param("ProfitGoal", 190.0) \
.SetGreaterThanZero() \
.SetDisplay("Profit Goal", "Currency profit target to flatten", "Risk")
self._max_loss = self.Param("MaxLoss", 10.0) \
.SetGreaterThanZero() \
.SetDisplay("Max Loss", "Maximum currency drawdown before flattening", "Risk")
self._rsi_period = self.Param("RsiPeriod", 7) \
.SetGreaterThanZero() \
.SetDisplay("RSI Period", "RSI averaging period", "Indicators")
# RsiPrice: 0=Open, 1=High, 2=Low, 3=Close, 4=Median, 5=Typical, 6=Weighted
self._rsi_price = self.Param("RsiPrice", 1) \
.SetDisplay("RSI Price", "Price source for RSI (0=O,1=H,2=L,3=C,4=Med,5=Typ,6=Wt)", "Indicators")
self._overbought_level = self.Param("OverboughtLevel", 88.0) \
.SetDisplay("Overbought Level", "RSI threshold for bullish reversals", "Indicators")
self._oversold_level = self.Param("OversoldLevel", 37.0) \
.SetDisplay("Oversold Level", "RSI threshold for bearish reversals", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Candle series to process", "General")
self._prev_open = None
self._prev_close = None
self._prev_prev_open = None
self._prev_prev_close = None
@property
def ProfitGoal(self):
return float(self._profit_goal.Value)
@property
def MaxLoss(self):
return float(self._max_loss.Value)
@property
def RsiPeriod(self):
return int(self._rsi_period.Value)
@property
def RsiPrice(self):
return int(self._rsi_price.Value)
@property
def OverboughtLevel(self):
return float(self._overbought_level.Value)
@property
def OversoldLevel(self):
return float(self._oversold_level.Value)
@property
def CandleType(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(super_simple_rsi_engulfing_strategy, self).OnStarted2(time)
self._prev_open = None
self._prev_close = None
self._prev_prev_open = None
self._prev_prev_close = None
self._rsi = RelativeStrengthIndex()
self._rsi.Length = self.RsiPeriod
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._rsi)
self.DrawOwnTrades(area)
def _get_price(self, candle):
price_type = self.RsiPrice
h = float(candle.HighPrice)
lo = float(candle.LowPrice)
c = float(candle.ClosePrice)
o = float(candle.OpenPrice)
if price_type == 0:
return o
elif price_type == 1:
return h
elif price_type == 2:
return lo
elif price_type == 3:
return c
elif price_type == 4:
return (h + lo) / 2.0
elif price_type == 5:
return (h + lo + c) / 3.0
elif price_type == 6:
return (h + lo + 2.0 * c) / 4.0
return c
def process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
price = self._get_price(candle)
rsi_result = process_float(self._rsi, Decimal(price), candle.ServerTime, True)
if not self._rsi.IsFormed:
self._update_history(candle)
return
rsi_value = float(rsi_result.Value)
if (self._prev_open is not None and self._prev_close is not None
and self._prev_prev_open is not None and self._prev_prev_close is not None):
prev_open = self._prev_open
prev_close = self._prev_close
prev_prev_open = self._prev_prev_open
prev_prev_close = self._prev_prev_close
bullish_engulfing = (prev_prev_open > prev_prev_close
and prev_open < prev_close
and prev_prev_open < prev_close)
bearish_engulfing = (prev_prev_open < prev_prev_close
and prev_open > prev_close
and prev_prev_open > prev_close)
long_signal = rsi_value > self.OverboughtLevel and bullish_engulfing and self.Position <= 0
short_signal = rsi_value < self.OversoldLevel and bearish_engulfing and self.Position >= 0
if long_signal:
self.BuyMarket()
elif short_signal:
self.SellMarket()
if self.Position != 0:
total_pnl = float(self.PnL)
if total_pnl >= self.ProfitGoal or total_pnl <= -self.MaxLoss:
if self.Position > 0:
self.SellMarket()
elif self.Position < 0:
self.BuyMarket()
self._update_history(candle)
def _update_history(self, candle):
self._prev_prev_open = self._prev_open
self._prev_prev_close = self._prev_close
self._prev_open = float(candle.OpenPrice)
self._prev_close = float(candle.ClosePrice)
def OnReseted(self):
super(super_simple_rsi_engulfing_strategy, self).OnReseted()
self._prev_open = None
self._prev_close = None
self._prev_prev_open = None
self._prev_prev_close = None
def CreateClone(self):
return super_simple_rsi_engulfing_strategy()