Parabolic SAR 距離ブレイクアウト戦略
Parabolic SAR 距離ブレイクアウト戦略は、Parabolic の急速な拡張を観察します。読み値が最近の範囲を超えてジャンプすると、価格はしばしば新しい動きを開始します。
テストでは年平均リターン約118%を示しています。株式市場で最もよく機能します。
インジケーターが最近のデータと偏差乗数から導かれたバンドを突き抜けると、ポジションが開きます。ストップを付けたロングとショートの両取引が可能です。
このシステムは早期ブレイクアウトを求めるモメンタムトレーダーに適しています。Parabolic が平均に戻るとトレードはクローズされます。デフォルト値は Acceleration = 0.02m から始まります。
詳細
- エントリー条件: インジケーターが偏差乗数分だけ平均を上回る。
- ロング/ショート: 両方向。
- エグジット条件: インジケーターが平均に戻る。
- ストップ: はい。
- デフォルト値:
Acceleration= 0.02mMaxAcceleration= 0.2mLookbackPeriod= 20DeviationMultiplier= 2mCandleType= TimeSpan.FromMinutes(5)
- フィルター:
- カテゴリ: ブレイクアウト
- 方向: 両方
- インジケーター: Parabolic
- ストップ: はい
- 複雑さ: 中級
- 時間軸: 短期
- 季節性: いいえ
- ニューラルネットワーク: いいえ
- ダイバージェンス: いいえ
- リスクレベル: 中
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that enters positions when the distance between price and Parabolic SAR
/// exceeds the average distance plus a multiple of standard deviation
/// </summary>
public class ParabolicSarDistanceBreakoutStrategy : Strategy
{
private readonly StrategyParam<decimal> _acceleration;
private readonly StrategyParam<decimal> _maxAcceleration;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _deviationMultiplier;
private readonly StrategyParam<DataType> _candleType;
private ParabolicSar _parabolicSar;
private decimal _avgDistanceLong;
private decimal _stdDevDistanceLong;
private decimal _avgDistanceShort;
private decimal _stdDevDistanceShort;
private decimal _lastLongDistance;
private decimal _lastShortDistance;
private int _samplesCount;
/// <summary>
/// Initial acceleration factor for Parabolic SAR
/// </summary>
public decimal Acceleration
{
get => _acceleration.Value;
set => _acceleration.Value = value;
}
/// <summary>
/// Maximum acceleration factor for Parabolic SAR
/// </summary>
public decimal MaxAcceleration
{
get => _maxAcceleration.Value;
set => _maxAcceleration.Value = value;
}
/// <summary>
/// Lookback period for distance statistics calculation
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Standard deviation multiplier for breakout detection
/// </summary>
public decimal DeviationMultiplier
{
get => _deviationMultiplier.Value;
set => _deviationMultiplier.Value = value;
}
/// <summary>
/// Candle type
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor
/// </summary>
public ParabolicSarDistanceBreakoutStrategy()
{
_acceleration = Param(nameof(Acceleration), 0.02m)
.SetGreaterThanZero()
.SetDisplay("Acceleration", "Initial acceleration factor for Parabolic SAR", "Indicator Parameters")
.SetOptimize(0.01m, 0.05m, 0.01m);
_maxAcceleration = Param(nameof(MaxAcceleration), 0.2m)
.SetGreaterThanZero()
.SetDisplay("Max Acceleration", "Maximum acceleration factor for Parabolic SAR", "Indicator Parameters")
.SetOptimize(0.1m, 0.5m, 0.1m);
_lookbackPeriod = Param(nameof(LookbackPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Lookback Period", "Period for statistical calculations", "Strategy Parameters")
.SetOptimize(10, 50, 5);
_deviationMultiplier = Param(nameof(DeviationMultiplier), 2m)
.SetGreaterThanZero()
.SetDisplay("Deviation Multiplier", "Standard deviation multiplier for breakout detection", "Strategy Parameters")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_avgDistanceLong = 0;
_stdDevDistanceLong = 0;
_avgDistanceShort = 0;
_stdDevDistanceShort = 0;
_lastLongDistance = 0;
_lastShortDistance = 0;
_samplesCount = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
_parabolicSar = new ParabolicSar
{
Acceleration = Acceleration,
AccelerationMax = MaxAcceleration
};
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_parabolicSar, ProcessCandle)
.Start();
// Setup chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _parabolicSar);
DrawOwnTrades(area);
}
// Set up position protection using the dynamic Parabolic SAR
StartProtection(
takeProfit: null, // We'll handle exits via strategy logic
stopLoss: null, // The dynamic SAR will act as our stop
isStopTrailing: true
);
base.OnStarted2(time);
}
private void ProcessCandle(ICandleMessage candle, decimal sarValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready for trading
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Calculate distances
decimal longDistance = 0;
decimal shortDistance = 0;
// If SAR is below price, it's in uptrend
if (sarValue < candle.ClosePrice)
longDistance = candle.ClosePrice - sarValue;
// If SAR is above price, it's in downtrend
else if (sarValue > candle.ClosePrice)
shortDistance = sarValue - candle.ClosePrice;
// Update statistics
UpdateDistanceStatistics(longDistance, shortDistance);
// Trading logic
if (_samplesCount >= LookbackPeriod)
{
// Long signal: distance exceeds average + k*stddev and we don't have a long position
if (longDistance > 0 &&
longDistance > _avgDistanceLong + DeviationMultiplier * _stdDevDistanceLong &&
Position <= 0)
{
// Cancel existing orders
CancelActiveOrders();
// Enter long position
var volume = Volume + Math.Abs(Position);
BuyMarket(volume);
LogInfo($"Long signal: Distance {longDistance} > Avg {_avgDistanceLong} + {DeviationMultiplier}*StdDev {_stdDevDistanceLong}");
}
// Short signal: distance exceeds average + k*stddev and we don't have a short position
else if (shortDistance > 0 &&
shortDistance > _avgDistanceShort + DeviationMultiplier * _stdDevDistanceShort &&
Position >= 0)
{
// Cancel existing orders
CancelActiveOrders();
// Enter short position
var volume = Volume + Math.Abs(Position);
SellMarket(volume);
LogInfo($"Short signal: Distance {shortDistance} > Avg {_avgDistanceShort} + {DeviationMultiplier}*StdDev {_stdDevDistanceShort}");
}
// Exit conditions - when price crosses SAR
if (Position > 0 && candle.ClosePrice < sarValue)
{
// Exit long position
SellMarket(Math.Abs(Position));
LogInfo($"Exit long: Price {candle.ClosePrice} crossed below SAR {sarValue}");
}
else if (Position < 0 && candle.ClosePrice > sarValue)
{
// Exit short position
BuyMarket(Math.Abs(Position));
LogInfo($"Exit short: Price {candle.ClosePrice} crossed above SAR {sarValue}");
}
}
// Store current distances for next update
_lastLongDistance = longDistance;
_lastShortDistance = shortDistance;
}
private void UpdateDistanceStatistics(decimal longDistance, decimal shortDistance)
{
_samplesCount++;
// Simple calculation of running average and standard deviation
if (_samplesCount == 1)
{
// Initialize with first values
_avgDistanceLong = longDistance;
_avgDistanceShort = shortDistance;
_stdDevDistanceLong = 0;
_stdDevDistanceShort = 0;
}
else
{
// Update running average
decimal oldAvgLong = _avgDistanceLong;
decimal oldAvgShort = _avgDistanceShort;
_avgDistanceLong = oldAvgLong + (longDistance - oldAvgLong) / _samplesCount;
_avgDistanceShort = oldAvgShort + (shortDistance - oldAvgShort) / _samplesCount;
// Update running standard deviation using Welford's algorithm
if (_samplesCount > 1)
{
_stdDevDistanceLong = (1 - 1.0m / (_samplesCount - 1)) * _stdDevDistanceLong +
_samplesCount * ((_avgDistanceLong - oldAvgLong) * (_avgDistanceLong - oldAvgLong));
_stdDevDistanceShort = (1 - 1.0m / (_samplesCount - 1)) * _stdDevDistanceShort +
_samplesCount * ((_avgDistanceShort - oldAvgShort) * (_avgDistanceShort - oldAvgShort));
}
// We only need last LookbackPeriod samples
if (_samplesCount > LookbackPeriod)
{
_samplesCount = LookbackPeriod;
}
}
// Calculate square root for final standard deviation
_stdDevDistanceLong = (decimal)Math.Sqrt((double)_stdDevDistanceLong / _samplesCount);
_stdDevDistanceShort = (decimal)Math.Sqrt((double)_stdDevDistanceShort / _samplesCount);
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import ParabolicSar
from StockSharp.Algo.Strategies import Strategy
from datatype_extensions import *
class parabolic_sar_distance_breakout_strategy(Strategy):
"""Strategy that enters positions when the distance between price and Parabolic SAR
exceeds the average distance plus a multiple of standard deviation"""
def __init__(self):
super(parabolic_sar_distance_breakout_strategy, self).__init__()
# Initial acceleration factor for Parabolic SAR
self._acceleration = self.Param("Acceleration", 0.02) \
.SetGreaterThanZero() \
.SetDisplay("Acceleration", "Initial acceleration factor for Parabolic SAR", "Indicator Parameters") \
.SetCanOptimize(True) \
.SetOptimize(0.01, 0.05, 0.01)
# Maximum acceleration factor for Parabolic SAR
self._max_acceleration = self.Param("MaxAcceleration", 0.2) \
.SetGreaterThanZero() \
.SetDisplay("Max Acceleration", "Maximum acceleration factor for Parabolic SAR", "Indicator Parameters") \
.SetCanOptimize(True) \
.SetOptimize(0.1, 0.5, 0.1)
# Lookback period for distance statistics calculation
self._lookback_period = self.Param("LookbackPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Lookback Period", "Period for statistical calculations", "Strategy Parameters") \
.SetCanOptimize(True) \
.SetOptimize(10, 50, 5)
# Standard deviation multiplier for breakout detection
self._deviation_multiplier = self.Param("DeviationMultiplier", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Deviation Multiplier", "Standard deviation multiplier for breakout detection", "Strategy Parameters") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5)
# Candle type
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._parabolic_sar = None
self._avg_distance_long = 0
self._std_dev_distance_long = 0
self._avg_distance_short = 0
self._std_dev_distance_short = 0
self._last_long_distance = 0
self._last_short_distance = 0
self._samples_count = 0
# Initial acceleration factor for Parabolic SAR
@property
def Acceleration(self):
return self._acceleration.Value
@Acceleration.setter
def Acceleration(self, value):
self._acceleration.Value = value
# Maximum acceleration factor for Parabolic SAR
@property
def MaxAcceleration(self):
return self._max_acceleration.Value
@MaxAcceleration.setter
def MaxAcceleration(self, value):
self._max_acceleration.Value = value
# Lookback period for distance statistics calculation
@property
def LookbackPeriod(self):
return self._lookback_period.Value
@LookbackPeriod.setter
def LookbackPeriod(self, value):
self._lookback_period.Value = value
# Standard deviation multiplier for breakout detection
@property
def DeviationMultiplier(self):
return self._deviation_multiplier.Value
@DeviationMultiplier.setter
def DeviationMultiplier(self, value):
self._deviation_multiplier.Value = value
# Candle type
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType)]
def OnReseted(self):
"""
Resets internal state when strategy is reset.
"""
super(parabolic_sar_distance_breakout_strategy, self).OnReseted()
self._avg_distance_long = 0
self._std_dev_distance_long = 0
self._avg_distance_short = 0
self._std_dev_distance_short = 0
self._last_long_distance = 0
self._last_short_distance = 0
self._samples_count = 0
def OnStarted2(self, time):
self._parabolic_sar = ParabolicSar()
self._parabolic_sar.Acceleration = self.Acceleration
self._parabolic_sar.AccelerationMax = self.MaxAcceleration
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self._parabolic_sar, self.ProcessCandle).Start()
# Setup chart visualization if available
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._parabolic_sar)
self.DrawOwnTrades(area)
# Set up position protection using the dynamic Parabolic SAR
self.StartProtection(
takeProfit=None,
stopLoss=None,
isStopTrailing=True
)
super(parabolic_sar_distance_breakout_strategy, self).OnStarted2(time)
def ProcessCandle(self, candle, sar_value):
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
# Check if strategy is ready for trading
# Calculate distances
long_distance = 0
short_distance = 0
# If SAR is below price, it's in uptrend
if sar_value < candle.ClosePrice:
long_distance = float(candle.ClosePrice - sar_value)
# If SAR is above price, it's in downtrend
elif sar_value > candle.ClosePrice:
short_distance = float(sar_value - candle.ClosePrice)
# Update statistics
self.UpdateDistanceStatistics(long_distance, short_distance)
# Trading logic
if self._samples_count >= self.LookbackPeriod:
# Long signal: distance exceeds average + k*stddev and we don't have a long position
if long_distance > 0 and \
long_distance > self._avg_distance_long + self.DeviationMultiplier * self._std_dev_distance_long and \
self.Position <= 0:
# Cancel existing orders
self.CancelActiveOrders()
# Enter long position
volume = self.Volume + Math.Abs(self.Position)
self.BuyMarket(volume)
self.LogInfo("Long signal: Distance {0} > Avg {1} + {2}*StdDev {3}".format(
long_distance, self._avg_distance_long, self.DeviationMultiplier, self._std_dev_distance_long))
# Short signal: distance exceeds average + k*stddev and we don't have a short position
elif short_distance > 0 and \
short_distance > self._avg_distance_short + self.DeviationMultiplier * self._std_dev_distance_short and \
self.Position >= 0:
# Cancel existing orders
self.CancelActiveOrders()
# Enter short position
volume = self.Volume + Math.Abs(self.Position)
self.SellMarket(volume)
self.LogInfo("Short signal: Distance {0} > Avg {1} + {2}*StdDev {3}".format(
short_distance, self._avg_distance_short, self.DeviationMultiplier, self._std_dev_distance_short))
# Exit conditions - when price crosses SAR
if self.Position > 0 and candle.ClosePrice < sar_value:
# Exit long position
self.SellMarket(Math.Abs(self.Position))
self.LogInfo("Exit long: Price {0} crossed below SAR {1}".format(candle.ClosePrice, sar_value))
elif self.Position < 0 and candle.ClosePrice > sar_value:
# Exit short position
self.BuyMarket(Math.Abs(self.Position))
self.LogInfo("Exit short: Price {0} crossed above SAR {1}".format(candle.ClosePrice, sar_value))
# Store current distances for next update
self._last_long_distance = long_distance
self._last_short_distance = short_distance
def UpdateDistanceStatistics(self, long_distance, short_distance):
self._samples_count += 1
# Simple calculation of running average and standard deviation
if self._samples_count == 1:
# Initialize with first values
self._avg_distance_long = long_distance
self._avg_distance_short = short_distance
self._std_dev_distance_long = 0
self._std_dev_distance_short = 0
else:
# Update running average
old_avg_long = self._avg_distance_long
old_avg_short = self._avg_distance_short
self._avg_distance_long = old_avg_long + (long_distance - old_avg_long) / self._samples_count
self._avg_distance_short = old_avg_short + (short_distance - old_avg_short) / self._samples_count
# Update running standard deviation using Welford's algorithm
if self._samples_count > 1:
self._std_dev_distance_long = (1 - 1.0 / (self._samples_count - 1)) * self._std_dev_distance_long + \
self._samples_count * ((self._avg_distance_long - old_avg_long) * (self._avg_distance_long - old_avg_long))
self._std_dev_distance_short = (1 - 1.0 / (self._samples_count - 1)) * self._std_dev_distance_short + \
self._samples_count * ((self._avg_distance_short - old_avg_short) * (self._avg_distance_short - old_avg_short))
# We only need last LookbackPeriod samples
if self._samples_count > self.LookbackPeriod:
self._samples_count = self.LookbackPeriod
# Calculate square root for final standard deviation
self._std_dev_distance_long = Math.Sqrt(float(self._std_dev_distance_long) / self._samples_count)
self._std_dev_distance_short = Math.Sqrt(float(self._std_dev_distance_short) / self._samples_count)
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return parabolic_sar_distance_breakout_strategy()