货币PPP价值策略
该策略基于购买力平价(PPP)的偏离来寻找价值。低于PPP估值的货币被买入,高于PPP估值的货币被做空。组合每月再平衡,以维持多空敞口。
由于PPP数据更新较慢,只有当所需调整超过设定的美元阈值时才会下单。示例代码提供了排序框架,但TryGetPPPDeviation函数仍为占位,需要用户自行实现。
细节
- 投资范围:具有PPP估值的货币对。
- 信号:做多
K个最被低估的货币,做空K个最被高估的货币。 - 再平衡:每月一次。
- 仓位:多空等权。
- 参数:
Universe– 可交易的货币。K– 多头和空头各包含的货币数量。MinTradeUsd– 最小交易金额(美元)。CandleType– 使用的K线周期(默认1天)。
- 注意:PPP偏离计算(
TryGetPPPDeviation)尚未实现,需要用户提供。
using System;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Configuration;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Currency PPP value strategy that trades the primary instrument when its synthetic PPP deviation becomes extreme relative to a benchmark currency.
/// </summary>
public class CurrencyPppValueStrategy : Strategy
{
private readonly StrategyParam<string> _security2Id;
private readonly StrategyParam<int> _pppLength;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _entryThreshold;
private readonly StrategyParam<decimal> _exitThreshold;
private readonly StrategyParam<int> _cooldownBars;
private readonly StrategyParam<decimal> _stopLoss;
private readonly StrategyParam<DataType> _candleType;
private Security _benchmark = null!;
private ExponentialMovingAverage _primaryPpp = null!;
private ExponentialMovingAverage _benchmarkPpp = null!;
private SimpleMovingAverage _spreadAverage = null!;
private StandardDeviation _spreadDeviation = null!;
private decimal _latestPrimaryDeviation;
private decimal _latestBenchmarkDeviation;
private decimal? _previousZScore;
private bool _primaryUpdated;
private bool _benchmarkUpdated;
private int _cooldownRemaining;
/// <summary>
/// Benchmark currency identifier.
/// </summary>
public string Security2Id
{
get => _security2Id.Value;
set => _security2Id.Value = value;
}
/// <summary>
/// Smoothing length for the synthetic PPP anchor.
/// </summary>
public int PppLength
{
get => _pppLength.Value;
set => _pppLength.Value = value;
}
/// <summary>
/// Lookback period used to normalize the deviation spread.
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Z-score threshold required to open a position.
/// </summary>
public decimal EntryThreshold
{
get => _entryThreshold.Value;
set => _entryThreshold.Value = value;
}
/// <summary>
/// Z-score threshold required to close a position.
/// </summary>
public decimal ExitThreshold
{
get => _exitThreshold.Value;
set => _exitThreshold.Value = value;
}
/// <summary>
/// Closed candles to wait before another position change.
/// </summary>
public int CooldownBars
{
get => _cooldownBars.Value;
set => _cooldownBars.Value = value;
}
/// <summary>
/// Stop loss percentage.
/// </summary>
public decimal StopLoss
{
get => _stopLoss.Value;
set => _stopLoss.Value = value;
}
/// <summary>
/// Candle type used for both instruments.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public CurrencyPppValueStrategy()
{
_security2Id = Param(nameof(Security2Id), Paths.HistoryDefaultSecurity2)
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark currency security", "General");
_pppLength = Param(nameof(PppLength), 14)
.SetRange(2, 80)
.SetDisplay("PPP Length", "Smoothing length for the synthetic PPP anchor", "Indicators");
_lookbackPeriod = Param(nameof(LookbackPeriod), 24)
.SetRange(5, 120)
.SetDisplay("Lookback Period", "Lookback period used to normalize the deviation spread", "Indicators");
_entryThreshold = Param(nameof(EntryThreshold), 1.25m)
.SetRange(0.2m, 5m)
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals");
_exitThreshold = Param(nameof(ExitThreshold), 0.3m)
.SetRange(0m, 2m)
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals");
_cooldownBars = Param(nameof(CooldownBars), 8)
.SetRange(0, 120)
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk");
_stopLoss = Param(nameof(StopLoss), 2.5m)
.SetRange(0.5m, 10m)
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(4).TimeFrame())
.SetDisplay("Candle Type", "Time frame of candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
if (Security != null)
yield return (Security, CandleType);
if (!Security2Id.IsEmpty())
yield return (new Security { Id = Security2Id }, CandleType);
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_benchmark = null!;
_primaryPpp = null!;
_benchmarkPpp = null!;
_spreadAverage = null!;
_spreadDeviation = null!;
_latestPrimaryDeviation = 0m;
_latestBenchmarkDeviation = 0m;
_previousZScore = null;
_primaryUpdated = false;
_benchmarkUpdated = false;
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
if (Security == null)
throw new InvalidOperationException("Primary currency security is not specified.");
if (Security2Id.IsEmpty())
throw new InvalidOperationException("Benchmark currency identifier is not specified.");
_benchmark = this.LookupById(Security2Id) ?? new Security { Id = Security2Id };
_primaryPpp = new ExponentialMovingAverage { Length = PppLength };
_benchmarkPpp = new ExponentialMovingAverage { Length = PppLength };
_spreadAverage = new SimpleMovingAverage { Length = LookbackPeriod };
_spreadDeviation = new StandardDeviation { Length = LookbackPeriod };
var primarySubscription = SubscribeCandles(CandleType, security: Security);
var benchmarkSubscription = SubscribeCandles(CandleType, security: _benchmark);
primarySubscription
.Bind(ProcessPrimaryCandle)
.Start();
benchmarkSubscription
.Bind(ProcessBenchmarkCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, primarySubscription);
DrawCandles(area, benchmarkSubscription);
DrawOwnTrades(area);
}
StartProtection(
new Unit(2, UnitTypes.Percent),
new Unit(StopLoss, UnitTypes.Percent));
}
private void ProcessPrimaryCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestPrimaryDeviation = UpdateDeviation(_primaryPpp, candle);
_primaryUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private void ProcessBenchmarkCandle(ICandleMessage candle)
{
if (candle.State != CandleStates.Finished)
return;
_latestBenchmarkDeviation = UpdateDeviation(_benchmarkPpp, candle);
_benchmarkUpdated = true;
TryProcessSpread(candle.OpenTime);
}
private decimal UpdateDeviation(ExponentialMovingAverage average, ICandleMessage candle)
{
var syntheticPpp = CalculateSyntheticPpp(candle);
var pppAnchor = average.Process(syntheticPpp, candle.OpenTime, true).ToDecimal();
return (candle.ClosePrice - pppAnchor) / Math.Max(pppAnchor, 1m);
}
private decimal CalculateSyntheticPpp(ICandleMessage candle)
{
var priceBase = Math.Max(candle.OpenPrice, 1m);
var range = Math.Max(candle.HighPrice - candle.LowPrice, Security?.PriceStep ?? 1m);
var purchasingPowerAnchor = (candle.OpenPrice + candle.LowPrice + candle.ClosePrice) / 3m;
var inflationPenalty = priceBase * Math.Min(0.12m, range / priceBase);
return purchasingPowerAnchor - inflationPenalty;
}
private void TryProcessSpread(DateTime time)
{
if (!_primaryUpdated || !_benchmarkUpdated)
return;
_primaryUpdated = false;
_benchmarkUpdated = false;
var spread = _latestPrimaryDeviation - _latestBenchmarkDeviation;
var mean = _spreadAverage.Process(spread, time, true).ToDecimal();
var deviation = _spreadDeviation.Process(spread, time, true).ToDecimal();
if (!_spreadAverage.IsFormed || !_spreadDeviation.IsFormed || deviation <= 0m)
return;
if (!IsFormedAndOnlineAndAllowTrading())
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
var zScore = (spread - mean) / deviation;
var bullishEntry = _previousZScore is decimal previousBullish && previousBullish > -EntryThreshold && zScore <= -EntryThreshold;
var bearishEntry = _previousZScore is decimal previousBearish && previousBearish < EntryThreshold && zScore >= EntryThreshold;
if (_cooldownRemaining == 0 && Position == 0)
{
if (bullishEntry)
{
BuyMarket();
_cooldownRemaining = CooldownBars;
}
else if (bearishEntry)
{
SellMarket();
_cooldownRemaining = CooldownBars;
}
}
else if (Position > 0 && zScore >= -ExitThreshold)
{
SellMarket(Position);
_cooldownRemaining = CooldownBars;
}
else if (Position < 0 && zScore <= ExitThreshold)
{
BuyMarket(Math.Abs(Position));
_cooldownRemaining = CooldownBars;
}
_previousZScore = zScore;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.BusinessEntities")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import ExponentialMovingAverage, SimpleMovingAverage, StandardDeviation
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from indicator_extensions import *
class currency_ppp_value_strategy(Strategy):
"""Currency PPP value strategy that trades the primary instrument when its synthetic PPP deviation becomes extreme relative to a benchmark currency."""
def __init__(self):
super(currency_ppp_value_strategy, self).__init__()
self._security2_id = self.Param("Security2Id", "TONUSDT@BNBFT") \
.SetDisplay("Benchmark Security Id", "Identifier of the benchmark currency security", "General")
self._ppp_length = self.Param("PppLength", 14) \
.SetRange(2, 80) \
.SetDisplay("PPP Length", "Smoothing length for the synthetic PPP anchor", "Indicators")
self._lookback_period = self.Param("LookbackPeriod", 24) \
.SetRange(5, 120) \
.SetDisplay("Lookback Period", "Lookback period used to normalize the deviation spread", "Indicators")
self._entry_threshold = self.Param("EntryThreshold", 1.25) \
.SetRange(0.2, 5.0) \
.SetDisplay("Entry Threshold", "Z-score threshold required to open a position", "Signals")
self._exit_threshold = self.Param("ExitThreshold", 0.3) \
.SetRange(0.0, 2.0) \
.SetDisplay("Exit Threshold", "Z-score threshold required to close a position", "Signals")
self._cooldown_bars = self.Param("CooldownBars", 8) \
.SetRange(0, 120) \
.SetDisplay("Cooldown Bars", "Closed candles to wait before another position change", "Risk")
self._stop_loss = self.Param("StopLoss", 2.5) \
.SetRange(0.5, 10.0) \
.SetDisplay("Stop Loss %", "Stop loss percentage", "Risk")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(4))) \
.SetDisplay("Candle Type", "Time frame of candles", "General")
self._benchmark = None
self._primary_ppp = None
self._benchmark_ppp = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_deviation = 0.0
self._latest_benchmark_deviation = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
@property
def candle_type(self):
return self._candle_type.Value
def GetWorkingSecurities(self):
result = []
if self.Security is not None:
result.append((self.Security, self.candle_type))
sec2_id = str(self._security2_id.Value)
if sec2_id:
s = Security()
s.Id = sec2_id
result.append((s, self.candle_type))
return result
def OnReseted(self):
super(currency_ppp_value_strategy, self).OnReseted()
self._benchmark = None
self._primary_ppp = None
self._benchmark_ppp = None
self._spread_average = None
self._spread_deviation = None
self._latest_primary_deviation = 0.0
self._latest_benchmark_deviation = 0.0
self._previous_z_score = None
self._primary_updated = False
self._benchmark_updated = False
self._cooldown_remaining = 0
def OnStarted2(self, time):
super(currency_ppp_value_strategy, self).OnStarted2(time)
sec2_id = str(self._security2_id.Value)
if not sec2_id:
raise Exception("Benchmark currency identifier is not specified.")
s = Security()
s.Id = sec2_id
self._benchmark = s
ppp_len = int(self._ppp_length.Value)
lookback = int(self._lookback_period.Value)
self._primary_ppp = ExponentialMovingAverage()
self._primary_ppp.Length = ppp_len
self._benchmark_ppp = ExponentialMovingAverage()
self._benchmark_ppp.Length = ppp_len
self._spread_average = SimpleMovingAverage()
self._spread_average.Length = lookback
self._spread_deviation = StandardDeviation()
self._spread_deviation.Length = lookback
primary_subscription = self.SubscribeCandles(self.candle_type, True, self.Security)
benchmark_subscription = self.SubscribeCandles(self.candle_type, True, self._benchmark)
primary_subscription.Bind(self.ProcessPrimaryCandle).Start()
benchmark_subscription.Bind(self.ProcessBenchmarkCandle).Start()
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, primary_subscription)
self.DrawCandles(area, benchmark_subscription)
self.DrawOwnTrades(area)
self.StartProtection(
Unit(2, UnitTypes.Percent),
Unit(float(self._stop_loss.Value), UnitTypes.Percent)
)
def ProcessPrimaryCandle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_primary_deviation = self.UpdateDeviation(self._primary_ppp, candle)
self._primary_updated = True
self.TryProcessSpread(candle.OpenTime)
def ProcessBenchmarkCandle(self, candle):
if candle.State != CandleStates.Finished:
return
self._latest_benchmark_deviation = self.UpdateDeviation(self._benchmark_ppp, candle)
self._benchmark_updated = True
self.TryProcessSpread(candle.OpenTime)
def UpdateDeviation(self, average, candle):
synthetic_ppp = self.CalculateSyntheticPpp(candle)
result = process_float(average, synthetic_ppp, candle.OpenTime, True)
ppp_anchor = float(result)
return (float(candle.ClosePrice) - ppp_anchor) / max(ppp_anchor, 1.0)
def CalculateSyntheticPpp(self, candle):
price_base = max(float(candle.OpenPrice), 1.0)
price_step = float(self.Security.PriceStep) if self.Security is not None and self.Security.PriceStep is not None else 1.0
range_val = max(float(candle.HighPrice) - float(candle.LowPrice), price_step)
purchasing_power_anchor = (float(candle.OpenPrice) + float(candle.LowPrice) + float(candle.ClosePrice)) / 3.0
inflation_penalty = price_base * min(0.12, range_val / price_base)
return purchasing_power_anchor - inflation_penalty
def TryProcessSpread(self, time):
if not self._primary_updated or not self._benchmark_updated:
return
self._primary_updated = False
self._benchmark_updated = False
spread = self._latest_primary_deviation - self._latest_benchmark_deviation
mean_result = process_float(self._spread_average, spread, time, True)
mean = float(mean_result)
dev_result = process_float(self._spread_deviation, spread, time, True)
deviation = float(dev_result)
if not self._spread_average.IsFormed or not self._spread_deviation.IsFormed or deviation <= 0:
return
if not self.IsFormedAndOnlineAndAllowTrading():
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
z_score = (spread - mean) / deviation
entry_thresh = float(self._entry_threshold.Value)
exit_thresh = float(self._exit_threshold.Value)
cooldown = int(self._cooldown_bars.Value)
bullish_entry = self._previous_z_score is not None and self._previous_z_score > -entry_thresh and z_score <= -entry_thresh
bearish_entry = self._previous_z_score is not None and self._previous_z_score < entry_thresh and z_score >= entry_thresh
if self._cooldown_remaining == 0 and self.Position == 0:
if bullish_entry:
self.BuyMarket()
self._cooldown_remaining = cooldown
elif bearish_entry:
self.SellMarket()
self._cooldown_remaining = cooldown
elif self.Position > 0 and z_score >= -exit_thresh:
self.SellMarket(self.Position)
self._cooldown_remaining = cooldown
elif self.Position < 0 and z_score <= exit_thresh:
self.BuyMarket(Math.Abs(self.Position))
self._cooldown_remaining = cooldown
self._previous_z_score = z_score
def CreateClone(self):
return currency_ppp_value_strategy()