Z-Score RSI 策略
该策略在价格的 z-score 上计算 RSI,并使用 RSI 的 EMA 作为信号。当 RSI 上穿其 EMA 时做多,下穿时做空。
细节
- 入场条件: z-score 的 RSI 上穿其 EMA
- 多空: 双向
- 出场条件: 反向穿越
- 止损: 无
- 默认值:
ZScoreLength= 20RsiLength= 9SmoothingLength= 15
- 筛选:
- 类别: 振荡器
- 方向: 双向
- 指标: SMA, StandardDeviation, RSI, EMA
- 止损: 无
- 复杂度: 基础
- 时间框架: 日内
- 季节性: 无
- 神经网络: 无
- 背离: 无
- 风险等级: 中等
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// RSI of price Z-Score with EMA smoothing.
/// Computes Z-score of price, feeds it to RSI, smooths with EMA.
/// Buys when RSI crosses above its EMA and sells on opposite cross.
/// </summary>
public class ZScoreRsiStrategy : Strategy
{
private readonly StrategyParam<int> _zScoreLength;
private readonly StrategyParam<int> _rsiLength;
private readonly StrategyParam<int> _smoothingLength;
private readonly StrategyParam<DataType> _candleType;
private readonly List<decimal> _closes = new();
private decimal _prevRsiZ;
private decimal _prevRsiMa;
private bool _hasPrev;
public int ZScoreLength { get => _zScoreLength.Value; set => _zScoreLength.Value = value; }
public int RsiLength { get => _rsiLength.Value; set => _rsiLength.Value = value; }
public int SmoothingLength { get => _smoothingLength.Value; set => _smoothingLength.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public ZScoreRsiStrategy()
{
_zScoreLength = Param(nameof(ZScoreLength), 20)
.SetGreaterThanZero()
.SetDisplay("Z-Score Length", "Length for mean and deviation", "Indicators");
_rsiLength = Param(nameof(RsiLength), 9)
.SetGreaterThanZero()
.SetDisplay("RSI Length", "Length for RSI", "Indicators");
_smoothingLength = Param(nameof(SmoothingLength), 15)
.SetGreaterThanZero()
.SetDisplay("RSI EMA Length", "EMA length over RSI", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
}
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
protected override void OnReseted()
{
base.OnReseted();
_closes.Clear();
_prevRsiZ = 0;
_prevRsiMa = 0;
_hasPrev = false;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Use SMA + StdDev via Bind to compute Z-score components
var sma = new SimpleMovingAverage { Length = ZScoreLength };
var stdDev = new StandardDeviation { Length = ZScoreLength };
_closes.Clear();
_prevRsiZ = 0;
_prevRsiMa = 0;
_hasPrev = false;
var subscription = SubscribeCandles(CandleType);
subscription.Bind(sma, stdDev, ProcessCandle).Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal meanVal, decimal stdVal)
{
if (candle.State != CandleStates.Finished)
return;
if (stdVal <= 0)
return;
// Compute Z-score
var z = (candle.ClosePrice - meanVal) / stdVal;
// Manual RSI on Z-score values
_closes.Add(z);
if (_closes.Count > RsiLength + SmoothingLength + 10)
_closes.RemoveAt(0);
if (_closes.Count < RsiLength + 1)
return;
// Calculate RSI manually on Z-score series
decimal avgGain = 0, avgLoss = 0;
for (int i = _closes.Count - RsiLength; i < _closes.Count; i++)
{
var change = _closes[i] - _closes[i - 1];
if (change > 0) avgGain += change;
else avgLoss += Math.Abs(change);
}
avgGain /= RsiLength;
avgLoss /= RsiLength;
decimal rsiZ;
if (avgLoss == 0)
rsiZ = 100;
else
{
var rs = avgGain / avgLoss;
rsiZ = 100 - (100 / (1 + rs));
}
// EMA smoothing of RSI
decimal rsiMa;
if (!_hasPrev)
{
rsiMa = rsiZ;
_prevRsiZ = rsiZ;
_prevRsiMa = rsiMa;
_hasPrev = true;
return;
}
var k = 2m / (SmoothingLength + 1);
rsiMa = rsiZ * k + _prevRsiMa * (1 - k);
// Crossover signals
var crossUp = _prevRsiZ <= _prevRsiMa && rsiZ > rsiMa;
var crossDown = _prevRsiZ >= _prevRsiMa && rsiZ < rsiMa;
if (crossUp && Position <= 0)
{
BuyMarket();
}
else if (crossDown && Position >= 0)
{
SellMarket();
}
_prevRsiZ = rsiZ;
_prevRsiMa = rsiMa;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
class z_score_rsi_strategy(Strategy):
def __init__(self):
super(z_score_rsi_strategy, self).__init__()
self._z_score_length = self.Param("ZScoreLength", 20) \
.SetDisplay("Z-Score Length", "Length for mean and deviation", "Indicators")
self._rsi_length = self.Param("RsiLength", 9) \
.SetDisplay("RSI Length", "Length for RSI", "Indicators")
self._smoothing_length = self.Param("SmoothingLength", 15) \
.SetDisplay("RSI EMA Length", "EMA length over RSI", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._closes = []
self._prev_rsi_z = 0.0
self._prev_rsi_ma = 0.0
self._has_prev = False
@property
def z_score_length(self):
return self._z_score_length.Value
@property
def rsi_length(self):
return self._rsi_length.Value
@property
def smoothing_length(self):
return self._smoothing_length.Value
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(z_score_rsi_strategy, self).OnReseted()
self._closes = []
self._prev_rsi_z = 0.0
self._prev_rsi_ma = 0.0
self._has_prev = False
def OnStarted2(self, time):
super(z_score_rsi_strategy, self).OnStarted2(time)
sma = SimpleMovingAverage()
sma.Length = self.z_score_length
std_dev = StandardDeviation()
std_dev.Length = self.z_score_length
self._closes = []
self._prev_rsi_z = 0.0
self._prev_rsi_ma = 0.0
self._has_prev = False
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(sma, std_dev, self.on_process).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def on_process(self, candle, mean_val, std_val):
if candle.State != CandleStates.Finished:
return
if std_val <= 0:
return
z = (float(candle.ClosePrice) - float(mean_val)) / float(std_val)
self._closes.append(z)
if len(self._closes) > self.rsi_length + self.smoothing_length + 10:
self._closes.pop(0)
if len(self._closes) < self.rsi_length + 1:
return
avg_gain = 0.0
avg_loss = 0.0
start = len(self._closes) - self.rsi_length
for i in range(start, len(self._closes)):
change = self._closes[i] - self._closes[i - 1]
if change > 0:
avg_gain += change
else:
avg_loss += abs(change)
avg_gain /= self.rsi_length
avg_loss /= self.rsi_length
if avg_loss == 0:
rsi_z = 100.0
else:
rs = avg_gain / avg_loss
rsi_z = 100.0 - (100.0 / (1.0 + rs))
if not self._has_prev:
rsi_ma = rsi_z
self._prev_rsi_z = rsi_z
self._prev_rsi_ma = rsi_ma
self._has_prev = True
return
k = 2.0 / (self.smoothing_length + 1)
rsi_ma = rsi_z * k + self._prev_rsi_ma * (1.0 - k)
cross_up = self._prev_rsi_z <= self._prev_rsi_ma and rsi_z > rsi_ma
cross_down = self._prev_rsi_z >= self._prev_rsi_ma and rsi_z < rsi_ma
if cross_up and self.Position <= 0:
self.BuyMarket()
elif cross_down and self.Position >= 0:
self.SellMarket()
self._prev_rsi_z = rsi_z
self._prev_rsi_ma = rsi_ma
def CreateClone(self):
return z_score_rsi_strategy()