Add limit to getNewAddressTransactions (encountered memory issues when ran against too many transactions).

This commit is contained in:
Nicholas Clarke 2021-11-17 23:19:06 -08:00
parent 5ac0dc673d
commit d728692eb7
1 changed files with 29 additions and 16 deletions

View File

@ -310,11 +310,11 @@ function parseOracleTransactions(transactions) {
return [processStates, transactionSummaries, oracleTransactions] return [processStates, transactionSummaries, oracleTransactions]
} }
async function getUnprocessedSignatures(pool, account) { async function getUnprocessedSignatures(pool, account, limit) {
const client = await pool.connect(); const client = await pool.connect();
let signatures; let signatures;
try { try {
const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc", [account]) const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc limit $2", [account, limit])
signatures = res.rows.map(e => e['signature']) signatures = res.rows.map(e => e['signature'])
} finally { } finally {
client.release() client.release()
@ -508,9 +508,9 @@ async function insertOracleTransactions(pool, processStates, transactionSummarie
} }
} }
async function getNewAddressTransactions(connection, address, requestWaitTime, pool) { async function getNewAddressTransactions(connection, address, requestWaitTime, pool, limit) {
let signaturesToProcess = (await getUnprocessedSignatures(pool, address)) let signaturesToProcess = (await getUnprocessedSignatures(pool, address, limit))
let promises: Promise<void>[] = []; let promises: Promise<void>[] = [];
let transactions: any[] = []; let transactions: any[] = [];
@ -532,22 +532,33 @@ async function getNewAddressTransactions(connection, address, requestWaitTime, p
return transactions return transactions
} }
async function processOracleTransactions(connection, address, pool, requestWaitTime) { async function processOracleTransactions(connection, address, pool, requestWaitTime, limit) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool) while (true) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit)
let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions) let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions)
await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions) await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions)
if (transactions.length < limit) {
break
}
}
} }
async function processMangoTransactions(connection, address, pool, requestWaitTime) { async function processMangoTransactions(connection, address, pool, requestWaitTime, limit) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool) while (true) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit)
let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions) let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions)
await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts) await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts)
if (transactions.length < limit) {
break
}
}
} }
async function consumeTransactions() { async function consumeTransactions() {
@ -579,14 +590,16 @@ async function consumeTransactions() {
// Order of inserting transactions important - inserting deposit_withdraw relies on having all oracle prices available // Order of inserting transactions important - inserting deposit_withdraw relies on having all oracle prices available
// So get new signatures of oracle transactions after mango transactions and insert oracle transactions first // So get new signatures of oracle transactions after mango transactions and insert oracle transactions first
let limit = 10000;
await insertNewSignatures(mangoProgramPk, connection, pool, requestWaitTime); await insertNewSignatures(mangoProgramPk, connection, pool, requestWaitTime);
await insertNewSignatures(oracleProgramPk, connection, pool, requestWaitTime); await insertNewSignatures(oracleProgramPk, connection, pool, requestWaitTime);
await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime); await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime, limit);
await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime); await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime, limit);
console.log('Refresh complete') console.log('Refresh complete')
// Unnecessary but let's give the servers a break // Unnecessary but let's give the servers a break
await sleep(20*1000) await sleep(1000)
} }
} }