FORTS 高频点差策略
概述
该策略模拟 FORTS 市场上的高频点差交易。它持续监控订单簿,在买卖两侧挂出限价单以获取买卖价差。
策略逻辑
- 订阅实时订单簿更新。
- 当没有持仓且当前价差达到
SpreadMultiplier参数指定的最小刻度数量时:- 在最佳买价上方一个最小价位挂出买入限价单。
- 在最佳卖价下方一个最小价位挂出卖出限价单。
- 如果已有持仓且没有挂单,则在相反方向挂出一个限价单,用于平仓并反向开仓。
- 当最佳买卖价发生变化时,策略会取消旧订单并在新价位重新挂单,以保证始终位于盘口第一档。
参数
SpreadMultiplier– 触发双向挂单所需的价差(以最小价位计),默认 4。Volume– 下单数量,默认为 1 手。
使用说明
- 适用于最小变动价位很小的合约,如 FORTS 期货。
- 策略仅使用限价单;除非保护机制触发,否则不会发送市价单。
- 需要在流动性充足且延迟较低的环境中运行。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that captures the spread on FORTS by trading when spread is wide enough.
/// </summary>
public class HftSpreaderForFortsStrategy : Strategy
{
private readonly StrategyParam<int> _spreadMultiplier;
private readonly StrategyParam<DataType> _candleType;
/// <summary>
/// Required spread in ticks to place both buy and sell orders.
/// </summary>
public int SpreadMultiplier
{
get => _spreadMultiplier.Value;
set => _spreadMultiplier.Value = value;
}
/// <summary>
/// Candle type for price feed.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initialize <see cref="HftSpreaderForFortsStrategy"/>.
/// </summary>
public HftSpreaderForFortsStrategy()
{
_spreadMultiplier = Param(nameof(SpreadMultiplier), 4)
.SetGreaterThanZero()
.SetDisplay("Spread Multiplier", "Spread ticks required to place orders", "General")
.SetOptimize(1, 10, 1);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(1).TimeFrame())
.SetDisplay("Candle Type", "Candle type for price feed", "General");
Volume = 1;
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
StartProtection(null, null);
var subscription = SubscribeCandles(CandleType);
subscription.Bind(ProcessCandle).Start();
}
private void ProcessCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var step = Security.PriceStep ?? 1m;
var spread = candle.HighPrice - candle.LowPrice;
if (spread >= SpreadMultiplier * step)
{
if (Position == 0)
{
// Wide spread detected - buy at low, will exit when spread narrows
BuyMarket();
}
else if (Position > 0)
{
// Close long position for spread capture
SellMarket();
}
else if (Position < 0)
{
// Close short position for spread capture
BuyMarket();
}
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Strategies import Strategy
class hft_spreader_for_forts_strategy(Strategy):
"""
HFT Spreader for FORTS: trades when candle spread is wide enough.
Opens and closes based on high-low range vs spread multiplier.
"""
def __init__(self):
super(hft_spreader_for_forts_strategy, self).__init__()
self._spread_multiplier = self.Param("SpreadMultiplier", 4) \
.SetDisplay("Spread Multiplier", "Spread ticks required to place orders", "General")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(1))) \
.SetDisplay("Candle Type", "Candle type for price feed", "General")
@property
def candle_type(self):
return self._candle_type.Value
def OnStarted2(self, time):
super(hft_spreader_for_forts_strategy, self).OnStarted2(time)
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._process_candle).Start()
def _process_candle(self, candle):
if candle.State != CandleStates.Finished:
return
step = 1.0
if self.Security is not None and self.Security.PriceStep is not None:
step = float(self.Security.PriceStep)
if step <= 0:
step = 1.0
spread = float(candle.HighPrice) - float(candle.LowPrice)
if spread >= self._spread_multiplier.Value * step:
if self.Position == 0:
self.BuyMarket()
elif self.Position > 0:
self.SellMarket()
elif self.Position < 0:
self.BuyMarket()
def CreateClone(self):
return hft_spreader_for_forts_strategy()