Color Momentum AMA Strategy
This strategy converts the MetaTrader Expert Advisor Exp_ColorMomentum_AMA to StockSharp. It calculates price momentum over a configurable period and smooths it with the Kaufman Adaptive Moving Average (AMA). Trading signals are generated when the smoothed momentum shows two consecutive rises or falls.
Logic
- Long entry: Momentum AMA rises for two bars in a row. Any existing short position is closed before opening a new long.
- Short entry: Momentum AMA falls for two bars in a row. Any existing long position is closed before opening a new short.
- Opposite signals close current positions.
Parameters
- Candle type
- Momentum period
- AMA period
- Fast period
- Slow period
- Signal bar
using System;
using System.Linq;
using System.Collections.Generic;
using Ecng.Common;
using Ecng.Collections;
using Ecng.Serialization;
using StockSharp.Algo.Indicators;
using StockSharp.Algo.Strategies;
using StockSharp.BusinessEntities;
using StockSharp.Messages;
namespace StockSharp.Samples.Strategies;
/// <summary>
/// Color Momentum AMA strategy.
/// Generates signals based on momentum smoothed by the Kaufman Adaptive Moving Average.
/// A long position is opened after two consecutive rises of the smoothed momentum, a short position is opened after two consecutive falls.
/// Opposite signals close existing positions.
/// </summary>
public class ColorMomentumAmaStrategy : Strategy
{
private readonly StrategyParam<int> _momentumPeriod;
private readonly StrategyParam<int> _amaPeriod;
private readonly StrategyParam<int> _fastPeriod;
private readonly StrategyParam<int> _slowPeriod;
private readonly StrategyParam<int> _signalBar;
private readonly StrategyParam<int> _signalCooldownBars;
private readonly StrategyParam<DataType> _candleType;
private Momentum _momentum = null!;
private KaufmanAdaptiveMovingAverage _ama = null!;
private decimal?[] _buffer = null!;
private int _cooldownRemaining;
/// <summary>
/// Momentum lookback period.
/// </summary>
public int MomentumPeriod
{
get => _momentumPeriod.Value;
set => _momentumPeriod.Value = value;
}
/// <summary>
/// AMA smoothing length.
/// </summary>
public int AmaPeriod
{
get => _amaPeriod.Value;
set => _amaPeriod.Value = value;
}
/// <summary>
/// Fast period for AMA efficiency ratio.
/// </summary>
public int FastPeriod
{
get => _fastPeriod.Value;
set => _fastPeriod.Value = value;
}
/// <summary>
/// Slow period for AMA efficiency ratio.
/// </summary>
public int SlowPeriod
{
get => _slowPeriod.Value;
set => _slowPeriod.Value = value;
}
/// <summary>
/// Number of bars back used for signal calculation.
/// </summary>
public int SignalBar
{
get => _signalBar.Value;
set => _signalBar.Value = value;
}
/// <summary>
/// Bars to wait between trading actions.
/// </summary>
public int SignalCooldownBars
{
get => _signalCooldownBars.Value;
set => _signalCooldownBars.Value = value;
}
/// <summary>
/// Candle type used for calculations.
/// </summary>
public DataType CandleType
{
get => _candleType.Value;
set => _candleType.Value = value;
}
public ColorMomentumAmaStrategy()
{
_momentumPeriod = Param(nameof(MomentumPeriod), 8)
.SetGreaterThanZero()
.SetDisplay("Momentum period", "Lookback period for momentum", "Indicator")
.SetOptimize(5, 20, 1);
_amaPeriod = Param(nameof(AmaPeriod), 9)
.SetGreaterThanZero()
.SetDisplay("AMA period", "Smoothing length for AMA", "Indicator")
.SetOptimize(5, 30, 1);
_fastPeriod = Param(nameof(FastPeriod), 2)
.SetGreaterThanZero()
.SetDisplay("Fast period", "Fast period of AMA", "Indicator")
.SetOptimize(2, 10, 1);
_slowPeriod = Param(nameof(SlowPeriod), 30)
.SetGreaterThanZero()
.SetDisplay("Slow period", "Slow period of AMA", "Indicator")
.SetOptimize(20, 60, 5);
_signalBar = Param(nameof(SignalBar), 2)
.SetRange(1, 5)
.SetDisplay("Signal bar", "Bar index used for signals", "Strategy")
;
_signalCooldownBars = Param(nameof(SignalCooldownBars), 6)
.SetGreaterThanZero()
.SetDisplay("Signal cooldown", "Bars to wait between reversals", "Strategy");
_candleType = Param(nameof(CandleType), TimeSpan.FromHours(1).TimeFrame())
.SetDisplay("Candle type", "Type of candles", "General");
}
/// <inheritdoc />
public override IEnumerable<(Security sec, DataType dt)> GetWorkingSecurities()
{
return [(Security, CandleType)];
}
/// <inheritdoc />
protected override void OnReseted()
{
base.OnReseted();
_buffer = new decimal?[SignalBar + 3];
_cooldownRemaining = 0;
}
/// <inheritdoc />
protected override void OnStarted2(DateTime time)
{
base.OnStarted2(time);
_momentum = new Momentum { Length = MomentumPeriod };
_ama = new KaufmanAdaptiveMovingAverage
{
Length = AmaPeriod,
FastSCPeriod = FastPeriod,
SlowSCPeriod = SlowPeriod
};
_buffer = new decimal?[SignalBar + 3];
_cooldownRemaining = 0;
var subscription = SubscribeCandles(CandleType);
subscription
.Bind(_momentum, ProcessCandle)
.Start();
var area = CreateChartArea();
if (area != null)
{
DrawCandles(area, subscription);
DrawIndicator(area, _momentum);
DrawIndicator(area, _ama);
DrawOwnTrades(area);
}
}
private void ProcessCandle(ICandleMessage candle, decimal momentumValue)
{
if (candle.State != CandleStates.Finished)
return;
if (_cooldownRemaining > 0)
_cooldownRemaining--;
// Update AMA with the momentum value
var amaResult = _ama.Process(momentumValue, candle.OpenTime, true);
if (!_ama.IsFormed || amaResult.IsEmpty)
return;
var amaValue = amaResult.ToDecimal();
// Maintain circular buffer of last values for signal evaluation
for (var i = _buffer.Length - 1; i > 0; i--) _buffer[i] = _buffer[i - 1];
_buffer[0] = amaValue;
if (_buffer[SignalBar + 2] == null || _buffer[SignalBar + 1] == null) return;
var v0 = _buffer[SignalBar]!.Value;
var v1 = _buffer[SignalBar + 1]!.Value;
var v2 = _buffer[SignalBar + 2]!.Value;
// Evaluate trend direction using consecutive values
var rising = v2 < v1 && v1 < v0;
var falling = v2 > v1 && v1 > v0;
if (_cooldownRemaining == 0 && rising && Position <= 0)
{
BuyMarket(Volume + Math.Abs(Position));
_cooldownRemaining = SignalCooldownBars;
}
else if (_cooldownRemaining == 0 && falling && Position >= 0)
{
SellMarket(Volume + Math.Abs(Position));
_cooldownRemaining = SignalCooldownBars;
}
}
}
import clr
clr.AddReference("StockSharp.Messages")
clr.AddReference("StockSharp.Algo")
clr.AddReference("StockSharp.Algo.Indicators")
clr.AddReference("StockSharp.Algo.Strategies")
from System import Math, TimeSpan
from StockSharp.Messages import DataType, CandleStates
from StockSharp.Algo.Indicators import Momentum, KaufmanAdaptiveMovingAverage
from StockSharp.Algo.Strategies import Strategy
from indicator_extensions import *
class color_momentum_ama_strategy(Strategy):
def __init__(self):
super(color_momentum_ama_strategy, self).__init__()
self._momentum_period = self.Param("MomentumPeriod", 8) \
.SetDisplay("Momentum period", "Lookback period for momentum", "Indicator")
self._ama_period = self.Param("AmaPeriod", 9) \
.SetDisplay("AMA period", "Smoothing length for AMA", "Indicator")
self._fast_period = self.Param("FastPeriod", 2) \
.SetDisplay("Fast period", "Fast period of AMA", "Indicator")
self._slow_period = self.Param("SlowPeriod", 30) \
.SetDisplay("Slow period", "Slow period of AMA", "Indicator")
self._signal_bar = self.Param("SignalBar", 2) \
.SetDisplay("Signal bar", "Bar index used for signals", "Strategy")
self._signal_cooldown_bars = self.Param("SignalCooldownBars", 6) \
.SetDisplay("Signal cooldown", "Bars to wait between reversals", "Strategy")
self._candle_type = self.Param("CandleType", DataType.TimeFrame(TimeSpan.FromHours(1))) \
.SetDisplay("Candle type", "Type of candles", "General")
self._momentum = None
self._ama = None
self._buffer = None
self._cooldown_remaining = 0
@property
def MomentumPeriod(self):
return self._momentum_period.Value
@MomentumPeriod.setter
def MomentumPeriod(self, value):
self._momentum_period.Value = value
@property
def AmaPeriod(self):
return self._ama_period.Value
@AmaPeriod.setter
def AmaPeriod(self, value):
self._ama_period.Value = value
@property
def FastPeriod(self):
return self._fast_period.Value
@FastPeriod.setter
def FastPeriod(self, value):
self._fast_period.Value = value
@property
def SlowPeriod(self):
return self._slow_period.Value
@SlowPeriod.setter
def SlowPeriod(self, value):
self._slow_period.Value = value
@property
def SignalBar(self):
return self._signal_bar.Value
@SignalBar.setter
def SignalBar(self, value):
self._signal_bar.Value = value
@property
def SignalCooldownBars(self):
return self._signal_cooldown_bars.Value
@SignalCooldownBars.setter
def SignalCooldownBars(self, value):
self._signal_cooldown_bars.Value = value
@property
def CandleType(self):
return self._candle_type.Value
@CandleType.setter
def CandleType(self, value):
self._candle_type.Value = value
def OnStarted2(self, time):
super(color_momentum_ama_strategy, self).OnStarted2(time)
self._momentum = Momentum()
self._momentum.Length = self.MomentumPeriod
self._ama = KaufmanAdaptiveMovingAverage()
self._ama.Length = self.AmaPeriod
self._ama.FastSCPeriod = self.FastPeriod
self._ama.SlowSCPeriod = self.SlowPeriod
self._buffer = [None] * (self.SignalBar + 3)
self._cooldown_remaining = 0
self.SubscribeCandles(self.CandleType) \
.Bind(self._momentum, self.ProcessCandle) \
.Start()
def ProcessCandle(self, candle, momentum_value):
if candle.State != CandleStates.Finished:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
mom_val = float(momentum_value)
ama_result = process_float(self._ama, momentum_value, candle.OpenTime, True)
if not self._ama.IsFormed or ama_result.IsEmpty:
return
ama_value = float(ama_result)
for i in range(len(self._buffer) - 1, 0, -1):
self._buffer[i] = self._buffer[i - 1]
self._buffer[0] = ama_value
sb = self.SignalBar
if self._buffer[sb + 2] is None or self._buffer[sb + 1] is None:
return
v0 = self._buffer[sb]
v1 = self._buffer[sb + 1]
v2 = self._buffer[sb + 2]
rising = v2 < v1 and v1 < v0
falling = v2 > v1 and v1 > v0
if self._cooldown_remaining == 0 and rising and self.Position <= 0:
volume = self.Volume + abs(self.Position)
self.BuyMarket(volume)
self._cooldown_remaining = self.SignalCooldownBars
elif self._cooldown_remaining == 0 and falling and self.Position >= 0:
volume = self.Volume + abs(self.Position)
self.SellMarket(volume)
self._cooldown_remaining = self.SignalCooldownBars
def OnReseted(self):
super(color_momentum_ama_strategy, self).OnReseted()
self._buffer = [None] * (self.SignalBar + 3)
self._cooldown_remaining = 0
def CreateClone(self):
return color_momentum_ama_strategy()