Стратегия разворота по F-Score
Стратегия сочетает фундаментальный показатель Piotroski F-Score и краткосрочный ценовой разворот. Каждый месяц она покупает наиболее упавшую акцию среди компаний с высоким F-Score и при необходимости шортит лидера роста с низким F-Score. Предполагается, что финансово устойчивые фирмы быстро восстанавливаются после временного снижения, а слабые компании склонны возвращаться после рывка.
В первый торговый день месяца алгоритм ранжирует вселенную по месячной доходности. Он открывает лонг в бумаге с минимальным доходом при FScore >= FHi и, при наличии, шортит бумагу с максимальной доходностью при FScore <= FLo. Позиции удерживаются один месяц.
Детали
- Условия входа:
- Лонг: среди бумаг с
FScore >= FHiкупить инструмент с наименьшей доходностьюLookback, если объём сделки >=MinTradeUsd. - Шорт (опционально): среди бумаг с
FScore <= FLoпродать инструмент с наибольшей доходностью.
- Лонг: среди бумаг с
- Длинные/короткие: Лонг и шорт.
- Условия выхода: Закрыть все позиции при следующей ежемесячной ребалансировке.
- Стопы: Нет.
- Параметры по умолчанию:
Universe– список оцениваемых бумаг.Lookback= 21 день.FHi= 7.FLo= 3.CandleType= 1 день.MinTradeUsd– минимальный объём сделки.
- Фильтры:
- Категория: Реверсия.
- Направление: Лонг и шорт.
- Таймфрейм: Краткосрочный.
- Ребалансировка: Ежемесячно.
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// F-Score reversal strategy that trades the primary instrument when a synthetic fundamental score aligns with relative short-term reversal versus a benchmark.
/// </summary>
public class FScoreReversalStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _lookback;
private readonly StrategyParam<int> _scoreLength;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private RateOfChange _primaryReversal = null!;
private RateOfChange _benchmarkReversal = null!;
private ExponentialMovingAverage _primaryScore = null!;
private ExponentialMovingAverage _benchmarkScore = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimarySignal;
private decimal _latestBenchmarkSignal;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Lookback period for short-term reversal.
/// </summary>
public int Lookback
{
get => _lookback.Value;
set => _lookback.Value = value;
}
/// <summary>
/// Smoothing length for the synthetic F-Score proxy.
/// </summary>
public int ScoreLength
{
get => _scoreLength.Value;
set => _scoreLength.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// The type of candles to use for strategy calculation.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="FScoreReversalStrategy"/>.
/// </summary>
public FScoreReversalStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_lookback = Param(nameof(Lookback), 12)
.SetRange(2, 80)
.SetDisplay("Lookback", "Lookback period in bars", "General");
_scoreLength = Param(nameof(ScoreLength), 8)
.SetRange(2, 50)
.SetDisplay("Score Length", "Smoothing length for the synthetic F-Score proxy", "General");
_entryThreshold = Param(nameof(EntryThreshold), 1.2m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.3m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryReversal = null!;
_benchmarkReversal = null!;
_primaryScore = null!;
_benchmarkScore = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimarySignal = 0m;
_latestBenchmarkSignal = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryReversal = new RateOfChange { Length = Lookback };
_benchmarkReversal = new RateOfChange { Length = Lookback };
_primaryScore = new ExponentialMovingAverage { Length = ScoreLength };
_benchmarkScore = new ExponentialMovingAverage { Length = ScoreLength };
_spreadAverage = new SimpleMovingAverage { Length = 24 };
_spreadDeviation = new StandardDeviation { Length = 24 };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var reversalValue = _primaryReversal.Process(candle);
var scoreValue = _primaryScore.Process(CalculateFScoreProxy(candle), candle.OpenTime, true);
if (!reversalValue.IsEmpty && !scoreValue.IsEmpty && _primaryReversal.IsFormed && _primaryScore.IsFormed)
{
_latestPrimarySignal = scoreValue.ToDecimal() - reversalValue.ToDecimal();
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
var reversalValue = _benchmarkReversal.Process(candle);
var scoreValue = _benchmarkScore.Process(CalculateFScoreProxy(candle), candle.OpenTime, true);
if (!reversalValue.IsEmpty && !scoreValue.IsEmpty && _benchmarkReversal.IsFormed && _benchmarkScore.IsFormed)
{
_latestBenchmarkSignal = scoreValue.ToDecimal() - reversalValue.ToDecimal();
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
}
private decimal CalculateFScoreProxy(ICandleMessage candle)
{
var priceBase = Math.Max(candle.OpenPrice, 1m);
var range = Math.Max(candle.HighPrice - candle.LowPrice, Security?.PriceStep ?? 1m);
var closeLocation = ((candle.ClosePrice - candle.LowPrice) - (candle.HighPrice - candle.ClosePrice)) / range;
var efficiency = (candle.ClosePrice - candle.OpenPrice) / priceBase;
return (closeLocation * 2m) + (efficiency * 100m);
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimarySignal - _latestBenchmarkSignal;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import RateOfChange, ExponentialMovingAverage, SimpleMovingAverage, StandardDeviation, CandleIndicatorValue
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class fscore_reversal_strategy(Strategy):
"""F-Score reversal strategy that trades the primary instrument when a synthetic fundamental score aligns with relative short-term reversal versus a benchmark."""
def __init__(self):
super(fscore_reversal_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._lookback = self.Param("Lookback", 12) \
.SetRange(2, 80) \
.SetDisplay("Lookback", "Lookback period in bars", "General")
self._score_length = self.Param("ScoreLength", 8) \
.SetRange(2, 50) \
.SetDisplay("Score Length", "Smoothing length for the synthetic F-Score proxy", "General")
self._entry_threshold = self.Param("EntryThreshold", 1.2) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.3) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._benchmark = None
self._primary_reversal = None
self._benchmark_reversal = None
self._primary_score = None
self._benchmark_score = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(fscore_reversal_strategy, self).OnReseted()
self._benchmark = None
self._primary_reversal = None
self._benchmark_reversal = None
self._primary_score = None
self._benchmark_score = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_signal = 0.0
self._latest_benchmark_signal = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(fscore_reversal_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
lookback = int(self._lookback.Value)
score_len = int(self._score_length.Value)
self._primary_reversal = RateOfChange()
self._primary_reversal.Length = lookback
self._benchmark_reversal = RateOfChange()
self._benchmark_reversal.Length = lookback
self._primary_score = ExponentialMovingAverage()
self._primary_score.Length = score_len
self._benchmark_score = ExponentialMovingAverage()
self._benchmark_score.Length = score_len
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = 24
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = 24
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
civ = CandleIndicatorValue(self._primary_reversal, candle)
civ.IsFinal = True
reversal_value = self._primary_reversal.Process(civ)
fscore_proxy = self.CalculateFScoreProxy(candle)
score_value = process_float(self._primary_score, fscore_proxy, candle.OpenTime, True)
if not reversal_value.IsEmpty and not score_value.IsEmpty and self._primary_reversal.IsFormed and self._primary_score.IsFormed:
self._latest_primary_signal = float(score_value) - float(reversal_value)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
civ = CandleIndicatorValue(self._benchmark_reversal, candle)
civ.IsFinal = True
reversal_value = self._benchmark_reversal.Process(civ)
fscore_proxy = self.CalculateFScoreProxy(candle)
score_value = process_float(self._benchmark_score, fscore_proxy, candle.OpenTime, True)
if not reversal_value.IsEmpty and not score_value.IsEmpty and self._benchmark_reversal.IsFormed and self._benchmark_score.IsFormed:
self._latest_benchmark_signal = float(score_value) - float(reversal_value)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def CalculateFScoreProxy(self, candle):
price_base = max(float(candle.OpenPrice), 1.0)
price_step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
range_val = max(float(candle.HighPrice) - float(candle.LowPrice), price_step)
close_location = ((float(candle.ClosePrice) - float(candle.LowPrice)) - (float(candle.HighPrice) - float(candle.ClosePrice))) / range_val
efficiency = (float(candle.ClosePrice) - float(candle.OpenPrice)) / price_base
return (close_location * 2.0) + (efficiency * 100.0)
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_signal - self._latest_benchmark_signal
mean_result = process_float(self._spread_average, spread, time, True)
mean = float(mean_result)
dev_result = process_float(self._spread_deviation, spread, time, True)
deviation = float(dev_result)
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return fscore_reversal_strategy()