mango-transaction-scraper-v3/src/signatures.ts

142 lines
4.8 KiB
TypeScript

import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js';
import { sleep } from '@blockworks-foundation/mango-client';
import { bulkBatchInsert } from './utils';
export async function getNewSignatures(
connection: Connection,
address: string,
requestWaitTime: number,
pool,
schema
) {
let client = await pool.connect();
let latestSlotRows = await client.query(
'select max(slot) as max_slot from ' +
schema +
'.transactions where program_pk = $1',
[address],
);
client.release();
let afterSlot = latestSlotRows.rows[0]['max_slot'];
console.log('Fetching all signatures after slot ' + afterSlot)
// Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSlot" slot
let signatures;
let slots;
const limit = 1000;
let before = null;
let options;
let allSignaturesInfo: ConfirmedSignatureInfo[] = [];
while (true) {
if (before === null) {
options = { limit: limit };
} else {
options = { limit: limit, before: before };
}
let signaturesInfo = await connection.getConfirmedSignaturesForAddress2(
new PublicKey(address),
options,
);
signatures = signaturesInfo.map((x) => x['signature']);
slots = signaturesInfo.map((x) => x['slot']);
if (signaturesInfo[0].slot < afterSlot) {
// Possibly this happens when validators are switched at the endpoint?
// In the absense of throwing this error the code will keep fetching signatures until to reaches the end of the signatures stored by the rpc node
throw ('most recent fetched signature has slot less than afterSlot')
}
console.log('slots ' + signaturesInfo[0].slot + ' to ' + signaturesInfo[signaturesInfo.length - 1].slot)
console.log(
new Date(
signaturesInfo[signaturesInfo.length - 1].blockTime! * 1000,
).toISOString(),
);
// Stop when we reach a slot we have already stored in the database
// Use slot instead of signature here as can have multiple signatures per slot and signatures are
// stored in a arbitray order per slot - leading to attempting to insert a duplicate signature
// If a slot is already seen - will have all signatures in that slot in the db
let afterSlotIndex = slots.indexOf(afterSlot);
if (afterSlotIndex !== -1) {
allSignaturesInfo = allSignaturesInfo.concat(
signaturesInfo.slice(0, afterSlotIndex),
);
break;
} else {
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
// otherwise we have an issue where the rpc endpoint does not have enough history
if (signaturesInfo.length !== limit) {
throw (
'rpc endpoint does not have sufficient signature history to reach slot ' +
afterSlot
);
}
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
}
before = signatures[signatures.length - 1];
await sleep(requestWaitTime);
}
return allSignaturesInfo;
}
export async function insertSignatures(
address: string,
pool,
schema: string,
newSignatures: ConfirmedSignatureInfo[]
) {
// By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest
// We reverse the order to oldest -> newest here
// This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)
// Also ensures that the auto increment id in our table is incremented oldest -> newest
newSignatures = newSignatures.reverse();
const inserts = newSignatures.map((signatureInfo) => ({
signature: signatureInfo.signature,
program_pk: address,
block_time: signatureInfo.blockTime,
block_datetime: new Date(signatureInfo.blockTime! * 1000).toISOString(),
slot: signatureInfo.slot,
err: signatureInfo.err === null ? 0 : 1,
process_state: 'unprocessed',
worker_partition: signatureInfo.slot % 10
}));
// Seems to be a bug in getConfirmedSignaturesForAddress2 where very rarely I can get duplicate signatures (separated by a few signatures in between)
// So redup here
let uniqueInserts = uniqueOnSignature(inserts);
let columns = [
'signature',
'program_pk',
'block_time',
'block_datetime',
'slot',
'err',
'process_state',
'worker_partition'
];
let table = 'transactions';
let batchSize = 10000;
await bulkBatchInsert(pool, table, columns, uniqueInserts, batchSize, schema);
console.log('inserted ' + newSignatures.length + ' signatures');
}
function uniqueOnSignature(inserts) {
var seen = {};
return inserts.filter(function (e) {
return seen.hasOwnProperty(e.signature)
? false
: (seen[e.signature] = true);
});
}