improver serum event queue tracking

This commit is contained in:
Maximilian Schneider 2021-05-31 13:33:39 +03:00
parent 0a3b9b2523
commit 342cadfe87
9 changed files with 277 additions and 352 deletions

View File

@ -4,6 +4,9 @@ Collects and aggregates trades from serum dex for display in a tradingview chart
This is powering the charts on [mango.markets](https://mango.markets).
Feel free to improve and extend for the benefit for the larger solana ecosystem.
** Note: This does not include a functioning trading view to inspect the
data.** Check the [mango markets gui code](https://github.com/blockworks-foundation/mango-ui-v2/blob/main/components/TradingView/index.tsx) for a reference how to use this API to display a TradingView.
## Configuration
* Markets: should be added to the dictionaries in src/index.ts

View File

@ -29,7 +29,7 @@
},
"scripts": {
"start": "node dist/index.js",
"dev": "ts-node-dev src/index.ts",
"dev": "ts-node-dev --files src/index.ts",
"clean": "rm -rf dist",
"prepare": "run-s clean postinstall",
"postinstall": "tsc",

71
src/events.ts Normal file
View File

@ -0,0 +1,71 @@
import BN from 'bn.js';
import { bits, blob, struct, u8, u32, nu64 } from 'buffer-layout';
import {
accountFlagsLayout,
publicKeyLayout,
u128,
u64,
zeros,
} from '@project-serum/serum/lib/layout';
interface EventQueueHeader {
head: number;
count: number;
seqNum: number;
}
const EVENT_QUEUE_HEADER = struct<EventQueueHeader>([
blob(5),
accountFlagsLayout('accountFlags'),
u32('head'),
zeros(4),
u32('count'),
zeros(4),
u32('seqNum'),
zeros(4),
]);
const EVENT_FLAGS = bits(u8(), false, 'eventFlags');
EVENT_FLAGS.addBoolean('fill');
EVENT_FLAGS.addBoolean('out');
EVENT_FLAGS.addBoolean('bid');
EVENT_FLAGS.addBoolean('maker');
const EVENT = struct([
EVENT_FLAGS,
u8('openOrdersSlot'),
u8('feeTier'),
blob(5),
u64('nativeQuantityReleased'), // Amount the user received
u64('nativeQuantityPaid'), // Amount the user paid
u64('nativeFeeOrRebate'),
u128('orderId'),
publicKeyLayout('openOrders'),
u64('clientOrderId'),
]);
export function decodeRecentEvents(
buffer: Buffer,
lastSeenSeqNum?: number,
) {
const header = EVENT_QUEUE_HEADER.decode(buffer);
const events: any[] = [];
if (lastSeenSeqNum !== undefined) {
const allocLen = Math.floor(
(buffer.length - EVENT_QUEUE_HEADER.span) / EVENT.span,
);
const newEventsCount = header.seqNum - lastSeenSeqNum
for (let i = newEventsCount; i > 0; --i) {
const nodeIndex = (header.head + header.count + allocLen - i) % allocLen
const decodedItem = EVENT.decode(buffer, EVENT_QUEUE_HEADER.span + nodeIndex * EVENT.span)
events.push(decodedItem)
}
}
return { header, events };
}

View File

@ -1,85 +1,45 @@
import { Account, Connection, PublicKey } from "@solana/web3.js"
import { Market, decodeEventQueue } from "@project-serum/serum"
import { Market } from "@project-serum/serum"
import cors from "cors"
import express from "express"
import { Tedis, TedisPool } from "tedis"
import { URL } from "url"
import { Order, Trade, TradeSide } from "./interfaces"
import { decodeRecentEvents } from "./events"
import { MarketConfig, Trade, TradeSide } from "./interfaces"
import { RedisConfig, RedisStore, createRedisStore } from "./redis"
import { encodeEvents } from "./serum"
import { resolutions, sleep } from "./time"
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
const MINUTES = 60 * 1000
class OrderBuffer {
cache: Map<string, number>
cleanupInterval: number
lastCleanup: number
timeToLive: number
constructor(timeToLive = 10 * MINUTES, cleanupInterval = 30 * MINUTES) {
this.cache = new Map()
this.cleanupInterval = cleanupInterval
this.lastCleanup = Date.now()
this.timeToLive = timeToLive
}
// returns a list of unique trades that have not been observed by the order buffer
// guarantees to not emit a new trade even if the same fills have been supplied twice
filterNewTrades(fills: Order[]): Trade[] {
const now = Date.now()
const takerOrders = fills.filter((o) => !o.eventFlags.maker)
const allTrades = takerOrders.map((o) => {
return {
id: o.orderId.toString(16),
price: o.price,
side: o.side === "buy" ? TradeSide.Buy : TradeSide.Sell,
size: o.size,
ts: now,
}
})
const newTrades = allTrades.filter((t) => !this.cache.has(t.id))
// store newTrades in cache
newTrades.forEach((t) => this.cache.set(t.id, now))
// cleanup cache
if (now > this.lastCleanup + this.cleanupInterval) {
let staleCacheEntries: string[] = []
this.cache.forEach((ts: number, key: string, _) => {
if (ts > now + this.timeToLive) {
staleCacheEntries.push(key)
}
})
staleCacheEntries.forEach((key) => {
this.cache.delete(key)
})
this.lastCleanup = now
}
return newTrades
}
}
interface MarketConfig {
clusterUrl: string
programId: string
marketName: string
marketPk: string
}
async function collectTrades(m: MarketConfig, r: RedisConfig) {
async function collectEventQueue(m: MarketConfig, r: RedisConfig) {
const store = await createRedisStore(r, m.marketName)
const marketAddress = new PublicKey(m.marketPk)
const programKey = new PublicKey(m.programId)
const connection = new Connection(m.clusterUrl)
const market = await Market.load(connection, marketAddress, undefined, programKey)
async function fetchTrades(lastSeqNum?: number): Promise<[Trade[], number]> {
const now = Date.now()
const accountInfo = await connection.getAccountInfo(market["_decoded"].eventQueue)
if (accountInfo === null) {
throw new Error(`Event queue account for market ${m.marketName} not found`)
}
const { header, events } = decodeRecentEvents(accountInfo.data, lastSeqNum);
const takerFills = events.filter((e) => e.eventFlags.fill && !e.eventFlags.maker)
const trades = takerFills.map(e => market.parseFillEvent(e)).map((e) => {
return {
price: e.price,
side: e.side === "buy" ? TradeSide.Buy : TradeSide.Sell,
size: e.size,
ts: now,
}
})
/*
if (trades.length > 0)
console.log({e: events.map(e => e.eventFlags), takerFills, trades})
*/
return [trades, header.seqNum]
}
async function storeTrades(ts: Trade[]) {
if (ts.length > 0) {
console.log(m.marketName, ts.length)
@ -89,44 +49,17 @@ async function collectTrades(m: MarketConfig, r: RedisConfig) {
}
}
const orderBuffer = new OrderBuffer()
while (true) {
try {
let fills = await market.loadFills(connection)
let trades = orderBuffer.filterNewTrades(fills)
const lastSeqNum = (await store.loadNumber('LASTSEQ'));
const [trades, currentSeqNum] = await fetchTrades(lastSeqNum);
storeTrades(trades)
store.storeNumber('LASTSEQ', currentSeqNum)
} catch (err) {
const error = err.toString().split("\n", 1)[0]
console.error(m.marketName, { error })
}
await sleep(10000)
}
}
async function collectEventQueue(m: MarketConfig, r: RedisConfig) {
const store = await createRedisStore(r, m.marketName)
const marketAddress = new PublicKey(m.marketPk)
const programKey = new PublicKey(m.programId)
const connection = new Connection(m.clusterUrl)
const market = await Market.load(connection, marketAddress, undefined, programKey)
while (true) {
try {
const accountInfo = await connection.getAccountInfo(market["_decoded"].eventQueue)
if (accountInfo === null) {
throw new Error(`Event queue account for market ${m.marketName} not found`)
}
const events = decodeEventQueue(accountInfo.data, 1000).filter((e) => e.eventFlags.fill)
if (events.length > 0) {
const encoded = encodeEvents(events)
store.storeBuffer(Date.now(), encoded)
}
} catch (err) {
const error = err.toString().split("\n", 1)[0]
console.error(m.marketName, { error })
}
await sleep(10000)
await sleep({Seconds: 10})
}
}
@ -165,43 +98,18 @@ function collectMarketData(programId: string, markets: Record<string, string>) {
Object.entries(markets).forEach((e) => {
const [marketName, marketPk] = e
const marketConfig = { clusterUrl, programId, marketName, marketPk } as MarketConfig
collectTrades(marketConfig, { host, port, password, db: 0 })
//collectEventQueue(marketConfig, { host, port, password, db: 1});
collectEventQueue(marketConfig, { host, port, password, db: 0});
})
}
collectMarketData(programIdV3, nativeMarketsV3)
interface TradingViewHistory {
s: string
t: number[]
c: number[]
o: number[]
h: number[]
l: number[]
v: number[]
}
const app = express()
app.use(cors())
const max_conn = parseInt(process.env.REDIS_MAX_CONN || "") || 200;
const redisConfig = { host, port, password, db: 0, max_conn }
const pool = new TedisPool(redisConfig)
const HOURS = 60 * MINUTES
const resolutions: { [id: string]: number | undefined } = {
"1": 1 * MINUTES,
"3": 3 * MINUTES,
"5": 5 * MINUTES,
"15": 15 * MINUTES,
"30": 30 * MINUTES,
"60": 1 * HOURS,
"120": 2 * HOURS,
"180": 3 * HOURS,
"240": 4 * HOURS,
"1D": 24 * HOURS,
}
const app = express()
app.use(cors())
app.get("/tv/config", async (req, res) => {
const response = {
@ -245,15 +153,14 @@ app.get("/tv/history", async (req, res) => {
const validSymbol = marketPk != undefined
const validResolution = resolution != undefined
const validFrom = true || new Date(from).getFullYear() >= 2021
// respond
if (!(validSymbol && validResolution && validFrom)) {
const error = { s: "error", validSymbol, validResolution, validFrom }
console.error({ req, error })
res.status(500).send(error)
console.error({ marketName, error })
res.status(404).send(error)
return
}
// respond
try {
const conn = await pool.getTedis()
try {
@ -290,11 +197,23 @@ app.get("/tv/history", async (req, res) => {
})
app.get("/trades/address/:marketPk", async (req, res) => {
// parse
const marketPk = req.params.marketPk as string
const marketName = symbolsByPk[marketPk]
// validate
const validPk = marketName != undefined
if (!validPk) {
const error = { s: "error", validPk }
console.error({ marketPk, error })
res.status(404).send(error)
return
}
// respond
try {
const conn = await pool.getTedis()
try {
const marketPk = req.params.marketPk as string
const marketName = symbolsByPk[marketPk]
const store = new RedisStore(conn, marketName)
const trades = await store.loadRecentTrades()
const response = {

View File

@ -1,12 +1,11 @@
import BN from 'bn.js';
export interface Order {
orderId: BN;
price: number;
side: 'buy' | 'sell';
size: number;
eventFlags: { maker: boolean };
};
export interface MarketConfig {
clusterUrl: string
programId: string
marketName: string
marketPk: string
}
export enum TradeSide {
None = 0,
@ -15,7 +14,6 @@ export enum TradeSide {
}
export interface Trade {
id?: string;
price: number;
side: TradeSide;
size: number;
@ -48,3 +46,8 @@ export interface BufferStore {
storeBuffer: (ts: number, b: Buffer) => Promise<void>;
};
export interface KeyValStore {
storeNumber: (key: string, val: number) => Promise<void>;
loadNumber: (key: string) => Promise<number | undefined>;
};

View File

@ -2,7 +2,7 @@ import { Base64TradeCoder } from './base64';
const coder = new Base64TradeCoder();
import { batch } from './candle';
import { BufferStore, Candle, CandleStore, Trade } from './interfaces';
import { BufferStore, Candle, CandleStore, KeyValStore, Trade } from './interfaces';
import { Tedis } from 'tedis';
export interface RedisConfig {
@ -12,7 +12,7 @@ export interface RedisConfig {
password?: string;
};
export class RedisStore implements CandleStore, BufferStore {
export class RedisStore implements CandleStore, BufferStore, KeyValStore {
connection: Tedis;
symbol: string;
@ -92,6 +92,20 @@ export class RedisStore implements CandleStore, BufferStore {
const key = this.keyForBuffer(ts);
await this.connection.set(key, b.toString('base64'));
};
// interface KeyValStore
async storeNumber(key: string, val: number): Promise<void> {
await this.connection.set(`${this.symbol}-NUM-${key}`, val.toString());
};
async loadNumber(key: string): Promise<number | undefined> {
const result = await this.connection.get(`${this.symbol}-NUM-${key}`);
if (result)
return result as number;
else
return undefined;
};
};

View File

@ -1,208 +0,0 @@
import { bits, blob, struct, u32, u8, Blob, Layout, UInt } from 'buffer-layout';
import BN from 'bn.js';
import { PublicKey } from '@solana/web3.js';
// copied from serum-ts/layouts.ts
class Zeros extends Blob {
decode(b, offset) {
const slice = super.decode(b, offset);
if (!slice.every((v) => v === 0)) {
throw new Error('nonzero padding bytes');
}
return slice;
}
}
export function zeros(length) {
return new Zeros(length);
}
class PublicKeyLayout extends Blob {
constructor(property) {
super(32, property);
}
decode(b, offset) {
return new PublicKey(super.decode(b, offset));
}
encode(src, b, offset) {
return super.encode(src.toBuffer(), b, offset);
}
}
export function publicKeyLayout(property) {
return new PublicKeyLayout(property);
}
class BNLayout extends Blob {
decode(b, offset) {
return new BN(super.decode(b, offset), 10, 'le');
}
encode(src, b, offset) {
return super.encode(src.toArrayLike(Buffer, 'le', this.span), b, offset);
}
}
export function u64(property) {
return new BNLayout(8, property);
}
export function u128(property) {
return new BNLayout(16, property);
}
export class WideBits extends Layout {
constructor(property) {
super(8, property);
this._lower = bits(u32(), false);
this._upper = bits(u32(), false);
}
addBoolean(property) {
if (this._lower.fields.length < 32) {
this._lower.addBoolean(property);
} else {
this._upper.addBoolean(property);
}
}
decode(b, offset = 0) {
const lowerDecoded = this._lower.decode(b, offset);
const upperDecoded = this._upper.decode(b, offset + this._lower.span);
return { ...lowerDecoded, ...upperDecoded };
}
encode(src, b, offset = 0) {
return (
this._lower.encode(src, b, offset) +
this._upper.encode(src, b, offset + this._lower.span)
);
}
}
export class VersionedLayout extends Layout {
constructor(version, inner, property) {
super(inner.span > 0 ? inner.span + 1 : inner.span, property);
this.version = version;
this.inner = inner;
}
decode(b, offset = 0) {
// if (b.readUInt8(offset) !== this._version) {
// throw new Error('invalid version');
// }
return this.inner.decode(b, offset + 1);
}
encode(src, b, offset = 0) {
b.writeUInt8(this.version, offset);
return 1 + this.inner.encode(src, b, offset + 1);
}
getSpan(b, offset = 0) {
return 1 + this.inner.getSpan(b, offset + 1);
}
}
class EnumLayout extends UInt {
constructor(values, span, property) {
super(span, property);
this.values = values;
}
encode(src, b, offset) {
if (this.values[src] !== undefined) {
return super.encode(this.values[src], b, offset);
}
throw new Error('Invalid ' + this.property);
}
decode(b, offset) {
const decodedValue = super.decode(b, offset);
const entry = Object.entries(this.values).find(
([, value]) => value === decodedValue,
);
if (entry) {
return entry[0];
}
throw new Error('Invalid ' + this.property);
}
}
export function sideLayout(property) {
return new EnumLayout({ buy: 0, sell: 1 }, 4, property);
}
export function orderTypeLayout(property) {
return new EnumLayout({ limit: 0, ioc: 1, postOnly: 2 }, 4, property);
}
export function selfTradeBehaviorLayout(property) {
return new EnumLayout(
{ decrementTake: 0, cancelProvide: 1, abortTransaction: 2 },
4,
property,
);
}
const ACCOUNT_FLAGS_LAYOUT = new WideBits();
ACCOUNT_FLAGS_LAYOUT.addBoolean('initialized');
ACCOUNT_FLAGS_LAYOUT.addBoolean('market');
ACCOUNT_FLAGS_LAYOUT.addBoolean('openOrders');
ACCOUNT_FLAGS_LAYOUT.addBoolean('requestQueue');
ACCOUNT_FLAGS_LAYOUT.addBoolean('eventQueue');
ACCOUNT_FLAGS_LAYOUT.addBoolean('bids');
ACCOUNT_FLAGS_LAYOUT.addBoolean('asks');
export function accountFlagsLayout(property = 'accountFlags') {
return ACCOUNT_FLAGS_LAYOUT.replicate(property);
}
export function setLayoutDecoder(layout, decoder) {
const originalDecode = layout.decode;
layout.decode = function decode(b, offset = 0) {
return decoder(originalDecode.call(this, b, offset));
};
}
export function setLayoutEncoder(layout, encoder) {
const originalEncode = layout.encode;
layout.encode = function encode(src, b, offset) {
return originalEncode.call(this, encoder(src), b, offset);
};
return layout;
}
// copied from serum-ts/queue.ts
const EVENT_FLAGS = bits(u8(), false, 'eventFlags');
EVENT_FLAGS.addBoolean('fill');
EVENT_FLAGS.addBoolean('out');
EVENT_FLAGS.addBoolean('bid');
EVENT_FLAGS.addBoolean('maker');
const EVENT = struct([
EVENT_FLAGS,
u8('openOrdersSlot'),
u8('feeTier'),
blob(5),
u64('nativeQuantityReleased'), // Amount the user received
u64('nativeQuantityPaid'), // Amount the user paid
u64('nativeFeeOrRebate'),
u128('orderId'),
publicKeyLayout('openOrders'),
u64('clientOrderId'),
]);
// should be a PR
export function encodeEvents(events) {
let buffer = new Buffer.alloc(events.length * EVENT.span);
for (let i = 0; i < events.length; i += 1) {
EVENT.encode(events[i], buffer, i*EVENT.span);
}
return buffer;
}

36
src/time.ts Normal file
View File

@ -0,0 +1,36 @@
const SECONDS = 1000;
const MINUTES = 60 * SECONDS;
const HOURS = 60 * MINUTES;
export const resolutions: { [id: string]: number | undefined } = {
"1": 1 * MINUTES,
"3": 3 * MINUTES,
"5": 5 * MINUTES,
"15": 15 * MINUTES,
"30": 30 * MINUTES,
"60": 1 * HOURS,
"120": 2 * HOURS,
"180": 3 * HOURS,
"240": 4 * HOURS,
"1D": 24 * HOURS,
}
interface Timespan {
Millis?: number;
Seconds?: number;
Minutes?: number;
Hours?: number;
}
export function sleep(time: Timespan) {
const millis = time.Millis || 0;
const seconds = time.Seconds || 0;
const minutes = time.Minutes || 0;
const hours = time.Hours || 0;
const total = millis + SECONDS * seconds + MINUTES * minutes + HOURS * hours;
return new Promise((resolve) => setTimeout(resolve, total))
}

87
types/buffer-layout.d.ts vendored Normal file
View File

@ -0,0 +1,87 @@
declare module 'buffer-layout' {
// TODO: remove `any`.
export class Layout<T = any> {
span: number;
property?: string;
constructor(span: number, property?: string);
decode(b: Buffer, offset?: number): T;
encode(src: T, b: Buffer, offset?: number): number;
getSpan(b: Buffer, offset?: number): number;
replicate(name: string): this;
}
// TODO: remove any.
export class Structure<T = any> extends Layout<T> {
span: any;
}
export function greedy(
elementSpan?: number,
property?: string,
): Layout<number>;
export function offset<T>(
layout: Layout<T>,
offset?: number,
property?: string,
): Layout<T>;
export function u8(property?: string): Layout<number>;
export function u16(property?: string): Layout<number>;
export function u24(property?: string): Layout<number>;
export function u32(property?: string): Layout<number>;
export function u40(property?: string): Layout<number>;
export function u48(property?: string): Layout<number>;
export function nu64(property?: string): Layout<number>;
export function u16be(property?: string): Layout<number>;
export function u24be(property?: string): Layout<number>;
export function u32be(property?: string): Layout<number>;
export function u40be(property?: string): Layout<number>;
export function u48be(property?: string): Layout<number>;
export function nu64be(property?: string): Layout<number>;
export function s8(property?: string): Layout<number>;
export function s16(property?: string): Layout<number>;
export function s24(property?: string): Layout<number>;
export function s32(property?: string): Layout<number>;
export function s40(property?: string): Layout<number>;
export function s48(property?: string): Layout<number>;
export function ns64(property?: string): Layout<number>;
export function s16be(property?: string): Layout<number>;
export function s24be(property?: string): Layout<number>;
export function s32be(property?: string): Layout<number>;
export function s40be(property?: string): Layout<number>;
export function s48be(property?: string): Layout<number>;
export function ns64be(property?: string): Layout<number>;
export function f32(property?: string): Layout<number>;
export function f32be(property?: string): Layout<number>;
export function f64(property?: string): Layout<number>;
export function f64be(property?: string): Layout<number>;
export function struct<T>(
fields: Layout<any>[],
property?: string,
decodePrefixes?: boolean,
): Layout<T>;
export function bits(
word: Layout<number>,
msb?: boolean,
property?: string,
): any;
export function seq<T>(
elementLayout: Layout<T>,
count: number | Layout<number>,
property?: string,
): Layout<T[]>;
export function union(
discr: Layout<any>,
defaultLayout?: any,
property?: string,
): any;
export function unionLayoutDiscriminator(
layout: Layout<any>,
property?: string,
): any;
export function blob(
length: number | Layout<number>,
property?: string,
): Layout<Buffer>;
export function cstr(property?: string): Layout<string>;
export function utf8(maxSpan: number, property?: string): Layout<string>;
}