Initial commit.

This commit is contained in:
Nicholas Clarke 2022-01-13 16:22:31 -08:00
commit 0565df369b
29 changed files with 4476 additions and 0 deletions

2
.env.sample Normal file
View File

@ -0,0 +1,2 @@
TRADE_HISTORY_DB_URL=
USE_WEBSOCKET=FALSE

8
.eslintrc Normal file
View File

@ -0,0 +1,8 @@
{
"parser": "@typescript-eslint/parser",
"extends": ["plugin:@typescript-eslint/recommended"],
"parserOptions": { "ecmaVersion": 2018, "sourceType": "module" },
"rules": {
"argsIgnorePattern": "^_"
}
}

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
.idea/
.vscode/
node_modules/
build/
tmp/
temp/
.env
/dist
.DS_Store

2
Procfile Normal file
View File

@ -0,0 +1,2 @@
worker: npm run start
web: npm run noop

9
README.md Normal file
View File

@ -0,0 +1,9 @@
# Collect filled trades via Websocket or Scraping the event queue
Steps to run this project:
1. Run `yarn` command
2. Setup database settings inside `ormconfig.js` file
3. Setup .env using .env.sample
4. `yarn build`
5. "start": "NODE_ENV=production node dist/index.js",

16
ormconfig.js Normal file
View File

@ -0,0 +1,16 @@
require('dotenv').config();
module.exports = {
type: 'postgres',
url: process.env.TRADE_HISTORY_DB_URL,
synchronize: false,
logging: false,
entities: ['dist/entity/**/*.js'],
migrations: ['dist/migration/**/*.js'],
subscribers: ['dist/subscribers/**/*.js'],
extra: {
ssl: {
rejectUnauthorized: false,
},
},
};

51
package.json Normal file
View File

@ -0,0 +1,51 @@
{
"name": "trade-history",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"scripts": {
"build": "tsc",
"dev": "ts-node-dev src/index.ts",
"clean": "rm -rf dist",
"prepare": "yarn clean && yarn build",
"start": "node dist/index.js",
"lint": "eslint src/**/*.ts",
"format": "eslint src/**/*.ts --fix",
"noop": "",
"migrate": "yarn build && typeorm migration:run",
"migrate:revert": "yarn build && typeorm migration:revert"
},
"prettier": {
"singleQuote": true,
"arrowParens": "always",
"printWidth": 100
},
"dependencies": {
"@blockworks-foundation/mango-client": "^3.2.23",
"@project-serum/serum": "^0.13.54",
"@solana/web3.js": "^1.18.0",
"bn.js": "^5.2.0",
"bs58": "^4.0.1",
"buffer-layout": "^1.2.0",
"cors": "^2.8.5",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"mysql": "^2.14.1",
"pg": "^8.5.1",
"reflect-metadata": "^0.1.10",
"typeorm": "0.2.31",
"ws": "^7.4.4"
},
"devDependencies": {
"@types/express": "^4.17.11",
"@types/node": "^8.0.29",
"@types/ws": "^7.4.0",
"@typescript-eslint/eslint-plugin": "^4.17.0",
"@typescript-eslint/parser": "^4.17.0",
"eslint": "^7.21.0",
"prettier": "^2.2.1",
"ts-node": "3.3.0",
"ts-node-dev": "^1.1.6",
"typescript": "4.1.5"
}
}

View File

@ -0,0 +1,16 @@
import { Entity, Column, PrimaryColumn } from 'typeorm';
@Entity()
export default class CurrencyMeta {
@PrimaryColumn('text')
address: string;
@PrimaryColumn('text')
programId: string;
@PrimaryColumn('text')
currency: string;
@Column()
MintDecimals: number;
}

64
src/entity/Event.ts Normal file
View File

@ -0,0 +1,64 @@
import { Entity, Column, Index, PrimaryColumn } from 'typeorm';
@Entity()
export default class Event {
@PrimaryColumn('timestamptz')
loadTimestamp: string;
@Column()
address: string;
@Column()
programId: string;
@Column()
baseCurrency: string;
@Column()
quoteCurrency: string;
@Column()
fill: boolean;
@Column()
out: boolean;
@Column()
bid: boolean;
@Column()
maker: boolean;
@Column()
openOrderSlot: string;
@Column()
feeTier: string;
@Column()
nativeQuantityReleased: string;
@Column()
nativeQuantityPaid: string;
@Column()
nativeFeeOrRebate: string;
@Column()
orderId: string;
@Column()
openOrders: string;
@Column()
clientOrderId: string;
@Column()
uuid: string;
@Column()
source: number;
@Column()
seqNum: number;
}

9
src/entity/Owner.ts Normal file
View File

@ -0,0 +1,9 @@
import { Entity, PrimaryColumn, Column } from 'typeorm';
@Entity()
export default class Owner {
@PrimaryColumn()
openOrders: string;
@Column()
owner: string;
}

40
src/entity/PerpEvent.ts Normal file
View File

@ -0,0 +1,40 @@
import { Entity, Column, PrimaryColumn } from 'typeorm';
@Entity()
export default class PerpEvent {
@PrimaryColumn('timestamptz')
loadTimestamp: string;
@Column()
address: string;
@Column()
seqNum: number;
@Column()
makerFee: number;
@Column()
takerFee: number;
@Column()
takerSide: string;
@Column()
maker: string;
@Column()
makerOrderId: string;
@Column()
taker: string;
@Column()
takerOrderId: number;
@Column()
price: number;
@Column()
quantity: number;
}

View File

@ -0,0 +1,28 @@
import { Entity, Column, PrimaryColumn } from 'typeorm';
@Entity()
export default class PerpLiquidationEvent {
@PrimaryColumn('timestamptz')
loadTimestamp: string;
@Column()
address: string;
@Column()
liqee: string;
@Column()
liqor: string;
@Column()
liquidationFee: number;
@Column()
seqNum: number;
@Column()
price: number;
@Column()
quantity: number;
}

View File

@ -0,0 +1,13 @@
import { Entity, Column, PrimaryColumn } from 'typeorm';
@Entity()
export default class PerpSequenceNumber {
@Column()
address: string;
@Column()
seqNum: number;
@PrimaryColumn('timestamptz')
loadTimestamp: string;
}

View File

@ -0,0 +1,13 @@
import { Entity, Column, PrimaryColumn } from 'typeorm';
@Entity()
export default class SequenceNumber {
@Column()
address: string;
@Column()
seqNum: number;
@PrimaryColumn('timestamptz')
loadTimestamp: string;
}

7
src/entity/index.ts Normal file
View File

@ -0,0 +1,7 @@
export { default as Owner } from './Owner';
export { default as CurrencyMeta } from './CurrencyMeta';
export { default as SerumEvent } from './Event';
export { default as PerpEvent } from './PerpEvent';
export { default as PerpLiquidationEvent } from './PerpLiquidationEvent';
export { default as PerpSequenceNumber } from './PerpSequenceNumber';
export { default as SequenceNumber } from './SequenceNumber';

52
src/index.ts Normal file
View File

@ -0,0 +1,52 @@
import dotenv from 'dotenv';
dotenv.config();
import { createConnection } from 'typeorm';
import { Connection } from '@solana/web3.js';
import { startEventParsing } from './scrapeSerumQueue';
import { TRACKED_MARKETS, wait, WEBSOCKET_MARKETS } from './utils';
import { watchMarketEventQueue } from './watchQueue';
import { getAllPerpMarkets, startPerpEventParsing } from './scrapePerpQueue';
const SECONDS = 1000;
let connection = new Connection(
process.env.ENDPOINT_URL || 'https://mango.rpcpool.com/d73005e4c66b26f49515b15f09a3'
);
// let devnetConnection = new Connection('https://mango.devnet.rpcpool.com');
export const SERUM_DEX_V3_PK = '9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin';
async function scrapeSerumMarkets(db) {
for (const trackedMarket of TRACKED_MARKETS) {
const scrapeInterval = trackedMarket?.interval || 6 * SECONDS;
startEventParsing(db, connection, trackedMarket, scrapeInterval);
await wait(5000);
}
}
async function scrapePerpMarkets(db) {
const scrapeInterval = 4 * SECONDS;
const mangoGroupPerpMarkets = await getAllPerpMarkets(connection);
for (const perpMarket of mangoGroupPerpMarkets) {
startPerpEventParsing(db, connection, perpMarket, scrapeInterval);
await wait(5000);
}
}
createConnection().then(async (db) => {
if (process.env.USE_WEBSOCKET === 'TRUE') {
// TODO - use TRACKED_MARKETS once websocket isn't rate limited
for (const trackedMarket of WEBSOCKET_MARKETS) {
trackedMarket['baseSymbol'] = trackedMarket['name'].split('/')[0];
trackedMarket['quoteSymbol'] = trackedMarket['name'].split('/')[1];
console.log('Starting connection to:', trackedMarket.name);
watchMarketEventQueue(db, trackedMarket);
await wait(3000);
}
} else {
scrapeSerumMarkets(db);
scrapePerpMarkets(db);
}
});

View File

@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddExtension1627606235010 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;');
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query('DROP EXTENSION timescaledb;');
}
}

View File

@ -0,0 +1,164 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
export class CreateEvent1627606235070 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'event',
columns: [
{
name: 'fill',
type: 'bool',
},
{
name: 'out',
type: 'bool',
},
{
name: 'bid',
type: 'bool',
},
{
name: 'maker',
type: 'bool',
},
{
name: 'address',
type: 'text',
},
{
name: 'programId',
type: 'text',
},
{
name: 'baseCurrency',
type: 'text',
},
{
name: 'quoteCurrency',
type: 'text',
},
{
name: 'openOrderSlot',
type: 'text',
},
{
name: 'feeTier',
type: 'text',
},
{
name: 'nativeQuantityReleased',
type: 'text',
},
{
name: 'nativeQuantityPaid',
type: 'text',
},
{
name: 'nativeFeeOrRebate',
type: 'text',
},
{
name: 'orderId',
type: 'text',
},
{
name: 'openOrders',
type: 'text',
},
{
name: 'clientOrderId',
type: 'text',
},
{
name: 'uuid',
type: 'text',
},
{
name: 'source',
type: 'text',
},
{
name: 'seqNum',
type: 'int8',
},
{
name: 'loadTimestamp',
type: 'timestamptz',
},
],
}),
true
);
await queryRunner.createTable(
new Table({
name: 'currency_meta',
columns: [
{
name: 'address',
type: 'text',
},
{
name: 'programId',
type: 'text',
},
{
name: 'currency',
type: 'text',
},
{
name: 'MintDecimals',
type: 'int4',
},
],
}),
true
);
await queryRunner.createTable(
new Table({
name: 'owner',
columns: [
{
name: 'openOrders',
type: 'text',
},
{
name: 'owner',
type: 'text',
},
],
}),
true
);
await queryRunner.createTable(
new Table({
name: 'sequence_number',
columns: [
{
name: 'address',
type: 'text',
},
{
name: 'loadTimestamp',
type: 'timestamptz',
},
{
name: 'seqNum',
type: 'int8',
},
],
}),
true
);
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable('event');
await queryRunner.dropTable('currency_meta');
await queryRunner.dropTable('sequence_number');
await queryRunner.dropTable('owner');
}
}

View File

@ -0,0 +1,10 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class MakeHypertable1627606235170 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query("SELECT create_hypertable('sequence_number', 'loadTimestamp');");
await queryRunner.query("SELECT create_hypertable('event', 'loadTimestamp');");
}
async down(queryRunner: QueryRunner): Promise<void> {}
}

View File

@ -0,0 +1,88 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
export class CreatePerpEvent1627606236070 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'perp_event',
columns: [
{
name: 'seqNum',
type: 'int8',
},
{
name: 'address',
type: 'text',
},
{
name: 'loadTimestamp',
type: 'timestamptz',
},
{
name: 'makerFee',
type: 'decimal',
},
{
name: 'takerFee',
type: 'decimal',
},
{
name: 'takerSide',
type: 'varchar',
},
{
name: 'maker',
type: 'text',
},
{
name: 'makerOrderId',
type: 'numeric',
},
{
name: 'taker',
type: 'text',
},
{
name: 'takerOrderId',
type: 'numeric',
},
{
name: 'price',
type: 'decimal',
},
{
name: 'quantity',
type: 'decimal',
},
],
}),
true
);
await queryRunner.createTable(
new Table({
name: 'perp_sequence_number',
columns: [
{
name: 'address',
type: 'text',
},
{
name: 'loadTimestamp',
type: 'timestamptz',
},
{
name: 'seqNum',
type: 'int8',
},
],
}),
true
);
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable('perp_event');
await queryRunner.dropTable('perp_sequence_number');
}
}

View File

@ -0,0 +1,10 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class MakePerpHypertable1627606236170 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query("SELECT create_hypertable('perp_sequence_number', 'loadTimestamp');");
await queryRunner.query("SELECT create_hypertable('perp_event', 'loadTimestamp');");
}
async down(queryRunner: QueryRunner): Promise<void> {}
}

View File

@ -0,0 +1,50 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
export class CreatePerpLiquidationEvent1627606237070 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'perp_liquidation_event',
columns: [
{
name: 'seqNum',
type: 'int8',
},
{
name: 'loadTimestamp',
type: 'timestamptz',
},
{
name: 'address',
type: 'text',
},
{
name: 'liqee',
type: 'text',
},
{
name: 'liqor',
type: 'text',
},
{
name: 'liquidationFee',
type: 'decimal',
},
{
name: 'price',
type: 'decimal',
},
{
name: 'quantity',
type: 'decimal',
},
],
}),
true
);
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable('perp_liquidation_event');
}
}

173
src/scrapePerpQueue.ts Normal file
View File

@ -0,0 +1,173 @@
import {
getMultipleAccounts,
IDS,
PerpMarket,
PerpMarketLayout,
ZERO_BN,
} from '@blockworks-foundation/mango-client';
import BN from 'bn.js';
import { Connection as DbConnection } from 'typeorm';
import { Connection, PublicKey } from '@solana/web3.js';
import { wait } from './utils';
import { PerpEvent, PerpLiquidationEvent, PerpSequenceNumber } from './entity';
export async function getAllPerpMarkets(connection: Connection) {
const DEFAULT_MANGO_GROUP_NAME = process.env.GROUP || 'mainnet.1';
const defaultMangoGroupIds = IDS['groups'].find(
(group) => group.name === DEFAULT_MANGO_GROUP_NAME
);
const perpMarketInfos = defaultMangoGroupIds.perpMarkets;
const perpMarketPks = perpMarketInfos.map((mkt) => new PublicKey(mkt.publicKey));
const allMarketAccountInfos = await getMultipleAccounts(connection, perpMarketPks);
return perpMarketInfos.map((perpInfo, i) => {
const decoded = PerpMarketLayout.decode(allMarketAccountInfos[i].accountInfo.data);
return new PerpMarket(
new PublicKey(perpInfo.publicKey),
perpInfo.baseDecimals,
perpInfo.quoteDecimals,
decoded
);
});
}
function formatFillEvents(address, fillEvents) {
return fillEvents.map((fillEvent) => {
return {
loadTimestamp: new Date(fillEvent.timestamp.toNumber() * 1000).toISOString(),
address,
takerSide: fillEvent.takerSide,
seqNum: fillEvent.seqNum.toNumber(),
makerFee: fillEvent.makerFee.toNumber(),
takerFee: fillEvent.takerFee.toNumber(),
maker: fillEvent.maker.toString(),
makerOrderId: fillEvent.makerOrderId.toString(),
taker: fillEvent.taker.toString(),
takerOrderId: fillEvent.takerOrderId.toString(),
price: fillEvent.price,
quantity: fillEvent.quantity,
};
});
}
function formatLiquidateEvent(address, liquidationEvents) {
return liquidationEvents.map((event) => {
return {
seqNum: event.seqNum.toNumber(),
loadTimestamp: new Date(event.timestamp.toNumber() * 1000).toISOString(),
address,
liqee: event.liqee.toString(),
liqor: event.liqor.toString(),
price: event.price,
quantity: event.quantity,
liquidationFee: event.liquidationFee.toNumber(),
};
});
}
const getLastSeqNumForMarket = async (address, db) => {
try {
return await db
.getRepository(PerpSequenceNumber)
.createQueryBuilder('perp_sequence_number')
.where('perp_sequence_number.address = :address', { address })
.addOrderBy('perp_sequence_number.seqNum', 'DESC')
.select('"seqNum"')
.limit(1)
.execute();
} catch (error) {
console.error('Unable to fetch perp_sequence_number', error);
}
};
async function insertPerpEvents(perpMarketPk, allEvents, fillEvents, liquidateEvents, db) {
if (allEvents.length === 0) return;
if (fillEvents.length) {
const newRecords = formatFillEvents(perpMarketPk, fillEvents);
await db.getRepository(PerpEvent).createQueryBuilder().insert().values(newRecords).execute();
}
if (liquidateEvents.length) {
const newRecords = formatLiquidateEvent(perpMarketPk, liquidateEvents);
await db
.getRepository(PerpLiquidationEvent)
.createQueryBuilder()
.insert()
.values(newRecords)
.execute();
}
const sequenceNumbers = allEvents
.map((e) => e.fill || e.liquidate || e.out)
.map((event) => ({
address: perpMarketPk,
seqNum: event.seqNum.toNumber(),
loadTimestamp: new Date().toISOString(),
}));
await db
.getRepository(PerpSequenceNumber)
.createQueryBuilder()
.insert()
.values(sequenceNumbers)
.execute();
}
async function captureEventsForPerpMarket(
db: DbConnection,
connection: Connection,
perpMarket: PerpMarket
) {
let lastSeqNum;
const perpMarketPk = perpMarket.publicKey.toString();
const lastSeqNumRecord = await getLastSeqNumForMarket(perpMarketPk, db);
if (lastSeqNumRecord.length) {
lastSeqNum = new BN(lastSeqNumRecord[0]?.seqNum);
}
const eventQueue = await perpMarket.loadEventQueue(connection);
const allEvents = eventQueue.eventsSince(lastSeqNum);
const fillEvents = allEvents
.map((e) => e.fill)
.filter((e) => !!e)
.map((e) => perpMarket.parseFillEvent(e));
const liquidateEvents = allEvents
.map((e) => e.liquidate)
.filter((e) => !!e)
.map((e) => ({
...e,
price: e.price.toNumber(),
quantity: perpMarket.baseLotsToNumber(e.quantity),
}));
console.log(
`market ${perpMarketPk} lastSeqNum: ${lastSeqNum?.toNumber()}, fill events: ${
fillEvents.length
}, all events: ${allEvents.length}`
);
try {
await insertPerpEvents(perpMarketPk, allEvents, fillEvents, liquidateEvents, db);
} catch (error) {
console.error('Error inserting event queue data:', error);
}
}
export const startPerpEventParsing = async (
db: DbConnection,
connection: Connection,
perpMarket,
waitTime
) => {
while (true) {
try {
await captureEventsForPerpMarket(db, connection, perpMarket);
} catch (e) {
console.log(`Error in startPerpEventParsing() ${e}`);
}
await wait(waitTime);
}
};

298
src/scrapeSerumQueue.ts Normal file
View File

@ -0,0 +1,298 @@
import 'reflect-metadata';
import { Connection, PublicKey } from '@solana/web3.js';
import { Market, OpenOrders, EVENT_QUEUE_LAYOUT } from '@project-serum/serum';
import { CurrencyMeta, SerumEvent, Owner, SequenceNumber } from './entity';
import { wait, SOURCES, formatFilledEvents } from './utils';
import { Event } from '@project-serum/serum/lib/queue';
import { SERUM_DEX_V3_PK } from '.';
import { Connection as DbConnection } from 'typeorm';
function throwIfNull<T>(value: T | null, message = 'account not found'): T {
if (value === null) {
throw new Error(message);
}
return value;
}
function decodeQueueItem(headerLayout, nodeLayout, buffer: Buffer, nodeIndex) {
return nodeLayout.decode(buffer, headerLayout.span + nodeIndex * nodeLayout.span);
}
function decodeEventsSince(buffer: Buffer, lastSeqNum: number): Event[] {
const { HEADER, NODE } = EVENT_QUEUE_LAYOUT;
const header = HEADER.decode(buffer);
if (lastSeqNum > header.seqNum) {
return [];
}
const allocLen = Math.floor((buffer.length - HEADER.span) / NODE.span);
// calculate number of missed events
// account for u32 & ringbuffer overflows
const modulo32Uint = 0x100000000;
let missedEvents = (header.seqNum - lastSeqNum + modulo32Uint) % modulo32Uint;
if (missedEvents > allocLen) {
missedEvents = allocLen;
}
const startSeq = (header.seqNum - missedEvents + modulo32Uint) % modulo32Uint;
// define boundary indexes in ring buffer [start;end)
const endIndex = (header.head + header.count) % allocLen;
const startIndex = (endIndex - missedEvents + allocLen) % allocLen;
const results: Event[] = [];
for (let i = 0; i < missedEvents; ++i) {
const nodeIndex = (startIndex + i) % allocLen;
const event = decodeQueueItem(HEADER, NODE, buffer, nodeIndex);
event.seqNum = (startSeq + i) % modulo32Uint;
results.push(event);
}
return results;
}
const loadEventQueue = async (
connection: Connection,
eventQueuePk: PublicKey,
lastSeqNum: number
) => {
const { data } = throwIfNull(await connection.getAccountInfo(eventQueuePk));
const events = decodeEventsSince(data, lastSeqNum);
events.shift(); // remove the first element in the array since it is the lastSeqNum recorded
return events;
};
const addOwnerMappings = async (events, db, connection) => {
if (events.length === 0) return;
const ownerRepository = db.getRepository(Owner);
const allOpenOrders: string[] = events.map((e) => e.openOrders.toString());
const uniqueOpenOrders: string[] = [...new Set(allOpenOrders)];
let records;
try {
records = await ownerRepository
.createQueryBuilder('owner')
.where('owner.openOrders IN (:...uniqueOpenOrders)', { uniqueOpenOrders })
.getMany();
} catch (error) {
console.error('Error fetching owners', error);
}
if (records.length === uniqueOpenOrders.length) return;
const openOrdersToAdd = uniqueOpenOrders.filter((o) => !records.find((r) => r.openOrders === o));
try {
const newRecords = await Promise.all(
openOrdersToAdd.map(async (openOrders) => {
const o = await OpenOrders.load(
connection,
new PublicKey(openOrders),
new PublicKey(SERUM_DEX_V3_PK)
);
return { owner: o.owner.toString(), openOrders };
})
);
await ownerRepository.save(newRecords);
} catch (error) {
console.error('Error updating Owner table:', `${error}`);
console.error('Tried adding open orders:', openOrdersToAdd);
}
};
const insertCurrencyMeta = async (marketMeta, db) => {
const { address, baseSymbol, quoteSymbol, baseDecimals, quoteDecimals } = marketMeta;
const currencyRepo = db.getRepository(CurrencyMeta);
const baseCurrencyRow = await currencyRepo.find({
where: { address, programId: SERUM_DEX_V3_PK, currency: baseSymbol },
});
const quoteCurrencyRow = await currencyRepo.find({
where: { address, programId: SERUM_DEX_V3_PK, currency: quoteSymbol },
});
let newCurrencies = [];
if (baseCurrencyRow.length === 0) {
newCurrencies.push({
address,
programId: SERUM_DEX_V3_PK,
currency: baseSymbol,
MintDecimals: baseDecimals,
});
}
if (quoteCurrencyRow.length === 0) {
newCurrencies.push({
address,
programId: SERUM_DEX_V3_PK,
currency: quoteSymbol,
MintDecimals: quoteDecimals,
});
}
if (newCurrencies.length) {
try {
await currencyRepo.save(newCurrencies);
} catch (error) {
console.error('Error inserting currency meta data:', error);
}
}
};
const insertEvents = async (
allEvents,
fillEvents: Array<any>,
marketMeta,
loadTimestamp,
db: DbConnection
) => {
if (allEvents.length === 0) return;
// persist fill events first and then string events to prevent missing events in case of a restart
if (fillEvents.length) {
const newRecords = formatFilledEvents(
fillEvents,
marketMeta,
loadTimestamp,
SOURCES.loadEventQueue
);
await db
.getRepository(SerumEvent)
.createQueryBuilder()
.insert()
.values(newRecords)
.orIgnore()
.execute();
}
const sequenceNumbers = allEvents.map((event) => ({
address: marketMeta.address,
seqNum: event.seqNum,
loadTimestamp,
}));
await db
.getRepository(SequenceNumber)
.createQueryBuilder()
.insert()
.values(sequenceNumbers)
.orIgnore()
.execute();
};
const getLastSeqNumForMarket = async (marketMeta, db) => {
try {
return await db
.getRepository(SequenceNumber)
.createQueryBuilder('sequence_number')
.where('sequence_number.address = :address', { address: marketMeta.address })
.orderBy('sequence_number.loadTimestamp', 'DESC')
.addOrderBy('sequence_number.seqNum', 'DESC')
.select('"seqNum"')
.limit(1)
.execute();
} catch (error) {
console.error('Unable to fetch sequence_number', error);
}
};
export const parseAndCaptureEventsForMarket = async (db, connection, marketMeta) => {
let lastSeqNum = 0;
try {
const record = await getLastSeqNumForMarket(marketMeta, db);
if (record.length) {
lastSeqNum = record[0]?.seqNum;
}
} catch (err) {
console.log('Unable to load lastSeqNum', `${err}`);
}
const loadTimestamp = new Date().toISOString();
let events = [];
try {
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
} catch (error) {
console.error('Error loading event queue', error);
}
const fillEvents = events.filter((e) => e.eventFlags['fill']);
console.log(
`Scraping ${marketMeta.name}. Last seq num: ${lastSeqNum}. Events loaded: ${events.length}. Fill events: ${fillEvents.length}`
);
try {
// track openOrder account public keys and the public key of the owner of the open orders account
await addOwnerMappings(fillEvents, db, connection);
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
} catch (error) {
console.error('Error inserting event queue data:', error);
}
};
export const parseAndCapturePerpEventsForMarket = async (
db: DbConnection,
connection,
marketMeta
) => {
let lastSeqNum = 0;
try {
const record = await getLastSeqNumForMarket(marketMeta, db);
if (record.length) {
lastSeqNum = record[0]?.seqNum;
}
} catch (err) {
console.log('Unable to load lastSeqNum', `${err}`);
}
const loadTimestamp = new Date().toISOString();
let events = [];
try {
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
} catch (error) {
console.error('Error loading event queue', error);
}
const fillEvents = events.filter((e) => e.eventFlags['fill']);
console.log(
`Scraping ${marketMeta.name}. Events loaded: ${events.length}. Last seq num: ${lastSeqNum}. Fill events: ${fillEvents.length}`
);
try {
await addOwnerMappings(fillEvents, db, connection);
} catch (error) {
console.error('Error inserting owner mappings data:', error);
}
try {
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
} catch (error) {
console.error('Error inserting event queue data:', error);
}
};
export const startEventParsing = async (
db: DbConnection,
connection: Connection,
marketMeta,
waitTime
) => {
const marketPk = new PublicKey(marketMeta.address);
const programID = new PublicKey(SERUM_DEX_V3_PK);
const market = await Market.load(connection, marketPk, {}, programID);
marketMeta.baseSymbol = marketMeta['name'].split('/')[0];
marketMeta.quoteSymbol = marketMeta['name'].split('/')[1];
marketMeta.baseDecimals = market['_baseSplTokenDecimals'];
marketMeta.quoteDecimals = market['_quoteSplTokenDecimals'];
marketMeta.eventQueuePk = market['_decoded'].eventQueue;
// stores mint decimals and base/quote currency symbols (might not need)
await insertCurrencyMeta(marketMeta, db);
while (true) {
try {
await parseAndCaptureEventsForMarket(db, connection, marketMeta);
} catch (err) {
console.error(`Error in startEventParsing() ${err}`);
}
await wait(waitTime);
}
};

164
src/utils.ts Normal file
View File

@ -0,0 +1,164 @@
import crypto from 'crypto';
import { blob, struct, u32 } from 'buffer-layout';
import { accountFlagsLayout, zeros } from '@project-serum/serum/lib/layout';
import { AccountInfo, Commitment, Connection, PublicKey } from '@solana/web3.js';
import { SERUM_DEX_V3_PK } from '.';
export const TEST_TRACKED_MARKETS = ['BTC/USDT', 'BTC/USDC'];
export const SOURCES = {
websocket: 1,
loadEventQueue: 2,
};
export const WEBSOCKET_MARKETS = [
{ name: 'BTC/USDT', address: 'C1EuT9VokAKLiW7i2ASnZUvxDoKuKkCpDDeNxAptuNe4' },
{ name: 'BTC/USDC', address: 'A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw' },
{ name: 'ETH/USDT', address: '7dLVkUfBVfCGkFhSXDCq1ukM9usathSgS716t643iFGF' },
{ name: 'ETH/USDC', address: '4tSvZvnbyzHXLMTiFonMyxZoHmFqau1XArcRCVHLZ5gX' },
{ name: 'SOL/USDC', address: '9wFFyRfZBsuAha4YcuxcXLKwMxJR43S7fPfQLusDBzvT' },
{ name: 'SRM/USDC', address: 'ByRys5tuUWDgL73G8JBAEfkdFf8JWBzPBDHsBVQ5vbQA' },
{ name: 'MNGO/USDC', address: '3d4rzwpy9iGdCZvgxcu7B1YocYffVLsQXPXkBZKt2zLc' },
];
export const TRACKED_MARKETS = [
// { name: 'BTC/WUSDT', address: '5r8FfnbNYcQbS1m4CYmoHYGjBtu6bxfo6UJHNRfzPiYH' },
// { name: 'ETH/WUSDT', address: '71CtEComq2XdhGNbXBuYPmosAjMCPSedcgbNi5jDaGbR' },
{ name: 'BTC/USDT', address: 'C1EuT9VokAKLiW7i2ASnZUvxDoKuKkCpDDeNxAptuNe4' },
{ name: 'ETH/USDT', address: '7dLVkUfBVfCGkFhSXDCq1ukM9usathSgS716t643iFGF' },
// { name: 'FIDA/USDT', address: 'EbV7pPpEvheLizuYX3gUCvWM8iySbSRAhu2mQ5Vz2Mxf' },
// { name: 'HGET/USDT', address: 'ErQXxiNfJgd4fqQ58PuEw5xY35TZG84tHT6FXf5s4UxY' },
// { name: 'KIN/USDT', address: '4nCFQr8sahhhL4XJ7kngGFBmpkmyf3xLzemuMhn6mWTm' },
{ name: 'RAY/USDT', address: 'teE55QrL4a4QSfydR9dnHF97jgCfptpuigbb53Lo95g' },
{ name: 'SOL/USDT', address: 'HWHvQhFmJB3NUcu1aihKmrKegfVxBEHzwVX6yZCKEsi1' },
{ name: 'SRM/USDT', address: 'AtNnsY1AyRERWJ8xCskfz38YdvruWVJQUVXgScC1iPb' },
// { name: 'SUSHI/USDT', address: '6DgQRTpJTnAYBSShngAVZZDq7j9ogRN1GfSQ3cq9tubW' },
{ name: 'ALEPH/USDC', address: 'GcoKtAmTy5QyuijXSmJKBtFdt99e6Buza18Js7j9AJ6e' },
{ name: 'APEX/USDC', address: 'GX26tyJyDxiFj5oaKvNB9npAHNgdoV9ZYHs5ijs5yG2U' },
// { name: 'BOP/USDC', address: '7MmPwD1K56DthW14P1PnWZ4zPCbPWemGs3YggcT1KzsM' },
{ name: 'BTC/USDC', address: 'A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw' },
// { name: 'CATO/USDC', address: '9fe1MWiKqUdwift3dEpxuRHWftG72rysCRHbxDy6i9xB' },
// { name: 'CCAI/USDC', address: '7gZNLDbWE73ueAoHuAeFoSu7JqmorwCLpNTBXHtYSFTa' },
{ name: 'COPE/USDC', address: '6fc7v3PmjZG9Lk2XTot6BywGyYLkBQuzuFKd4FpCsPxk' },
{ name: 'ETH/USDC', address: '4tSvZvnbyzHXLMTiFonMyxZoHmFqau1XArcRCVHLZ5gX' },
// { name: 'FAB/USDC', address: 'Cud48DK2qoxsWNzQeTL5D8sAiHsGwG8Ev1VMNcYLayxt' },
{ name: 'FIDA/USDC', address: 'E14BKBhDWD4EuTkWj1ooZezesGxMW8LPCps4W5PuzZJo' },
{ name: 'FTT/USDC', address: '2Pbh1CvRVku1TgewMfycemghf6sU9EyuFDcNXqvRmSxc' },
// { name: 'HXRO/USDC', address: '6Pn1cSiRos3qhBf54uBP9ZQg8x3JTardm1dL3n4p29tA' },
// { name: 'KIN/USDC', address: 'Bn6NPyr6UzrFAwC4WmvPvDr2Vm8XSUnFykM2aQroedgn' },
// { name: 'LINK/USDC', address: '3hwH1txjJVS8qv588tWrjHfRxdqNjBykM1kMcit484up' },
// { name: 'LIQ/USDC', address: 'FLKUQGh9VAG4otn4njLPUf5gaUPx5aAZ2Q6xWiD3hH5u' },
// { name: 'MAPS/USDC', address: '3A8XQRWXC7BjLpgLDDBhQJLT5yPCzS16cGYRKHkKxvYo' },
// { name: 'MEDIA/USDC', address: 'FfiqqvJcVL7oCCu8WQUMHLUC2dnHQPAPjTdSzsERFWjb' },
// { name: 'MER/USDC', address: 'G4LcexdCzzJUKZfqyVDQFzpkjhB1JoCNL8Kooxi9nJz5' },
{ name: 'MNGO/USDC', address: '3d4rzwpy9iGdCZvgxcu7B1YocYffVLsQXPXkBZKt2zLc' },
// { name: 'MOLA/USDC', address: 'HSpeWWRqBJ4HH2FPyfDhoN1AUq3gYoDenQGZASSqzYW1' },
// { name: 'NINJA/USDC', address: 'J4oPt5Q3FYxrznkXLkbosAWrJ4rZLqJpGqz7vZUL4eMM' },
// { name: 'OXY/USDC', address: 'GZ3WBFsqntmERPwumFEYgrX2B7J7G11MzNZAy7Hje27X' },
// { name: 'OXYPOOL/USDC', address: 'G1uoNqQzdasMUvXV66Eki5dwjWv5N9YU8oHKJrE4mfka' },
{ name: 'RAY/USDC', address: '2xiv8A5xrJ7RnGdxXB42uFEkYHJjszEhaJyKKt4WaLep' },
// { name: 'ROPE/USDC', address: '4Sg1g8U2ZuGnGYxAhc6MmX9MX7yZbrrraPkCQ9MdCPtF' },
// { name: 'renBTC/USDC', address: '74Ciu5yRzhe8TFTHvQuEVbFZJrbnCMRoohBK33NNiPtv' },
// { name: 'renDOGE/USDC', address: '5FpKCWYXgHWZ9CdDMHjwxAfqxJLdw2PRXuAmtECkzADk' },
{ name: 'SBR/USDC', address: 'HXBi8YBwbh4TXF6PjVw81m8Z3Cc4WBofvauj5SBFdgUs' },
{ name: 'SAMO/USDC', address: 'FR3SPJmgfRSKKQ2ysUZBu7vJLpzTixXnjzb84bY3Diif' },
{ name: 'SLRS/USDC', address: '2Gx3UfV831BAh8uQv1FKSPKS9yajfeeD8GJ4ZNb2o2YP' },
{ name: 'SNY/USDC', address: 'DPfj2jYwPaezkCmUNm5SSYfkrkz8WFqwGLcxDDUsN3gA' },
{ name: 'SOL/USDC', address: '9wFFyRfZBsuAha4YcuxcXLKwMxJR43S7fPfQLusDBzvT', interval: '4000' },
// { name: 'SOLAPE/USDC', address: '4zffJaPyeXZ2wr4whHgP39QyTfurqZ2BEd4M5W6SEuon' },
{ name: 'SRM/USDC', address: 'ByRys5tuUWDgL73G8JBAEfkdFf8JWBzPBDHsBVQ5vbQA' },
{ name: 'STEP/USDC', address: '97qCB4cAVSTthvJu3eNoEx6AY6DLuRDtCoPm5Tdyg77S' },
{ name: 'SUSHI/USDC', address: 'A1Q9iJDVVS8Wsswr9ajeZugmj64bQVCYLZQLra2TMBMo' },
// { name: 'TOMO/USDC', address: '8BdpjpSD5n3nk8DQLqPUyTZvVqFu6kcff5bzUX5dqDpy' },
// { name: 'TULIP/USDC', address: '8GufnKq7YnXKhnB3WNhgy5PzU9uvHbaaRrZWQK6ixPxW' },
// { name: 'UNI/USDC', address: '6JYHjaQBx6AtKSSsizDMwozAEDEZ5KBsSUzH7kRjGJon' },
{ name: 'USDT/USDC', address: '77quYg4MGneUdjgXCunt9GgM1usmrxKY31twEy3WHwcS' },
// { name: 'WOO/USDC', address: '2Ux1EYeWsxywPKouRCNiALCZ1y3m563Tc4hq1kQganiq' },
// { name: 'YFI/USDC', address: '7qcCo8jqepnjjvB5swP4Afsr3keVBs6gNpBTNubd1Kr2' },
{ name: 'MSOL/USDC', address: '6oGsL2puUgySccKzn9XA9afqF217LfxP5ocq4B3LWsjy' },
{ name: 'BNB/USDC', address: '3zzTxtDCt9PimwzGrgWJEbxZfSLetDMkdYegPanGNpMf' },
{ name: 'AVAX/USDC', address: 'E8JQstcwjuqN5kdMyUJLNuaectymnhffkvfg1j286UCr' },
{ name: 'LUNA/USDC', address: 'HBTu8hNaoT3VyiSSzJYa8jwt9sDGKtJviSwFa11iXdmE' },
];
export const wait = async (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
export const formatEvent = (uuid: string, event, marketMeta, loadTimestamp, source) => {
return {
address: marketMeta.address,
programId: SERUM_DEX_V3_PK,
baseCurrency: marketMeta.baseSymbol,
quoteCurrency: marketMeta.quoteSymbol,
fill: event.eventFlags['fill'],
out: event.eventFlags['out'],
bid: event.eventFlags['bid'],
maker: event.eventFlags['maker'],
openOrderSlot: event.openOrdersSlot.toString(),
feeTier: event.feeTier.toString(),
nativeQuantityReleased: parseInt(event.nativeQuantityReleased.toString()),
nativeQuantityPaid: parseInt(event.nativeQuantityPaid.toString()),
nativeFeeOrRebate: event.nativeFeeOrRebate.toString(),
orderId: event.orderId.toString(),
openOrders: event.openOrders.toString(),
clientOrderId: event.clientOrderId.toString(),
seqNum: event.seqNum,
uuid,
source,
loadTimestamp,
};
};
export const formatFilledEvents = (events, marketMeta, loadTimestamp, source) => {
const newRecords = events.map((event) => {
const uuid = crypto.createHash('sha256').update(JSON.stringify(event)).digest('hex');
return formatEvent(uuid, event, marketMeta, loadTimestamp, source);
});
return newRecords;
};
const EVENT_QUEUE_HEADER = struct([
blob(5),
accountFlagsLayout('accountFlags'),
u32('head'),
zeros(4),
u32('count'),
zeros(4),
u32('seqNum'),
zeros(4),
]);
export const decodeEventHeader = (eventDataBuffer) => {
return EVENT_QUEUE_HEADER.decode(eventDataBuffer);
};
export async function getMultipleAccounts(
connection: Connection,
publicKeys: PublicKey[],
commitment?: Commitment
): Promise<{ publicKey: PublicKey; accountInfo: AccountInfo<Buffer> }[]> {
const publicKeyStrs = publicKeys.map((pk) => pk.toBase58());
// load connection commitment as a default
commitment ||= connection.commitment;
const args = commitment ? [publicKeyStrs, { commitment }] : [publicKeyStrs];
// @ts-ignore
const resp = await connection._rpcRequest('getMultipleAccounts', args);
if (resp.error) {
throw new Error(resp.error.message);
}
return resp.result.value.map(({ data, executable, lamports, owner }, i: number) => ({
publicKey: publicKeys[i],
accountInfo: {
data: Buffer.from(data[0], 'base64'),
executable,
owner: new PublicKey(owner),
lamports,
},
}));
}

43
src/watchQueue.ts Normal file
View File

@ -0,0 +1,43 @@
import 'reflect-metadata';
import { Connection, PublicKey } from '@solana/web3.js';
import { Market, decodeEventQueue } from '@project-serum/serum';
import { SerumEvent } from './entity';
import { formatFilledEvents, SOURCES } from './utils';
import { SERUM_DEX_V3_PK } from '.';
const insertEvents = async (events: Array<any>, marketMeta, loadTimestamp, db) => {
const eventRepo = db.getRepository(SerumEvent);
if (events.length) {
try {
await eventRepo.createQueryBuilder().insert().values(events).orIgnore().execute();
} catch (error) {
console.error('Error inserting into Event table:', error);
}
}
};
export const watchMarketEventQueue = async (db, marketMeta) => {
let connection = new Connection('https://mango.rpcpool.com/');
let marketPk = new PublicKey(marketMeta.address);
let programID = new PublicKey(SERUM_DEX_V3_PK);
let market = await Market.load(connection, marketPk, {}, programID);
const eventQueueAccount = market['_decoded'].eventQueue;
connection.onAccountChange(eventQueueAccount, (event) => {
const loadTimestamp = new Date().toISOString();
const decodedEventQueue = decodeEventQueue(event.data);
const filledEvents = decodedEventQueue.filter((e) => e.eventFlags['fill']);
if (filledEvents.length > 0) {
console.log('event inserted for', marketMeta.name);
const formattedFilledEvents = formatFilledEvents(
filledEvents,
marketMeta,
loadTimestamp,
SOURCES.websocket
);
insertEvents(formattedFilledEvents, marketMeta, loadTimestamp, db);
}
});
};

18
tsconfig.json Normal file
View File

@ -0,0 +1,18 @@
{
"compilerOptions": {
"target": "es2019",
"lib": ["es2019"],
"module": "commonjs",
"moduleResolution": "node",
"outDir": "dist",
"allowJs": true,
"sourceMap": false,
"resolveJsonModule": true,
"skipLibCheck": true,
"esModuleInterop": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
},
"include": ["src/**/*.ts", "src/**/*.json"],
"exclude": ["node_modules"],
}

35
utils.ts Normal file
View File

@ -0,0 +1,35 @@
import { PublicKey } from '@solana/web3.js';
import { MintInfo, MintLayout, AccountInfo, AccountLayout, u64 } from '@solana/spl-token';
import * as BufferLayout from 'buffer-layout';
import * as bs58 from 'bs58';
export const ACCOUNT_LAYOUT = BufferLayout.struct([
BufferLayout.blob(32, 'mint'),
BufferLayout.blob(32, 'owner'),
BufferLayout.nu64('amount'),
BufferLayout.blob(93),
]);
export const MINT_LAYOUT = BufferLayout.struct([
BufferLayout.blob(44),
BufferLayout.u8('decimals'),
BufferLayout.blob(37),
]);
export function parseTokenAccountData(data: any) {
// @ts-ignore
let { mint, owner, amount } = ACCOUNT_LAYOUT.decode(data);
return {
mint: new PublicKey(mint),
owner: new PublicKey(owner),
amount,
};
}
export function parseTokenAccount(data: Buffer): AccountInfo {
const accountInfo = AccountLayout.decode(data);
accountInfo.mint = new PublicKey(accountInfo.mint);
accountInfo.owner = new PublicKey(accountInfo.owner);
accountInfo.amount = u64.fromBuffer(accountInfo.amount);
return accountInfo;
}

3073
yarn.lock Normal file

File diff suppressed because it is too large Load Diff