diff --git a/src/getTransactions.ts b/src/getTransactions.ts new file mode 100644 index 0000000..024721a --- /dev/null +++ b/src/getTransactions.ts @@ -0,0 +1,72 @@ +const pgp = require('pg-promise')({ + capSQL: true, +}); + +import { sleep } from '@blockworks-foundation/mango-client'; + +export async function getUnprocessedTransactions( + connection, + address, + requestWaitTime, + pool, + schema, + numTransactions +) { + + let signaturesToProcess = await getUnprocessedSignatures( + pool, + address, + schema, + numTransactions, + ); + + let promises: Promise[] = []; + let transactions: any[] = []; + let counter = 1; + for (let signature of signaturesToProcess) { + // Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json) + let args = [signature, { encoding: 'jsonParsed', commitment: 'finalized' }]; + let promise = connection + ._rpcRequest('getConfirmedTransaction', args) + .then((confirmedTransaction) => + transactions.push([signature, confirmedTransaction]), + ); + + console.log('requested ', counter, ' of ', signaturesToProcess.length); + counter++; + + promises.push(promise); + + // Limit request frequency to avoid request failures due to rate limiting + await sleep(requestWaitTime); + } + await (Promise as any).allSettled(promises); + + return transactions; +} + +async function getUnprocessedSignatures( + pool, + programPk, + schema, + limit, +) { + const client = await pool.connect(); + let signatures; + try { + // TODO: add back in order by id asc - but why does it make it so much slower? + const res = await client.query( + 'select signature from ' + + schema + + ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + + limit, + [programPk], + ); + + signatures = res.rows.map((e) => e['signature']); + } finally { + client.release(); + } + + return signatures; +} diff --git a/src/index.ts b/src/index.ts index 10decd4..99ecfe3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,7 @@ import { Cluster, sleep } from '@blockworks-foundation/mango-client'; import { insertNewSignatures } from './signatures'; import { Pool } from 'pg'; import { notify } from './utils'; -import { populateTransactions } from './insertTransactions'; +import { getUnprocessedTransactions } from './getTransactions'; import { parseTransactions } from './parseTransactions'; const pgp = require('pg-promise')({ @@ -18,12 +18,14 @@ async function insertMangoTransactions( rawTransactionsPool, parsedTransactionsPool, schema, - processStates, + transactionSummaries, parsedTransactions, ) { // Insert parsed transactions to appropriate tables on timescaledb // Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases) + const batchSize = 1000; + let columnSets = {}; let tableName; let inserts; @@ -40,10 +42,9 @@ async function insertMangoTransactions( } } - let batchSize = 1000; - let client = await parsedTransactionsPool.connect(); + let parsedClient = await parsedTransactionsPool.connect(); try { - await client.query('BEGIN'); + await parsedClient.query('BEGIN'); for ([tableName, inserts] of Object.entries(parsedTransactions)) { if (inserts.length > 0) { @@ -54,78 +55,64 @@ async function insertMangoTransactions( insertsBatch, columnSets[tableName], ); - await client.query(insertsSql); + await parsedClient.query(insertsSql); } console.log(inserts.length + ' records inserted into ' + tableName); } } - await client.query('COMMIT'); + await parsedClient.query('COMMIT'); } catch (e) { - await client.query('ROLLBACK'); - console.log('transaction rolled back') + await parsedClient.query('ROLLBACK'); + console.log('transaction rolled back: parsed') // TODO: check settle fees throw e; } finally { - client.release(); + parsedClient.release(); } tableName = 'transactions'; let table = new pgp.helpers.TableName({ table: tableName, schema: schema }); - const processStateCs = new pgp.helpers.ColumnSet( - ['?signature', 'process_state'], + const transactionSummariesCs = new pgp.helpers.ColumnSet( + ['?signature', 'process_state', 'log_messages'], { table: table }, ); - if (processStates.length > 0) { - client = await rawTransactionsPool.connect(); - try { - await client.query('BEGIN'); + let rawClient = await rawTransactionsPool.connect(); + try { + await rawClient.query('BEGIN'); - for (let i = 0, j = processStates.length; i < j; i += batchSize) { - let updatesBatch = processStates.slice(i, i + batchSize); - let updatedSql = - pgp.helpers.update(updatesBatch, processStateCs) + - ' WHERE v.signature = t.signature'; - await client.query(updatedSql); - } - console.log(processStates.length + ' process states updated'); - - await client.query('COMMIT'); - } catch (e) { - await client.query('ROLLBACK'); - console.log('transaction rolled back') - throw e; - } finally { - client.release(); - } + for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) { + let updatesBatch = transactionSummaries.slice(i, i + batchSize); + let updatedSql = + pgp.helpers.update(updatesBatch, transactionSummariesCs) + + ' WHERE v.signature = t.signature'; + await rawClient.query(updatedSql); + } + console.log(transactionSummaries.length + ' process states updated'); + + await rawClient.query('COMMIT'); + } catch (e) { + await rawClient.query('ROLLBACK'); + console.log('transaction rolled back: raw') + throw e; + } finally { + rawClient.release(); } } - async function processMangoTransactions( + clusterConnection, + requestWaitTime, address, rawTransactionsPool, parsedTransactionsPool, schema, limit, ) { - const client = await rawTransactionsPool.connect(); - let res; - try { - res = await client.query( - 'select transaction, signature from ' + - schema + - ".transactions where process_state = 'ready for parsing' and program_pk = $1 limit $2", - [address, limit], - ); - } finally { - client.release(); - } - console.log('Fetched ' + res.rows.length + ' records to parse.') - - let transactions = res.rows.map((e) => [e.transaction, e.signature]); - let [processStates, parsedTransactions] = parseTransactions( + let transactions = await getUnprocessedTransactions(clusterConnection, address, requestWaitTime, rawTransactionsPool, schema, limit) + + let [transactionSummaries, parsedTransactions] = parseTransactions( transactions, mangoProgramId, ); @@ -133,7 +120,7 @@ async function processMangoTransactions( rawTransactionsPool, parsedTransactionsPool, schema, - processStates, + transactionSummaries, parsedTransactions, ); } @@ -148,7 +135,7 @@ async function consumeTransactions() { let schema = 'transactions_v3'; console.log(clusterUrl); - let connection = new Connection(clusterUrl, 'finalized'); + let clusterConnection = new Connection(clusterUrl, 'finalized'); const rawTransactionsPool = new Pool({ connectionString: rawConnectionString, ssl: { @@ -169,14 +156,7 @@ async function consumeTransactions() { await insertNewSignatures( mangoProgramId, - connection, - rawTransactionsPool, - requestWaitTime, - schema, - ); - await populateTransactions( - connection, - mangoProgramId, + clusterConnection, rawTransactionsPool, requestWaitTime, schema, @@ -184,6 +164,8 @@ async function consumeTransactions() { let transactionsParsingLimit = 50000; await processMangoTransactions( + clusterConnection, + requestWaitTime, mangoProgramId, rawTransactionsPool, parsedTransactionsPool, @@ -210,4 +192,10 @@ async function main() { } } +// Stop program crashing on rejected promises +process.on('unhandledRejection', function (err, promise) { + console.error('unhandled rejection', err, promise); +}); + + main(); diff --git a/src/insertTransactions.ts b/src/insertTransactions.ts index dd16c99..a4b769f 100644 --- a/src/insertTransactions.ts +++ b/src/insertTransactions.ts @@ -1,174 +1,113 @@ const pgp = require('pg-promise')({ - capSQL: true, -}); + capSQL: true + }); -import { sleep } from '@blockworks-foundation/mango-client'; + import {sleep} from '@blockworks-foundation/mango-client'; -export async function populateTransactions( - connection, - address, - pool, - requestWaitTime, - schema, -) { - let transactions = await getNewAddressSignaturesWithoutTransactions( - connection, - address, - requestWaitTime, - pool, - schema, - ); +export async function populateTransactions(connection, address, pool, requestWaitTime, schema) { - let [transactionInserts, transactionErrors] = - getTransactionInserts(transactions); + let transactions = await getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) + + let [transactionInserts, transactionErrors] = getTransactionInserts(transactions) + + await insertTransactions(pool, schema, transactionInserts, transactionErrors) - await insertTransactions(pool, schema, transactionInserts, transactionErrors); } -async function getNewAddressSignaturesWithoutTransactions( - connection, - address, - requestWaitTime, - pool, - schema, -) { - let limit = 25000; +async function getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) { - let signaturesToProcess = await getSignaturesWithoutTransactions( - pool, - address, - schema, - limit, - ); + let limit = 25000; - let promises: Promise[] = []; - let transactions: any[] = []; - let counter = 1; - for (let signature of signaturesToProcess) { - // Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json) - let args = [signature, { encoding: 'jsonParsed', commitment: 'finalized' }]; - let promise = connection - ._rpcRequest('getConfirmedTransaction', args) - .then((confirmedTransaction) => - transactions.push([signature, confirmedTransaction]), - ); + let signaturesToProcess = (await getSignaturesWithoutTransactions(pool, address, schema, limit)) + + let promises: Promise[] = []; + let transactions: any[] = []; + let counter = 1; + for (let signature of signaturesToProcess) { + // Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json) + let args = [signature, {encoding: 'jsonParsed', commitment: 'finalized'}] + let promise = connection._rpcRequest('getConfirmedTransaction', args).then(confirmedTransaction => transactions.push([signature, confirmedTransaction])); - console.log('requested ', counter, ' of ', signaturesToProcess.length); - counter++; + console.log('requested ', counter, ' of ', signaturesToProcess.length); + counter++; + + promises.push(promise); - promises.push(promise); + // Limit request frequency to avoid request failures due to rate limiting + await sleep(requestWaitTime); - // Limit request frequency to avoid request failures due to rate limiting - await sleep(requestWaitTime); - } - await (Promise as any).allSettled(promises); + } + await (Promise as any).allSettled(promises); + + return transactions - return transactions; } -async function getSignaturesWithoutTransactions( - pool, - programPk, - schema, - limit, -) { - const client = await pool.connect(); - let signatures; - try { - // TODO: add back in order by id asc - but why does it make it so much slower? - const res = await client.query( - 'select signature from ' + - schema + - ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + - limit, - [programPk], - ); +async function getSignaturesWithoutTransactions(pool, programPk, schema, limit) { + const client = await pool.connect(); + let signatures; + try { + // TODO: add back in order by id asc - but why does it make it so much slower? + const res = await client.query("select signature from " + schema + ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + limit, [programPk]) - signatures = res.rows.map((e) => e['signature']); - } finally { - client.release(); - } + signatures = res.rows.map(e => e['signature']) + } finally { + client.release() + } - return signatures; + return signatures; } function getTransactionInserts(transactions) { - let transactionInserts: any[] = []; - let processStates: any[] = []; + let transactionInserts: any[] = []; + let processStates: any[] = []; - for (let transaction of transactions) { - let [signature, confirmedTransaction] = transaction; + for (let transaction of transactions) { + let [signature, confirmedTransaction] = transaction; + try { + let transactionInsert = { + transaction: JSON.stringify(confirmedTransaction), + log_messages: confirmedTransaction.result!.meta!.logMessages!.join('\n'), + signature: signature + } + transactionInserts.push(transactionInsert) + processStates.push({signature: signature, process_state: 'ready for parsing'}) + } catch(e: any) { + console.log(e.stack) + processStates.push({signature: signature, process_state: 'transaction download error'}) + } + } + return [transactionInserts, processStates] +} + +async function insertTransactions(pool, schema, transactionInserts, processStates) { + const transactionsTable = new pgp.helpers.TableName({table: 'transactions', schema: schema}) + + const transactionCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'transaction'], {table: transactionsTable}); + const processStatesCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: transactionsTable}); + + let batchSize = 1000; + let client = await pool.connect() try { - let transactionInsert = { - transaction: JSON.stringify(confirmedTransaction), - log_messages: - confirmedTransaction.result!.meta!.logMessages!.join('\n'), - signature: signature, - }; - transactionInserts.push(transactionInsert); - processStates.push({ - signature: signature, - process_state: 'ready for parsing', - }); - } catch (e: any) { - console.log(e.stack); - processStates.push({ - signature: signature, - process_state: 'transaction download error', - }); + await client.query('BEGIN') + + for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) { + let updatesBatch = transactionInserts.slice(i, i + batchSize); + let updatedSql = pgp.helpers.update(updatesBatch, transactionCs) + ' WHERE v.signature = t.signature'; + await client.query(updatedSql) + } + + for (let i = 0, j = processStates.length; i < j; i += batchSize) { + let updatesBatch = processStates.slice(i, i + batchSize); + let updatedSql = pgp.helpers.update(updatesBatch, processStatesCs) + ' WHERE v.signature = t.signature'; + await client.query(updatedSql) + } + + await client.query('COMMIT') + } catch (e) { + await client.query('ROLLBACK') + throw e + } finally { + client.release() } - } - return [transactionInserts, processStates]; -} - -async function insertTransactions( - pool, - schema, - transactionInserts, - processStates, -) { - const transactionsTable = new pgp.helpers.TableName({ - table: 'transactions', - schema: schema, - }); - - const transactionCs = new pgp.helpers.ColumnSet( - ['?signature', 'log_messages', 'transaction'], - { table: transactionsTable }, - ); - const processStatesCs = new pgp.helpers.ColumnSet( - ['?signature', 'process_state'], - { table: transactionsTable }, - ); - - console.log('Starting transaction inserts') - - let batchSize = 1000; - let client = await pool.connect(); - try { - await client.query('BEGIN'); - - for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) { - let updatesBatch = transactionInserts.slice(i, i + batchSize); - let updatedSql = - pgp.helpers.update(updatesBatch, transactionCs) + - ' WHERE v.signature = t.signature'; - await client.query(updatedSql); - } - - for (let i = 0, j = processStates.length; i < j; i += batchSize) { - let updatesBatch = processStates.slice(i, i + batchSize); - let updatedSql = - pgp.helpers.update(updatesBatch, processStatesCs) + - ' WHERE v.signature = t.signature'; - await client.query(updatedSql); - } - - await client.query('COMMIT'); - } catch (e) { - await client.query('ROLLBACK'); - throw e; - } finally { - client.release(); - } } diff --git a/src/jsonParsers.ts b/src/jsonParsers.ts index 189ee83..62d7020 100644 --- a/src/jsonParsers.ts +++ b/src/jsonParsers.ts @@ -194,7 +194,7 @@ export function jsonParser(parsedTransactions, result, instructions, signature, blockDatetime, ), ); - } else if (instructionName === 'LiquidatePerpMarket') { + } else if (instructionName === 'LiquidatePerpMarket') { parsedTransactions.liquidate_perp_market.push( ...parseLiquidatePerpMarket( result.meta.logMessages, diff --git a/src/parseTransactions.ts b/src/parseTransactions.ts index 53fa7b9..7319a21 100644 --- a/src/parseTransactions.ts +++ b/src/parseTransactions.ts @@ -1,19 +1,12 @@ const bs58 = require('bs58'); import { MangoInstructionLayout, - IDS, - PerpMarket, - Config, - Cluster, - GroupConfig, - awaitTransactionSignatureConfirmation, - PerpMarketConfig, } from '@blockworks-foundation/mango-client'; import { jsonParser } from './jsonParsers'; import { anchorParser } from './anchorParsers'; export function parseTransactions(transactionsResult, mangoProgramId) { - let processStates: any = []; + let transactionSummaries: any = []; let parsedTransactions: any = { deposits_withdraws: [], @@ -46,73 +39,89 @@ export function parseTransactions(transactionsResult, mangoProgramId) { }; for (let transactionResult of transactionsResult) { - let [transactionJson, signature] = transactionResult; + let [signature, transaction] = transactionResult; + try { - let transaction = JSON.parse(transactionJson); let result = transaction.result; + let logMessages = result!.meta!.logMessages!.join('\n'); - if (result.meta.err !== null) { - processStates.push({ - signature: signature, - process_state: 'transaction error', - }); - } else { - let slot = result.slot; - let blockTime = result.blockTime; - let blockDatetime = new Date(blockTime * 1000).toISOString(); + try { - // only look at cases where instruction is for mango program id - // Skip attempting to parse serum instructions, etc - let instructions = result.transaction.message.instructions; - let innerInstructions = result.meta.innerInstructions.map(e => e.instructions).flat(); - instructions.push(...innerInstructions) - - instructions = instructions.filter( - (ix) => - ix.programId === mangoProgramId && - !( - // Old mango group - not in ids.json so have to hardcode here to ignore - ( - ix.accounts && - ix.accounts[0] === - '2WNLfEMzhgwBPn6QptT43SdZy9cXTUbVJCMdCfimg4oi' - ) - ), - ); - - // Anchor logging was deployed at slot 100936906 - use different parsing code before and after this slot - const ancorDeploymentSlot = 100936906 - if (slot < ancorDeploymentSlot) { - - // Populate instruction num and name for each instruction - let ixNum = 1; - for (const ix of instructions) { - let decodeData = bs58.decode(ix.data); - let decodedInstruction = MangoInstructionLayout.decode(decodeData, 0); - let instructionName = Object.keys(decodedInstruction)[0]; - ix.instructionNum = ixNum; - ix.instructionName = instructionName; - ixNum++; + if (result.meta.err !== null) { + transactionSummaries.push({ + signature: signature, + process_state: 'transaction error', + log_messages: logMessages + }); + } else { + let slot = result.slot; + let blockTime = result.blockTime; + let blockDatetime = new Date(blockTime * 1000).toISOString(); + + // only look at cases where instruction is for mango program id + // Skip attempting to parse serum instructions, etc + let instructions = result.transaction.message.instructions; + let innerInstructions = result.meta.innerInstructions.map(e => e.instructions).flat(); + instructions.push(...innerInstructions) + + instructions = instructions.filter( + (ix) => + ix.programId === mangoProgramId && + !( + // Old mango group - not in ids.json so have to hardcode here to ignore + ( + ix.accounts && + ix.accounts[0] === + '2WNLfEMzhgwBPn6QptT43SdZy9cXTUbVJCMdCfimg4oi' + ) + ), + ); + + // Anchor logging was deployed at slot 100936906 - use different parsing code before and after this slot + const ancorDeploymentSlot = 100936906 + if (slot < ancorDeploymentSlot) { + + // Populate instruction num and name for each instruction + let ixNum = 1; + for (const ix of instructions) { + let decodeData = bs58.decode(ix.data); + let decodedInstruction = MangoInstructionLayout.decode(decodeData, 0); + let instructionName = Object.keys(decodedInstruction)[0]; + ix.instructionNum = ixNum; + ix.instructionName = instructionName; + ixNum++; + } + + jsonParser(parsedTransactions, result, instructions, signature, blockTime, slot, blockDatetime) + } else { + anchorParser(parsedTransactions, result, signature, blockTime, slot, blockDatetime) } - jsonParser(parsedTransactions, result, instructions, signature, blockTime, slot, blockDatetime) - } else { - anchorParser(parsedTransactions, result, signature, blockTime, slot, blockDatetime) + transactionSummaries.push({ + signature: signature, + process_state: 'processed', + log_messages: logMessages + }); } - - processStates.push({ + } catch (e: any) { + console.log(e.stack); + transactionSummaries.push({ signature: signature, - process_state: 'processed', + process_state: 'parsing error', + log_messages: logMessages }); } - } catch (e: any) { - console.log(e.stack); - processStates.push({ + + } catch { + transactionSummaries.push({ signature: signature, - process_state: 'parsing error', + process_state: 'transaction download error', + log_messages: null }); } + + } - return [processStates, parsedTransactions]; + return [transactionSummaries, parsedTransactions]; } diff --git a/src/tests/testParseTransactions.ts b/src/tests/testParseTransactions.ts index 39c4088..51de54e 100644 --- a/src/tests/testParseTransactions.ts +++ b/src/tests/testParseTransactions.ts @@ -57,7 +57,7 @@ async function processMangoTransactions(rawTransactionsPool, schema, limit) { schema + '.transactions where signature in (' + signaturesSql + - ') order by id desc limit $1', + ') limit $1', [limit], ); } finally {