142 lines
4.8 KiB
TypeScript
142 lines
4.8 KiB
TypeScript
import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js';
|
|
import { sleep } from '@blockworks-foundation/mango-client';
|
|
import { bulkBatchInsert } from './utils';
|
|
|
|
export async function getNewSignatures(
|
|
connection: Connection,
|
|
address: string,
|
|
requestWaitTime: number,
|
|
pool,
|
|
schema
|
|
) {
|
|
|
|
let client = await pool.connect();
|
|
let latestSlotRows = await client.query(
|
|
'select max(slot) as max_slot from ' +
|
|
schema +
|
|
'.transactions where program_pk = $1',
|
|
[address],
|
|
);
|
|
client.release();
|
|
|
|
let afterSlot = latestSlotRows.rows[0]['max_slot'];
|
|
|
|
console.log('Fetching all signatures after slot ' + afterSlot)
|
|
|
|
// Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSlot" slot
|
|
let signatures;
|
|
let slots;
|
|
const limit = 1000;
|
|
let before = null;
|
|
let options;
|
|
let allSignaturesInfo: ConfirmedSignatureInfo[] = [];
|
|
while (true) {
|
|
if (before === null) {
|
|
options = { limit: limit };
|
|
} else {
|
|
options = { limit: limit, before: before };
|
|
}
|
|
|
|
let signaturesInfo = await connection.getConfirmedSignaturesForAddress2(
|
|
new PublicKey(address),
|
|
options,
|
|
);
|
|
signatures = signaturesInfo.map((x) => x['signature']);
|
|
slots = signaturesInfo.map((x) => x['slot']);
|
|
|
|
if (signaturesInfo[0].slot < afterSlot) {
|
|
// Possibly this happens when validators are switched at the endpoint?
|
|
// In the absense of throwing this error the code will keep fetching signatures until to reaches the end of the signatures stored by the rpc node
|
|
throw ('most recent fetched signature has slot less than afterSlot')
|
|
}
|
|
|
|
console.log('slots ' + signaturesInfo[0].slot + ' to ' + signaturesInfo[signaturesInfo.length - 1].slot)
|
|
console.log(
|
|
new Date(
|
|
signaturesInfo[signaturesInfo.length - 1].blockTime! * 1000,
|
|
).toISOString(),
|
|
);
|
|
|
|
// Stop when we reach a slot we have already stored in the database
|
|
// Use slot instead of signature here as can have multiple signatures per slot and signatures are
|
|
// stored in a arbitray order per slot - leading to attempting to insert a duplicate signature
|
|
// If a slot is already seen - will have all signatures in that slot in the db
|
|
let afterSlotIndex = slots.indexOf(afterSlot);
|
|
if (afterSlotIndex !== -1) {
|
|
allSignaturesInfo = allSignaturesInfo.concat(
|
|
signaturesInfo.slice(0, afterSlotIndex),
|
|
);
|
|
break;
|
|
} else {
|
|
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
|
|
// otherwise we have an issue where the rpc endpoint does not have enough history
|
|
if (signaturesInfo.length !== limit) {
|
|
throw (
|
|
'rpc endpoint does not have sufficient signature history to reach slot ' +
|
|
afterSlot
|
|
);
|
|
}
|
|
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
|
|
}
|
|
before = signatures[signatures.length - 1];
|
|
|
|
await sleep(requestWaitTime);
|
|
}
|
|
|
|
return allSignaturesInfo;
|
|
}
|
|
|
|
export async function insertSignatures(
|
|
address: string,
|
|
pool,
|
|
schema: string,
|
|
newSignatures: ConfirmedSignatureInfo[]
|
|
) {
|
|
|
|
// By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest
|
|
// We reverse the order to oldest -> newest here
|
|
// This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)
|
|
// Also ensures that the auto increment id in our table is incremented oldest -> newest
|
|
newSignatures = newSignatures.reverse();
|
|
|
|
const inserts = newSignatures.map((signatureInfo) => ({
|
|
signature: signatureInfo.signature,
|
|
program_pk: address,
|
|
block_time: signatureInfo.blockTime,
|
|
block_datetime: new Date(signatureInfo.blockTime! * 1000).toISOString(),
|
|
slot: signatureInfo.slot,
|
|
err: signatureInfo.err === null ? 0 : 1,
|
|
process_state: 'unprocessed',
|
|
worker_partition: signatureInfo.slot % 10
|
|
}));
|
|
// Seems to be a bug in getConfirmedSignaturesForAddress2 where very rarely I can get duplicate signatures (separated by a few signatures in between)
|
|
// So redup here
|
|
let uniqueInserts = uniqueOnSignature(inserts);
|
|
|
|
let columns = [
|
|
'signature',
|
|
'program_pk',
|
|
'block_time',
|
|
'block_datetime',
|
|
'slot',
|
|
'err',
|
|
'process_state',
|
|
'worker_partition'
|
|
];
|
|
let table = 'transactions';
|
|
let batchSize = 10000;
|
|
|
|
await bulkBatchInsert(pool, table, columns, uniqueInserts, batchSize, schema);
|
|
|
|
console.log('inserted ' + newSignatures.length + ' signatures');
|
|
}
|
|
|
|
function uniqueOnSignature(inserts) {
|
|
var seen = {};
|
|
return inserts.filter(function (e) {
|
|
return seen.hasOwnProperty(e.signature)
|
|
? false
|
|
: (seen[e.signature] = true);
|
|
});
|
|
}
|