Force DiverSign 策略
Force DiverSign 策略基于两个使用不同平滑周期的 Force Index 指标之间的背离信号进行交易。 该策略寻找由三根K线组成的反转形态,并结合快慢 Force 值的相反摆动。 出现看涨背离时买入,出现看跌背离时卖出。
参数
Period1– 快速 Force Index 的周期。Period2– 慢速 Force Index 的周期。MaType1– 用于平滑快速 Force Index 的均线类型。MaType2– 用于平滑慢速 Force Index 的均线类型。CandleType– 计算所用的K线时间框架。
交易逻辑
- 计算原始 Force Index:成交量乘以收盘价变化。
- 使用两条均线对原始值进行平滑,得到快慢两个 Force 序列。
- 保存最近五个 Force 值和最近四根K线。
- 买入 条件:
- 前三根K线形成下–上–下的形态;
- 两个 Force 序列形成局部底并随后上升;
- 第一次和第三次K线之间快慢 Force 方向相反。
- 卖出 条件:
- 前三根K线形成上–下–上的形态;
- 两个 Force 序列形成局部顶并随后下降;
- 第一次和第三次K线之间快慢 Force 方向相反。
每个信号都会反转持仓:买入平掉空头,卖出平掉多头。
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Force DiverSign strategy.
/// Detects divergence between fast and slow Force Index values
/// combined with a specific candle pattern.
/// </summary>
public class ForceDiverSignStrategy : Strategy
{
private readonly StrategyParam<int> _period1;
private readonly StrategyParam<int> _period2;
private readonly StrategyParam<DataType> _candleType;
private readonly decimal[] _opens = new decimal[5];
private readonly decimal[] _closes = new decimal[5];
private readonly decimal[] _f1 = new decimal[5];
private readonly decimal[] _f2 = new decimal[5];
private decimal _prevClose;
private int _count;
public int Period1 { get => _period1.Value; set => _period1.Value = value; }
public int Period2 { get => _period2.Value; set => _period2.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public ForceDiverSignStrategy()
{
_period1 = Param(nameof(Period1), 3)
.SetGreaterThanZero()
.SetDisplay("Fast Period", "Period for fast Force index", "Indicators");
_period2 = Param(nameof(Period2), 7)
.SetGreaterThanZero()
.SetDisplay("Slow Period", "Period for slow Force index", "Indicators");
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_prevClose = default;
_count = default;
Array.Clear(_opens);
Array.Clear(_closes);
Array.Clear(_f1);
Array.Clear(_f2);
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var ma1 = new ExponentialMovingAverage { Length = Period1 };
var ma2 = new ExponentialMovingAverage { Length = Period2 };
Indicators.Add(ma1);
Indicators.Add(ma2);
var subscription = SubscribeCandles(CandleType);
subscription.Bind(candle =>
{
if (candle.State != CandleStates.Finished)
return;
if (_count == 0)
{
_prevClose = candle.ClosePrice;
Shift(_opens, candle.OpenPrice);
Shift(_closes, candle.ClosePrice);
_count++;
return;
}
var force = (candle.ClosePrice - _prevClose) * candle.TotalVolume;
_prevClose = candle.ClosePrice;
var f1v = ma1.Process(force, candle.OpenTime, true);
var f2v = ma2.Process(force, candle.OpenTime, true);
Shift(_opens, candle.OpenPrice);
Shift(_closes, candle.ClosePrice);
if (f1v.IsEmpty || f2v.IsEmpty)
{
_count++;
return;
}
var f1 = f1v.ToDecimal();
var f2 = f2v.ToDecimal();
Shift(_f1, f1);
Shift(_f2, f2);
if (_count < 5)
{
_count++;
return;
}
if (!IsFormedAndOnlineAndAllowTrading())
return;
var sellSignal = _opens[3] < _closes[3] && _opens[2] > _closes[2] && _opens[1] < _closes[1]
&& _f1[4] < _f1[3] && _f1[3] > _f1[2] && _f1[2] < _f1[1]
&& _f2[4] < _f2[3] && _f2[3] > _f2[2] && _f2[2] < _f2[1]
&& ((_f1[3] > _f1[1] && _f2[3] < _f2[1]) || (_f1[3] < _f1[1] && _f2[3] > _f2[1]));
var buySignal = _opens[3] > _closes[3] && _opens[2] < _closes[2] && _opens[1] > _closes[1]
&& _f1[4] > _f1[3] && _f1[3] < _f1[2] && _f1[2] > _f1[1]
&& _f2[4] > _f2[3] && _f2[3] < _f2[2] && _f2[2] > _f2[1]
&& ((_f1[3] > _f1[1] && _f2[3] < _f2[1]) || (_f1[3] < _f1[1] && _f2[3] > _f2[1]));
if (buySignal && Position <= 0)
{
if (Position < 0)
BuyMarket();
BuyMarket();
}
else if (sellSignal && Position >= 0)
{
if (Position > 0)
SellMarket();
SellMarket();
}
}).Start();
}
private static void Shift(decimal[] array, decimal value)
{
for (var i = array.Length - 1; i > 0; i--)
array[i] = array[i - 1];
array[0] = value;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import ExponentialMovingAverage
from StockSharp.Algo.Strategies import Strategy
from indicator_extensions import *
class force_diver_sign_strategy(Strategy):
"""
Force DiverSign: detects divergence between fast and slow Force Index.
Uses candle pattern (alternating bull/bear) combined with force divergence.
"""
def __init__(self):
super(force_diver_sign_strategy, self).__init__()
self._period1 = self.Param("Period1", 3) \
.SetDisplay("Fast Period", "Period for fast Force index", "Indicators")
self._period2 = self.Param("Period2", 7) \
.SetDisplay("Slow Period", "Period for slow Force index", "Indicators")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5))) \
.SetDisplay("Candle Type", "Type of candles", "General")
self._opens = [0.0] * 5
self._closes = [0.0] * 5
self._f1 = [0.0] * 5
self._f2 = [0.0] * 5
self._prev_close = 0.0
self._count = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(force_diver_sign_strategy, self).OnReseted()
self._opens = [0.0] * 5
self._closes = [0.0] * 5
self._f1 = [0.0] * 5
self._f2 = [0.0] * 5
self._prev_close = 0.0
self._count = 0
def OnStarted2(self, time):
super(force_diver_sign_strategy, self).OnStarted2(time)
self._ma1 = ExponentialMovingAverage()
self._ma1.Length = self._period1.Value
self._ma2 = ExponentialMovingAverage()
self._ma2.Length = self._period2.Value
self.Indicators.Add(self._ma1)
self.Indicators.Add(self._ma2)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawOwnTrades(area)
def _shift(self, arr, value):
for i in range(len(arr) - 1, 0, -1):
arr[i] = arr[i - 1]
arr[0] = value
def _process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
if self._count == 0:
self._prev_close = float(candle.ClosePrice)
self._shift(self._opens, float(candle.OpenPrice))
self._shift(self._closes, float(candle.ClosePrice))
self._count += 1
return
close = float(candle.ClosePrice)
volume = float(candle.TotalVolume)
force = (close - self._prev_close) * volume
self._prev_close = close
f1v = process_float(self._ma1, force, candle.OpenTime, True)
f2v = process_float(self._ma2, force, candle.OpenTime, True)
self._shift(self._opens, float(candle.OpenPrice))
self._shift(self._closes, close)
if f1v.IsEmpty or f2v.IsEmpty:
self._count += 1
return
f1 = float(f1v)
f2 = float(f2v)
self._shift(self._f1, f1)
self._shift(self._f2, f2)
if self._count < 5:
self._count += 1
return
sell_signal = (self._opens[3] < self._closes[3] and
self._opens[2] > self._closes[2] and
self._opens[1] < self._closes[1] and
self._f1[4] < self._f1[3] and self._f1[3] > self._f1[2] and self._f1[2] < self._f1[1] and
self._f2[4] < self._f2[3] and self._f2[3] > self._f2[2] and self._f2[2] < self._f2[1] and
((self._f1[3] > self._f1[1] and self._f2[3] < self._f2[1]) or
(self._f1[3] < self._f1[1] and self._f2[3] > self._f2[1])))
buy_signal = (self._opens[3] > self._closes[3] and
self._opens[2] < self._closes[2] and
self._opens[1] > self._closes[1] and
self._f1[4] > self._f1[3] and self._f1[3] < self._f1[2] and self._f1[2] > self._f1[1] and
self._f2[4] > self._f2[3] and self._f2[3] < self._f2[2] and self._f2[2] > self._f2[1] and
((self._f1[3] > self._f1[1] and self._f2[3] < self._f2[1]) or
(self._f1[3] < self._f1[1] and self._f2[3] > self._f2[1])))
if buy_signal and self.Position <= 0:
if self.Position < 0:
self.BuyMarket()
self.BuyMarket()
elif sell_signal and self.Position >= 0:
if self.Position > 0:
self.SellMarket()
self.SellMarket()
def CreateClone(self):
return force_diver_sign_strategy()