Скос распределения для товаров
Стратегия Скос распределения для товаров сортирует фьючерсы по коэффициенту асимметрии доходностей. Контракты с положительным скосом покупаются, а с сильным отрицательным — продаются, предполагая возврат при экстремальных движениях.
Детали
- Вход: ранжирование по исторической асимметрии доходностей.
- Длинные/короткие позиции: обе стороны.
- Выход: периодическая ребалансировка.
- Стопы: отсутствуют.
- Значения по умолчанию:
CandleType = TimeSpan.FromMinutes(5).TimeFrame()
- Фильтры:
- Категория: Статистическая
- Направление: Обе
- Индикаторы: Ценовые
- Стопы: Нет
- Сложность: Средняя
- Таймфрейм: Среднесрочный
- Сезонность: Нет
- Нейросети: Нет
- Дивергенции: Нет
- Уровень риска: Средний
using System;
using System.Collections.Generic;
using System.Linq;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Skewness-based commodity strategy that trades the primary commodity when its return skewness diverges from a benchmark commodity.
/// </summary>
public class SkewnessCommodityStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _windowLength;
private readonly StrategyParam<int> _normalizationPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private readonly Queue<decimal> _primaryReturns = [];
private readonly Queue<decimal> _benchmarkReturns = [];
private decimal? _previousPrimaryClose;
private decimal? _previousBenchmarkClose;
private decimal? _previousZScore;
private decimal _latestPrimarySkewness;
private decimal _latestBenchmarkSkewness;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
public int WindowLength
{
get => _windowLength.Value;
set => _windowLength.Value = value;
}
public int NormalizationPeriod
{
get => _normalizationPeriod.Value;
set => _normalizationPeriod.Value = value;
}
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public SkewnessCommodityStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark commodity", "General");
_windowLength = Param(nameof(WindowLength), 20)
.SetRange(5, 120)
.SetDisplay("Window Length", "Lookback period used to estimate return skewness", "Indicators");
_normalizationPeriod = Param(nameof(NormalizationPeriod), 16)
.SetRange(5, 120)
.SetDisplay("Normalization Period", "Lookback period used to normalize the skewness spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.1m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.25m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 3m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_primaryReturns.Clear();
_benchmarkReturns.Clear();
_previousPrimaryClose = null;
_previousBenchmarkClose = null;
_previousZScore = null;
_latestPrimarySkewness = 0m;
_latestBenchmarkSkewness = 0m;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_spreadAverage = new SimpleMovingAverage { Length = NormalizationPeriod };
_spreadDeviation = new StandardDeviation { Length = NormalizationPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription.Bind(ProcessPrimaryCandle).Start();
benchmarkSubscription.Bind(ProcessBenchmarkCandle).Start();
StartProtection(new Unit(2, UnitTypes.Percent), new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
if (UpdateReturns(_primaryReturns, candle.ClosePrice, ref _previousPrimaryClose) is null)
return;
_latestPrimarySkewness = CalculateSkewness(_primaryReturns);
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
if (UpdateReturns(_benchmarkReturns, candle.ClosePrice, ref _previousBenchmarkClose) is null)
return;
_latestBenchmarkSkewness = CalculateSkewness(_benchmarkReturns);
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private decimal? UpdateReturns(Queue<decimal> queue, decimal closePrice, ref decimal? previousClose)
{
if (previousClose is not decimal previous || previous <= 0m)
{
previousClose = closePrice;
return null;
}
var ret = (closePrice - previous) / previous;
previousClose = closePrice;
if (queue.Count == WindowLength)
queue.Dequeue();
queue.Enqueue(ret);
return ret;
}
private static decimal CalculateSkewness(IEnumerable<decimal> returns)
{
var values = returns.ToArray();
if (values.Length < 3)
return 0m;
var mean = values.Average();
var variance = values.Select(value => (value - mean) * (value - mean)).Average();
if (variance <= 0m)
return 0m;
var deviation = (decimal)Math.Sqrt((double)variance);
var thirdMoment = values.Select(value => (value - mean) * (value - mean) * (value - mean)).Average();
return thirdMoment / (deviation * deviation * deviation);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated || _primaryReturns.Count < WindowLength || _benchmarkReturns.Count < WindowLength)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestBenchmarkSkewness - _latestPrimarySkewness;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
import math
from collections import deque
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class skewness_commodity_strategy(Strategy):
"""Skewness-based commodity strategy that trades the primary commodity when its return skewness diverges from a benchmark commodity."""
def __init__(self):
super(skewness_commodity_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark commodity", "General")
self._window_length = self.Param("WindowLength", 20) \
.SetRange(5, 120) \
.SetDisplay("Window Length", "Lookback period used to estimate return skewness", "Indicators")
self._normalization_period = self.Param("NormalizationPeriod", 16) \
.SetRange(5, 120) \
.SetDisplay("Normalization Period", "Lookback period used to normalize the skewness spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.1) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.25) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 3.0) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._spread_average = None
self._spread_deviation = None
self._primary_returns = deque()
self._benchmark_returns = deque()
self._previous_primary_close = None
self._previous_benchmark_close = None
self._previous_z_score = None
self._latest_primary_skewness = 0.0
self._latest_benchmark_skewness = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def OnReseted(self):
super(skewness_commodity_strategy, self).OnReseted()
self._benchmark = None
self._spread_average = None
self._spread_deviation = None
self._primary_returns = deque()
self._benchmark_returns = deque()
self._previous_primary_close = None
self._previous_benchmark_close = None
self._previous_z_score = None
self._latest_primary_skewness = 0.0
self._latest_benchmark_skewness = 0.0
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(skewness_commodity_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
norm_period = int(self._normalization_period.Value)
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = norm_period
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = norm_period
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self._process_primary_candle).Start()
benchmark_subscription.Bind(self._process_benchmark_candle).Start()
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def _process_primary_candle(self, candle):
if candle.State != CandleStates.Finished:
return
ret = self._update_returns(self._primary_returns, float(candle.ClosePrice), True)
if ret is None:
return
self._latest_primary_skewness = self._calculate_skewness(self._primary_returns)
self._primary_updated = True
self._try_process_spread(candle.OpenTime)
def _process_benchmark_candle(self, candle):
if candle.State != CandleStates.Finished:
return
ret = self._update_returns(self._benchmark_returns, float(candle.ClosePrice), False)
if ret is None:
return
self._latest_benchmark_skewness = self._calculate_skewness(self._benchmark_returns)
self._benchmark_updated = True
self._try_process_spread(candle.OpenTime)
def _update_returns(self, queue, close_price, is_primary):
if is_primary:
previous = self._previous_primary_close
else:
previous = self._previous_benchmark_close
if previous is None or previous <= 0:
if is_primary:
self._previous_primary_close = close_price
else:
self._previous_benchmark_close = close_price
return None
ret = (close_price - previous) / previous
if is_primary:
self._previous_primary_close = close_price
else:
self._previous_benchmark_close = close_price
window_len = int(self._window_length.Value)
if len(queue) == window_len:
queue.popleft()
queue.append(ret)
return ret
def _calculate_skewness(self, returns):
values = list(returns)
if len(values) < 3:
return 0.0
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
if variance <= 0:
return 0.0
deviation = math.sqrt(variance)
third_moment = sum((v - mean) ** 3 for v in values) / len(values)
return third_moment / (deviation ** 3)
def _try_process_spread(self, time):
window_len = int(self._window_length.Value)
if not self._primary_updated or not self._benchmark_updated or len(self._primary_returns) < window_len or len(self._benchmark_returns) < window_len:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_benchmark_skewness - self._latest_primary_skewness
mean = float(process_float(self._spread_average, spread, time, True))
deviation = float(process_float(self._spread_deviation, spread, time, True))
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return skewness_commodity_strategy()