Refactoring to reduce DB space requirements - no longer store entire transaction.

This commit is contained in:
Nicholas Clarke 2021-12-07 00:19:50 -08:00
parent 067ca2b087
commit aef85a6530
6 changed files with 282 additions and 274 deletions

72
src/getTransactions.ts Normal file
View File

@ -0,0 +1,72 @@
const pgp = require('pg-promise')({
capSQL: true,
});
import { sleep } from '@blockworks-foundation/mango-client';
export async function getUnprocessedTransactions(
connection,
address,
requestWaitTime,
pool,
schema,
numTransactions
) {
let signaturesToProcess = await getUnprocessedSignatures(
pool,
address,
schema,
numTransactions,
);
let promises: Promise<void>[] = [];
let transactions: any[] = [];
let counter = 1;
for (let signature of signaturesToProcess) {
// Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json)
let args = [signature, { encoding: 'jsonParsed', commitment: 'finalized' }];
let promise = connection
._rpcRequest('getConfirmedTransaction', args)
.then((confirmedTransaction) =>
transactions.push([signature, confirmedTransaction]),
);
console.log('requested ', counter, ' of ', signaturesToProcess.length);
counter++;
promises.push(promise);
// Limit request frequency to avoid request failures due to rate limiting
await sleep(requestWaitTime);
}
await (Promise as any).allSettled(promises);
return transactions;
}
async function getUnprocessedSignatures(
pool,
programPk,
schema,
limit,
) {
const client = await pool.connect();
let signatures;
try {
// TODO: add back in order by id asc - but why does it make it so much slower?
const res = await client.query(
'select signature from ' +
schema +
".transactions where process_state = 'unprocessed' and program_pk = $1 limit " +
limit,
[programPk],
);
signatures = res.rows.map((e) => e['signature']);
} finally {
client.release();
}
return signatures;
}

View File

@ -3,7 +3,7 @@ import { Cluster, sleep } from '@blockworks-foundation/mango-client';
import { insertNewSignatures } from './signatures'; import { insertNewSignatures } from './signatures';
import { Pool } from 'pg'; import { Pool } from 'pg';
import { notify } from './utils'; import { notify } from './utils';
import { populateTransactions } from './insertTransactions'; import { getUnprocessedTransactions } from './getTransactions';
import { parseTransactions } from './parseTransactions'; import { parseTransactions } from './parseTransactions';
const pgp = require('pg-promise')({ const pgp = require('pg-promise')({
@ -18,12 +18,14 @@ async function insertMangoTransactions(
rawTransactionsPool, rawTransactionsPool,
parsedTransactionsPool, parsedTransactionsPool,
schema, schema,
processStates, transactionSummaries,
parsedTransactions, parsedTransactions,
) { ) {
// Insert parsed transactions to appropriate tables on timescaledb // Insert parsed transactions to appropriate tables on timescaledb
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases) // Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases)
const batchSize = 1000;
let columnSets = {}; let columnSets = {};
let tableName; let tableName;
let inserts; let inserts;
@ -40,10 +42,9 @@ async function insertMangoTransactions(
} }
} }
let batchSize = 1000; let parsedClient = await parsedTransactionsPool.connect();
let client = await parsedTransactionsPool.connect();
try { try {
await client.query('BEGIN'); await parsedClient.query('BEGIN');
for ([tableName, inserts] of Object.entries(parsedTransactions)) { for ([tableName, inserts] of Object.entries(parsedTransactions)) {
if (inserts.length > 0) { if (inserts.length > 0) {
@ -54,78 +55,64 @@ async function insertMangoTransactions(
insertsBatch, insertsBatch,
columnSets[tableName], columnSets[tableName],
); );
await client.query(insertsSql); await parsedClient.query(insertsSql);
} }
console.log(inserts.length + ' records inserted into ' + tableName); console.log(inserts.length + ' records inserted into ' + tableName);
} }
} }
await client.query('COMMIT'); await parsedClient.query('COMMIT');
} catch (e) { } catch (e) {
await client.query('ROLLBACK'); await parsedClient.query('ROLLBACK');
console.log('transaction rolled back') console.log('transaction rolled back: parsed')
// TODO: check settle fees // TODO: check settle fees
throw e; throw e;
} finally { } finally {
client.release(); parsedClient.release();
} }
tableName = 'transactions'; tableName = 'transactions';
let table = new pgp.helpers.TableName({ table: tableName, schema: schema }); let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
const processStateCs = new pgp.helpers.ColumnSet( const transactionSummariesCs = new pgp.helpers.ColumnSet(
['?signature', 'process_state'], ['?signature', 'process_state', 'log_messages'],
{ table: table }, { table: table },
); );
if (processStates.length > 0) { let rawClient = await rawTransactionsPool.connect();
client = await rawTransactionsPool.connect(); try {
try { await rawClient.query('BEGIN');
await client.query('BEGIN');
for (let i = 0, j = processStates.length; i < j; i += batchSize) { for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize); let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql = let updatedSql =
pgp.helpers.update(updatesBatch, processStateCs) + pgp.helpers.update(updatesBatch, transactionSummariesCs) +
' WHERE v.signature = t.signature'; ' WHERE v.signature = t.signature';
await client.query(updatedSql); await rawClient.query(updatedSql);
} }
console.log(processStates.length + ' process states updated'); console.log(transactionSummaries.length + ' process states updated');
await client.query('COMMIT'); await rawClient.query('COMMIT');
} catch (e) { } catch (e) {
await client.query('ROLLBACK'); await rawClient.query('ROLLBACK');
console.log('transaction rolled back') console.log('transaction rolled back: raw')
throw e; throw e;
} finally { } finally {
client.release(); rawClient.release();
}
} }
} }
async function processMangoTransactions( async function processMangoTransactions(
clusterConnection,
requestWaitTime,
address, address,
rawTransactionsPool, rawTransactionsPool,
parsedTransactionsPool, parsedTransactionsPool,
schema, schema,
limit, limit,
) { ) {
const client = await rawTransactionsPool.connect();
let res;
try {
res = await client.query(
'select transaction, signature from ' +
schema +
".transactions where process_state = 'ready for parsing' and program_pk = $1 limit $2",
[address, limit],
);
} finally {
client.release();
}
console.log('Fetched ' + res.rows.length + ' records to parse.') let transactions = await getUnprocessedTransactions(clusterConnection, address, requestWaitTime, rawTransactionsPool, schema, limit)
let transactions = res.rows.map((e) => [e.transaction, e.signature]); let [transactionSummaries, parsedTransactions] = parseTransactions(
let [processStates, parsedTransactions] = parseTransactions(
transactions, transactions,
mangoProgramId, mangoProgramId,
); );
@ -133,7 +120,7 @@ async function processMangoTransactions(
rawTransactionsPool, rawTransactionsPool,
parsedTransactionsPool, parsedTransactionsPool,
schema, schema,
processStates, transactionSummaries,
parsedTransactions, parsedTransactions,
); );
} }
@ -148,7 +135,7 @@ async function consumeTransactions() {
let schema = 'transactions_v3'; let schema = 'transactions_v3';
console.log(clusterUrl); console.log(clusterUrl);
let connection = new Connection(clusterUrl, 'finalized'); let clusterConnection = new Connection(clusterUrl, 'finalized');
const rawTransactionsPool = new Pool({ const rawTransactionsPool = new Pool({
connectionString: rawConnectionString, connectionString: rawConnectionString,
ssl: { ssl: {
@ -169,14 +156,7 @@ async function consumeTransactions() {
await insertNewSignatures( await insertNewSignatures(
mangoProgramId, mangoProgramId,
connection, clusterConnection,
rawTransactionsPool,
requestWaitTime,
schema,
);
await populateTransactions(
connection,
mangoProgramId,
rawTransactionsPool, rawTransactionsPool,
requestWaitTime, requestWaitTime,
schema, schema,
@ -184,6 +164,8 @@ async function consumeTransactions() {
let transactionsParsingLimit = 50000; let transactionsParsingLimit = 50000;
await processMangoTransactions( await processMangoTransactions(
clusterConnection,
requestWaitTime,
mangoProgramId, mangoProgramId,
rawTransactionsPool, rawTransactionsPool,
parsedTransactionsPool, parsedTransactionsPool,
@ -210,4 +192,10 @@ async function main() {
} }
} }
// Stop program crashing on rejected promises
process.on('unhandledRejection', function (err, promise) {
console.error('unhandled rejection', err, promise);
});
main(); main();

View File

@ -1,174 +1,113 @@
const pgp = require('pg-promise')({ const pgp = require('pg-promise')({
capSQL: true, capSQL: true
}); });
import { sleep } from '@blockworks-foundation/mango-client'; import {sleep} from '@blockworks-foundation/mango-client';
export async function populateTransactions( export async function populateTransactions(connection, address, pool, requestWaitTime, schema) {
connection,
address,
pool,
requestWaitTime,
schema,
) {
let transactions = await getNewAddressSignaturesWithoutTransactions(
connection,
address,
requestWaitTime,
pool,
schema,
);
let [transactionInserts, transactionErrors] = let transactions = await getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema)
getTransactionInserts(transactions);
let [transactionInserts, transactionErrors] = getTransactionInserts(transactions)
await insertTransactions(pool, schema, transactionInserts, transactionErrors)
await insertTransactions(pool, schema, transactionInserts, transactionErrors);
} }
async function getNewAddressSignaturesWithoutTransactions( async function getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) {
connection,
address,
requestWaitTime,
pool,
schema,
) {
let limit = 25000;
let signaturesToProcess = await getSignaturesWithoutTransactions( let limit = 25000;
pool,
address,
schema,
limit,
);
let promises: Promise<void>[] = []; let signaturesToProcess = (await getSignaturesWithoutTransactions(pool, address, schema, limit))
let transactions: any[] = [];
let counter = 1; let promises: Promise<void>[] = [];
for (let signature of signaturesToProcess) { let transactions: any[] = [];
// Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json) let counter = 1;
let args = [signature, { encoding: 'jsonParsed', commitment: 'finalized' }]; for (let signature of signaturesToProcess) {
let promise = connection // Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json)
._rpcRequest('getConfirmedTransaction', args) let args = [signature, {encoding: 'jsonParsed', commitment: 'finalized'}]
.then((confirmedTransaction) => let promise = connection._rpcRequest('getConfirmedTransaction', args).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
transactions.push([signature, confirmedTransaction]),
);
console.log('requested ', counter, ' of ', signaturesToProcess.length); console.log('requested ', counter, ' of ', signaturesToProcess.length);
counter++; counter++;
promises.push(promise);
promises.push(promise); // Limit request frequency to avoid request failures due to rate limiting
await sleep(requestWaitTime);
// Limit request frequency to avoid request failures due to rate limiting }
await sleep(requestWaitTime); await (Promise as any).allSettled(promises);
}
await (Promise as any).allSettled(promises); return transactions
return transactions;
} }
async function getSignaturesWithoutTransactions( async function getSignaturesWithoutTransactions(pool, programPk, schema, limit) {
pool, const client = await pool.connect();
programPk, let signatures;
schema, try {
limit, // TODO: add back in order by id asc - but why does it make it so much slower?
) { const res = await client.query("select signature from " + schema + ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + limit, [programPk])
const client = await pool.connect();
let signatures;
try {
// TODO: add back in order by id asc - but why does it make it so much slower?
const res = await client.query(
'select signature from ' +
schema +
".transactions where process_state = 'unprocessed' and program_pk = $1 limit " +
limit,
[programPk],
);
signatures = res.rows.map((e) => e['signature']); signatures = res.rows.map(e => e['signature'])
} finally { } finally {
client.release(); client.release()
} }
return signatures; return signatures;
} }
function getTransactionInserts(transactions) { function getTransactionInserts(transactions) {
let transactionInserts: any[] = []; let transactionInserts: any[] = [];
let processStates: any[] = []; let processStates: any[] = [];
for (let transaction of transactions) { for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction; let [signature, confirmedTransaction] = transaction;
try {
let transactionInsert = {
transaction: JSON.stringify(confirmedTransaction),
log_messages: confirmedTransaction.result!.meta!.logMessages!.join('\n'),
signature: signature
}
transactionInserts.push(transactionInsert)
processStates.push({signature: signature, process_state: 'ready for parsing'})
} catch(e: any) {
console.log(e.stack)
processStates.push({signature: signature, process_state: 'transaction download error'})
}
}
return [transactionInserts, processStates]
}
async function insertTransactions(pool, schema, transactionInserts, processStates) {
const transactionsTable = new pgp.helpers.TableName({table: 'transactions', schema: schema})
const transactionCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'transaction'], {table: transactionsTable});
const processStatesCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: transactionsTable});
let batchSize = 1000;
let client = await pool.connect()
try { try {
let transactionInsert = { await client.query('BEGIN')
transaction: JSON.stringify(confirmedTransaction),
log_messages: for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) {
confirmedTransaction.result!.meta!.logMessages!.join('\n'), let updatesBatch = transactionInserts.slice(i, i + batchSize);
signature: signature, let updatedSql = pgp.helpers.update(updatesBatch, transactionCs) + ' WHERE v.signature = t.signature';
}; await client.query(updatedSql)
transactionInserts.push(transactionInsert); }
processStates.push({
signature: signature, for (let i = 0, j = processStates.length; i < j; i += batchSize) {
process_state: 'ready for parsing', let updatesBatch = processStates.slice(i, i + batchSize);
}); let updatedSql = pgp.helpers.update(updatesBatch, processStatesCs) + ' WHERE v.signature = t.signature';
} catch (e: any) { await client.query(updatedSql)
console.log(e.stack); }
processStates.push({
signature: signature, await client.query('COMMIT')
process_state: 'transaction download error', } catch (e) {
}); await client.query('ROLLBACK')
throw e
} finally {
client.release()
} }
}
return [transactionInserts, processStates];
}
async function insertTransactions(
pool,
schema,
transactionInserts,
processStates,
) {
const transactionsTable = new pgp.helpers.TableName({
table: 'transactions',
schema: schema,
});
const transactionCs = new pgp.helpers.ColumnSet(
['?signature', 'log_messages', 'transaction'],
{ table: transactionsTable },
);
const processStatesCs = new pgp.helpers.ColumnSet(
['?signature', 'process_state'],
{ table: transactionsTable },
);
console.log('Starting transaction inserts')
let batchSize = 1000;
let client = await pool.connect();
try {
await client.query('BEGIN');
for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) {
let updatesBatch = transactionInserts.slice(i, i + batchSize);
let updatedSql =
pgp.helpers.update(updatesBatch, transactionCs) +
' WHERE v.signature = t.signature';
await client.query(updatedSql);
}
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql =
pgp.helpers.update(updatesBatch, processStatesCs) +
' WHERE v.signature = t.signature';
await client.query(updatedSql);
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
} }

View File

@ -194,7 +194,7 @@ export function jsonParser(parsedTransactions, result, instructions, signature,
blockDatetime, blockDatetime,
), ),
); );
} else if (instructionName === 'LiquidatePerpMarket') { } else if (instructionName === 'LiquidatePerpMarket') {
parsedTransactions.liquidate_perp_market.push( parsedTransactions.liquidate_perp_market.push(
...parseLiquidatePerpMarket( ...parseLiquidatePerpMarket(
result.meta.logMessages, result.meta.logMessages,

View File

@ -1,19 +1,12 @@
const bs58 = require('bs58'); const bs58 = require('bs58');
import { import {
MangoInstructionLayout, MangoInstructionLayout,
IDS,
PerpMarket,
Config,
Cluster,
GroupConfig,
awaitTransactionSignatureConfirmation,
PerpMarketConfig,
} from '@blockworks-foundation/mango-client'; } from '@blockworks-foundation/mango-client';
import { jsonParser } from './jsonParsers'; import { jsonParser } from './jsonParsers';
import { anchorParser } from './anchorParsers'; import { anchorParser } from './anchorParsers';
export function parseTransactions(transactionsResult, mangoProgramId) { export function parseTransactions(transactionsResult, mangoProgramId) {
let processStates: any = []; let transactionSummaries: any = [];
let parsedTransactions: any = { let parsedTransactions: any = {
deposits_withdraws: [], deposits_withdraws: [],
@ -46,73 +39,89 @@ export function parseTransactions(transactionsResult, mangoProgramId) {
}; };
for (let transactionResult of transactionsResult) { for (let transactionResult of transactionsResult) {
let [transactionJson, signature] = transactionResult; let [signature, transaction] = transactionResult;
try { try {
let transaction = JSON.parse(transactionJson);
let result = transaction.result; let result = transaction.result;
let logMessages = result!.meta!.logMessages!.join('\n');
if (result.meta.err !== null) { try {
processStates.push({
signature: signature,
process_state: 'transaction error',
});
} else {
let slot = result.slot;
let blockTime = result.blockTime;
let blockDatetime = new Date(blockTime * 1000).toISOString();
// only look at cases where instruction is for mango program id if (result.meta.err !== null) {
// Skip attempting to parse serum instructions, etc transactionSummaries.push({
let instructions = result.transaction.message.instructions; signature: signature,
let innerInstructions = result.meta.innerInstructions.map(e => e.instructions).flat(); process_state: 'transaction error',
instructions.push(...innerInstructions) log_messages: logMessages
});
instructions = instructions.filter( } else {
(ix) => let slot = result.slot;
ix.programId === mangoProgramId && let blockTime = result.blockTime;
!( let blockDatetime = new Date(blockTime * 1000).toISOString();
// Old mango group - not in ids.json so have to hardcode here to ignore
( // only look at cases where instruction is for mango program id
ix.accounts && // Skip attempting to parse serum instructions, etc
ix.accounts[0] === let instructions = result.transaction.message.instructions;
'2WNLfEMzhgwBPn6QptT43SdZy9cXTUbVJCMdCfimg4oi' let innerInstructions = result.meta.innerInstructions.map(e => e.instructions).flat();
) instructions.push(...innerInstructions)
),
); instructions = instructions.filter(
(ix) =>
// Anchor logging was deployed at slot 100936906 - use different parsing code before and after this slot ix.programId === mangoProgramId &&
const ancorDeploymentSlot = 100936906 !(
if (slot < ancorDeploymentSlot) { // Old mango group - not in ids.json so have to hardcode here to ignore
(
// Populate instruction num and name for each instruction ix.accounts &&
let ixNum = 1; ix.accounts[0] ===
for (const ix of instructions) { '2WNLfEMzhgwBPn6QptT43SdZy9cXTUbVJCMdCfimg4oi'
let decodeData = bs58.decode(ix.data); )
let decodedInstruction = MangoInstructionLayout.decode(decodeData, 0); ),
let instructionName = Object.keys(decodedInstruction)[0]; );
ix.instructionNum = ixNum;
ix.instructionName = instructionName; // Anchor logging was deployed at slot 100936906 - use different parsing code before and after this slot
ixNum++; const ancorDeploymentSlot = 100936906
if (slot < ancorDeploymentSlot) {
// Populate instruction num and name for each instruction
let ixNum = 1;
for (const ix of instructions) {
let decodeData = bs58.decode(ix.data);
let decodedInstruction = MangoInstructionLayout.decode(decodeData, 0);
let instructionName = Object.keys(decodedInstruction)[0];
ix.instructionNum = ixNum;
ix.instructionName = instructionName;
ixNum++;
}
jsonParser(parsedTransactions, result, instructions, signature, blockTime, slot, blockDatetime)
} else {
anchorParser(parsedTransactions, result, signature, blockTime, slot, blockDatetime)
} }
jsonParser(parsedTransactions, result, instructions, signature, blockTime, slot, blockDatetime) transactionSummaries.push({
} else { signature: signature,
anchorParser(parsedTransactions, result, signature, blockTime, slot, blockDatetime) process_state: 'processed',
log_messages: logMessages
});
} }
} catch (e: any) {
processStates.push({ console.log(e.stack);
transactionSummaries.push({
signature: signature, signature: signature,
process_state: 'processed', process_state: 'parsing error',
log_messages: logMessages
}); });
} }
} catch (e: any) {
console.log(e.stack); } catch {
processStates.push({ transactionSummaries.push({
signature: signature, signature: signature,
process_state: 'parsing error', process_state: 'transaction download error',
log_messages: null
}); });
} }
} }
return [processStates, parsedTransactions]; return [transactionSummaries, parsedTransactions];
} }

View File

@ -57,7 +57,7 @@ async function processMangoTransactions(rawTransactionsPool, schema, limit) {
schema + schema +
'.transactions where signature in (' + '.transactions where signature in (' +
signaturesSql + signaturesSql +
') order by id desc limit $1', ') limit $1',
[limit], [limit],
); );
} finally { } finally {