mango-transaction-scraper/src/signatures.ts

133 lines
5.5 KiB
TypeScript

import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js';
import { sleep} from '@blockworks-foundation/mango-client';
import { bulkBatchInsert } from './utils';
export async function getNewSignatures(afterSignature: string, connection: Connection, address: PublicKey, requestWaitTime) {
// Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSignature" signature
let signatures;
const limit = 1000;
let before = null;
let options;
let allSignaturesInfo: ConfirmedSignatureInfo[] = [];
while (true) {
if (before === null) {
options = {limit: limit};
} else {
options = {limit: limit, before: before};
}
let signaturesInfo = (await connection.getConfirmedSignaturesForAddress2(address, options));
signatures = signaturesInfo.map(x => x['signature']);
let afterSignatureIndex = signatures.indexOf(afterSignature);
if (afterSignatureIndex !== -1) {
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo.slice(0, afterSignatureIndex));
break
} else {
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
// otherwise we have an issue where the rpc endpoint does not have enough history
if (signaturesInfo.length !== limit) {
throw 'rpc endpoint does not have sufficient signature history to reach afterSignature ' + afterSignature
}
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
}
before = signatures[signatures.length-1];
console.log(new Date(signaturesInfo[signaturesInfo.length-1].blockTime! * 1000).toISOString());
await sleep(requestWaitTime);
}
return allSignaturesInfo
}
export async function getLatestSignatureBeforeUnixEpoch(connection, address, unixEpoch, requestWaitTime) {
let signaturesInfo;
let limit = 1000;
let options;
let earliestSignature = null;
let signature;
let found = false;
while (true) {
if (earliestSignature === null) {
options = {limit: limit};
} else {
options = {limit: limit, before: earliestSignature};
}
signaturesInfo = await connection.getConfirmedSignaturesForAddress2(address, options);
for (let signatureInfo of signaturesInfo) {
if (signatureInfo.blockTime < unixEpoch) {
signature = signatureInfo.signature;
found = true;
break
}
}
if (found) {break}
earliestSignature = signaturesInfo[signaturesInfo.length - 1].signature;
console.log(new Date(signaturesInfo[signaturesInfo.length-1].blockTime! * 1000).toISOString());
await sleep(requestWaitTime);
}
return signature;
}
export async function insertNewSignatures(address, connection, pool, requestWaitTime) {
let client = await pool.connect()
let latestDbSignatureRows = await client.query('select signature from all_transactions where id = (select max(id) from all_transactions where account = $1)', [address.toBase58()])
client.release();
// let latestDbSignatureRow = db.prepare('select signature from transactions where id = (select max(id) from transactions where account = ?)').get(account.toBase58());
let latestDbSignature;
if (latestDbSignatureRows.rows.length === 0) {
let currentUnixEpoch = Math.round(Date.now()/1000);
// If tranasctions table is empty - initialise by getting all signatures in the 6 hours
let latestSignatureUnixEpoch = currentUnixEpoch - 6 * 60 * 60;
console.log('Current time ', new Date(currentUnixEpoch * 1000).toISOString());
console.log('Getting all signatures after ', new Date(latestSignatureUnixEpoch * 1000).toISOString());
latestDbSignature = await getLatestSignatureBeforeUnixEpoch(connection, address, latestSignatureUnixEpoch, requestWaitTime);
} else {
latestDbSignature = latestDbSignatureRows.rows[0]['signature'];
}
let newSignatures = await getNewSignatures(latestDbSignature, connection, address, requestWaitTime);
// By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest
// We reverse the order to oldest -> newest here
// This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)
// Also ensures that the auto increment id in our table is incremented oldest -> newest
newSignatures = newSignatures.reverse();
const inserts = newSignatures.map(signatureInfo => ({
signature: signatureInfo.signature,
account: address.toBase58(),
block_time: signatureInfo.blockTime,
block_datetime: (new Date(signatureInfo.blockTime! * 1000)).toISOString(),
slot: signatureInfo.slot,
err: signatureInfo.err === null ? 0 : 1,
process_state: 'unprocessed'
}))
let columns = ['signature', 'account', 'block_time', 'block_datetime', 'slot', 'err', 'process_state'];
let table = 'all_transactions'
let batchSize = 10000
await bulkBatchInsert(pool, table, columns, inserts, batchSize);
console.log('inserted ' + newSignatures.length + ' signatures')
}