mango-transaction-scraper-v3/src/index.ts

249 lines
7.0 KiB
TypeScript

import { Connection } from '@solana/web3.js';
import { sleep } from '@blockworks-foundation/mango-client';
import { insertSignatures, getNewSignatures } from './signatures';
import { Pool } from 'pg';
import { notify } from './utils';
import { getTransactions, getUnprocessedSignatures } from './getTransactions';
import { parseTransactions } from './parseTransactions';
const pgp = require('pg-promise')({
capSQL: true,
});
const mangoProgramId =
process.env.MANGO_PROGRAM_ID ||
'5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH';
async function insertMangoTransactions(
rawTransactionsPool,
parsedTransactionsPool,
schema,
transactionSummaries,
parsedTransactions,
) {
// Insert parsed transactions to appropriate tables on timescaledb
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases)
console.log('starting inserts')
const batchSize = 1000;
let tableName;
tableName = 'transactions';
let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
const transactionSummariesCs = new pgp.helpers.ColumnSet(
['?signature', 'process_state', 'log_messages'],
{ table: table },
);
let rawClient = await rawTransactionsPool.connect();
try {
await rawClient.query('BEGIN');
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql =
pgp.helpers.update(updatesBatch, transactionSummariesCs) +
' WHERE v.signature = t.signature';
await rawClient.query(updatedSql);
}
// We need to keep the two databases we are updating consistent - so I've placed this here to reduce
// the risk of a state where one transaction is committed and then the code is interrupted before the other is committed
await insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions);
console.log(transactionSummaries.length + ' process states updated');
await rawClient.query('COMMIT');
} catch (e) {
await rawClient.query('ROLLBACK');
console.log('transaction rolled back: transaction summary')
throw e;
} finally {
rawClient.release();
}
console.log('finished inserts')
}
async function insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions) {
const batchSize = 1000;
let tableName;
let columnSets = {};
let inserts;
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
if (inserts.length > 0) {
let table = new pgp.helpers.TableName({
table: tableName,
schema: schema,
});
columnSets[tableName] = new pgp.helpers.ColumnSet(
Object.keys(inserts[0]),
{ table: table },
);
}
}
let parsedClient = await parsedTransactionsPool.connect();
try {
await parsedClient.query('BEGIN');
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
if (inserts.length > 0) {
console.log(tableName + ' insert started');
for (let i = 0, j = inserts.length; i < j; i += batchSize) {
let insertsBatch = inserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(
insertsBatch,
columnSets[tableName],
);
await parsedClient.query(insertsSql);
}
console.log(inserts.length + ' records inserted into ' + tableName);
}
}
console.log('parsed transactions inserted');
await parsedClient.query('COMMIT');
} catch (e) {
await parsedClient.query('ROLLBACK');
console.log('transaction rolled back: parsed transactions')
// TODO: check settle fees
throw e;
} finally {
parsedClient.release();
}
}
async function processMangoTransactions(
clusterConnection,
requestWaitTime,
address,
rawTransactionsPool,
parsedTransactionsPool,
schema,
workerPartitionStart,
workerPartitionEnd,
parsingBatchSize,
) {
let signaturesToProcess = await getUnprocessedSignatures(
rawTransactionsPool,
address,
schema,
workerPartitionStart,
workerPartitionEnd,
parsingBatchSize
);
let transactions = await getTransactions(
signaturesToProcess,
clusterConnection,
requestWaitTime
);
let [transactionSummaries, parsedTransactions] = parseTransactions(
transactions,
mangoProgramId,
);
await insertMangoTransactions(
rawTransactionsPool,
parsedTransactionsPool,
schema,
transactionSummaries,
parsedTransactions,
);
}
async function consumeTransactions() {
const clusterUrl =
process.env.CLUSTER_URL || 'https://api.mainnet-beta.solana.com';
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
let parsingBatchSize = parseInt(process.env.PARSING_BATCH_SIZE!) || 10000;
const rawConnectionString = process.env.CONNECTION_STRING_RAW;
const parsedConnectionString = process.env.CONNECTION_STRING_PARSED;
// Allow transaction parsers to work independently - worker_partition is a number from 0 - 9. Can be divided amongst parsers.
const workerPartitionStart = parseInt(process.env.WORKER_PARTITION_START!);
const workerPartitionEnd = parseInt(process.env.WORKER_PARTITION_END!);
// Only one process should insert new signatures. 1 for true, 0 for false.
const insertSignaturesBool = parseInt(process.env.INSERT_SIGNATURES!) === 1;
let schema = 'transactions_v3';
console.log(clusterUrl);
let clusterConnection = new Connection(clusterUrl, 'finalized');
const rawTransactionsPool = new Pool({
connectionString: rawConnectionString,
ssl: {
rejectUnauthorized: false,
},
});
const parsedTransactionsPool = new Pool({
connectionString: parsedConnectionString,
ssl: {
rejectUnauthorized: false,
},
});
console.log('Initialized');
notify('v3: Initialized');
while (true) {
console.log('Refreshing transactions ' + Date());
if (insertSignaturesBool) {
let newSignatures = await getNewSignatures(clusterConnection,
mangoProgramId,
requestWaitTime,
rawTransactionsPool,
schema)
await insertSignatures(
mangoProgramId,
rawTransactionsPool,
schema,
newSignatures
)
}
await processMangoTransactions(
clusterConnection,
requestWaitTime,
mangoProgramId,
rawTransactionsPool,
parsedTransactionsPool,
schema,
workerPartitionStart,
workerPartitionEnd,
parsingBatchSize,
);
console.log('Refresh complete');
await sleep(10 * 1000);
}
}
async function main() {
while (true) {
try {
await consumeTransactions();
} catch (e: any) {
notify('v3: ' + e.toString());
console.log(e, e.stack);
// Wait for 60 seconds
await sleep(60 * 1000);
}
}
}
// Stop program crashing on rejected promises
process.on('unhandledRejection', function (err, promise) {
console.error('unhandled rejection', err, promise);
});
main();