KPrmSt Cross 策略
概述
KPrmSt Cross 策略移植自 MetaTrader 5 专家 exp_kprmst.mq5。策略使用类似随机指标的 KPrmSt 振荡器,当主线与信号线交叉时捕捉趋势反转。
策略订阅指定时间框架的K线,并计算 Stochastic 指标(作为 KPrmSt 的近似)。当 %K 线从上向下穿越 %D 线时开多仓;当 %K 线从下向上穿越 %D 线时开空仓,并相应地关闭已有仓位。
参数
Candle Type– 计算所用 K 线的时间框架。K Period– 主线计算的周期。D Period– 信号线的平滑周期。Slowing– 对 %K 线的额外平滑。Stop Loss– 以价格单位表示的止损,0 表示禁用。Take Profit– 以价格单位表示的止盈,0 表示禁用。
交易逻辑
- 仅处理已经完成的 K 线。
- 保存振荡器的历史值以检测交叉。
- 当 %K 由上向下穿越 %D 时,开多仓或平空仓。
- 当 %K 由下向上穿越 %D 时,开空仓或平多仓。
- 可选的止损和止盈在达到设定的价格时平仓。
说明
- 原始 KPrmSt 指标用 StockSharp 的
Stochastic指标进行近似。 - 原专家中的资金管理功能未实现。
- 策略需要支持的行情数据与交易通道才能运行。
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// KPrmSt cross strategy based on the stochastic oscillator.
/// Opens long when %K crosses above %D from below.
/// Opens short when %K crosses below %D from above.
/// </summary>
public class KprmStCrossStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<decimal> _stopLossPct;
private readonly StrategyParam<decimal> _takeProfitPct;
private StochasticOscillator _stochastic;
private decimal? _prevK;
private decimal? _prevD;
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public decimal StopLossPct
{
get => _stopLossPct.Value;
set => _stopLossPct.Value = value;
}
public decimal TakeProfitPct
{
get => _takeProfitPct.Value;
set => _takeProfitPct.Value = value;
}
public KprmStCrossStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for indicator calculation", "General");
_stopLossPct = Param(nameof(StopLossPct), 2m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_takeProfitPct = Param(nameof(TakeProfitPct), 3m)
.SetDisplay("Take Profit %", "Take profit percentage", "Risk");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_stochastic = default;
_prevK = null;
_prevD = null;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_stochastic = new StochasticOscillator();
Indicators.Add(_stochastic);
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
StartProtection(
takeProfit: new Unit(TakeProfitPct, UnitTypes.Percent),
stopLoss: new Unit(StopLossPct, UnitTypes.Percent),
useMarketOrders: true);
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var stochResult = _stochastic.Process(candle);
if (!stochResult.IsFormed)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
var stochVal = (StochasticOscillatorValue)stochResult;
if (stochVal.K is not decimal k || stochVal.D is not decimal d)
return;
if (_prevK.HasValue && _prevD.HasValue)
{
var wasBelow = _prevK.Value < _prevD.Value;
var isAbove = k > d;
// K crosses above D -> buy
if (wasBelow && isAbove && Position <= 0)
{
if (Position < 0) BuyMarket();
BuyMarket();
}
// K crosses below D -> sell
else if (!wasBelow && !isAbove && _prevK.Value > _prevD.Value && Position >= 0)
{
if (Position > 0) SellMarket();
SellMarket();
}
}
_prevK = k;
_prevD = d;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import StochasticOscillator, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
class kprm_st_cross_strategy(Strategy):
"""
KPrmSt cross strategy using Stochastic K/D crossover.
Buys when K crosses above D, sells when K crosses below D.
Uses StartProtection for percentage-based SL/TP.
"""
def __init__(self):
super(kprm_st_cross_strategy, self).__init__()
self._stop_loss_pct = self.Param("StopLossPct", 2.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._take_profit_pct = self.Param("TakeProfitPct", 3.0) \
.SetDisplay("Take Profit %", "Take profit percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame", "General")
self._stochastic = None
self._prev_k = None
self._prev_d = None
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(kprm_st_cross_strategy, self).OnReseted()
self._prev_k = None
self._prev_d = None
def OnStarted2(self, time):
super(kprm_st_cross_strategy, self).OnStarted2(time)
self._stochastic = StochasticOscillator()
self.Indicators.Add(self._stochastic)
self.StartProtection(
Unit(self._take_profit_pct.Value, UnitTypes.Percent),
Unit(self._stop_loss_pct.Value, UnitTypes.Percent),
useMarketOrders=True)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def _process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
cv = CandleIndicatorValue(self._stochastic, candle)
stoch_result = self._stochastic.Process(cv)
if not stoch_result.IsFormed:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
k = stoch_result.K
d = stoch_result.D
if k is None or d is None:
return
k = float(k)
d = float(d)
if self._prev_k is not None and self._prev_d is not None:
was_below = self._prev_k < self._prev_d
is_above = k > d
if was_below and is_above and self.Position <= 0:
if self.Position < 0:
self.BuyMarket()
self.BuyMarket()
elif not was_below and not is_above and self._prev_k > self._prev_d and self.Position >= 0:
if self.Position > 0:
self.SellMarket()
self.SellMarket()
self._prev_k = k
self._prev_d = d
def CreateClone(self):
return kprm_st_cross_strategy()