Exp TSI CCI 策略
该策略基于商品通道指数 (CCI) 计算真实强度指数 (TSI),并根据与信号线的交叉进行交易。
逻辑
- 使用指定周期计算 CCI。
- 将 CCI 数值传入 TSI 指标,设置短期和长期平滑长度。
- 对 TSI 结果应用 EMA 以生成信号线。
- 当 TSI 上穿信号线时做多。
- 当 TSI 下穿信号线时做空。
参数
Candle Type– 使用的K线周期。CCI Period– CCI 计算周期。TSI Short Length– TSI 短期平滑长度。TSI Long Length– TSI 长期平滑长度。Signal Length– TSI信号线的EMA长度。
指标
- Commodity Channel Index
- True Strength Index
- Exponential Moving Average
免责声明
本策略仅用于教育目的,不构成投资建议。
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy based on True Strength Index crossover filtered by Commodity Channel Index.
/// Opens long when TSI crosses above its signal line and CCI is positive,
/// opens short when TSI crosses below its signal line and CCI is negative.
/// </summary>
public class ExpTsiCciStrategy : Strategy
{
private readonly StrategyParam<DataType> _candleType;
private readonly StrategyParam<int> _cciPeriod;
private readonly StrategyParam<decimal> _minTsiSpread;
private readonly StrategyParam<decimal> _minCciMagnitude;
private readonly StrategyParam<int> _cooldownBars;
private decimal _prevTsi;
private decimal _prevSignal;
private bool _initialized;
private int _cooldownRemaining;
/// <summary>
/// Candle type for strategy calculation.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Commodity Channel Index period.
/// </summary>
public int CciPeriod
{
get => _cciPeriod.Value;
set => _cciPeriod.Value = value;
}
/// <summary>
/// Minimum absolute spread between TSI and signal required for a valid crossover.
/// </summary>
public decimal MinTsiSpread
{
get => _minTsiSpread.Value;
set => _minTsiSpread.Value = value;
}
/// <summary>
/// Minimum absolute CCI value required for confirmation.
/// </summary>
public decimal MinCciMagnitude
{
get => _minCciMagnitude.Value;
set => _minCciMagnitude.Value = value;
}
/// <summary>
/// Number of completed candles to wait after a position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
public ExpTsiCciStrategy()
{
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
_cciPeriod = Param(nameof(CciPeriod), 14)
.SetGreaterThanZero()
.SetDisplay("CCI Period", "CCI calculation period", "CCI");
_minTsiSpread = Param(nameof(MinTsiSpread), 2m)
.SetDisplay("Min TSI Spread", "Minimum TSI-signal spread", "Signal");
_minCciMagnitude = Param(nameof(MinCciMagnitude), 50m)
.SetDisplay("Min CCI", "Minimum absolute CCI confirmation", "Signal");
_cooldownBars = Param(nameof(CooldownBars), 10)
.SetDisplay("Cooldown Bars", "Completed candles to wait after a signal", "Signal");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
var tsi = new TrueStrengthIndex();
var cci = new CommodityChannelIndex { Length = CciPeriod };
var subscription = SubscribeCandles(CandleType);
subscription
.BindEx(tsi, cci, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, tsi);
DrawOwnTrades(area);
}
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_prevTsi = 0m;
_prevSignal = 0m;
_initialized = false;
_cooldownRemaining = 0;
}
private void ProcessCandle(ICandleMessage candle, IIndicatorValue tsiValue, IIndicatorValue cciValue)
{
if (candle.State != CandleStates.Finished)
return;
if (!tsiValue.IsFinal || !cciValue.IsFinal)
return;
var tv = (ITrueStrengthIndexValue)tsiValue;
if (tv.Tsi is not decimal tsi || tv.Signal is not decimal signal)
return;
var cci = cciValue.ToDecimal();
if (!_initialized)
{
_prevTsi = tsi;
_prevSignal = signal;
_initialized = true;
return;
}
var crossUp = _prevTsi <= _prevSignal && tsi > signal;
var crossDown = _prevTsi >= _prevSignal && tsi < signal;
var spread = Math.Abs(tsi - signal);
if (_cooldownRemaining > 0)
_cooldownRemaining--;
if (crossUp && spread >= MinTsiSpread && cci >= MinCciMagnitude && _cooldownRemaining == 0 && Position <= 0)
{
if (Position < 0)
BuyMarket();
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (crossDown && spread >= MinTsiSpread && cci <= -MinCciMagnitude && _cooldownRemaining == 0 && Position >= 0)
{
if (Position > 0)
SellMarket();
SellMarket();
_cooldownRemaining = CooldownBars;
}
_prevTsi = tsi;
_prevSignal = signal;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import TrueStrengthIndex, CommodityChannelIndex
from StockSharp.Algo.Strategies import Strategy
class exp_tsi_cci_strategy(Strategy):
def __init__(self):
super(exp_tsi_cci_strategy, self).__init__()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._cci_period = self.Param("CciPeriod", 14) \
.SetDisplay("CCI Period", "CCI calculation period", "CCI")
self._min_tsi_spread = self.Param("MinTsiSpread", 2.0) \
.SetDisplay("Min TSI Spread", "Minimum TSI-signal spread", "Signal")
self._min_cci_magnitude = self.Param("MinCciMagnitude", 50.0) \
.SetDisplay("Min CCI", "Minimum absolute CCI confirmation", "Signal")
self._cooldown_bars = self.Param("CooldownBars", 10) \
.SetDisplay("Cooldown Bars", "Completed candles to wait after a signal", "Signal")
self._prev_tsi = 0.0
self._prev_signal = 0.0
self._initialized = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
@property
def cci_period(self):
return self._cci_period.Value
@property
def min_tsi_spread(self):
return self._min_tsi_spread.Value
@property
def min_cci_magnitude(self):
return self._min_cci_magnitude.Value
@property
def cooldown_bars(self):
return self._cooldown_bars.Value
def OnReseted(self):
super(exp_tsi_cci_strategy, self).OnReseted()
self._prev_tsi = 0.0
self._prev_signal = 0.0
self._initialized = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(exp_tsi_cci_strategy, self).OnStarted2(time)
tsi = TrueStrengthIndex()
cci = CommodityChannelIndex()
cci.Length = self.cci_period
subscription = self.SubscribeCandles(self.candle_type)
subscription.BindEx(tsi, cci, self.process_candle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, tsi)
self.DrawOwnTrades(area)
def process_candle(self, candle, tsi_value, cci_value):
if candle.State != CandleStates.Finished:
return
if not tsi_value.IsFinal or not cci_value.IsFinal:
return
tsi_val = tsi_value.Tsi
signal_val = tsi_value.Signal
if tsi_val is None or signal_val is None:
return
tsi_val = float(tsi_val)
signal_val = float(signal_val)
cci = float(cci_value)
if not self._initialized:
self._prev_tsi = tsi_val
self._prev_signal = signal_val
self._initialized = True
return
cross_up = self._prev_tsi <= self._prev_signal and tsi_val > signal_val
cross_down = self._prev_tsi >= self._prev_signal and tsi_val < signal_val
spread = abs(tsi_val - signal_val)
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
min_tsi = float(self.min_tsi_spread)
min_cci = float(self.min_cci_magnitude)
if cross_up and spread >= min_tsi and cci >= min_cci and self._cooldown_remaining == 0 and self.Position <= 0:
if self.Position < 0:
self.BuyMarket()
self.BuyMarket()
self._cooldown_remaining = self.cooldown_bars
elif cross_down and spread >= min_tsi and cci <= -min_cci and self._cooldown_remaining == 0 and self.Position >= 0:
if self.Position > 0:
self.SellMarket()
self.SellMarket()
self._cooldown_remaining = self.cooldown_bars
self._prev_tsi = tsi_val
self._prev_signal = signal_val
def CreateClone(self):
return exp_tsi_cci_strategy()