Reduce risk of inconsistent DBs on code interruption.
This commit is contained in:
parent
b6ff171a3d
commit
319153140d
75
src/index.ts
75
src/index.ts
|
@ -26,9 +26,49 @@ async function insertMangoTransactions(
|
|||
|
||||
const batchSize = 1000;
|
||||
|
||||
let columnSets = {};
|
||||
let tableName;
|
||||
tableName = 'transactions';
|
||||
let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
|
||||
const transactionSummariesCs = new pgp.helpers.ColumnSet(
|
||||
['?signature', 'process_state', 'log_messages'],
|
||||
{ table: table },
|
||||
);
|
||||
|
||||
let rawClient = await rawTransactionsPool.connect();
|
||||
try {
|
||||
await rawClient.query('BEGIN');
|
||||
|
||||
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
|
||||
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
|
||||
let updatedSql =
|
||||
pgp.helpers.update(updatesBatch, transactionSummariesCs) +
|
||||
' WHERE v.signature = t.signature';
|
||||
await rawClient.query(updatedSql);
|
||||
}
|
||||
|
||||
// We need to keep the two databases we are updating consistent - so I've placed this here to reduce
|
||||
// the risk of a state where one transaction is committed and then the code is interrupted before the other is committed
|
||||
await insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions);
|
||||
|
||||
console.log(transactionSummaries.length + ' process states updated');
|
||||
await rawClient.query('COMMIT');
|
||||
} catch (e) {
|
||||
await rawClient.query('ROLLBACK');
|
||||
console.log('transaction rolled back: transaction summary')
|
||||
throw e;
|
||||
} finally {
|
||||
rawClient.release();
|
||||
}
|
||||
}
|
||||
|
||||
async function insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions) {
|
||||
|
||||
const batchSize = 1000;
|
||||
|
||||
let tableName;
|
||||
let columnSets = {};
|
||||
let inserts;
|
||||
|
||||
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
|
||||
if (inserts.length > 0) {
|
||||
let table = new pgp.helpers.TableName({
|
||||
|
@ -60,44 +100,17 @@ async function insertMangoTransactions(
|
|||
console.log(inserts.length + ' records inserted into ' + tableName);
|
||||
}
|
||||
}
|
||||
|
||||
console.log('parsed transactions inserted');
|
||||
await parsedClient.query('COMMIT');
|
||||
} catch (e) {
|
||||
await parsedClient.query('ROLLBACK');
|
||||
console.log('transaction rolled back: parsed')
|
||||
console.log('transaction rolled back: parsed transactions')
|
||||
// TODO: check settle fees
|
||||
throw e;
|
||||
} finally {
|
||||
parsedClient.release();
|
||||
}
|
||||
|
||||
tableName = 'transactions';
|
||||
let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
|
||||
const transactionSummariesCs = new pgp.helpers.ColumnSet(
|
||||
['?signature', 'process_state', 'log_messages'],
|
||||
{ table: table },
|
||||
);
|
||||
|
||||
let rawClient = await rawTransactionsPool.connect();
|
||||
try {
|
||||
await rawClient.query('BEGIN');
|
||||
|
||||
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
|
||||
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
|
||||
let updatedSql =
|
||||
pgp.helpers.update(updatesBatch, transactionSummariesCs) +
|
||||
' WHERE v.signature = t.signature';
|
||||
await rawClient.query(updatedSql);
|
||||
}
|
||||
console.log(transactionSummaries.length + ' process states updated');
|
||||
|
||||
await rawClient.query('COMMIT');
|
||||
} catch (e) {
|
||||
await rawClient.query('ROLLBACK');
|
||||
console.log('transaction rolled back: raw')
|
||||
throw e;
|
||||
} finally {
|
||||
rawClient.release();
|
||||
}
|
||||
}
|
||||
|
||||
async function processMangoTransactions(
|
||||
|
|
Loading…
Reference in New Issue