Autocorrelation Reversal Strategy
此策略分析短期价格自相关性,以判断近期走势是否可能反转。负自相关表明价格变动倾向交替,为均值回归创造条件。
测试表明年均收益约为 124%,该策略在外汇市场表现最佳。
当计算得出的自相关值低于阈值且价格在均线下方时买入;自相关为负且价格高于均线时做空。价格穿越均线或自相关升至阈值上方即平仓。
该方法适合寻找统计优势而非图形形态的交易者。百分比止损可在趋势持续而未按预期反转时保护资金。
细节
- 入场条件:
- 多头: Autocorrelation < Threshold && Close < MA
- 空头: Autocorrelation < Threshold && Close > MA
- 多/空: 双向
- 离场条件:
- 多头: Close > MA 或 自相关 > Threshold
- 空头: Close < MA 或 自相关 > Threshold
- 止损: 百分比止损
- 默认值:
AutoCorrPeriod= 20AutoCorrThreshold= -0.3mStopLossPercent= 2mCandleType= TimeSpan.FromMinutes(5)
- 过滤器:
- 类别: Mean Reversion
- 方向: 双向
- 指标: Autocorrelation, MA
- 止损: 是
- 复杂度: 中等
- 时间框架: 日内
- 季节性: 否
- 神经网络: 否
- 背离: 否
- 风险等级: 中等
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that trades based on price autocorrelation.
/// Buys when autocorrelation is negative and price is below average.
/// Sells when autocorrelation is negative and price is above average.
/// </summary>
public class AutocorrelationReversionStrategy : Strategy
{
private readonly StrategyParam<int> _autoCorrPeriod;
private readonly StrategyParam<decimal> _autoCorrThreshold;
private readonly StrategyParam<decimal> _stopLossPercent;
private readonly StrategyParam<DataType> _candleType;
private SimpleMovingAverage _sma;
private decimal _currentPrice;
private readonly Queue<decimal> _priceHistory = [];
private decimal _latestAutocorrelation;
/// <summary>
/// Period for autocorrelation calculation.
/// </summary>
public int AutoCorrPeriod
{
get => _autoCorrPeriod.Value;
set => _autoCorrPeriod.Value = value;
}
/// <summary>
/// Autocorrelation threshold for signal generation.
/// </summary>
public decimal AutoCorrThreshold
{
get => _autoCorrThreshold.Value;
set => _autoCorrThreshold.Value = value;
}
/// <summary>
/// Stop-loss percentage.
/// </summary>
public decimal StopLossPercent
{
get => _stopLossPercent.Value;
set => _stopLossPercent.Value = value;
}
/// <summary>
/// Type of candles to use.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor.
/// </summary>
public AutocorrelationReversionStrategy()
{
_autoCorrPeriod = Param(nameof(AutoCorrPeriod), 20)
.SetDisplay("Autocorrelation period", "Period for autocorrelation calculation", "Strategy parameters")
.SetOptimize(10, 30, 5);
_autoCorrThreshold = Param(nameof(AutoCorrThreshold), -0.3m)
.SetDisplay("Autocorr threshold", "Threshold for autocorrelation signals", "Strategy parameters")
.SetOptimize(-0.5m, -0.1m, 0.1m);
_stopLossPercent = Param(nameof(StopLossPercent), 2m)
.SetDisplay("Stop-loss %", "Stop-loss as percentage from entry price", "Risk management")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_priceHistory.Clear();
_latestAutocorrelation = default;
_currentPrice = default;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
// Initialize the SMA indicator (using same period as autocorrelation for simplicity)
_sma = new SMA { Length = AutoCorrPeriod };
// Create a subscription to candlesticks
var subscription = SubscribeCandles(CandleType);
// Subscribe to candle processing
subscription
.Bind(_sma, ProcessCandle)
.Start();
// Start position protection
StartProtection(
new Unit(StopLossPercent, UnitTypes.Percent),
new Unit(StopLossPercent * 1.5m, UnitTypes.Percent));
// Setup chart if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _sma);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal smaValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Update current price and price history
_currentPrice = candle.ClosePrice;
// Update price history queue
_priceHistory.Enqueue(_currentPrice);
if (_priceHistory.Count > AutoCorrPeriod)
_priceHistory.Dequeue();
// Wait until we have enough data
if (_priceHistory.Count < AutoCorrPeriod)
return;
// Check if strategy is ready to trade
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Calculate autocorrelation
_latestAutocorrelation = CalculateAutocorrelation();
// Log the autocorrelation value
LogInfo($"Autocorrelation: {_latestAutocorrelation}, Current price: {_currentPrice}, SMA: {smaValue}");
// Trading logic: Look for negative autocorrelation below threshold
if (_latestAutocorrelation < AutoCorrThreshold)
{
// Price below average - buy signal
if (_currentPrice < smaValue && Position <= 0)
{
BuyMarket(Volume);
LogInfo($"Buy signal: Autocorr={_latestAutocorrelation}, Price={_currentPrice}, SMA={smaValue}");
}
// Price above average - sell signal
else if (_currentPrice > smaValue && Position >= 0)
{
SellMarket(Volume + Math.Abs(Position));
LogInfo($"Sell signal: Autocorr={_latestAutocorrelation}, Price={_currentPrice}, SMA={smaValue}");
}
}
}
private decimal CalculateAutocorrelation()
{
// Convert queue to array for easier calculation
decimal[] prices = [.. _priceHistory];
// Calculate price changes
decimal[] priceChanges = new decimal[prices.Length - 1];
for (int i = 0; i < prices.Length - 1; i++)
{
priceChanges[i] = prices[i + 1] - prices[i];
}
// Calculate autocorrelation of lag 1
decimal meanChange = priceChanges.Average();
decimal numerator = 0;
decimal denominator = 0;
for (int i = 0; i < priceChanges.Length - 1; i++)
{
decimal deviation1 = priceChanges[i] - meanChange;
decimal deviation2 = priceChanges[i + 1] - meanChange;
numerator += deviation1 * deviation2;
denominator += deviation1 * deviation1;
}
// Guard against division by zero
if (denominator == 0)
return 0;
return numerator / denominator;
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates, Unit, UnitTypes
from StockSharp.Algo.Indicators import SimpleMovingAverage
from StockSharp.Algo.Strategies import Strategy
from StockSharp.BusinessEntities import Security
from datatype_extensions import *
class autocorrelation_reversion_strategy(Strategy):
"""
Strategy that trades based on price autocorrelation.
Buys when autocorrelation is negative and price is below average.
Sells when autocorrelation is negative and price is above average.
"""
def __init__(self):
super(autocorrelation_reversion_strategy, self).__init__()
# Initialize strategy parameters
self._auto_corr_period = self.Param("AutoCorrPeriod", 20) \
.SetDisplay("Autocorrelation period", "Period for autocorrelation calculation", "Strategy parameters") \
.SetCanOptimize(True) \
.SetOptimize(10, 30, 5)
self._auto_corr_threshold = self.Param("AutoCorrThreshold", -0.3) \
.SetDisplay("Autocorr threshold", "Threshold for autocorrelation signals", "Strategy parameters") \
.SetCanOptimize(True) \
.SetOptimize(-0.5, -0.1, 0.1)
self._stop_loss_percent = self.Param("StopLossPercent", 2.0) \
.SetDisplay("Stop-loss %", "Stop-loss as percentage from entry price", "Risk management") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5)
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle type", "Type of candles to use", "General")
self._sma = None
self._current_price = 0.0
self._price_history = []
self._latest_autocorrelation = 0.0
# Period for autocorrelation calculation.
@property
def AutoCorrPeriod(self):
return self._auto_corr_period.Value
@AutoCorrPeriod.setter
def AutoCorrPeriod(self, value):
self._auto_corr_period.Value = value
# Autocorrelation threshold for signal generation.
@property
def AutoCorrThreshold(self):
return self._auto_corr_threshold.Value
@AutoCorrThreshold.setter
def AutoCorrThreshold(self, value):
self._auto_corr_threshold.Value = value
# Stop-loss percentage.
@property
def StopLossPercent(self):
return self._stop_loss_percent.Value
@StopLossPercent.setter
def StopLossPercent(self, value):
self._stop_loss_percent.Value = value
# Type of candles to use.
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType)]
def OnReseted(self):
super(autocorrelation_reversion_strategy, self).OnReseted()
self._price_history = []
self._latest_autocorrelation = 0.0
self._current_price = 0.0
def OnStarted2(self, time):
super(autocorrelation_reversion_strategy, self).OnStarted2(time)
# Initialize the SMA indicator (using same period as autocorrelation for simplicity)
self._sma = SimpleMovingAverage()
self._sma.Length = self.AutoCorrPeriod
# Create a subscription to candlesticks
subscription = self.SubscribeCandles(self.CandleType)
# Subscribe to candle processing
subscription.Bind(self._sma, self.ProcessCandle).Start()
# Start position protection
self.StartProtection(
takeProfit=Unit(self.StopLossPercent, UnitTypes.Percent),
stopLoss=Unit(self.StopLossPercent * 1.5, UnitTypes.Percent)
)
# Setup chart if available
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._sma)
self.DrawOwnTrades(area)
def ProcessCandle(self, candle, sma_value):
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
# Update current price and price history
self._current_price = float(candle.ClosePrice)
# Update price history queue
self._price_history.append(self._current_price)
if len(self._price_history) > self.AutoCorrPeriod:
self._price_history.pop(0)
# Wait until we have enough data
if len(self._price_history) < self.AutoCorrPeriod:
return
# Check if strategy is ready to trade
# Calculate autocorrelation
self._latest_autocorrelation = self.CalculateAutocorrelation()
# Log the autocorrelation value
self.LogInfo(
"Autocorrelation: {0}, Current price: {1}, SMA: {2}".format(
self._latest_autocorrelation, self._current_price, sma_value))
# Trading logic: Look for negative autocorrelation below threshold
if self._latest_autocorrelation < self.AutoCorrThreshold:
# Price below average - buy signal
if self._current_price < sma_value and self.Position <= 0:
self.BuyMarket(self.Volume)
self.LogInfo(
"Buy signal: Autocorr={0}, Price={1}, SMA={2}".format(
self._latest_autocorrelation, self._current_price, sma_value))
# Price above average - sell signal
elif self._current_price > sma_value and self.Position >= 0:
self.SellMarket(self.Volume + Math.Abs(self.Position))
self.LogInfo(
"Sell signal: Autocorr={0}, Price={1}, SMA={2}".format(
self._latest_autocorrelation, self._current_price, sma_value))
def CalculateAutocorrelation(self):
# Convert queue to array for easier calculation
prices = list(self._price_history)
# Calculate price changes
price_changes = [prices[i + 1] - prices[i] for i in range(len(prices) - 1)]
# Calculate autocorrelation of lag 1
if not price_changes:
return 0.0
mean_change = sum(price_changes) / len(price_changes)
numerator = 0.0
denominator = 0.0
for i in range(len(price_changes) - 1):
deviation1 = price_changes[i] - mean_change
deviation2 = price_changes[i + 1] - mean_change
numerator += deviation1 * deviation2
denominator += deviation1 * deviation1
# Guard against division by zero
if denominator == 0:
return 0.0
return numerator / denominator
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return autocorrelation_reversion_strategy()