Lexical Density Filings Strategy
Эта факторная стратегия анализирует сложность регуляторной отчетности компаний. Лексическая плотность рассчитывается как доля уникальных слов в последнем отчёте. Считается, что насыщенный информацией текст отражает перспективы роста, тогда как скудные заявления могут скрывать проблемы.
Каждый квартал бумаги ранжируются по лексической плотности. Стратегия покупает акции с наибольшей плотностью и продаёт с наименьшей. Позиции равновзвешены и пересматриваются в первые три торговых дня февраля, мая, августа и ноября. Между ребалансировками стоп-лоссы не используются.
Тесты на широком рынке США показали стабильную премию и умеренную оборачиваемость, что делает стратегию удобной частью мультифакторного портфеля.
Подробности
- Критерий входа: квартальная сортировка по лексической плотности; лонг верхний квинтиль, шорт нижний квинтиль
- Длинные/короткие: обе стороны
- Критерий выхода: следующая ребалансировка
- Стопы: нет
- Значения по умолчанию:
Quintile= 5MinTradeUsd= 200CandleType= TimeSpan.FromDays(1)
- Фильтры:
- Категория: Фундаментальная
- Направление: Обе
- Индикаторы: Текстовый анализ
- Стопы: Нет
- Сложность: Средняя
- Таймфрейм: Многомесячный
- Сезонность: Да
- Нейросети: Нет
- Дивергенция: Нет
- Уровень риска: Средний
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Lexical density filings strategy that trades the primary instrument when its synthetic filing-density score diverges from a benchmark.
/// </summary>
public class LexicalDensityFilingsStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _densityLength;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private ExponentialMovingAverage _primaryDensity = null!;
private ExponentialMovingAverage _benchmarkDensity = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimaryDensity;
private decimal _latestBenchmarkDensity;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark security identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Smoothing length for the synthetic lexical density score.
/// </summary>
public int DensityLength
{
get => _densityLength.Value;
set => _densityLength.Value = value;
}
/// <summary>
/// Lookback period used to normalize the density spread.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for analysis.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Initializes a new instance of <see cref="LexicalDensityFilingsStrategy"/>.
/// </summary>
public LexicalDensityFilingsStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General");
_densityLength = Param(nameof(DensityLength), 10)
.SetRange(2, 80)
.SetDisplay("Density Length", "Smoothing length for the synthetic lexical density score", "Indicators");
_lookbackPeriod = Param(nameof(LookbackPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Lookback Period", "Lookback period used to normalize the density spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.2m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.3m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame for candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryDensity = null!;
_benchmarkDensity = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimaryDensity = 0m;
_latestBenchmarkDensity = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark security identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryDensity = new ExponentialMovingAverage { Length = DensityLength };
_benchmarkDensity = new ExponentialMovingAverage { Length = DensityLength };
_spreadAverage = new SimpleMovingAverage { Length = LookbackPeriod };
_spreadDeviation = new StandardDeviation { Length = LookbackPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestPrimaryDensity = UpdateDensity(_primaryDensity, candle);
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestBenchmarkDensity = UpdateDensity(_benchmarkDensity, candle);
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private decimal UpdateDensity(ExponentialMovingAverage average, ICandleMessage candle)
{
var densitySignal = CalculateDensitySignal(candle);
return average.Process(densitySignal, candle.OpenTime, true).ToDecimal();
}
private decimal CalculateDensitySignal(ICandleMessage candle)
{
var priceBase = Math.Max(candle.OpenPrice, 1m);
var range = Math.Max(candle.HighPrice - candle.LowPrice, Security?.PriceStep ?? 1m);
var closeLocation = ((candle.ClosePrice - candle.LowPrice) - (candle.HighPrice - candle.ClosePrice)) / range;
var compression = 1m - Math.Min(0.2m, range / priceBase);
return (closeLocation * 2m) + compression;
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimaryDensity - _latestBenchmarkDensity;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish < EntryThreshold && zScore >= EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish > -EntryThreshold && zScore <= -EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore <= ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore >= -ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import ExponentialMovingAverage, SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class lexical_density_filings_strategy(Strategy):
"""Lexical density filings strategy that trades the primary instrument when its synthetic lexical density diverges from a benchmark."""
def __init__(self):
super(lexical_density_filings_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark security", "General")
self._density_length = self.Param("DensityLength", 10) \
.SetRange(2, 80) \
.SetDisplay("Density Length", "Smoothing length for the synthetic lexical density score", "Indicators")
self._lookback_period = self.Param("LookbackPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Lookback Period", "Lookback period used to normalize the density spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.2) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.3) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame for candles", "General")
self._benchmark = None
self._primary_density = None
self._benchmark_density = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_density = 0.0
self._latest_benchmark_density = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(lexical_density_filings_strategy, self).OnReseted()
self._benchmark = None
self._primary_density = None
self._benchmark_density = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_density = 0.0
self._latest_benchmark_density = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(lexical_density_filings_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark security identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
density_len = int(self._density_length.Value)
lookback = int(self._lookback_period.Value)
self._primary_density = ExponentialMovingAverage()
self._primary_density.Length = density_len
self._benchmark_density = ExponentialMovingAverage()
self._benchmark_density.Length = density_len
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = lookback
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = lookback
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_primary_density = self.UpdateDensity(self._primary_density, candle)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_benchmark_density = self.UpdateDensity(self._benchmark_density, candle)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def UpdateDensity(self, average, candle):
density_signal = self.CalculateDensitySignal(candle)
result = process_float(average, density_signal, candle.OpenTime, True)
return float(result)
def CalculateDensitySignal(self, candle):
price_base = max(float(candle.OpenPrice), 1.0)
price_step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
range_val = max(float(candle.HighPrice) - float(candle.LowPrice), price_step)
close_location = ((float(candle.ClosePrice) - float(candle.LowPrice)) - (float(candle.HighPrice) - float(candle.ClosePrice))) / range_val
compression = 1.0 - min(0.2, range_val / price_base)
return (close_location * 2.0) + compression
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_density - self._latest_benchmark_density
mean_result = process_float(self._spread_average, spread, time, True)
mean = float(mean_result)
dev_result = process_float(self._spread_deviation, spread, time, True)
deviation = float(dev_result)
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score <= exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score >= -exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return lexical_density_filings_strategy()