diff --git a/src/index.ts b/src/index.ts index 5a3009d..551e442 100644 --- a/src/index.ts +++ b/src/index.ts @@ -310,11 +310,11 @@ function parseOracleTransactions(transactions) { return [processStates, transactionSummaries, oracleTransactions] } -async function getUnprocessedSignatures(pool, account) { +async function getUnprocessedSignatures(pool, account, limit) { const client = await pool.connect(); let signatures; try { - const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc", [account]) + const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc limit $2", [account, limit]) signatures = res.rows.map(e => e['signature']) } finally { client.release() @@ -508,9 +508,9 @@ async function insertOracleTransactions(pool, processStates, transactionSummarie } } -async function getNewAddressTransactions(connection, address, requestWaitTime, pool) { +async function getNewAddressTransactions(connection, address, requestWaitTime, pool, limit) { - let signaturesToProcess = (await getUnprocessedSignatures(pool, address)) + let signaturesToProcess = (await getUnprocessedSignatures(pool, address, limit)) let promises: Promise[] = []; let transactions: any[] = []; @@ -532,22 +532,33 @@ async function getNewAddressTransactions(connection, address, requestWaitTime, p return transactions } -async function processOracleTransactions(connection, address, pool, requestWaitTime) { +async function processOracleTransactions(connection, address, pool, requestWaitTime, limit) { - let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool) + while (true) { + let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit) - let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions) - - await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions) + let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions) + + await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions) + + if (transactions.length < limit) { + break + } + } } -async function processMangoTransactions(connection, address, pool, requestWaitTime) { +async function processMangoTransactions(connection, address, pool, requestWaitTime, limit) { - let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool) + while (true) { + let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit) - let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions) + let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions) - await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts) + await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts) + if (transactions.length < limit) { + break + } + } } async function consumeTransactions() { @@ -579,14 +590,16 @@ async function consumeTransactions() { // Order of inserting transactions important - inserting deposit_withdraw relies on having all oracle prices available // So get new signatures of oracle transactions after mango transactions and insert oracle transactions first + let limit = 10000; + await insertNewSignatures(mangoProgramPk, connection, pool, requestWaitTime); await insertNewSignatures(oracleProgramPk, connection, pool, requestWaitTime); - await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime); - await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime); + await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime, limit); + await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime, limit); console.log('Refresh complete') // Unnecessary but let's give the servers a break - await sleep(20*1000) + await sleep(1000) } }