From 5bf322ef00031ca4fd9d9121c7d2cdc8a19af1c7 Mon Sep 17 00:00:00 2001 From: Nicholas Clarke Date: Thu, 13 Jan 2022 14:27:38 -0800 Subject: [PATCH] Add new command "init" to populate some rows into the transactions table (required before the scraper can start). --- package.json | 1 + src/index.ts | 23 ++++++++++++++--------- src/init.ts | 43 +++++++++++++++++++++++++++++++++++++++++++ src/signatures.ts | 46 ++++++++++++++++++++-------------------------- 4 files changed, 78 insertions(+), 35 deletions(-) create mode 100644 src/init.ts diff --git a/package.json b/package.json index 5db76c8..f34dc80 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "clean": "rm -rf dist", "prepare": "yarn clean && yarn build", "start": "node dist/index.js", + "init": "node dist/init.js", "test": "echo \"Error: no test specified\" && exit 1", "noop": "" }, diff --git a/src/index.ts b/src/index.ts index 0df3814..08091b9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ -import { Connection, PublicKey } from '@solana/web3.js'; -import { Cluster, sleep } from '@blockworks-foundation/mango-client'; -import { insertNewSignatures } from './signatures'; +import { Connection } from '@solana/web3.js'; +import { sleep } from '@blockworks-foundation/mango-client'; +import { insertSignatures, getNewSignatures } from './signatures'; import { Pool } from 'pg'; import { notify } from './utils'; import { getTransactions, getUnprocessedSignatures } from './getTransactions'; @@ -170,7 +170,7 @@ async function consumeTransactions() { const workerPartitionEnd = parseInt(process.env.WORKER_PARTITION_END!); // Only one process should insert new signatures. 1 for true, 0 for false. - const insertSignatures = parseInt(process.env.INSERT_SIGNATURES!) === 1; + const insertSignaturesBool = parseInt(process.env.INSERT_SIGNATURES!) === 1; let schema = 'transactions_v3'; @@ -194,14 +194,19 @@ async function consumeTransactions() { while (true) { console.log('Refreshing transactions ' + Date()); - if (insertSignatures) { - await insertNewSignatures( + if (insertSignaturesBool) { + let newSignatures = await getNewSignatures(clusterConnection, mangoProgramId, - clusterConnection, - rawTransactionsPool, requestWaitTime, + rawTransactionsPool, + schema) + + await insertSignatures( + mangoProgramId, + rawTransactionsPool, schema, - ); + newSignatures + ) } await processMangoTransactions( diff --git a/src/init.ts b/src/init.ts new file mode 100644 index 0000000..f694b11 --- /dev/null +++ b/src/init.ts @@ -0,0 +1,43 @@ +import { Connection, PublicKey } from '@solana/web3.js'; +import { insertSignatures } from './signatures'; +import { Pool } from 'pg'; + + +const pgp = require('pg-promise')({ + capSQL: true, +}); + +const mangoProgramId = + process.env.MANGO_PROGRAM_ID || + '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH'; + +async function initTransactions() { + const clusterUrl = + process.env.CLUSTER_URL || 'https://api.mainnet-beta.solana.com'; + const rawConnectionString = process.env.CONNECTION_STRING_RAW; + + let schema = 'transactions_v3'; + + console.log(clusterUrl); + let clusterConnection = new Connection(clusterUrl, 'finalized'); + const rawTransactionsPool = new Pool({ + connectionString: rawConnectionString, + ssl: { + rejectUnauthorized: false, + }, + }); + + let signaturesInfo = await clusterConnection.getConfirmedSignaturesForAddress2(new PublicKey(mangoProgramId)); + + await insertSignatures( + mangoProgramId, + rawTransactionsPool, + schema, + signaturesInfo + ) + + console.log('transactions table sucessfully initialised') +} + +initTransactions(); + \ No newline at end of file diff --git a/src/signatures.ts b/src/signatures.ts index 67d62ae..463bae8 100644 --- a/src/signatures.ts +++ b/src/signatures.ts @@ -3,12 +3,24 @@ import { sleep } from '@blockworks-foundation/mango-client'; import { bulkBatchInsert } from './utils'; export async function getNewSignatures( - afterSlot: number, connection: Connection, - addressPk: PublicKey, + address: string, requestWaitTime: number, + pool, + schema ) { + let client = await pool.connect(); + let latestSlotRows = await client.query( + 'select max(slot) as max_slot from ' + + schema + + '.transactions where program_pk = $1', + [address], + ); + client.release(); + + let afterSlot = latestSlotRows.rows[0]['max_slot']; + console.log('Fetching all signatures after slot ' + afterSlot) // Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSlot" slot @@ -26,7 +38,7 @@ export async function getNewSignatures( } let signaturesInfo = await connection.getConfirmedSignaturesForAddress2( - addressPk, + new PublicKey(address), options, ); signatures = signaturesInfo.map((x) => x['signature']); @@ -34,7 +46,8 @@ export async function getNewSignatures( if (signaturesInfo[0].slot < afterSlot) { // Possibly this happens when validators are switched at the endpoint? - throw 'most recent fetched signature has slot less than afterSlot' + // In the absense of throwing this error the code will keep fetching signatures until to reaches the end of the signatures stored by the rpc node + throw ('most recent fetched signature has slot less than afterSlot') } console.log('slots ' + signaturesInfo[0].slot + ' to ' + signaturesInfo[signaturesInfo.length - 1].slot) @@ -73,32 +86,13 @@ export async function getNewSignatures( return allSignaturesInfo; } -export async function insertNewSignatures( +export async function insertSignatures( address: string, - connection: Connection, pool, - requestWaitTime: number, schema: string, + newSignatures: ConfirmedSignatureInfo[] ) { - let client = await pool.connect(); - let latestSlotRows = await client.query( - 'select max(slot) as max_slot from ' + - schema + - '.transactions where program_pk = $1', - [address], - ); - - client.release(); - - let latestSlot = latestSlotRows.rows[0]['max_slot']; - - let newSignatures = await getNewSignatures( - latestSlot, - connection, - new PublicKey(address), - requestWaitTime, - ); - + // By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest // We reverse the order to oldest -> newest here // This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)