249 lines
7.0 KiB
TypeScript
249 lines
7.0 KiB
TypeScript
import { Connection } from '@solana/web3.js';
|
|
import { sleep } from '@blockworks-foundation/mango-client';
|
|
import { insertSignatures, getNewSignatures } from './signatures';
|
|
import { Pool } from 'pg';
|
|
import { notify } from './utils';
|
|
import { getTransactions, getUnprocessedSignatures } from './getTransactions';
|
|
import { parseTransactions } from './parseTransactions';
|
|
|
|
const pgp = require('pg-promise')({
|
|
capSQL: true,
|
|
});
|
|
|
|
const mangoProgramId =
|
|
process.env.MANGO_PROGRAM_ID ||
|
|
'5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH';
|
|
|
|
async function insertMangoTransactions(
|
|
rawTransactionsPool,
|
|
parsedTransactionsPool,
|
|
schema,
|
|
transactionSummaries,
|
|
parsedTransactions,
|
|
) {
|
|
// Insert parsed transactions to appropriate tables on timescaledb
|
|
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases)
|
|
|
|
console.log('starting inserts')
|
|
|
|
const batchSize = 1000;
|
|
|
|
let tableName;
|
|
tableName = 'transactions';
|
|
let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
|
|
const transactionSummariesCs = new pgp.helpers.ColumnSet(
|
|
['?signature', 'process_state', 'log_messages'],
|
|
{ table: table },
|
|
);
|
|
|
|
let rawClient = await rawTransactionsPool.connect();
|
|
try {
|
|
await rawClient.query('BEGIN');
|
|
|
|
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
|
|
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
|
|
let updatedSql =
|
|
pgp.helpers.update(updatesBatch, transactionSummariesCs) +
|
|
' WHERE v.signature = t.signature';
|
|
await rawClient.query(updatedSql);
|
|
}
|
|
|
|
// We need to keep the two databases we are updating consistent - so I've placed this here to reduce
|
|
// the risk of a state where one transaction is committed and then the code is interrupted before the other is committed
|
|
await insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions);
|
|
|
|
console.log(transactionSummaries.length + ' process states updated');
|
|
await rawClient.query('COMMIT');
|
|
} catch (e) {
|
|
await rawClient.query('ROLLBACK');
|
|
console.log('transaction rolled back: transaction summary')
|
|
throw e;
|
|
} finally {
|
|
rawClient.release();
|
|
}
|
|
|
|
console.log('finished inserts')
|
|
}
|
|
|
|
async function insertParsedTransactions(parsedTransactionsPool, schema, parsedTransactions) {
|
|
|
|
const batchSize = 1000;
|
|
|
|
let tableName;
|
|
let columnSets = {};
|
|
let inserts;
|
|
|
|
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
|
|
if (inserts.length > 0) {
|
|
let table = new pgp.helpers.TableName({
|
|
table: tableName,
|
|
schema: schema,
|
|
});
|
|
columnSets[tableName] = new pgp.helpers.ColumnSet(
|
|
Object.keys(inserts[0]),
|
|
{ table: table },
|
|
);
|
|
}
|
|
}
|
|
|
|
let parsedClient = await parsedTransactionsPool.connect();
|
|
try {
|
|
await parsedClient.query('BEGIN');
|
|
|
|
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
|
|
if (inserts.length > 0) {
|
|
console.log(tableName + ' insert started');
|
|
for (let i = 0, j = inserts.length; i < j; i += batchSize) {
|
|
let insertsBatch = inserts.slice(i, i + batchSize);
|
|
let insertsSql = pgp.helpers.insert(
|
|
insertsBatch,
|
|
columnSets[tableName],
|
|
);
|
|
await parsedClient.query(insertsSql);
|
|
}
|
|
console.log(inserts.length + ' records inserted into ' + tableName);
|
|
}
|
|
}
|
|
|
|
console.log('parsed transactions inserted');
|
|
await parsedClient.query('COMMIT');
|
|
} catch (e) {
|
|
await parsedClient.query('ROLLBACK');
|
|
console.log('transaction rolled back: parsed transactions')
|
|
// TODO: check settle fees
|
|
throw e;
|
|
} finally {
|
|
parsedClient.release();
|
|
}
|
|
}
|
|
|
|
async function processMangoTransactions(
|
|
clusterConnection,
|
|
requestWaitTime,
|
|
address,
|
|
rawTransactionsPool,
|
|
parsedTransactionsPool,
|
|
schema,
|
|
workerPartitionStart,
|
|
workerPartitionEnd,
|
|
parsingBatchSize,
|
|
) {
|
|
|
|
let signaturesToProcess = await getUnprocessedSignatures(
|
|
rawTransactionsPool,
|
|
address,
|
|
schema,
|
|
workerPartitionStart,
|
|
workerPartitionEnd,
|
|
parsingBatchSize
|
|
);
|
|
|
|
let transactions = await getTransactions(
|
|
signaturesToProcess,
|
|
clusterConnection,
|
|
requestWaitTime
|
|
);
|
|
|
|
let [transactionSummaries, parsedTransactions] = parseTransactions(
|
|
transactions,
|
|
mangoProgramId,
|
|
);
|
|
await insertMangoTransactions(
|
|
rawTransactionsPool,
|
|
parsedTransactionsPool,
|
|
schema,
|
|
transactionSummaries,
|
|
parsedTransactions,
|
|
);
|
|
}
|
|
|
|
async function consumeTransactions() {
|
|
const clusterUrl =
|
|
process.env.CLUSTER_URL || 'https://api.mainnet-beta.solana.com';
|
|
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
|
|
let parsingBatchSize = parseInt(process.env.PARSING_BATCH_SIZE!) || 10000;
|
|
const rawConnectionString = process.env.CONNECTION_STRING_RAW;
|
|
const parsedConnectionString = process.env.CONNECTION_STRING_PARSED;
|
|
|
|
// Allow transaction parsers to work independently - worker_partition is a number from 0 - 9. Can be divided amongst parsers.
|
|
const workerPartitionStart = parseInt(process.env.WORKER_PARTITION_START!);
|
|
const workerPartitionEnd = parseInt(process.env.WORKER_PARTITION_END!);
|
|
|
|
// Only one process should insert new signatures. 1 for true, 0 for false.
|
|
const insertSignaturesBool = parseInt(process.env.INSERT_SIGNATURES!) === 1;
|
|
|
|
let schema = 'transactions_v3';
|
|
|
|
console.log(clusterUrl);
|
|
let clusterConnection = new Connection(clusterUrl, 'finalized');
|
|
const rawTransactionsPool = new Pool({
|
|
connectionString: rawConnectionString,
|
|
ssl: {
|
|
rejectUnauthorized: false,
|
|
},
|
|
});
|
|
const parsedTransactionsPool = new Pool({
|
|
connectionString: parsedConnectionString,
|
|
ssl: {
|
|
rejectUnauthorized: false,
|
|
},
|
|
});
|
|
|
|
console.log('Initialized');
|
|
notify('v3: Initialized');
|
|
while (true) {
|
|
console.log('Refreshing transactions ' + Date());
|
|
|
|
if (insertSignaturesBool) {
|
|
let newSignatures = await getNewSignatures(clusterConnection,
|
|
mangoProgramId,
|
|
requestWaitTime,
|
|
rawTransactionsPool,
|
|
schema)
|
|
|
|
await insertSignatures(
|
|
mangoProgramId,
|
|
rawTransactionsPool,
|
|
schema,
|
|
newSignatures
|
|
)
|
|
}
|
|
|
|
await processMangoTransactions(
|
|
clusterConnection,
|
|
requestWaitTime,
|
|
mangoProgramId,
|
|
rawTransactionsPool,
|
|
parsedTransactionsPool,
|
|
schema,
|
|
workerPartitionStart,
|
|
workerPartitionEnd,
|
|
parsingBatchSize,
|
|
);
|
|
|
|
console.log('Refresh complete');
|
|
await sleep(10 * 1000);
|
|
}
|
|
}
|
|
|
|
async function main() {
|
|
while (true) {
|
|
try {
|
|
await consumeTransactions();
|
|
} catch (e: any) {
|
|
notify('v3: ' + e.toString());
|
|
console.log(e, e.stack);
|
|
// Wait for 60 seconds
|
|
await sleep(60 * 1000);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop program crashing on rejected promises
|
|
process.on('unhandledRejection', function (err, promise) {
|
|
console.error('unhandled rejection', err, promise);
|
|
});
|
|
|
|
|
|
main();
|