import { Connection } from '@solana/web3.js'; import { sleep } from '@blockworks-foundation/mango-client'; import { insertSignatures, getNewSignatures } from './signatures'; import { Pool } from 'pg'; import { notify } from './utils'; import { getTransactions, getUnprocessedSignatures } from './getTransactions'; import { parseTransactions } from './parseTransactions'; const pgp = require('pg-promise')({ capSQL: true, }); const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH'; async function insertMangoTransactions( rawTransactionsPool, parsedTransactionsPool, schema, transactionSummaries, parsedTransactions, ) { // Insert parsed transactions to appropriate tables on timescaledb // Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases) console.log('starting inserts') const batchSize = 1000; let tableName; tableName = 'transactions'; let table = new pgp.helpers.TableName({ table: tableName, schema: schema }); const transactionSummariesCs = new pgp.helpers.ColumnSet( ['?signature', 'process_state', 'log_messages'], { table: table }, ); let rawClient = await rawTransactionsPool.connect(); try { await rawClient.query('BEGIN'); for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) { let updatesBatch = transactionSummaries.slice(i, i + batchSize); let updatedSql = pgp.helpers.update(updatesBatch, transactionSummariesCs) + ' WHERE v.signature = t.signature'; await rawClient.query(updatedSql); } // We need to keep the two databases we are updating consistent - so I've placed this here to reduce // the risk of a state where one transaction is committed and then the code is interrupted before the other is committed await insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions); console.log(transactionSummaries.length + ' process states updated'); await rawClient.query('COMMIT'); } catch (e) { await rawClient.query('ROLLBACK'); console.log('transaction rolled back: transaction summary') throw e; } finally { rawClient.release(); } console.log('finished inserts') } async function insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions) { const batchSize = 1000; let tableName; let columnSets = {}; let inserts; for ([tableName, inserts] of Object.entries(parsedTransactions)) { if (inserts.length > 0) { let table = new pgp.helpers.TableName({ table: tableName, schema: schema, }); columnSets[tableName] = new pgp.helpers.ColumnSet( Object.keys(inserts[0]), { table: table }, ); } } let parsedClient = await parsedTransactionsPool.connect(); try { await parsedClient.query('BEGIN'); for ([tableName, inserts] of Object.entries(parsedTransactions)) { if (inserts.length > 0) { console.log(tableName + ' insert started'); for (let i = 0, j = inserts.length; i < j; i += batchSize) { let insertsBatch = inserts.slice(i, i + batchSize); let insertsSql = pgp.helpers.insert( insertsBatch, columnSets[tableName], ); await parsedClient.query(insertsSql); } console.log(inserts.length + ' records inserted into ' + tableName); } } console.log('parsed transactions inserted'); await parsedClient.query('COMMIT'); } catch (e) { await parsedClient.query('ROLLBACK'); console.log('transaction rolled back: parsed transactions') // TODO: check settle fees throw e; } finally { parsedClient.release(); } } async function processMangoTransactions( clusterConnection, requestWaitTime, address, rawTransactionsPool, parsedTransactionsPool, schema, workerPartitionStart, workerPartitionEnd, parsingBatchSize, ) { let signaturesToProcess = await getUnprocessedSignatures( rawTransactionsPool, address, schema, workerPartitionStart, workerPartitionEnd, parsingBatchSize ); let transactions = await getTransactions( signaturesToProcess, clusterConnection, requestWaitTime ); let [transactionSummaries, parsedTransactions] = parseTransactions( transactions, mangoProgramId, ); await insertMangoTransactions( rawTransactionsPool, parsedTransactionsPool, schema, transactionSummaries, parsedTransactions, ); } async function consumeTransactions() { const clusterUrl = process.env.CLUSTER_URL || 'https://api.mainnet-beta.solana.com'; let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500; let parsingBatchSize = parseInt(process.env.PARSING_BATCH_SIZE!) || 10000; const rawConnectionString = process.env.CONNECTION_STRING_RAW; const parsedConnectionString = process.env.CONNECTION_STRING_PARSED; // Allow transaction parsers to work independently - worker_partition is a number from 0 - 9. Can be divided amongst parsers. const workerPartitionStart = parseInt(process.env.WORKER_PARTITION_START!); const workerPartitionEnd = parseInt(process.env.WORKER_PARTITION_END!); // Only one process should insert new signatures. 1 for true, 0 for false. const insertSignaturesBool = parseInt(process.env.INSERT_SIGNATURES!) === 1; let schema = 'transactions_v3'; console.log(clusterUrl); let clusterConnection = new Connection(clusterUrl, 'finalized'); const rawTransactionsPool = new Pool({ connectionString: rawConnectionString, ssl: { rejectUnauthorized: false, }, }); const parsedTransactionsPool = new Pool({ connectionString: parsedConnectionString, ssl: { rejectUnauthorized: false, }, }); console.log('Initialized'); notify('v3: Initialized'); while (true) { console.log('Refreshing transactions ' + Date()); if (insertSignaturesBool) { let newSignatures = await getNewSignatures(clusterConnection, mangoProgramId, requestWaitTime, rawTransactionsPool, schema) await insertSignatures( mangoProgramId, rawTransactionsPool, schema, newSignatures ) } await processMangoTransactions( clusterConnection, requestWaitTime, mangoProgramId, rawTransactionsPool, parsedTransactionsPool, schema, workerPartitionStart, workerPartitionEnd, parsingBatchSize, ); console.log('Refresh complete'); await sleep(10 * 1000); } } async function main() { while (true) { try { await consumeTransactions(); } catch (e: any) { notify('v3: ' + e.toString()); console.log(e, e.stack); // Wait for 60 seconds await sleep(60 * 1000); } } } // Stop program crashing on rejected promises process.on('unhandledRejection', function (err, promise) { console.error('unhandled rejection', err, promise); }); main();