Example: tracking-candles.ts
Static snapshot for Binance/Rest/misc/tracking-candles.ts.
Example Path
Binance/Rest/misc/tracking-candles.ts
Source Link
Repository source: https://github.com/sieblyio/crypto-api-examples/blob/master/examples/Binance/Rest/misc/tracking-candles.ts
Code Snapshot
import {
DefaultLogger,
isWsFormattedKline,
KlineInterval,
USDMClient,
WebsocketClient,
} from 'binance';
import { EventEmitter } from 'events';
/**
* This elaborate example serves the following key functions:
* - Connect to various candle websockets to receive realtime candle events (update open candle & append closed candle)
* - Backfill some candles using the REST API
* - Once backfilled, start processing candle events (update & append in-memory, depending if candle closed or not)
* - Keep the candle stores trimmed, so we never store more than `maxStoredCandles` candles per symbol
* - When a connection opens or reconnects, the backfill is executed again to ensure there are no gaps
*
* The "onCandleClosed()" function is where you would run custom logic with a dataset of candles (e.g. run some indicator calculations)
*/
const restClient = new USDMClient();
const ignoredTraceLogMsgs = [
'Sending ping',
'Received pong frame, clearing pong timer',
'Received ping frame, sending pong frame',
];
const customLogger = {
...DefaultLogger,
trace: (msg: string, context?: any) => {
if (ignoredTraceLogMsgs.includes(msg)) {
return;
}
console.log(JSON.stringify({ msg, context }));
},
};
const wsClient = new WebsocketClient(
{
beautify: true,
},
customLogger,
);
/**
* Configuration logic
*/
const symbolsToMonitor: string[] = ['BTCUSDT', 'ETHUSDT'];
const timeframes: KlineInterval[] = ['1m'];
const maxStoredCandles = 3;
/**
* Data stores
*/
interface EngineCandle {
open: number;
close: number;
high: number;
low: number;
volume: number;
openTime: number;
openDt: Date;
closeTime: number;
closeDt: Date;
}
/**
* The shape of the events produced by the candle store.
* All the info needed to query the candle store for all candles, after receiving a candle closed event
*/
interface CandleStoreEvent {
symbol: string;
interval: string;
}
/** These are the events produced by the candle store, which can be used to implement this abstraction layer */
export declare interface CandleEmitter extends EventEmitter {
on(event: 'candleClose', listener: (event: CandleStoreEvent) => void): this;
on(event: 'candleUpdate', listener: (event: CandleStoreEvent) => void): this;
}
/** Some options to configure the behaviour of the candle store */
interface CandleStoreOptions {
/** Keep a ceiling on how many candles are stored, before old ones are discarded (prevent the store from growing forever into infinity) */
maxStoredCandles?: number;
eventEmitter: EventEmitter;
}
/**
/**
* A general store for symbol/interval candles, including handling the currently open candle, with some utility methods
*/
export class CandleStore {
private symbol: string;
// Closed candles are stored as an array of candles per interval in this store.
// This is essentially an object acting as a key/value store (key: interval, value: candle array)
private closedCandles: Record<string, EngineCandle[]> = {};
// Open candles are kept separate from the closed candles, also in a key/value store (key: interval, value: current open candle)
private openCandles: Record<string, EngineCandle | null> = {};
private maxStoredCandles: number;
private emitter: EventEmitter;
constructor(symbol: string, options: CandleStoreOptions) {
// super();
this.symbol = symbol;
this.maxStoredCandles = options?.maxStoredCandles || 3;
this.emitter = options.eventEmitter;
}
/**
* Overwrite the current candle store, e.g. after backfilling. Candles are sorted automatically before overwriting the store
*/
public setCandles(candles: EngineCandle[], interval: string): void {
const ascendingCandles = [...candles].sort(
(a, b) => a.closeTime - b.closeTime,
);
this.initCandleStores(interval);
this.closedCandles[interval] = ascendingCandles;
}
public setOpenCandle(candle: EngineCandle | null, interval: string): void {
this.openCandles[interval] = candle;
}
/**
* Provide a candle event to the store for processing (e.g candle closed vs open candle updated).
* - Closed candles are appended to the array.
* - Open candles are tracked separately and only (optionally) used during the getCandles(true) query.
*/
public processCandleEvent(
candle: EngineCandle,
interval: string,
isCandleClosed: boolean,
): void {
const evt = { symbol: this.symbol, interval };
this.initCandleStores(interval);
if (!isCandleClosed) {
this.setOpenCandle(candle, interval);
this.emitter.emit('candleUpdate', evt);
// console.log(this.symbol, `Open candle update`);
return;
}
this.setOpenCandle(null, interval);
this.closedCandles[interval].push(candle);
this.trimExcessCandles(interval);
this.emitter.emit('candleClose', evt);
// console.log(`Emit candle closed evt`, evt);
}
private initCandleStores(interval: string) {
if (
!this.closedCandles[interval] ||
!Array.isArray(this.closedCandles[interval])
) {
this.closedCandles[interval] = [];
}
}
/**
* Execute a store-trim. This is called automatically during candle-closed events, but
* can be called manually (e.g. after backfilling) to ensure the store only keeps the most recent `maxStoredCandles`.
*/
public trimExcessCandles(interval: string): void {
const maxStored = this.maxStoredCandles;
this.initCandleStores(interval);
const totalCandles = this.closedCandles[interval]?.length;
if (totalCandles <= maxStored) {
return;
}
const elementsToRemove = totalCandles - maxStored;
// This mutates the closed candle store to remove the first x elements
this.closedCandles[interval].splice(0, elementsToRemove);
}
/**
* Query all candles in the store for an interval.
* Optionally append the currently open candle to the end of the array.
*/
public getCandles(
interval: string,
includeOpenCandle?: boolean,
): EngineCandle[] {
const candles = this.closedCandles[interval];
const openCandle = this.openCandles[interval];
this.initCandleStores(interval);
if (!candles) {
return [];
}
// check last candle has same open time as open candle, just in case
if (
!includeOpenCandle ||
!openCandle ||
openCandle.openTime === candles[candles.length - 1].openTime
) {
return candles;
}
// Include open candle at end of array
return [...candles, openCandle];
}
}
/**
* A key value store for all symbols & intervals
*
* // All CandleStores for that symbol (one per symbol, supports many intervals in one store)
* const symbolCandles = allCandleStores[symbol];
*
*/
const allCandleStores: Record<string, CandleStore> = {};
/**
* Get a candle store for a symbol.
* Since a missing candle store is automatically initialised, you can assume this will always return a candle store.
*/
function getCandleStore(symbol: string): CandleStore {
initCandleStoreIfMissing(symbol);
return allCandleStores[symbol];
}
const eventEmitter = new EventEmitter();
// Hook up event consumers on the shared event emitter
eventEmitter.on('candleClose', (e) => onCandleClosed(e.symbol, e.interval));
// eventEmitter.on('candleUpdate', (e) => {
// console.log('candle updated', {
// dt: new Date(),
// symbol: e.symbol,
// interval: e.interval,
// });
// });
/** Ensure a candle store exists for this symbol & attach consumers to the store */
function initCandleStoreIfMissing(symbol: string): void {
if (allCandleStores[symbol]) {
return;
}
// Inject your own event emitter and initialise one candle store per symbol (it supports multiple intervals)
allCandleStores[symbol] = new CandleStore(symbol, {
maxStoredCandles: maxStoredCandles,
eventEmitter: eventEmitter,
});
}
/**
* Websocket Listeners
*/
wsClient.on('formattedMessage', (data) => {
if (isWsFormattedKline(data)) {
const candle = data.kline;
const isCandleClosed = data.kline.final;
// console.log('kline received ', { candle, isCandleClosed });
const { open, close, high, low, volume, interval, symbol } = candle;
const engineCandle: EngineCandle = {
open,
close,
high,
low,
volume,
openTime: candle.startTime,
openDt: new Date(candle.startTime),
closeTime: candle.endTime,
closeDt: new Date(candle.endTime),
};
initCandleStoreIfMissing(symbol);
// const candleStore: CandleStore = allIntervalCandleStores[symbol][interval];
const candleStore: CandleStore = allCandleStores[symbol];
candleStore.processCandleEvent(engineCandle, interval, isCandleClosed);
return;
}
console.log('log formattedMessage: ', data);
});
wsClient.on(
'open',
async (data: { wsKey: string; wsUrl: string; ws: any; event: any }) => {
console.log('connection opened open:', data.wsKey, data.wsUrl);
},
);
// response to command sent via WS stream (e.g. subscription confirmation)
// this will automatically trigger a backfill for that symbol.
wsClient.on('response', (data) => {
console.log('log response: ', JSON.stringify(data, null, 2));
// empty response result === success
if (!data.result && data.request.method === 'SUBSCRIBE') {
data.request.params.forEach(async (topic: string) => {
console.log('Successfully subscribed to topic: ', topic);
// btcusdt@kline_1m -> btcusdt, kline_1m
const [symbol, topicWithInterval] = topic.split('@');
// kline_1m -> kline, 1m
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const [topicName, interval] = topicWithInterval.split('_');
await backfillCandles(symbol.toUpperCase(), interval as KlineInterval);
});
}
});
wsClient.on('reconnecting', (data) => {
console.log('ws automatically reconnecting.... ', data?.wsKey);
});
wsClient.on('reconnected', async (data) => {
console.log('ws has reconnected ', data?.wsKey, data?.wsUrl);
});
/**
* Execute a 1-page backfill (1000 candles). Called automatically when a connection opens OR reconnects.
*/
async function backfillCandles(
symbol: string,
interval: KlineInterval,
): Promise<void> {
initCandleStoreIfMissing(symbol);
console.log(`Executing backfill for ${symbol} : ${interval}`);
const initialCandleResult = await restClient.getKlines({
symbol,
interval,
limit: 1000,
});
// Map to a standard candle structure
const mappedEngineCandles: EngineCandle[] = initialCandleResult.map(
(candle) => {
return {
open: Number(candle[1]),
high: Number(candle[2]),
low: Number(candle[3]),
close: Number(candle[4]),
volume: Number(candle[5]),
openTime: candle[0],
openDt: new Date(candle[0]),
closeTime: candle[6],
closeDt: new Date(candle[6]),
};
},
);
// Last candle might not be closed, so filter that out (ignore any candles with close time in the future)
const closedCandles = mappedEngineCandles.filter(
(c) => c.closeTime <= Date.now(),
);
// const candleStore: CandleStore = allIntervalCandleStores[symbol][interval];
const candleStore: CandleStore = allCandleStores[symbol];
// Overwrite the current candles in the store and remove excess candles
candleStore.setCandles(closedCandles, interval);
candleStore.trimExcessCandles(interval);
console.log(`Finished backfill for ${symbol} : ${interval}`);
}
/**
* Bootstrap a connection per symbol & timeframe. Backfill will automatically trigger when the connection opens successfully.
* Note: this will spawn one websocket connection per symbol per timeframe. For complex cases, this may create too many connections.
*/
symbolsToMonitor.forEach((symbol) => {
initCandleStoreIfMissing(symbol);
timeframes.forEach(async (interval) => {
// Open a websocket to start consuming candle events
await wsClient.subscribeKlines(symbol, interval, 'usdm');
console.log(`Requested subscription to ${symbol} & ${interval}`);
});
});
function onCandleClosed(symbol: string, interval: string): void {
// When a candle closes, fetch all closed candles from the store for that symbol, e.g. to calculate some indicators
const closedSymbolCandles = getCandleStore(symbol).getCandles(interval);
console.log('candle closed', {
dt: new Date(),
symbol,
interval,
mostRecentCandlesInStore: JSON.stringify(closedSymbolCandles),
});
}
This is a static, crawlable snapshot. The interactive app loads after JavaScript starts and can refresh live data.