299 lines
8.9 KiB
TypeScript
299 lines
8.9 KiB
TypeScript
import 'reflect-metadata';
|
|
import { Connection, PublicKey } from '@solana/web3.js';
|
|
import { Market, OpenOrders, EVENT_QUEUE_LAYOUT } from '@project-serum/serum';
|
|
import { CurrencyMeta, SerumEvent, Owner, SequenceNumber } from './entity';
|
|
import { wait, SOURCES, formatFilledEvents } from './utils';
|
|
import { Event } from '@project-serum/serum/lib/queue';
|
|
import { SERUM_DEX_V3_PK } from '.';
|
|
import { Connection as DbConnection } from 'typeorm';
|
|
|
|
function throwIfNull<T>(value: T | null, message = 'account not found'): T {
|
|
if (value === null) {
|
|
throw new Error(message);
|
|
}
|
|
return value;
|
|
}
|
|
|
|
function decodeQueueItem(headerLayout, nodeLayout, buffer: Buffer, nodeIndex) {
|
|
return nodeLayout.decode(buffer, headerLayout.span + nodeIndex * nodeLayout.span);
|
|
}
|
|
|
|
function decodeEventsSince(buffer: Buffer, lastSeqNum: number): Event[] {
|
|
const { HEADER, NODE } = EVENT_QUEUE_LAYOUT;
|
|
const header = HEADER.decode(buffer);
|
|
if (lastSeqNum > header.seqNum) {
|
|
return [];
|
|
}
|
|
const allocLen = Math.floor((buffer.length - HEADER.span) / NODE.span);
|
|
|
|
// calculate number of missed events
|
|
// account for u32 & ringbuffer overflows
|
|
const modulo32Uint = 0x100000000;
|
|
let missedEvents = (header.seqNum - lastSeqNum + modulo32Uint) % modulo32Uint;
|
|
if (missedEvents > allocLen) {
|
|
missedEvents = allocLen;
|
|
}
|
|
const startSeq = (header.seqNum - missedEvents + modulo32Uint) % modulo32Uint;
|
|
|
|
// define boundary indexes in ring buffer [start;end)
|
|
const endIndex = (header.head + header.count) % allocLen;
|
|
const startIndex = (endIndex - missedEvents + allocLen) % allocLen;
|
|
|
|
const results: Event[] = [];
|
|
for (let i = 0; i < missedEvents; ++i) {
|
|
const nodeIndex = (startIndex + i) % allocLen;
|
|
const event = decodeQueueItem(HEADER, NODE, buffer, nodeIndex);
|
|
event.seqNum = (startSeq + i) % modulo32Uint;
|
|
results.push(event);
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
const loadEventQueue = async (
|
|
connection: Connection,
|
|
eventQueuePk: PublicKey,
|
|
lastSeqNum: number
|
|
) => {
|
|
const { data } = throwIfNull(await connection.getAccountInfo(eventQueuePk));
|
|
const events = decodeEventsSince(data, lastSeqNum);
|
|
events.shift(); // remove the first element in the array since it is the lastSeqNum recorded
|
|
return events;
|
|
};
|
|
|
|
const addOwnerMappings = async (events, db, connection) => {
|
|
if (events.length === 0) return;
|
|
const ownerRepository = db.getRepository(Owner);
|
|
const allOpenOrders: string[] = events.map((e) => e.openOrders.toString());
|
|
const uniqueOpenOrders: string[] = [...new Set(allOpenOrders)];
|
|
|
|
let records;
|
|
try {
|
|
records = await ownerRepository
|
|
.createQueryBuilder('owner')
|
|
.where('owner.openOrders IN (:...uniqueOpenOrders)', { uniqueOpenOrders })
|
|
.getMany();
|
|
} catch (error) {
|
|
console.error('Error fetching owners', error);
|
|
}
|
|
|
|
if (records.length === uniqueOpenOrders.length) return;
|
|
|
|
const openOrdersToAdd = uniqueOpenOrders.filter((o) => !records.find((r) => r.openOrders === o));
|
|
|
|
try {
|
|
const newRecords = await Promise.all(
|
|
openOrdersToAdd.map(async (openOrders) => {
|
|
const o = await OpenOrders.load(
|
|
connection,
|
|
new PublicKey(openOrders),
|
|
new PublicKey(SERUM_DEX_V3_PK)
|
|
);
|
|
return { owner: o.owner.toString(), openOrders };
|
|
})
|
|
);
|
|
|
|
await ownerRepository.save(newRecords);
|
|
} catch (error) {
|
|
console.error('Error updating Owner table:', `${error}`);
|
|
console.error('Tried adding open orders:', openOrdersToAdd);
|
|
}
|
|
};
|
|
|
|
const insertCurrencyMeta = async (marketMeta, db) => {
|
|
const { address, baseSymbol, quoteSymbol, baseDecimals, quoteDecimals } = marketMeta;
|
|
const currencyRepo = db.getRepository(CurrencyMeta);
|
|
|
|
const baseCurrencyRow = await currencyRepo.find({
|
|
where: { address, programId: SERUM_DEX_V3_PK, currency: baseSymbol },
|
|
});
|
|
const quoteCurrencyRow = await currencyRepo.find({
|
|
where: { address, programId: SERUM_DEX_V3_PK, currency: quoteSymbol },
|
|
});
|
|
|
|
let newCurrencies = [];
|
|
if (baseCurrencyRow.length === 0) {
|
|
newCurrencies.push({
|
|
address,
|
|
programId: SERUM_DEX_V3_PK,
|
|
currency: baseSymbol,
|
|
MintDecimals: baseDecimals,
|
|
});
|
|
}
|
|
|
|
if (quoteCurrencyRow.length === 0) {
|
|
newCurrencies.push({
|
|
address,
|
|
programId: SERUM_DEX_V3_PK,
|
|
currency: quoteSymbol,
|
|
MintDecimals: quoteDecimals,
|
|
});
|
|
}
|
|
|
|
if (newCurrencies.length) {
|
|
try {
|
|
await currencyRepo.save(newCurrencies);
|
|
} catch (error) {
|
|
console.error('Error inserting currency meta data:', error);
|
|
}
|
|
}
|
|
};
|
|
|
|
const insertEvents = async (
|
|
allEvents,
|
|
fillEvents: Array<any>,
|
|
marketMeta,
|
|
loadTimestamp,
|
|
db: DbConnection
|
|
) => {
|
|
if (allEvents.length === 0) return;
|
|
// persist fill events first and then string events to prevent missing events in case of a restart
|
|
if (fillEvents.length) {
|
|
const newRecords = formatFilledEvents(
|
|
fillEvents,
|
|
marketMeta,
|
|
loadTimestamp,
|
|
SOURCES.loadEventQueue
|
|
);
|
|
await db
|
|
.getRepository(SerumEvent)
|
|
.createQueryBuilder()
|
|
.insert()
|
|
.values(newRecords)
|
|
.orIgnore()
|
|
.execute();
|
|
}
|
|
const sequenceNumbers = allEvents.map((event) => ({
|
|
address: marketMeta.address,
|
|
seqNum: event.seqNum,
|
|
loadTimestamp,
|
|
}));
|
|
await db
|
|
.getRepository(SequenceNumber)
|
|
.createQueryBuilder()
|
|
.insert()
|
|
.values(sequenceNumbers)
|
|
.orIgnore()
|
|
.execute();
|
|
};
|
|
|
|
const getLastSeqNumForMarket = async (marketMeta, db) => {
|
|
try {
|
|
return await db
|
|
.getRepository(SequenceNumber)
|
|
.createQueryBuilder('sequence_number')
|
|
.where('sequence_number.address = :address', { address: marketMeta.address })
|
|
.orderBy('sequence_number.loadTimestamp', 'DESC')
|
|
.addOrderBy('sequence_number.seqNum', 'DESC')
|
|
.select('"seqNum"')
|
|
.limit(1)
|
|
.execute();
|
|
} catch (error) {
|
|
console.error('Unable to fetch sequence_number', error);
|
|
}
|
|
};
|
|
|
|
export const parseAndCaptureEventsForMarket = async (db, connection, marketMeta) => {
|
|
let lastSeqNum = 0;
|
|
try {
|
|
const record = await getLastSeqNumForMarket(marketMeta, db);
|
|
if (record.length) {
|
|
lastSeqNum = record[0]?.seqNum;
|
|
}
|
|
} catch (err) {
|
|
console.log('Unable to load lastSeqNum', `${err}`);
|
|
}
|
|
|
|
const loadTimestamp = new Date().toISOString();
|
|
let events = [];
|
|
try {
|
|
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
|
|
} catch (error) {
|
|
console.error('Error loading event queue', error);
|
|
}
|
|
|
|
const fillEvents = events.filter((e) => e.eventFlags['fill']);
|
|
console.log(
|
|
`Scraping ${marketMeta.name}. Last seq num: ${lastSeqNum}. Events loaded: ${events.length}. Fill events: ${fillEvents.length}`
|
|
);
|
|
|
|
try {
|
|
// track openOrder account public keys and the public key of the owner of the open orders account
|
|
await addOwnerMappings(fillEvents, db, connection);
|
|
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
|
|
} catch (error) {
|
|
console.error('Error inserting event queue data:', error);
|
|
}
|
|
};
|
|
|
|
export const parseAndCapturePerpEventsForMarket = async (
|
|
db: DbConnection,
|
|
connection,
|
|
marketMeta
|
|
) => {
|
|
let lastSeqNum = 0;
|
|
try {
|
|
const record = await getLastSeqNumForMarket(marketMeta, db);
|
|
if (record.length) {
|
|
lastSeqNum = record[0]?.seqNum;
|
|
}
|
|
} catch (err) {
|
|
console.log('Unable to load lastSeqNum', `${err}`);
|
|
}
|
|
|
|
const loadTimestamp = new Date().toISOString();
|
|
let events = [];
|
|
try {
|
|
events = await loadEventQueue(connection, marketMeta.eventQueuePk, lastSeqNum);
|
|
} catch (error) {
|
|
console.error('Error loading event queue', error);
|
|
}
|
|
|
|
const fillEvents = events.filter((e) => e.eventFlags['fill']);
|
|
console.log(
|
|
`Scraping ${marketMeta.name}. Events loaded: ${events.length}. Last seq num: ${lastSeqNum}. Fill events: ${fillEvents.length}`
|
|
);
|
|
|
|
try {
|
|
await addOwnerMappings(fillEvents, db, connection);
|
|
} catch (error) {
|
|
console.error('Error inserting owner mappings data:', error);
|
|
}
|
|
|
|
try {
|
|
await insertEvents(events, fillEvents, marketMeta, loadTimestamp, db);
|
|
} catch (error) {
|
|
console.error('Error inserting event queue data:', error);
|
|
}
|
|
};
|
|
|
|
export const startEventParsing = async (
|
|
db: DbConnection,
|
|
connection: Connection,
|
|
marketMeta,
|
|
waitTime
|
|
) => {
|
|
const marketPk = new PublicKey(marketMeta.address);
|
|
const programID = new PublicKey(SERUM_DEX_V3_PK);
|
|
const market = await Market.load(connection, marketPk, {}, programID);
|
|
|
|
marketMeta.baseSymbol = marketMeta['name'].split('/')[0];
|
|
marketMeta.quoteSymbol = marketMeta['name'].split('/')[1];
|
|
marketMeta.baseDecimals = market['_baseSplTokenDecimals'];
|
|
marketMeta.quoteDecimals = market['_quoteSplTokenDecimals'];
|
|
marketMeta.eventQueuePk = market['_decoded'].eventQueue;
|
|
|
|
// stores mint decimals and base/quote currency symbols (might not need)
|
|
await insertCurrencyMeta(marketMeta, db);
|
|
|
|
while (true) {
|
|
try {
|
|
await parseAndCaptureEventsForMarket(db, connection, marketMeta);
|
|
} catch (err) {
|
|
console.error(`Error in startEventParsing() ${err}`);
|
|
}
|
|
|
|
await wait(waitTime);
|
|
}
|
|
};
|