Market Data
When creating your own adapter for working with an exchange, you need to implement methods for subscribing to various types of market data. These methods are called when a MarketDataMessage message is received and provide receiving and processing data from the exchange.
Schematically, the algorithm for processing a subscription or unsubscription request looks like this:
- Sends a confirmation of receiving the subscription request using the SendSubscriptionReply method.
- Checks whether the request is a subscription or unsubscription using the MarketDataMessage.IsSubscribe property.
- In case of a subscription, sets up a subscription to receive real-time data via WebSocket or another mechanism (specific to each exchange).
- In case of an unsubscription, cancels the corresponding subscription (specific to each exchange).
- Sends a message about the subscription result using the SendSubscriptionResult or SendSubscriptionFinished methods, depending on the subscription type and the operation result.
Candle Data
When implementing a subscription to candle data in your own adapter, it is important to consider the specifics of how a particular exchange works with this type of data. In the case of Coinbase, the following methods and properties were overridden:
Supported Timeframes
The TimeFrames
property defines the list of timeframes supported by the adapter for candles. This allows StockSharp to know which timeframes can be requested through this adapter.
protected override IEnumerable<TimeSpan> TimeFrames { get; } = Extensions.TimeFrames.Keys.ToArray();
Support for Candle Updates
The IsSupportCandlesUpdates
method determines whether the adapter supports real-time candle updates for a specific subscription request. In the case of Coinbase, only updates for 5-minute candles are supported.
private static readonly DataType _tf5min = DataType.TimeFrame(TimeSpan.FromMinutes(5));
public override bool IsSupportCandlesUpdates(MarketDataMessage subscription)
{
// Coinbase only supports 5-minute candles for updating via websockets
// Therefore, other timeframes will be built from ticks (automatically by the StockSharp core)
return subscription.DataType2 == _tf5min;
}
Overriding these methods and properties allows the adapter to correctly handle requests for subscribing to candle data, taking into account the specifics of the Coinbase API. For example, if a timeframe other than 5 minutes is requested, StockSharp will know that it needs to use tick data to build candles of other timeframes.
Subscribing to Candle Data
To subscribe to candle data, the OnTFCandlesSubscriptionAsync method is implemented. This method, like the method for subscribing to tick data, can request historical data, as well as set up a subscription to receive new candles in real time.
protected override async ValueTask OnTFCandlesSubscriptionAsync(MarketDataMessage mdMsg, CancellationToken cancellationToken)
{
// Send confirmation of receiving the subscription request
SendSubscriptionReply(mdMsg.TransactionId);
var symbol = mdMsg.SecurityId.ToSymbol();
if (mdMsg.IsSubscribe)
{
var tf = mdMsg.GetTimeFrame();
// If historical data is requested
if (mdMsg.From is not null)
{
var from = (long)mdMsg.From.Value.ToUnix();
var to = (long)(mdMsg.To ?? DateTimeOffset.UtcNow).ToUnix();
var left = mdMsg.Count ?? long.MaxValue;
var step = (long)tf.Multiply(200).TotalSeconds;
var granularity = mdMsg.GetTimeFrame().ToNative();
while (from < to)
{
// Request historical candles
var candles = await _restClient.GetCandles(symbol, from, from + step, granularity, cancellationToken);
var needBreak = true;
var last = from;
foreach (var candle in candles.OrderBy(t => t.Time))
{
cancellationToken.ThrowIfCancellationRequested();
var time = (long)candle.Time.ToUnix();
if (time < from)
continue;
if (time > to)
{
needBreak = true;
break;
}
// Send information about each historical candle
SendOutMessage(new TimeFrameCandleMessage
{
OpenPrice = (decimal)candle.Open,
ClosePrice = (decimal)candle.Close,
HighPrice = (decimal)candle.High,
LowPrice = (decimal)candle.Low,
TotalVolume = (decimal)candle.Volume,
OpenTime = candle.Time,
State = CandleStates.Finished,
// In case of identifying data by subscription, filling instrument information is not required
OriginalTransactionId = mdMsg.TransactionId,
});
if (--left <= 0)
{
needBreak = true;
break;
}
last = time;
needBreak = false;
}
if (needBreak)
break;
from = last;
}
}
if (!mdMsg.IsHistoryOnly() && mdMsg.DataType2 == _tf5min)
{
// Subscribe to receive new candles in real time
_candlesTransIds[symbol] = mdMsg.TransactionId;
await _socketClient.SubscribeCandles(symbol, cancellationToken);
// Notify that the subscription has transitioned to online status
SendSubscriptionResult(mdMsg);
}
else
{
// Send a response that the subscription is finished (not online)
SendSubscriptionFinished(mdMsg.TransactionId);
}
}
else
{
// Unsubscribe from receiving candles
_candlesTransIds.Remove(symbol);
await _socketClient.UnSubscribeCandles(symbol, cancellationToken);
}
}
Processing Candle Data
To process candle data received from the exchange in real time, a method with code like in the SessionOnCandleReceived method is usually implemented. This method converts the received data into a TimeFrameCandleMessage message and sends it using the SendOutMessage method.
private void SessionOnCandleReceived(Ohlc candle)
{
// Check if there is an active subscription to candles for this instrument
if (!_candlesTransIds.TryGetValue(candle.Symbol, out var transId))
return;
// Create and send a message about a new candle
SendOutMessage(new TimeFrameCandleMessage
{
OpenPrice = (decimal)candle.Open,
ClosePrice = (decimal)candle.Close,
HighPrice = (decimal)candle.High,
LowPrice = (decimal)candle.Low,
TotalVolume = (decimal)candle.Volume,
OpenTime = candle.Time,
State = CandleStates.Active, // The candle is considered active as it may still change
// In case of identifying data by subscription, filling instrument information is not required
OriginalTransactionId = transId,
});
}
Level 1 (Best Bid and Ask Prices, Last Price)
Subscribing to Level 1 Data
To subscribe to Level 1 changes, the OnLevel1SubscriptionAsync method is implemented. This method usually performs the following actions:
protected override async ValueTask OnLevel1SubscriptionAsync(MarketDataMessage mdMsg, CancellationToken cancellationToken)
{
// Send confirmation of receiving the subscription request
// This informs the system that the request has been received and is being processed
SendSubscriptionReply(mdMsg.TransactionId);
// Convert the instrument identifier to a symbol understood by the exchange
var symbol = mdMsg.SecurityId.ToSymbol();
if (mdMsg.IsSubscribe)
{
// If this is a subscription request
// Subscribe to receive Level 1 data via WebSocket
await _socketClient.SubscribeTicker(symbol, cancellationToken);
// Send a message about successful subscription
// This informs the system that the subscription is set up and data will be received
SendSubscriptionResult(mdMsg);
}
else
{
// If this is an unsubscription request
// Cancel the subscription to receive Level 1 data
await _socketClient.UnSubscribeTicker(symbol, cancellationToken);
}
}
Processing Level 1 Data
To process Level 1 data received from the exchange in real time, a method with code like in the SessionOnTickerChanged example is usually implemented. This method converts the received data into a Level1ChangeMessage message and sends it using the SendOutMessage method.
private void SessionOnTickerChanged(Ticker ticker)
{
// Create a message with Level 1 data changes
SendOutMessage(new Level1ChangeMessage
{
// Specify the instrument identifier
SecurityId = ticker.Product.ToStockSharp(),
// Set the time of receiving data
ServerTime = CurrentTime.ConvertToUtc(),
}
// Add various Level 1 fields if they are present in the data from the exchange
.TryAdd(Level1Fields.LastTradeId, ticker.LastTradeId)
.TryAdd(Level1Fields.LastTradePrice, ticker.LastTradePrice?.ToDecimal())
.TryAdd(Level1Fields.LastTradeVolume, ticker.LastTradePrice?.ToDecimal())
.TryAdd(Level1Fields.LastTradeOrigin, ticker.LastTradeSide?.ToSide())
.TryAdd(Level1Fields.HighPrice, ticker.High?.ToDecimal())
.TryAdd(Level1Fields.LowPrice, ticker.Low?.ToDecimal())
.TryAdd(Level1Fields.Volume, ticker.Volume?.ToDecimal())
.TryAdd(Level1Fields.Change, ticker.Change?.ToDecimal())
.TryAdd(Level1Fields.BestBidPrice, ticker.Bid?.ToDecimal())
.TryAdd(Level1Fields.BestAskPrice, ticker.Ask?.ToDecimal())
.TryAdd(Level1Fields.BestBidVolume, ticker.BidSize?.ToDecimal())
.TryAdd(Level1Fields.BestAskVolume, ticker.AskSize?.ToDecimal())
);
}
Order Book
Support for Incremental Order Book Updates
When implementing order book functionality in your own adapter, it is important to consider whether the exchange supports incremental order book updates. For this, the IsSupportOrderBookIncrements
property was overridden in the Coinbase adapter:
public override bool IsSupportOrderBookIncrements => true;
The IsSupportOrderBookIncrements
property indicates whether the adapter supports incremental order book updates. Setting this property to true
means that the exchange can send partial order book updates rather than a full snapshot with each change.
Overriding this property allows StockSharp to optimize the processing of order book data. If the property is set to true
, the system will expect and correctly handle incremental updates.
Subscribing to Order Book Data
To subscribe to order book changes, the OnMarketDepthSubscriptionAsync method is implemented. This method performs actions similar to the OnLevel1SubscriptionAsync method, but for order book data.
protected override async ValueTask OnMarketDepthSubscriptionAsync(MarketDataMessage mdMsg, CancellationToken cancellationToken)
{
// Send confirmation of receiving the subscription request
SendSubscriptionReply(mdMsg.TransactionId);
// Convert the instrument identifier to a symbol understood by the exchange
var symbol = mdMsg.SecurityId.ToSymbol();
if (mdMsg.IsSubscribe)
{
// If this is a subscription request
// Subscribe to receive order book data via WebSocket
await _socketClient.SubscribeOrderBook(symbol, cancellationToken);
// Send a message about successful subscription
SendSubscriptionResult(mdMsg);
}
else
{
// If this is an unsubscription request
// Cancel the subscription to receive order book data
await _socketClient.UnSubscribeOrderBook(symbol, cancellationToken);
}
}
Processing Order Book Data
To process order book data received from the exchange in real time, a method with code like in the SessionOnOrderBookReceived method is usually implemented. This method converts the received data into a QuoteChangeMessage message and sends it using the SendOutMessage method.
private void SessionOnOrderBookReceived(string type, string symbol, IEnumerable<OrderBookChange> changes)
{
var bids = new List<QuoteChange>();
var asks = new List<QuoteChange>();
// Distribute changes by bids and asks
foreach (var change in changes)
{
var side = change.Side.ToSide();
var quotes = side == Sides.Buy ? bids : asks;
quotes.Add(new((decimal)change.Price, (decimal)change.Size));
}
// Create and send a message with changes in the order book
SendOutMessage(new QuoteChangeMessage
{
SecurityId = symbol.ToStockSharp(),
Bids = bids.ToArray(),
Asks = asks.ToArray(),
ServerTime = CurrentTime.ConvertToUtc(),
// Determine if this is a full order book snapshot or an incremental update.
// If the exchange always sends only full order books and does not support incrementality,
// then setting this property is not required at all
State = type == "snapshot" ? QuoteChangeStates.SnapshotComplete : QuoteChangeStates.Increment,
});
}
Tick Data (Trades)
Subscribing to Tick Data
To subscribe to tick data, the OnTicksSubscriptionAsync method is implemented. This method, in addition to actions similar to the previous subscription methods, can also request historical data if specified in the request.
protected override async ValueTask OnTicksSubscriptionAsync(MarketDataMessage mdMsg, CancellationToken cancellationToken)
{
// Send confirmation of receiving the subscription request
SendSubscriptionReply(mdMsg.TransactionId);
var symbol = mdMsg.SecurityId.ToSymbol();
if (mdMsg.IsSubscribe)
{
// If historical data is requested
if (mdMsg.From is not null)
{
var from = (long)mdMsg.From.Value.ToUnix(false);
var to = (long)(mdMsg.To ?? DateTimeOffset.UtcNow).ToUnix(false);
var left = mdMsg.Count ?? long.MaxValue;
while (from < to)
{
// Request historical trades
var trades = await _restClient.GetTrades(symbol, from, to, cancellationToken);
var needBreak = true;
var last = from;
foreach (var trade in trades.OrderBy(t => t.Time))
{
cancellationToken.ThrowIfCancellationRequested();
var time = (long)trade.Time.ToUnix();
if (time < from)
continue;
if (time > to)
{
needBreak = true;
break;
}
// Send information about each historical trade
SendOutMessage(new ExecutionMessage
{
// Set that the message carries information about a tick trade
// (not a transaction like an order or own trade)
DataTypeEx = DataType.Ticks,
TradeId = trade.TradeId,
TradePrice = trade.Price?.ToDecimal(),
TradeVolume = trade.Size?.ToDecimal(),
ServerTime = trade.Time,
OriginSide = trade.Side.ToSide(),
// For history, always set the subscription identifier,
// so that the external code can understand which subscription the data was received for.
// In case of identifying data by subscription, filling instrument information is not required
OriginalTransactionId = mdMsg.TransactionId,
});
if (--left <= 0)
{
needBreak = true;
break;
}
last = time;
needBreak = false;
}
if (needBreak)
break;
from = last;
}
}
if (!mdMsg.IsHistoryOnly())
{
// Subscribe to receive new trades in real time
await _socketClient.SubscribeTrades(symbol, cancellationToken);
}
// Send a message about successful subscription
SendSubscriptionResult(mdMsg);
}
else
{
// Unsubscribe from receiving trades in real time
await _socketClient.UnSubscribeTrades(symbol, cancellationToken);
}
}
Processing Tick Data
To process tick data received from the exchange in real time, a method with code like in the SessionOnTradeReceived method is usually implemented. This method converts the received data into an ExecutionMessage message with the DataType.Ticks type and sends it using the SendOutMessage method.
private void SessionOnTradeReceived(Trade trade)
{
// Create and send a message about a new trade
SendOutMessage(new ExecutionMessage
{
// Set that the message carries information about a tick trade
// (not a transaction like an order or own trade)
DataTypeEx = DataType.Ticks,
SecurityId = trade.ProductId.ToStockSharp(),
TradeId = trade.TradeId,
TradePrice = (decimal)trade.Price,
TradeVolume = (decimal)trade.Size,
ServerTime = trade.Time,
OriginSide = trade.Side.ToSide(),
});
}
Subscribing to Order Log
The order log is detailed information about all changes in the order book, including the addition, modification, and deletion of orders. This data is specific and not provided by all data sources. For example, Coinbase does not support providing an order log.
To implement a subscription to an order log in an adapter, the OnOrderLogSubscriptionAsync method is used. This method is called when a MarketDataMessage message with the DataType.OrderLog data type is received.
Below is an example implementation of this method taken from the BitStamp connector, which supports order log:
protected override async ValueTask OnOrderLogSubscriptionAsync(MarketDataMessage mdMsg, CancellationToken cancellationToken)
{
// Send confirmation of receiving the subscription request
SendSubscriptionReply(mdMsg.TransactionId);
// Convert the instrument identifier to a currency pair
var symbol = mdMsg.SecurityId.ToCurrency();
if (mdMsg.IsSubscribe)
{
if (!mdMsg.IsHistoryOnly())
{
// Subscribe to receive order log in real time
await _pusherClient.SubscribeOrderLog(symbol, cancellationToken);
}
// Send a message about successful subscription
SendSubscriptionResult(mdMsg);
}
else
// Unsubscribe from receiving order log
await _pusherClient.UnSubscribeOrderLog(symbol, cancellationToken);
}
When processing order log data received from the exchange, a separate method is usually used, which converts the received data into ExecutionMessage messages with the ExecutionTypes.OrderLog type:
private void SessionOnNewOrderLog(string symbol, OrderStates state, Order order)
{
// Create and send a message with information about a new entry in the order log
SendOutMessage(new ExecutionMessage
{
DataTypeEx = DataType.OrderLog,
SecurityId = symbol.ToStockSharp(),
ServerTime = order.Time,
OrderVolume = (decimal)order.Amount,
OrderPrice = (decimal)order.Price,
OrderId = order.Id,
Side = order.Type.ToSide(),
OrderState = state,
});
}
It is important not to forget to add support for this data type in the adapter constructor:
this.AddSupportedMarketDataType(DataType.OrderLog);
Specifics of Processing Historical and Live Data
When implementing requests for historical data and processing live data in your own adapter, it is important to consider the following points:
Historical Data
When sending historical data in response to a request:
Setting OriginalTransactionId is mandatory. This allows the system to associate the received data with the original request.
Setting SecurityId or TimeFrameCandleMessage.TimeFrame (in the case of candles) is not required, but also not prohibited. The StockSharp core will automatically fill these fields with the necessary values from the original request.
Live Data
When processing live data, for example, received via WebSocket:
Setting OriginalTransactionId is optional. If the transaction ID is not set, the system will distribute the data to all active subscriptions for the corresponding instrument and data type.
Setting SecurityId and other specific fields (for example, TimeFrameCandleMessage.TimeFrame for candles) is mandatory, as this information is necessary for the correct routing of data in the system.