MK Custome Adaptive SuperTrend Strategy
Adaptive SuperTrend that clusters ATR volatility into three levels. Long trades occur when trend flips up, shorts when it turns down. Stops use the SuperTrend line with optional percent take profit and stop loss.
Long: Direction changes to uptrend.
Short: Direction changes to downtrend.
Exit: Opposite signal, SuperTrend break, or percent stop.
Indicators: SuperTrend, ATR.
Stops: SuperTrend, percent stop-loss.
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
using StockSharp.Algo;
using StockSharp.Algo.Candles;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Adaptive SuperTrend strategy using volatility clustering.
/// </summary>
public class MKCustomeAdaptiveSuperTrendStrategy : Strategy
{
private readonly StrategyParam<int> _atrLength;
private readonly StrategyParam<decimal> _factor;
private readonly StrategyParam<int> _trainingPeriod;
private readonly StrategyParam<DataType> _candleType;
private AverageTrueRange _atr;
private readonly List<decimal> _atrHistory = new();
private decimal _prevLowerBand;
private decimal _prevUpperBand;
private decimal _prevSuperTrend;
private int _prevDirection;
public int AtrLength { get => _atrLength.Value; set => _atrLength.Value = value; }
public decimal Factor { get => _factor.Value; set => _factor.Value = value; }
public int TrainingPeriod { get => _trainingPeriod.Value; set => _trainingPeriod.Value = value; }
public DataType CandleType { get => _candleType.Value; set => _candleType.Value = value; }
public MKCustomeAdaptiveSuperTrendStrategy()
{
_atrLength = Param(nameof(AtrLength), 10);
_factor = Param(nameof(Factor), 3m);
_trainingPeriod = Param(nameof(TrainingPeriod), 20);
_candleType = Param(nameof(CandleType), TimeSpan.FromMinutes(5).TimeFrame());
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_atr = null;
_atrHistory.Clear();
_prevLowerBand = 0m;
_prevUpperBand = 0m;
_prevSuperTrend = 0m;
_prevDirection = 0;
}
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
StartProtection(null, null);
_prevLowerBand = 0;
_prevUpperBand = 0;
_prevSuperTrend = 0;
_prevDirection = 0;
_atrHistory.Clear();
_atr = new AverageTrueRange { Length = AtrLength };
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_atr, ProcessCandle)
.Start();
}
private void ProcessCandle(ICandleMessage candle, decimal atr)
{
if (candle.State != CandleStates.Finished)
return;
if (!_atr.IsFormed)
return;
_atrHistory.Add(atr);
if (_atrHistory.Count > TrainingPeriod)
_atrHistory.RemoveAt(0);
if (_atrHistory.Count < TrainingPeriod)
return;
if (ProcessState != ProcessStates.Started)
return;
var atrHigh = _atrHistory.Max();
var atrLow = _atrHistory.Min();
var range = atrHigh - atrLow;
if (range <= 0) range = atr * 0.01m;
var highVol = atrLow + range * 0.75m;
var midVol = atrLow + range * 0.5m;
var lowVol = atrLow + range * 0.25m;
var distHigh = Math.Abs(atr - highVol);
var distMid = Math.Abs(atr - midVol);
var distLow = Math.Abs(atr - lowVol);
var assigned = distHigh < distMid
? (distHigh < distLow ? highVol : lowVol)
: (distMid < distLow ? midVol : lowVol);
var (st, dir) = CalcSuperTrend(candle, assigned);
if (_prevDirection <= 0 && dir > 0 && Position <= 0)
{
if (Position < 0)
BuyMarket(Math.Abs(Position));
BuyMarket();
}
else if (_prevDirection >= 0 && dir < 0 && Position >= 0)
{
if (Position > 0)
SellMarket(Math.Abs(Position));
SellMarket();
}
_prevDirection = dir;
}
private (decimal st, int dir) CalcSuperTrend(ICandleMessage candle, decimal atrVal)
{
var src = (candle.HighPrice + candle.LowPrice) / 2m;
var upperBand = src + Factor * atrVal;
var lowerBand = src - Factor * atrVal;
if (_prevLowerBand != default && lowerBand < _prevLowerBand && candle.ClosePrice > _prevLowerBand)
lowerBand = _prevLowerBand;
if (_prevUpperBand != default && upperBand > _prevUpperBand && candle.ClosePrice < _prevUpperBand)
upperBand = _prevUpperBand;
int dir;
if (_prevSuperTrend == 0)
dir = candle.ClosePrice > src ? 1 : -1;
else if (_prevSuperTrend == _prevUpperBand)
dir = candle.ClosePrice > upperBand ? 1 : -1;
else
dir = candle.ClosePrice < lowerBand ? -1 : 1;
var st = dir == 1 ? lowerBand : upperBand;
_prevLowerBand = lowerBand;
_prevUpperBand = upperBand;
_prevSuperTrend = st;
return (st, dir);
}
}
import clr
import math
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import AverageTrueRange
from StockSharp.Algo.Strategies import Strategy
class mk_custome_adaptive_super_trend_strategy(Strategy):
def __init__(self):
super(mk_custome_adaptive_super_trend_strategy, self).__init__()
self._atr_length = self.Param("AtrLength", 10) \
.SetGreaterThanZero()
self._factor = self.Param("Factor", 3.0) \
.SetGreaterThanZero()
self._training_period = self.Param("TrainingPeriod", 20) \
.SetGreaterThanZero()
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromMinutes(5)))
self._atr_history = []
self._prev_lower_band = 0.0
self._prev_upper_band = 0.0
self._prev_super_trend = 0.0
self._prev_direction = 0
@property
def candle_type(self):
return self._candle_type.Value
@candle_type.setter
def candle_type(self, value):
self._candle_type.Value = value
def OnReseted(self):
super(mk_custome_adaptive_super_trend_strategy, self).OnReseted()
self._atr_history = []
self._prev_lower_band = 0.0
self._prev_upper_band = 0.0
self._prev_super_trend = 0.0
self._prev_direction = 0
def OnStarted2(self, time):
super(mk_custome_adaptive_super_trend_strategy, self).OnStarted2(time)
self._atr_history = []
self._prev_lower_band = 0.0
self._prev_upper_band = 0.0
self._prev_super_trend = 0.0
self._prev_direction = 0
self._atr = AverageTrueRange()
self._atr.Length = self._atr_length.Value
subscription = self.SubscribeCandles(self.candle_type)
subscription.Bind(self._atr, self.OnProcess).Start()
def OnProcess(self, candle, atr_value):
if candle.State != CandleStates.Finished:
return
if not self._atr.IsFormed:
return
av = float(atr_value)
tp = self._training_period.Value
self._atr_history.append(av)
if len(self._atr_history) > tp:
self._atr_history.pop(0)
if len(self._atr_history) < tp:
return
atr_high = max(self._atr_history)
atr_low = min(self._atr_history)
rng = atr_high - atr_low
if rng <= 0.0:
rng = av * 0.01
high_vol = atr_low + rng * 0.75
mid_vol = atr_low + rng * 0.5
low_vol = atr_low + rng * 0.25
dist_high = abs(av - high_vol)
dist_mid = abs(av - mid_vol)
dist_low = abs(av - low_vol)
if dist_high < dist_mid:
assigned = high_vol if dist_high < dist_low else low_vol
else:
assigned = mid_vol if dist_mid < dist_low else low_vol
high = float(candle.HighPrice)
low = float(candle.LowPrice)
close = float(candle.ClosePrice)
factor = float(self._factor.Value)
src = (high + low) / 2.0
upper_band = src + factor * assigned
lower_band = src - factor * assigned
if self._prev_lower_band != 0.0 and lower_band < self._prev_lower_band and close > self._prev_lower_band:
lower_band = self._prev_lower_band
if self._prev_upper_band != 0.0 and upper_band > self._prev_upper_band and close < self._prev_upper_band:
upper_band = self._prev_upper_band
if self._prev_super_trend == 0.0:
direction = 1 if close > src else -1
elif self._prev_super_trend == self._prev_upper_band:
direction = 1 if close > upper_band else -1
else:
direction = -1 if close < lower_band else 1
st = lower_band if direction == 1 else upper_band
if self._prev_direction <= 0 and direction > 0 and self.Position <= 0:
self.BuyMarket()
elif self._prev_direction >= 0 and direction < 0 and self.Position >= 0:
self.SellMarket()
self._prev_lower_band = lower_band
self._prev_upper_band = upper_band
self._prev_super_trend = st
self._prev_direction = direction
def CreateClone(self):
return mk_custome_adaptive_super_trend_strategy()