event-scraper-v3/src/scrapeSerumQueue.ts

299 lines
8.9 KiB
TypeScript

import 'reflect-metadata';
import { Connection, PublicKey } from '@solana/web3.js';
import { Market, OpenOrders, EVENT_QUEUE_LAYOUT } from '@project-serum/serum';
import { CurrencyMeta, SerumEvent, Owner, SequenceNumber } from './entity';
import { wait, SOURCES, formatFilledEvents } from './utils';
import { Event } from '@project-serum/serum/lib/queue';
import { SERUM_DEX_V3_PK } from '.';
import { Connection as DbConnection } from 'typeorm';
function throwIfNull<T>(value: T | null, message = 'account not found'): T {
if (value === null) {
throw new Error(message);
}
return value;
}
function decodeQueueItem(headerLayout, nodeLayout, buffer: Buffer, nodeIndex) {
return nodeLayout.decode(buffer, headerLayout.span + nodeIndex * nodeLayout.span);
}
function decodeEventsSince(buffer: Buffer, lastSeqNum: number): Event[] {
const { HEADER, NODE } = EVENT_QUEUE_LAYOUT;
const header = HEADER.decode(buffer);
if (lastSeqNum > header.seqNum) {
return [];
}
const allocLen = Math.floor((buffer.length - HEADER.span) / NODE.span);
// calculate number of missed events
// account for u32 & ringbuffer overflows
const modulo32Uint = 0x100000000;
let missedEvents = (header.seqNum - lastSeqNum + modulo32Uint) % modulo32Uint;
if (missedEvents > allocLen) {
missedEvents = allocLen;
}
const startSeq = (header.seqNum - missedEvents + modulo32Uint) % modulo32Uint;
// define boundary indexes in ring buffer [start;end)
const endIndex = (header.head + header.count) % allocLen;
const startIndex = (endIndex - missedEvents + allocLen) % allocLen;
const results: Event[] = [];
for (let i = 0; i < missedEvents; ++i) {
const nodeIndex = (startIndex + i) % allocLen;
const event = decodeQueueItem(HEADER, NODE, buffer, nodeIndex);
event.seqNum = (startSeq + i) % modulo32Uint;
results.push(event);
}
return results;
}
const loadEventQueue = async (
connection: Connection,
eventQueuePk: PublicKey,
lastSeqNum: number
) => {
const { data } = throwIfNull(await connection.getAccountInfo(eventQueuePk));
const events = decodeEventsSince(data, lastSeqNum);
events.shift(); // remove the first element in the array since it is the lastSeqNum recorded
return events;
};
const addOwnerMappings = async (events, db, connection) => {
if (events.length === 0) return;
const ownerRepository = db.getRepository(Owner);
const allOpenOrders: string[] = events.map((e) => e.openOrders.toString());
const uniqueOpenOrders: string[] = [...new Set(allOpenOrders)];
let records;
try {
records = await ownerRepository
.createQueryBuilder('owner')
.where('owner.openOrders IN (:...uniqueOpenOrders)', { uniqueOpenOrders })
.getMany();
} catch (error) {
console.error('Error fetching owners', error);
}
if (records.length === uniqueOpenOrders.length) return;
const openOrdersToAdd = uniqueOpenOrders.filter((o) => !records.find((r) => r.openOrders === o));
try {
const newRecords = await Promise.all(
openOrdersToAdd.map(async (openOrders) => {
const o = await OpenOrders.load(
connection,
new PublicKey(openOrders),
new PublicKey(SERUM_DEX_V3_PK)
);
return { owner: o.owner.toString(), openOrders };
})
);
await ownerRepository.save(newRecords);
} catch (error) {
console.error('Error updating Owner table:', `${error}`);
console.error('Tried adding open orders:', openOrdersToAdd);
}
};
const insertCurrencyMeta = async (marketMeta, db) => {
const { address, baseSymbol, quoteSymbol, baseDecimals, quoteDecimals } = marketMeta;
const currencyRepo = db.getRepository(CurrencyMeta);
const baseCurrencyRow = await currencyRepo.find({
where: { address, programId: SERUM_DEX_V3_PK, currency: baseSymbol },
});
const quoteCurrencyRow = await currencyRepo.find({
where: { address, programId: SERUM_DEX_V3_PK, currency: quoteSymbol },
});
let newCurrencies = [];
if (baseCurrencyRow.length === 0) {
newCurrencies.push({
address,
programId: SERUM_DEX_V3_PK,
currency: baseSymbol,
MintDecimals: baseDecimals,
});
}
if (quoteCurrencyRow.length === 0) {
newCurrencies.push({
address,
programId: SERUM_DEX_V3_PK,
currency: quoteSymbol,
MintDecimals: quoteDecimals,
});
}
if (newCurrencies.length) {
try {
await currencyRepo.save(newCurrencies);
} catch (error) {
console.error('Error inserting currency meta data:', error);
}
}
};
const insertEvents = async (
allEvents,
fillEvents: Array<any>,
marketMeta,
loadTimestamp,
db: DbConnection
) => {
if (allEvents.length === 0) return;
// persist fill events first and then string events to prevent missing events in case of a restart
if (fillEvents.length) {
const newRecords = formatFilledEvents(
fillEvents,
marketMeta,
loadTimestamp,
SOURCES.loadEventQueue
);
await db
.getRepository(SerumEvent)
.createQueryBuilder()
.insert()
.values(newRecords)
.orIgnore()
.execute();
}
const sequenceNumbers = allEvents.map((event) => ({
address: marketMeta.address,
seqNum: event.seqNum,
loadTimestamp,
}));
await db
.getRepository(SequenceNumber)
.createQueryBuilder()
.insert()
.values(sequenceNumbers)
.orIgnore()
.execute();
};
const getLastSeqNumForMarket = async (marketMeta, db) => {
try {
return await db
.getRepository(SequenceNumber)
.createQueryBuilder('sequence_number')
.where('sequence_number.address = :address', { address: marketMeta.address })
.orderBy('sequence_number.loadTimestamp', 'DESC')
.addOrderBy('sequence_number.seqNum', 'DESC')
.select('"seqNum"')
.limit(1)
.execute();
} catch (error) {
console.error('Unable to fetch sequence_number', error);
}
};
export const parseAndCaptureEventsForMarket = async (db, connection, marketMeta) => {
let lastSeqNum = 0;
try {
const record = await getLastSeqNumForMarket(marketMeta, db);
if (record.length) {
lastSeqNum = record[0]?.seqNum;
}
} catch (err) {
console.log('Unable to load lastSeqNum', `${err}`);
}
const loadTimestamp = new Date().toISOString();
let events = [];
try {
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
} catch (error) {
console.error('Error loading event queue', error);
}
const fillEvents = events.filter((e) => e.eventFlags['fill']);
console.log(
`Scraping ${marketMeta.name}. Last seq num: ${lastSeqNum}. Events loaded: ${events.length}. Fill events: ${fillEvents.length}`
);
try {
// track openOrder account public keys and the public key of the owner of the open orders account
await addOwnerMappings(fillEvents, db, connection);
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
} catch (error) {
console.error('Error inserting event queue data:', error);
}
};
export const parseAndCapturePerpEventsForMarket = async (
db: DbConnection,
connection,
marketMeta
) => {
let lastSeqNum = 0;
try {
const record = await getLastSeqNumForMarket(marketMeta, db);
if (record.length) {
lastSeqNum = record[0]?.seqNum;
}
} catch (err) {
console.log('Unable to load lastSeqNum', `${err}`);
}
const loadTimestamp = new Date().toISOString();
let events = [];
try {
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
} catch (error) {
console.error('Error loading event queue', error);
}
const fillEvents = events.filter((e) => e.eventFlags['fill']);
console.log(
`Scraping ${marketMeta.name}. Events loaded: ${events.length}. Last seq num: ${lastSeqNum}. Fill events: ${fillEvents.length}`
);
try {
await addOwnerMappings(fillEvents, db, connection);
} catch (error) {
console.error('Error inserting owner mappings data:', error);
}
try {
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
} catch (error) {
console.error('Error inserting event queue data:', error);
}
};
export const startEventParsing = async (
db: DbConnection,
connection: Connection,
marketMeta,
waitTime
) => {
const marketPk = new PublicKey(marketMeta.address);
const programID = new PublicKey(SERUM_DEX_V3_PK);
const market = await Market.load(connection, marketPk, {}, programID);
marketMeta.baseSymbol = marketMeta['name'].split('/')[0];
marketMeta.quoteSymbol = marketMeta['name'].split('/')[1];
marketMeta.baseDecimals = market['_baseSplTokenDecimals'];
marketMeta.quoteDecimals = market['_quoteSplTokenDecimals'];
marketMeta.eventQueuePk = market['_decoded'].eventQueue;
// stores mint decimals and base/quote currency symbols (might not need)
await insertCurrencyMeta(marketMeta, db);
while (true) {
try {
await parseAndCaptureEventsForMarket(db, connection, marketMeta);
} catch (err) {
console.error(`Error in startEventParsing() ${err}`);
}
await wait(waitTime);
}
};