114 lines
4.3 KiB
TypeScript
114 lines
4.3 KiB
TypeScript
const pgp = require('pg-promise')({
|
|
capSQL: true
|
|
});
|
|
|
|
import {sleep} from '@blockworks-foundation/mango-client';
|
|
|
|
export async function populateTransactions(connection, address, pool, requestWaitTime, schema) {
|
|
|
|
let transactions = await getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema)
|
|
|
|
let [transactionInserts, transactionErrors] = getTransactionInserts(transactions)
|
|
|
|
await insertTransactions(pool, schema, transactionInserts, transactionErrors)
|
|
|
|
}
|
|
|
|
async function getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) {
|
|
|
|
let limit = 25000;
|
|
|
|
let signaturesToProcess = (await getSignaturesWithoutTransactions(pool, address, schema, limit))
|
|
|
|
let promises: Promise<void>[] = [];
|
|
let transactions: any[] = [];
|
|
let counter = 1;
|
|
for (let signature of signaturesToProcess) {
|
|
// Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json)
|
|
let args = [signature, {encoding: 'jsonParsed', commitment: 'finalized'}]
|
|
let promise = connection._rpcRequest('getConfirmedTransaction', args).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
|
|
|
|
console.log('requested ', counter, ' of ', signaturesToProcess.length);
|
|
counter++;
|
|
|
|
promises.push(promise);
|
|
|
|
// Limit request frequency to avoid request failures due to rate limiting
|
|
await sleep(requestWaitTime);
|
|
|
|
}
|
|
await (Promise as any).allSettled(promises);
|
|
|
|
return transactions
|
|
|
|
}
|
|
|
|
async function getSignaturesWithoutTransactions(pool, programPk, schema, limit) {
|
|
const client = await pool.connect();
|
|
let signatures;
|
|
try {
|
|
// TODO: add back in order by id asc - but why does it make it so much slower?
|
|
const res = await client.query("select signature from " + schema + ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + limit, [programPk])
|
|
|
|
signatures = res.rows.map(e => e['signature'])
|
|
} finally {
|
|
client.release()
|
|
}
|
|
|
|
return signatures;
|
|
}
|
|
|
|
function getTransactionInserts(transactions) {
|
|
let transactionInserts: any[] = [];
|
|
let processStates: any[] = [];
|
|
|
|
for (let transaction of transactions) {
|
|
let [signature, confirmedTransaction] = transaction;
|
|
try {
|
|
let transactionInsert = {
|
|
transaction: JSON.stringify(confirmedTransaction),
|
|
log_messages: confirmedTransaction.result!.meta!.logMessages!.join('\n'),
|
|
signature: signature
|
|
}
|
|
transactionInserts.push(transactionInsert)
|
|
processStates.push({signature: signature, process_state: 'ready for parsing'})
|
|
} catch(e: any) {
|
|
console.log(e.stack)
|
|
processStates.push({signature: signature, process_state: 'transaction download error'})
|
|
}
|
|
}
|
|
return [transactionInserts, processStates]
|
|
}
|
|
|
|
async function insertTransactions(pool, schema, transactionInserts, processStates) {
|
|
const transactionsTable = new pgp.helpers.TableName({table: 'transactions', schema: schema})
|
|
|
|
const transactionCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'transaction'], {table: transactionsTable});
|
|
const processStatesCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: transactionsTable});
|
|
|
|
let batchSize = 1000;
|
|
let client = await pool.connect()
|
|
try {
|
|
await client.query('BEGIN')
|
|
|
|
for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) {
|
|
let updatesBatch = transactionInserts.slice(i, i + batchSize);
|
|
let updatedSql = pgp.helpers.update(updatesBatch, transactionCs) + ' WHERE v.signature = t.signature';
|
|
await client.query(updatedSql)
|
|
}
|
|
|
|
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
|
|
let updatesBatch = processStates.slice(i, i + batchSize);
|
|
let updatedSql = pgp.helpers.update(updatesBatch, processStatesCs) + ' WHERE v.signature = t.signature';
|
|
await client.query(updatedSql)
|
|
}
|
|
|
|
await client.query('COMMIT')
|
|
} catch (e) {
|
|
await client.query('ROLLBACK')
|
|
throw e
|
|
} finally {
|
|
client.release()
|
|
}
|
|
}
|