Прорыв расстояния Parabolic SAR
Стратегия Parabolic SAR Distance Breakout отслеживает быстрые расширения индикатора Parabolic. Когда его значения выходят за недавний диапазон, цена часто начинает новое движение.
Тестирование показывает среднегодичную доходность около 118%. Стратегию лучше запускать на фондовом рынке.
Позиция открывается, когда индикатор пробивает диапазон, сформированный по последним данным и умноженный на коэффициент отклонения. Возможны сделки в обе стороны со стопом.
Такая система подходит трейдерам-моментщикам, ищущим ранние прорывы. Сделки закрываются, когда Parabolic возвращается к среднему. Значение по умолчанию Acceleration = 0.02m.
Подробности
- Условие входа: Индикатор превышает среднее значение на величину коэффициента отклонения.
- Лонг/Шорт: Оба направления.
- Условие выхода: Индикатор возвращается к среднему.
- Стопы: Да.
- Значения по умолчанию:
Acceleration= 0.02mMaxAcceleration= 0.2mLookbackPeriod= 20DeviationMultiplier= 2mCandleType= TimeSpan.FromMinutes(5)
- Фильтры:
- Категория: Прорыв
- Направление: Оба
- Индикаторы: Parabolic
- Стопы: Да
- Сложность: Средняя
- Таймфрейм: Краткосрочный
- Сезонность: Нет
- Нейронные сети: Нет
- Дивергенция: Нет
- Уровень риска: Средний
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Strategy that enters positions when the distance between price and Parabolic SAR
/// exceeds the average distance plus a multiple of standard deviation
/// </summary>
public class ParabolicSarDistanceBreakoutStrategy : Strategy
{
private readonly StrategyParam<decimal> _acceleration;
private readonly StrategyParam<decimal> _maxAcceleration;
private readonly StrategyParam<int> _lookbackPeriod;
private readonly StrategyParam<decimal> _deviationMultiplier;
private readonly StrategyParam<DataType> _candleType;
private ParabolicSar _parabolicSar;
private decimal _avgDistanceLong;
private decimal _stdDevDistanceLong;
private decimal _avgDistanceShort;
private decimal _stdDevDistanceShort;
private decimal _lastLongDistance;
private decimal _lastShortDistance;
private int _samplesCount;
/// <summary>
/// Initial acceleration factor for Parabolic SAR
/// </summary>
public decimal Acceleration
{
get => _acceleration.Value;
set => _acceleration.Value = value;
}
/// <summary>
/// Maximum acceleration factor for Parabolic SAR
/// </summary>
public decimal MaxAcceleration
{
get => _maxAcceleration.Value;
set => _maxAcceleration.Value = value;
}
/// <summary>
/// Lookback period for distance statistics calculation
/// </summary>
public int LookbackPeriod
{
get => _lookbackPeriod.Value;
set => _lookbackPeriod.Value = value;
}
/// <summary>
/// Standard deviation multiplier for breakout detection
/// </summary>
public decimal DeviationMultiplier
{
get => _deviationMultiplier.Value;
set => _deviationMultiplier.Value = value;
}
/// <summary>
/// Candle type
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
/// <summary>
/// Constructor
/// </summary>
public ParabolicSarDistanceBreakoutStrategy()
{
_acceleration = Param(nameof(Acceleration), 0.02m)
.SetGreaterThanZero()
.SetDisplay("Acceleration", "Initial acceleration factor for Parabolic SAR", "Indicator Parameters")
.SetOptimize(0.01m, 0.05m, 0.01m);
_maxAcceleration = Param(nameof(MaxAcceleration), 0.2m)
.SetGreaterThanZero()
.SetDisplay("Max Acceleration", "Maximum acceleration factor for Parabolic SAR", "Indicator Parameters")
.SetOptimize(0.1m, 0.5m, 0.1m);
_lookbackPeriod = Param(nameof(LookbackPeriod), 20)
.SetGreaterThanZero()
.SetDisplay("Lookback Period", "Period for statistical calculations", "Strategy Parameters")
.SetOptimize(10, 50, 5);
_deviationMultiplier = Param(nameof(DeviationMultiplier), 2m)
.SetGreaterThanZero()
.SetDisplay("Deviation Multiplier", "Standard deviation multiplier for breakout detection", "Strategy Parameters")
.SetOptimize(1m, 3m, 0.5m);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame())
.SetDisplay("Candle Type", "Type of candles to use", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_avgDistanceLong = 0;
_stdDevDistanceLong = 0;
_avgDistanceShort = 0;
_stdDevDistanceShort = 0;
_lastLongDistance = 0;
_lastShortDistance = 0;
_samplesCount = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
_parabolicSar = new ParabolicSar
{
Acceleration = Acceleration,
AccelerationMax = MaxAcceleration
};
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_parabolicSar, ProcessCandle)
.Start();
// Setup chart visualization if available
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _parabolicSar);
DrawOwnTrades(area);
}
// Set up position protection using the dynamic Parabolic SAR
StartProtection(
takeProfit: null, // We'll handle exits via strategy logic
stopLoss: null, // The dynamic SAR will act as our stop
isStopTrailing: true
);
base.OnStarted2(time);
}
private void ProcessCandle(ICandleMessage candle, decimal sarValue)
{
// Skip unfinished candles
if (candle.State != CandleStates.Finished)
return;
// Check if strategy is ready for trading
if (!IsFormedAndOnlineAndAllowTrading())
return;
// Calculate distances
decimal longDistance = 0;
decimal shortDistance = 0;
// If SAR is below price, it's in uptrend
if (sarValue < candle.ClosePrice)
longDistance = candle.ClosePrice - sarValue;
// If SAR is above price, it's in downtrend
else if (sarValue > candle.ClosePrice)
shortDistance = sarValue - candle.ClosePrice;
// Update statistics
UpdateDistanceStatistics(longDistance, shortDistance);
// Trading logic
if (_samplesCount >= LookbackPeriod)
{
// Long signal: distance exceeds average + k*stddev and we don't have a long position
if (longDistance > 0 &&
longDistance > _avgDistanceLong + DeviationMultiplier * _stdDevDistanceLong &&
Position <= 0)
{
// Cancel existing orders
CancelActiveOrders();
// Enter long position
var volume = Volume + Math.Abs(Position);
BuyMarket(volume);
LogInfo($"Long signal: Distance {longDistance} > Avg {_avgDistanceLong} + {DeviationMultiplier}*StdDev {_stdDevDistanceLong}");
}
// Short signal: distance exceeds average + k*stddev and we don't have a short position
else if (shortDistance > 0 &&
shortDistance > _avgDistanceShort + DeviationMultiplier * _stdDevDistanceShort &&
Position >= 0)
{
// Cancel existing orders
CancelActiveOrders();
// Enter short position
var volume = Volume + Math.Abs(Position);
SellMarket(volume);
LogInfo($"Short signal: Distance {shortDistance} > Avg {_avgDistanceShort} + {DeviationMultiplier}*StdDev {_stdDevDistanceShort}");
}
// Exit conditions - when price crosses SAR
if (Position > 0 && candle.ClosePrice < sarValue)
{
// Exit long position
SellMarket(Math.Abs(Position));
LogInfo($"Exit long: Price {candle.ClosePrice} crossed below SAR {sarValue}");
}
else if (Position < 0 && candle.ClosePrice > sarValue)
{
// Exit short position
BuyMarket(Math.Abs(Position));
LogInfo($"Exit short: Price {candle.ClosePrice} crossed above SAR {sarValue}");
}
}
// Store current distances for next update
_lastLongDistance = longDistance;
_lastShortDistance = shortDistance;
}
private void UpdateDistanceStatistics(decimal longDistance, decimal shortDistance)
{
_samplesCount++;
// Simple calculation of running average and standard deviation
if (_samplesCount == 1)
{
// Initialize with first values
_avgDistanceLong = longDistance;
_avgDistanceShort = shortDistance;
_stdDevDistanceLong = 0;
_stdDevDistanceShort = 0;
}
else
{
// Update running average
decimal oldAvgLong = _avgDistanceLong;
decimal oldAvgShort = _avgDistanceShort;
_avgDistanceLong = oldAvgLong + (longDistance - oldAvgLong) / _samplesCount;
_avgDistanceShort = oldAvgShort + (shortDistance - oldAvgShort) / _samplesCount;
// Update running standard deviation using Welford's algorithm
if (_samplesCount > 1)
{
_stdDevDistanceLong = (1 - 1.0m / (_samplesCount - 1)) * _stdDevDistanceLong +
_samplesCount * ((_avgDistanceLong - oldAvgLong) * (_avgDistanceLong - oldAvgLong));
_stdDevDistanceShort = (1 - 1.0m / (_samplesCount - 1)) * _stdDevDistanceShort +
_samplesCount * ((_avgDistanceShort - oldAvgShort) * (_avgDistanceShort - oldAvgShort));
}
// We only need last LookbackPeriod samples
if (_samplesCount > LookbackPeriod)
{
_samplesCount = LookbackPeriod;
}
}
// Calculate square root for final standard deviation
_stdDevDistanceLong = (decimal)Math.Sqrt((double)_stdDevDistanceLong / _samplesCount);
_stdDevDistanceShort = (decimal)Math.Sqrt((double)_stdDevDistanceShort / _samplesCount);
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan, Math
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import ParabolicSar
from StockSharp.Algo.Strategies import Strategy
from datatype_extensions import *
class parabolic_sar_distance_breakout_strategy(Strategy):
"""Strategy that enters positions when the distance between price and Parabolic SAR
exceeds the average distance plus a multiple of standard deviation"""
def __init__(self):
super(parabolic_sar_distance_breakout_strategy, self).__init__()
# Initial acceleration factor for Parabolic SAR
self._acceleration = self.Param("Acceleration", 0.02) \
.SetGreaterThanZero() \
.SetDisplay("Acceleration", "Initial acceleration factor for Parabolic SAR", "Indicator Parameters") \
.SetCanOptimize(True) \
.SetOptimize(0.01, 0.05, 0.01)
# Maximum acceleration factor for Parabolic SAR
self._max_acceleration = self.Param("MaxAcceleration", 0.2) \
.SetGreaterThanZero() \
.SetDisplay("Max Acceleration", "Maximum acceleration factor for Parabolic SAR", "Indicator Parameters") \
.SetCanOptimize(True) \
.SetOptimize(0.1, 0.5, 0.1)
# Lookback period for distance statistics calculation
self._lookback_period = self.Param("LookbackPeriod", 20) \
.SetGreaterThanZero() \
.SetDisplay("Lookback Period", "Period for statistical calculations", "Strategy Parameters") \
.SetCanOptimize(True) \
.SetOptimize(10, 50, 5)
# Standard deviation multiplier for breakout detection
self._deviation_multiplier = self.Param("DeviationMultiplier", 2.0) \
.SetGreaterThanZero() \
.SetDisplay("Deviation Multiplier", "Standard deviation multiplier for breakout detection", "Strategy Parameters") \
.SetCanOptimize(True) \
.SetOptimize(1.0, 3.0, 0.5)
# Candle type
self._candle_type = self.Param("CandleType", tf(5)) \
.SetDisplay("Candle Type", "Type of candles to use", "General")
self._parabolic_sar = None
self._avg_distance_long = 0
self._std_dev_distance_long = 0
self._avg_distance_short = 0
self._std_dev_distance_short = 0
self._last_long_distance = 0
self._last_short_distance = 0
self._samples_count = 0
# Initial acceleration factor for Parabolic SAR
@property
def Acceleration(self):
return self._acceleration.Value
@Acceleration.setter
def Acceleration(self, value):
self._acceleration.Value = value
# Maximum acceleration factor for Parabolic SAR
@property
def MaxAcceleration(self):
return self._max_acceleration.Value
@MaxAcceleration.setter
def MaxAcceleration(self, value):
self._max_acceleration.Value = value
# Lookback period for distance statistics calculation
@property
def LookbackPeriod(self):
return self._lookback_period.Value
@LookbackPeriod.setter
def LookbackPeriod(self, value):
self._lookback_period.Value = value
# Standard deviation multiplier for breakout detection
@property
def DeviationMultiplier(self):
return self._deviation_multiplier.Value
@DeviationMultiplier.setter
def DeviationMultiplier(self, value):
self._deviation_multiplier.Value = value
# Candle type
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def GetWorkingSecurities(self):
return [(self.Security, self.CandleType)]
def OnReseted(self):
"""
Resets internal state when strategy is reset.
"""
super(parabolic_sar_distance_breakout_strategy, self).OnReseted()
self._avg_distance_long = 0
self._std_dev_distance_long = 0
self._avg_distance_short = 0
self._std_dev_distance_short = 0
self._last_long_distance = 0
self._last_short_distance = 0
self._samples_count = 0
def OnStarted2(self, time):
self._parabolic_sar = ParabolicSar()
self._parabolic_sar.Acceleration = self.Acceleration
self._parabolic_sar.AccelerationMax = self.MaxAcceleration
subscription = self.SubscribeCandles(self.CandleType)
subscription.Bind(self._parabolic_sar, self.ProcessCandle).Start()
# Setup chart visualization if available
area = self.CreateChartArea()
if area is not None:
self.DrawCandles(area, subscription)
self.DrawIndicator(area, self._parabolic_sar)
self.DrawOwnTrades(area)
# Set up position protection using the dynamic Parabolic SAR
self.StartProtection(
takeProfit=None,
stopLoss=None,
isStopTrailing=True
)
super(parabolic_sar_distance_breakout_strategy, self).OnStarted2(time)
def ProcessCandle(self, candle, sar_value):
# Skip unfinished candles
if candle.State != CandleStates.Finished:
return
# Check if strategy is ready for trading
# Calculate distances
long_distance = 0
short_distance = 0
# If SAR is below price, it's in uptrend
if sar_value < candle.ClosePrice:
long_distance = float(candle.ClosePrice - sar_value)
# If SAR is above price, it's in downtrend
elif sar_value > candle.ClosePrice:
short_distance = float(sar_value - candle.ClosePrice)
# Update statistics
self.UpdateDistanceStatistics(long_distance, short_distance)
# Trading logic
if self._samples_count >= self.LookbackPeriod:
# Long signal: distance exceeds average + k*stddev and we don't have a long position
if long_distance > 0 and \
long_distance > self._avg_distance_long + self.DeviationMultiplier * self._std_dev_distance_long and \
self.Position <= 0:
# Cancel existing orders
self.CancelActiveOrders()
# Enter long position
volume = self.Volume + Math.Abs(self.Position)
self.BuyMarket(volume)
self.LogInfo("Long signal: Distance {0} > Avg {1} + {2}*StdDev {3}".format(
long_distance, self._avg_distance_long, self.DeviationMultiplier, self._std_dev_distance_long))
# Short signal: distance exceeds average + k*stddev and we don't have a short position
elif short_distance > 0 and \
short_distance > self._avg_distance_short + self.DeviationMultiplier * self._std_dev_distance_short and \
self.Position >= 0:
# Cancel existing orders
self.CancelActiveOrders()
# Enter short position
volume = self.Volume + Math.Abs(self.Position)
self.SellMarket(volume)
self.LogInfo("Short signal: Distance {0} > Avg {1} + {2}*StdDev {3}".format(
short_distance, self._avg_distance_short, self.DeviationMultiplier, self._std_dev_distance_short))
# Exit conditions - when price crosses SAR
if self.Position > 0 and candle.ClosePrice < sar_value:
# Exit long position
self.SellMarket(Math.Abs(self.Position))
self.LogInfo("Exit long: Price {0} crossed below SAR {1}".format(candle.ClosePrice, sar_value))
elif self.Position < 0 and candle.ClosePrice > sar_value:
# Exit short position
self.BuyMarket(Math.Abs(self.Position))
self.LogInfo("Exit short: Price {0} crossed above SAR {1}".format(candle.ClosePrice, sar_value))
# Store current distances for next update
self._last_long_distance = long_distance
self._last_short_distance = short_distance
def UpdateDistanceStatistics(self, long_distance, short_distance):
self._samples_count += 1
# Simple calculation of running average and standard deviation
if self._samples_count == 1:
# Initialize with first values
self._avg_distance_long = long_distance
self._avg_distance_short = short_distance
self._std_dev_distance_long = 0
self._std_dev_distance_short = 0
else:
# Update running average
old_avg_long = self._avg_distance_long
old_avg_short = self._avg_distance_short
self._avg_distance_long = old_avg_long + (long_distance - old_avg_long) / self._samples_count
self._avg_distance_short = old_avg_short + (short_distance - old_avg_short) / self._samples_count
# Update running standard deviation using Welford's algorithm
if self._samples_count > 1:
self._std_dev_distance_long = (1 - 1.0 / (self._samples_count - 1)) * self._std_dev_distance_long + \
self._samples_count * ((self._avg_distance_long - old_avg_long) * (self._avg_distance_long - old_avg_long))
self._std_dev_distance_short = (1 - 1.0 / (self._samples_count - 1)) * self._std_dev_distance_short + \
self._samples_count * ((self._avg_distance_short - old_avg_short) * (self._avg_distance_short - old_avg_short))
# We only need last LookbackPeriod samples
if self._samples_count > self.LookbackPeriod:
self._samples_count = self.LookbackPeriod
# Calculate square root for final standard deviation
self._std_dev_distance_long = Math.Sqrt(float(self._std_dev_distance_long) / self._samples_count)
self._std_dev_distance_short = Math.Sqrt(float(self._std_dev_distance_short) / self._samples_count)
def CreateClone(self):
"""!! REQUIRED!! Creates a new instance of the strategy."""
return parabolic_sar_distance_breakout_strategy()