From 319153140d997602d3afc4e5b43663dbcb51fd94 Mon Sep 17 00:00:00 2001 From: Nicholas Clarke Date: Tue, 14 Dec 2021 09:24:40 -0800 Subject: [PATCH] Reduce risk of inconsistent DBs on code interruption. --- src/index.ts | 75 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/src/index.ts b/src/index.ts index 99ecfe3..1ec1ed8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,9 +26,49 @@ async function insertMangoTransactions( const batchSize = 1000; - let columnSets = {}; let tableName; + tableName = 'transactions'; + let table = new pgp.helpers.TableName({ table: tableName, schema: schema }); + const transactionSummariesCs = new pgp.helpers.ColumnSet( + ['?signature', 'process_state', 'log_messages'], + { table: table }, + ); + + let rawClient = await rawTransactionsPool.connect(); + try { + await rawClient.query('BEGIN'); + + for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) { + let updatesBatch = transactionSummaries.slice(i, i + batchSize); + let updatedSql = + pgp.helpers.update(updatesBatch, transactionSummariesCs) + + ' WHERE v.signature = t.signature'; + await rawClient.query(updatedSql); + } + + // We need to keep the two databases we are updating consistent - so I've placed this here to reduce + // the risk of a state where one transaction is committed and then the code is interrupted before the other is committed + await insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions); + + console.log(transactionSummaries.length + ' process states updated'); + await rawClient.query('COMMIT'); + } catch (e) { + await rawClient.query('ROLLBACK'); + console.log('transaction rolled back: transaction summary') + throw e; + } finally { + rawClient.release(); + } +} + +async function insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions) { + + const batchSize = 1000; + + let tableName; + let columnSets = {}; let inserts; + for ([tableName, inserts] of Object.entries(parsedTransactions)) { if (inserts.length > 0) { let table = new pgp.helpers.TableName({ @@ -60,44 +100,17 @@ async function insertMangoTransactions( console.log(inserts.length + ' records inserted into ' + tableName); } } + + console.log('parsed transactions inserted'); await parsedClient.query('COMMIT'); } catch (e) { await parsedClient.query('ROLLBACK'); - console.log('transaction rolled back: parsed') + console.log('transaction rolled back: parsed transactions') // TODO: check settle fees throw e; } finally { parsedClient.release(); } - - tableName = 'transactions'; - let table = new pgp.helpers.TableName({ table: tableName, schema: schema }); - const transactionSummariesCs = new pgp.helpers.ColumnSet( - ['?signature', 'process_state', 'log_messages'], - { table: table }, - ); - - let rawClient = await rawTransactionsPool.connect(); - try { - await rawClient.query('BEGIN'); - - for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) { - let updatesBatch = transactionSummaries.slice(i, i + batchSize); - let updatedSql = - pgp.helpers.update(updatesBatch, transactionSummariesCs) + - ' WHERE v.signature = t.signature'; - await rawClient.query(updatedSql); - } - console.log(transactionSummaries.length + ' process states updated'); - - await rawClient.query('COMMIT'); - } catch (e) { - await rawClient.query('ROLLBACK'); - console.log('transaction rolled back: raw') - throw e; - } finally { - rawClient.release(); - } } async function processMangoTransactions(