using prettier; implemented update funding

This commit is contained in:
dd 2021-10-07 16:47:08 -07:00
parent e0a8e971e9
commit 22f22cb016
6 changed files with 1898 additions and 1193 deletions

View File

@ -5,7 +5,9 @@
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
"start": "ts-node src/index.ts", "start": "ts-node src/index.ts",
"test": "echo \"Error: no test specified\" && exit 1" "test-txs": "ts-node src/tests/testParseTransactions.ts",
"test": "echo \"Error: no test specified\" && exit 1",
"format": "prettier --check ."
}, },
"repository": "https://github.com/blockworks-foundation/mango-transaction-scraper-v3", "repository": "https://github.com/blockworks-foundation/mango-transaction-scraper-v3",
"author": "Nicholas Clarke", "author": "Nicholas Clarke",
@ -19,10 +21,15 @@
"package.json": "^2.0.1", "package.json": "^2.0.1",
"pg": "^8.6.0", "pg": "^8.6.0",
"pg-format": "^1.0.4", "pg-format": "^1.0.4",
"pg-promise": "^10.10.2" "pg-promise": "^10.10.2",
"prettier": "^2.0.5"
}, },
"devDependencies": { "devDependencies": {
"ts-node": "^10.0.0", "ts-node": "^10.0.0",
"typescript": "^4.3.2" "typescript": "^4.3.2"
},
"prettier": {
"singleQuote": true,
"trailingComma": "all"
} }
} }

View File

@ -1,151 +1,205 @@
import {Connection, PublicKey} from '@solana/web3.js'; import { Connection, PublicKey } from '@solana/web3.js';
import {sleep} from '@blockworks-foundation/mango-client'; import { Cluster, sleep } from '@blockworks-foundation/mango-client';
import {insertNewSignatures} from './signatures'; import { insertNewSignatures } from './signatures';
import { Pool } from 'pg' import { Pool } from 'pg';
import { notify } from './utils'; import { notify } from './utils';
import {populateTransactions} from './insertTransactions'; import { populateTransactions } from './insertTransactions';
import {parseTransactions} from './parseTransactions'; import { parseTransactions } from './parseTransactions';
const pgp = require('pg-promise')({ const pgp = require('pg-promise')({
capSQL: true capSQL: true,
}); });
const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH' const mangoProgramId =
process.env.MANGO_PROGRAM_ID ||
'5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH';
async function insertMangoTransactions(rawTransactionsPool, parsedTransactionsPool, schema, processStates, parsedTransactions) { async function insertMangoTransactions(
// Insert parsed transactions to appropriate tables on timescaledb rawTransactionsPool,
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases) parsedTransactionsPool,
schema,
processStates,
parsedTransactions,
) {
// Insert parsed transactions to appropriate tables on timescaledb
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases)
let columnSets = {};
let tableName;
let inserts;
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
if (inserts.length > 0) {
let table = new pgp.helpers.TableName({
table: tableName,
schema: schema,
});
columnSets[tableName] = new pgp.helpers.ColumnSet(
Object.keys(inserts[0]),
{ table: table },
);
}
}
let batchSize = 1000;
let client = await parsedTransactionsPool.connect();
try {
await client.query('BEGIN');
let columnSets = {}
let tableName
let inserts
for ([tableName, inserts] of Object.entries(parsedTransactions)) { for ([tableName, inserts] of Object.entries(parsedTransactions)) {
if (inserts.length > 0) { if (inserts.length > 0) {
let table = new pgp.helpers.TableName({table: tableName, schema: schema}) console.log(tableName + ' insert started');
columnSets[tableName] = new pgp.helpers.ColumnSet(Object.keys(inserts[0]), {table: table}); for (let i = 0, j = inserts.length; i < j; i += batchSize) {
let insertsBatch = inserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(
insertsBatch,
columnSets[tableName],
);
await client.query(insertsSql);
} }
console.log(tableName + ' inserted');
}
} }
await client.query('COMMIT');
let batchSize = 1000; } catch (e) {
let client = await parsedTransactionsPool.connect() await client.query('ROLLBACK');
try { throw e;
await client.query('BEGIN') } finally {
client.release();
}
for ([tableName, inserts] of Object.entries(parsedTransactions)) { tableName = 'transactions';
if (inserts.length > 0) { let table = new pgp.helpers.TableName({ table: tableName, schema: schema });
console.log(tableName + ' insert started') const processStateCs = new pgp.helpers.ColumnSet(
for (let i = 0, j = inserts.length; i < j; i += batchSize) { ['?signature', 'process_state'],
let insertsBatch = inserts.slice(i, i + batchSize); { table: table },
let insertsSql = pgp.helpers.insert(insertsBatch, columnSets[tableName]); );
await client.query(insertsSql)
} client = await rawTransactionsPool.connect();
console.log(tableName + ' inserted') try {
} await client.query('BEGIN');
}
await client.query('COMMIT') for (let i = 0, j = processStates.length; i < j; i += batchSize) {
} catch (e) { let updatesBatch = processStates.slice(i, i + batchSize);
await client.query('ROLLBACK') let updatedSql =
throw e pgp.helpers.update(updatesBatch, processStateCs) +
} finally { ' WHERE v.signature = t.signature';
client.release() await client.query(updatedSql);
} }
tableName = 'transactions' console.log('process states updated');
let table = new pgp.helpers.TableName({table: tableName, schema: schema}) await client.query('COMMIT');
const processStateCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: table}); } catch (e) {
await client.query('ROLLBACK');
client = await rawTransactionsPool.connect() throw e;
try { } finally {
await client.query('BEGIN') client.release();
}
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, processStateCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
}
console.log('process states updated')
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
} }
async function processMangoTransactions(
address,
rawTransactionsPool,
parsedTransactionsPool,
schema,
limit,
) {
const client = await rawTransactionsPool.connect();
let res;
try {
res = await client.query(
'select transaction, signature from ' +
schema +
".transactions where process_state = 'ready for parsing' and program_pk = $1 order by id asc limit $2",
[address, limit],
);
} finally {
client.release();
}
async function processMangoTransactions(address, rawTransactionsPool, parsedTransactionsPool, schema, limit) { let transactions = res.rows.map((e) => [e.transaction, e.signature]);
const client = await rawTransactionsPool.connect(); let [processStates, parsedTransactions] = parseTransactions(
let res; transactions,
try { mangoProgramId,
res = await client.query("select transaction, signature from " + schema + ".transactions where process_state = 'ready for parsing' and program_pk = $1 order by id asc limit $2", [address, limit]) );
} finally { await insertMangoTransactions(
client.release() rawTransactionsPool,
} parsedTransactionsPool,
schema,
let transactions = res.rows.map(e => [e.transaction, e.signature]); processStates,
let [processStates, parsedTransactions] = parseTransactions(transactions, mangoProgramId); parsedTransactions,
await insertMangoTransactions(rawTransactionsPool, parsedTransactionsPool, schema, processStates, parsedTransactions) );
} }
async function consumeTransactions() { async function consumeTransactions() {
const clusterUrl = process.env.CLUSTER_URL || "https://api.mainnet-beta.solana.com"; const clusterUrl =
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500; process.env.CLUSTER_URL || 'https://api.mainnet-beta.solana.com';
const rawConnectionString = process.env.CONNECTION_STRING_RAW let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
const parsedConnectionString = process.env.CONNECTION_STRING_PARSED const rawConnectionString = process.env.CONNECTION_STRING_RAW;
const parsedConnectionString = process.env.CONNECTION_STRING_PARSED;
let schema = 'transactions_v3'; let schema = 'transactions_v3';
console.log(clusterUrl); console.log(clusterUrl);
let connection = new Connection(clusterUrl, 'finalized');
const rawTransactionsPool = new Pool({
connectionString: rawConnectionString,
ssl: {
rejectUnauthorized: false,
},
});
const parsedTransactionsPool = new Pool({
connectionString: parsedConnectionString,
ssl: {
rejectUnauthorized: false,
},
});
let connection = new Connection(clusterUrl, 'finalized'); console.log('Initialized');
const rawTransactionsPool = new Pool( notify('v3: Initialized');
{ while (true) {
connectionString: rawConnectionString, console.log('Refreshing transactions ' + Date());
ssl: {
rejectUnauthorized: false,
}
}
)
const parsedTransactionsPool = new Pool(
{
connectionString: parsedConnectionString,
ssl: {
rejectUnauthorized: false,
}
}
)
console.log('Initialized') await insertNewSignatures(
notify('v3: Initialized') mangoProgramId,
while (true) { connection,
console.log('Refreshing transactions ' + Date()) rawTransactionsPool,
requestWaitTime,
await insertNewSignatures(mangoProgramId, connection, rawTransactionsPool, requestWaitTime, schema) schema,
await populateTransactions(connection, mangoProgramId, rawTransactionsPool, requestWaitTime, schema); );
await populateTransactions(
connection,
mangoProgramId,
rawTransactionsPool,
requestWaitTime,
schema,
);
let transactionsParsingLimit = 50000; let transactionsParsingLimit = 50000;
await processMangoTransactions(mangoProgramId, rawTransactionsPool, parsedTransactionsPool,schema, transactionsParsingLimit); await processMangoTransactions(
mangoProgramId,
console.log('Refresh complete') rawTransactionsPool,
// Probably unnecessary but let's give the servers a break parsedTransactionsPool,
await sleep(5*1000) schema,
} transactionsParsingLimit,
);
console.log('Refresh complete');
// Probably unnecessary but let's give the servers a break
await sleep(5 * 1000);
}
} }
async function main() { async function main() {
while (true) { while (true) {
try { try {
await consumeTransactions() await consumeTransactions();
} } catch (e: any) {
catch(e: any) { notify('v3: ' + e.toString());
notify('v3: ' + e.toString()) console.log(e, e.stack);
console.log(e, e.stack) // Wait for 10 mins
// Wait for 10 mins await sleep(10 * 60 * 1000);
await sleep(10*60*1000)
}
} }
}
} }
main() main();

View File

@ -1,113 +1,172 @@
const pgp = require('pg-promise')({ const pgp = require('pg-promise')({
capSQL: true capSQL: true,
}); });
import {sleep} from '@blockworks-foundation/mango-client'; import { sleep } from '@blockworks-foundation/mango-client';
export async function populateTransactions(connection, address, pool, requestWaitTime, schema) { export async function populateTransactions(
connection,
address,
pool,
requestWaitTime,
schema,
) {
let transactions = await getNewAddressSignaturesWithoutTransactions(
connection,
address,
requestWaitTime,
pool,
schema,
);
let transactions = await getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) let [transactionInserts, transactionErrors] =
getTransactionInserts(transactions);
let [transactionInserts, transactionErrors] = getTransactionInserts(transactions)
await insertTransactions(pool, schema, transactionInserts, transactionErrors)
await insertTransactions(pool, schema, transactionInserts, transactionErrors);
} }
async function getNewAddressSignaturesWithoutTransactions(connection, address, requestWaitTime, pool, schema) { async function getNewAddressSignaturesWithoutTransactions(
connection,
address,
requestWaitTime,
pool,
schema,
) {
let limit = 25000;
let limit = 25000; let signaturesToProcess = await getSignaturesWithoutTransactions(
pool,
address,
schema,
limit,
);
let signaturesToProcess = (await getSignaturesWithoutTransactions(pool, address, schema, limit)) let promises: Promise<void>[] = [];
let transactions: any[] = [];
let promises: Promise<void>[] = []; let counter = 1;
let transactions: any[] = []; for (let signature of signaturesToProcess) {
let counter = 1; // Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json)
for (let signature of signaturesToProcess) { let args = [signature, { encoding: 'jsonParsed', commitment: 'finalized' }];
// Want to store the raw json returned from the rpc - so have to bypass the regular client methods here (which transform the json) let promise = connection
let args = [signature, {encoding: 'jsonParsed', commitment: 'finalized'}] ._rpcRequest('getConfirmedTransaction', args)
let promise = connection._rpcRequest('getConfirmedTransaction', args).then(confirmedTransaction => transactions.push([signature, confirmedTransaction])); .then((confirmedTransaction) =>
transactions.push([signature, confirmedTransaction]),
);
console.log('requested ', counter, ' of ', signaturesToProcess.length); console.log('requested ', counter, ' of ', signaturesToProcess.length);
counter++; counter++;
promises.push(promise);
// Limit request frequency to avoid request failures due to rate limiting promises.push(promise);
await sleep(requestWaitTime);
} // Limit request frequency to avoid request failures due to rate limiting
await (Promise as any).allSettled(promises); await sleep(requestWaitTime);
}
return transactions await (Promise as any).allSettled(promises);
return transactions;
} }
async function getSignaturesWithoutTransactions(pool, programPk, schema, limit) { async function getSignaturesWithoutTransactions(
const client = await pool.connect(); pool,
let signatures; programPk,
try { schema,
// TODO: add back in order by id asc - but why does it make it so much slower? limit,
const res = await client.query("select signature from " + schema + ".transactions where process_state = 'unprocessed' and program_pk = $1 limit " + limit, [programPk]) ) {
const client = await pool.connect();
let signatures;
try {
// TODO: add back in order by id asc - but why does it make it so much slower?
const res = await client.query(
'select signature from ' +
schema +
".transactions where process_state = 'unprocessed' and program_pk = $1 limit " +
limit,
[programPk],
);
signatures = res.rows.map(e => e['signature']) signatures = res.rows.map((e) => e['signature']);
} finally { } finally {
client.release() client.release();
} }
return signatures; return signatures;
} }
function getTransactionInserts(transactions) { function getTransactionInserts(transactions) {
let transactionInserts: any[] = []; let transactionInserts: any[] = [];
let processStates: any[] = []; let processStates: any[] = [];
for (let transaction of transactions) { for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction; let [signature, confirmedTransaction] = transaction;
try {
let transactionInsert = {
transaction: JSON.stringify(confirmedTransaction),
log_messages: confirmedTransaction.result!.meta!.logMessages!.join('\n'),
signature: signature
}
transactionInserts.push(transactionInsert)
processStates.push({signature: signature, process_state: 'ready for parsing'})
} catch(e: any) {
console.log(e.stack)
processStates.push({signature: signature, process_state: 'transaction download error'})
}
}
return [transactionInserts, processStates]
}
async function insertTransactions(pool, schema, transactionInserts, processStates) {
const transactionsTable = new pgp.helpers.TableName({table: 'transactions', schema: schema})
const transactionCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'transaction'], {table: transactionsTable});
const processStatesCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: transactionsTable});
let batchSize = 1000;
let client = await pool.connect()
try { try {
await client.query('BEGIN') let transactionInsert = {
transaction: JSON.stringify(confirmedTransaction),
for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) { log_messages:
let updatesBatch = transactionInserts.slice(i, i + batchSize); confirmedTransaction.result!.meta!.logMessages!.join('\n'),
let updatedSql = pgp.helpers.update(updatesBatch, transactionCs) + ' WHERE v.signature = t.signature'; signature: signature,
await client.query(updatedSql) };
} transactionInserts.push(transactionInsert);
processStates.push({
for (let i = 0, j = processStates.length; i < j; i += batchSize) { signature: signature,
let updatesBatch = processStates.slice(i, i + batchSize); process_state: 'ready for parsing',
let updatedSql = pgp.helpers.update(updatesBatch, processStatesCs) + ' WHERE v.signature = t.signature'; });
await client.query(updatedSql) } catch (e: any) {
} console.log(e.stack);
processStates.push({
await client.query('COMMIT') signature: signature,
} catch (e) { process_state: 'transaction download error',
await client.query('ROLLBACK') });
throw e
} finally {
client.release()
} }
}
return [transactionInserts, processStates];
}
async function insertTransactions(
pool,
schema,
transactionInserts,
processStates,
) {
const transactionsTable = new pgp.helpers.TableName({
table: 'transactions',
schema: schema,
});
const transactionCs = new pgp.helpers.ColumnSet(
['?signature', 'log_messages', 'transaction'],
{ table: transactionsTable },
);
const processStatesCs = new pgp.helpers.ColumnSet(
['?signature', 'process_state'],
{ table: transactionsTable },
);
let batchSize = 1000;
let client = await pool.connect();
try {
await client.query('BEGIN');
for (let i = 0, j = transactionInserts.length; i < j; i += batchSize) {
let updatesBatch = transactionInserts.slice(i, i + batchSize);
let updatedSql =
pgp.helpers.update(updatesBatch, transactionCs) +
' WHERE v.signature = t.signature';
await client.query(updatedSql);
}
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql =
pgp.helpers.update(updatesBatch, processStatesCs) +
' WHERE v.signature = t.signature';
await client.query(updatedSql);
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,96 +1,136 @@
import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js'; import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js';
import { sleep} from '@blockworks-foundation/mango-client'; import { sleep } from '@blockworks-foundation/mango-client';
import { bulkBatchInsert } from './utils'; import { bulkBatchInsert } from './utils';
export async function getNewSignatures(
export async function getNewSignatures(afterSlot: number, connection: Connection, addressPk: PublicKey, requestWaitTime: number) { afterSlot: number,
// Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSlot" slot connection: Connection,
addressPk: PublicKey,
let signatures; requestWaitTime: number,
let slots; ) {
const limit = 1000; // Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSlot" slot
let before = null; let signatures;
let options; let slots;
let allSignaturesInfo: ConfirmedSignatureInfo[] = []; const limit = 1000;
while (true) { let before = null;
let options;
if (before === null) { let allSignaturesInfo: ConfirmedSignatureInfo[] = [];
options = {limit: limit}; while (true) {
} else { if (before === null) {
options = {limit: limit, before: before}; options = { limit: limit };
} } else {
options = { limit: limit, before: before };
let signaturesInfo = (await connection.getConfirmedSignaturesForAddress2(addressPk, options));
signatures = signaturesInfo.map(x => x['signature'])
slots = signaturesInfo.map(x => x['slot']);
// Stop when we reach a slot we have already stored in the database
// Use slot instead of signature here as can have multiple signatures per slot and signatures are
// stored in a arbitray order per slot - leading to attempting to insert a duplicate signature
// If a slot is already seen - will have all signatures in that slot in the db
let afterSlotIndex = slots.indexOf(afterSlot);
if (afterSlotIndex !== -1) {
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo.slice(0, afterSlotIndex));
break
} else {
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
// otherwise we have an issue where the rpc endpoint does not have enough history
if (signaturesInfo.length !== limit) {
throw 'rpc endpoint does not have sufficient signature history to reach afterSignature ' + afterSlot
}
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
}
before = signatures[signatures.length-1];
console.log(new Date(signaturesInfo[signaturesInfo.length-1].blockTime! * 1000).toISOString());
await sleep(requestWaitTime);
} }
return allSignaturesInfo let signaturesInfo = await connection.getConfirmedSignaturesForAddress2(
addressPk,
options,
);
signatures = signaturesInfo.map((x) => x['signature']);
slots = signaturesInfo.map((x) => x['slot']);
// Stop when we reach a slot we have already stored in the database
// Use slot instead of signature here as can have multiple signatures per slot and signatures are
// stored in a arbitray order per slot - leading to attempting to insert a duplicate signature
// If a slot is already seen - will have all signatures in that slot in the db
let afterSlotIndex = slots.indexOf(afterSlot);
if (afterSlotIndex !== -1) {
allSignaturesInfo = allSignaturesInfo.concat(
signaturesInfo.slice(0, afterSlotIndex),
);
break;
} else {
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
// otherwise we have an issue where the rpc endpoint does not have enough history
if (signaturesInfo.length !== limit) {
throw (
'rpc endpoint does not have sufficient signature history to reach afterSignature ' +
afterSlot
);
}
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
}
before = signatures[signatures.length - 1];
console.log(
new Date(
signaturesInfo[signaturesInfo.length - 1].blockTime! * 1000,
).toISOString(),
);
await sleep(requestWaitTime);
}
return allSignaturesInfo;
} }
export async function insertNewSignatures(address: string, connection: Connection, pool, requestWaitTime: number, schema: string) { export async function insertNewSignatures(
let client = await pool.connect() address: string,
let latestSlotRows = await client.query('select max(slot) as max_slot from ' + schema + '.transactions where program_pk = $1', [address]) connection: Connection,
pool,
client.release(); requestWaitTime: number,
schema: string,
) {
let client = await pool.connect();
let latestSlotRows = await client.query(
'select max(slot) as max_slot from ' +
schema +
'.transactions where program_pk = $1',
[address],
);
let latestSlot = latestSlotRows.rows[0]['max_slot']; client.release();
let newSignatures = await getNewSignatures(latestSlot, connection, new PublicKey(address), requestWaitTime); let latestSlot = latestSlotRows.rows[0]['max_slot'];
// By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest let newSignatures = await getNewSignatures(
// We reverse the order to oldest -> newest here latestSlot,
// This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps) connection,
// Also ensures that the auto increment id in our table is incremented oldest -> newest new PublicKey(address),
newSignatures = newSignatures.reverse(); requestWaitTime,
);
const inserts = newSignatures.map(signatureInfo => ({ // By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest
signature: signatureInfo.signature, // We reverse the order to oldest -> newest here
program_pk: address, // This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)
block_time: signatureInfo.blockTime, // Also ensures that the auto increment id in our table is incremented oldest -> newest
block_datetime: (new Date(signatureInfo.blockTime! * 1000)).toISOString(), newSignatures = newSignatures.reverse();
slot: signatureInfo.slot,
err: signatureInfo.err === null ? 0 : 1,
process_state: 'unprocessed'
}))
// Seems to be a bug in getConfirmedSignaturesForAddress2 where very rarely I can get duplicate signatures (separated by a few signatures in between)
// So redup here
let uniqueInserts = uniqueOnSignature(inserts);
let columns = ['signature', 'program_pk', 'block_time', 'block_datetime', 'slot', 'err', 'process_state'];
let table = 'transactions'
let batchSize = 10000
await bulkBatchInsert(pool, table, columns, uniqueInserts, batchSize, schema); const inserts = newSignatures.map((signatureInfo) => ({
signature: signatureInfo.signature,
program_pk: address,
block_time: signatureInfo.blockTime,
block_datetime: new Date(signatureInfo.blockTime! * 1000).toISOString(),
slot: signatureInfo.slot,
err: signatureInfo.err === null ? 0 : 1,
process_state: 'unprocessed',
}));
// Seems to be a bug in getConfirmedSignaturesForAddress2 where very rarely I can get duplicate signatures (separated by a few signatures in between)
// So redup here
let uniqueInserts = uniqueOnSignature(inserts);
console.log('inserted ' + newSignatures.length + ' signatures') let columns = [
'signature',
'program_pk',
'block_time',
'block_datetime',
'slot',
'err',
'process_state',
];
let table = 'transactions';
let batchSize = 10000;
await bulkBatchInsert(pool, table, columns, uniqueInserts, batchSize, schema);
console.log('inserted ' + newSignatures.length + ' signatures');
} }
function uniqueOnSignature(inserts) { function uniqueOnSignature(inserts) {
var seen = {}; var seen = {};
return inserts.filter(function(e) { return inserts.filter(function (e) {
return seen.hasOwnProperty(e.signature) ? false : (seen[e.signature] = true); return seen.hasOwnProperty(e.signature)
}); ? false
} : (seen[e.signature] = true);
});
}

View File

@ -1,77 +1,86 @@
import { Pool } from 'pg' import { Pool } from 'pg';
import {parseTransactions} from '../parseTransactions'; import { parseTransactions } from '../parseTransactions';
const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH' const mangoProgramId =
process.env.MANGO_PROGRAM_ID || 'mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68';
async function processMangoTransactions(rawTransactionsPool, schema, limit) { async function processMangoTransactions(rawTransactionsPool, schema, limit) {
const client = await rawTransactionsPool.connect();
let res;
try {
// Below are examples of different types of transactions
let signatures = [
'4cYaxHEMAycyjRBfD2Va1cnaNcKSCxsLaYF47EnpUH4aqCEsZR9c9BJMeMr9NiPR7grN6puBYqWL6FdeX3ZgHrjj', // Deposit
'49ajp59Gtpr5Q4kD4y2rMZrQSgkFNrferoC5nUq6mAZsveFf3e12DXtCR1GMFSyhR6Aypd6tJG7SKveNwGHGBDYQ', // Withdraw
const client = await rawTransactionsPool.connect(); '5E9Jqz3nPtTx5rHTHmWyvw5RGTodPp4jRBx158dpe24b4QtpyAEVcEPuwvL6cwQ5MAcKHDiukU45L5xJ8fLR9Y1J', // CachePrices and CacheRootBanks
let res; '2o3hH59r6Pggg8oTTEjtXyJ9xHmx2z1pPXG4SWoPeogW95sjQoxps5fu4LbaTWfTyMvK1epsvz1nLiZ9CTPAgyD1', // LiquidateTokenAndToken
try { '1F47ZsgeLNpDsHCqnNRbG8hi6C2rEeyT8YME61g84HSTRHrheXX69Zya2Dz3fW14SC1y84cAbLVg7jDae38Vp3a', // LiquidateTokenAndPerp
'4RBxzncCHW8XSHR1mFJrxtUMitYyYiSc9jvHQ4CPA8mqCukgw3dQNoYpgm5GRozyxMJP1j4ew9gNPkzCnrnkmaud', // LiquidatePerpMarket
// Below are examples of different types of transactions
let signatures = [
"4cYaxHEMAycyjRBfD2Va1cnaNcKSCxsLaYF47EnpUH4aqCEsZR9c9BJMeMr9NiPR7grN6puBYqWL6FdeX3ZgHrjj", // Deposit
"49ajp59Gtpr5Q4kD4y2rMZrQSgkFNrferoC5nUq6mAZsveFf3e12DXtCR1GMFSyhR6Aypd6tJG7SKveNwGHGBDYQ", // Withdraw
"5E9Jqz3nPtTx5rHTHmWyvw5RGTodPp4jRBx158dpe24b4QtpyAEVcEPuwvL6cwQ5MAcKHDiukU45L5xJ8fLR9Y1J", // CachePrices and CacheRootBanks '56y2iUGUyQ8BcqLpL5N1QwR3QQF37NtaWRnd5XjQ25BNWKfNxvpnhwwqtD7B88dAvJEykTHNCLvUKdxY4jhQ2uhQ', //LiquidateTokenAndPerp example 2
"2o3hH59r6Pggg8oTTEjtXyJ9xHmx2z1pPXG4SWoPeogW95sjQoxps5fu4LbaTWfTyMvK1epsvz1nLiZ9CTPAgyD1", // LiquidateTokenAndToken '4ErqDTV11imwDnoA6AAj3VMZC9FQvyq2GA4emfpiSxVBifAQE9EDwPnHxoTMxULHfyyCf8GBBzoczGSqYJMFEV5D', //LiquidateTokenAndPerp example 3
"1F47ZsgeLNpDsHCqnNRbG8hi6C2rEeyT8YME61g84HSTRHrheXX69Zya2Dz3fW14SC1y84cAbLVg7jDae38Vp3a", // LiquidateTokenAndPerp '59DFSbsN1DbnqUiMsStS2xQn4tBncG8w3gg8aikBvQZKnWhf1yRt87EU4WMvXtnnVe18zWaRDFVfnVYp3ASgF7tV', // ResolveTokenBankruptcy
"4RBxzncCHW8XSHR1mFJrxtUMitYyYiSc9jvHQ4CPA8mqCukgw3dQNoYpgm5GRozyxMJP1j4ew9gNPkzCnrnkmaud", // LiquidatePerpMarket '4Xx7gVesMQQZqprJYLu5gNEYRLA5GTXKURrkc8jG3CLKynRwhEM93MownyAMhxpdFTvfXQ9kkxkRcemZ8Fn5WHyk', // ResolvePerpBankruptcy
'3bzj3KkA3FSZHJuCmRgHhSgqeaEzD32sCHkYdRLcZm1vcuB4ra5NbU5EZqBhW6QjeKRV9QRWC4SHxK2hS54s79Zx', // settle_pnl
"56y2iUGUyQ8BcqLpL5N1QwR3QQF37NtaWRnd5XjQ25BNWKfNxvpnhwwqtD7B88dAvJEykTHNCLvUKdxY4jhQ2uhQ", //LiquidateTokenAndPerp example 2 '5TmhvKQJmjUD9dZgCszBF8gNKUohpxwjrYu1RngZVh1hEToGMtjPtXJF89QLHXzANMWWQRfMomsgCg8353CpYgBb', // settle_fees
"4ErqDTV11imwDnoA6AAj3VMZC9FQvyq2GA4emfpiSxVBifAQE9EDwPnHxoTMxULHfyyCf8GBBzoczGSqYJMFEV5D", //LiquidateTokenAndPerp example 3 '4qV6PTD1nGj5qq89FQK8QKwN231pGgtayD7uX4B6y83b19gcVXB5ByLCvApSJjCRrboiCg7RVT2p2e1CtP3zuXDb', // force_settle_quote_positions
"59DFSbsN1DbnqUiMsStS2xQn4tBncG8w3gg8aikBvQZKnWhf1yRt87EU4WMvXtnnVe18zWaRDFVfnVYp3ASgF7tV", // ResolveTokenBankruptcy '5qDPBrFjCcaZthjRCqisHRw1mFEkHFHRFWi5jbKCmpAgpAXNdEkSv8L472D12VB5AukYaGsWhAy5bcvvUGJ1Sgtv', // FillEvent
"4Xx7gVesMQQZqprJYLu5gNEYRLA5GTXKURrkc8jG3CLKynRwhEM93MownyAMhxpdFTvfXQ9kkxkRcemZ8Fn5WHyk", // ResolvePerpBankruptcy '3YXaEG95w5eG7jBBjz8hW9auXVAv9z2MH8yw51tL8nqSqmKgXtrD1hgE7LCqK2hpFwcrpjeWtBeVqGsbCHLh3kSe', // redeem mango
"3bzj3KkA3FSZHJuCmRgHhSgqeaEzD32sCHkYdRLcZm1vcuB4ra5NbU5EZqBhW6QjeKRV9QRWC4SHxK2hS54s79Zx", // settle_pnl '2HNnZmThkFUsG1pw9bNaJoeeGUZJun3hkcpwBJt3ZU9FKe3CY17wrJgk1BZ8txm13RJ512ThbZVZxaqsxNFn4xVs', // checked_add_net details
"5TmhvKQJmjUD9dZgCszBF8gNKUohpxwjrYu1RngZVh1hEToGMtjPtXJF89QLHXzANMWWQRfMomsgCg8353CpYgBb", // settle_fees '4ebib6h5kQHpcpK4A4UpH7ThJVEtui2X7vVvTfCW8iuJgjHMocH7nymN3zVrrbwZL9HQYJY1tHdnGjo7ZSgrL7M6', // error example
"4qV6PTD1nGj5qq89FQK8QKwN231pGgtayD7uX4B6y83b19gcVXB5ByLCvApSJjCRrboiCg7RVT2p2e1CtP3zuXDb", // force_settle_quote_positions '59DFSbsN1DbnqUiMsStS2xQn4tBncG8w3gg8aikBvQZKnWhf1yRt87EU4WMvXtnnVe18zWaRDFVfnVYp3ASgF7tV', //token socialized loss
"5qDPBrFjCcaZthjRCqisHRw1mFEkHFHRFWi5jbKCmpAgpAXNdEkSv8L472D12VB5AukYaGsWhAy5bcvvUGJ1Sgtv", // FillEvent '4RCvRY8BWPB6FixyfufYKojUdnE91DiqmFE2b8e4FCyuWCdT1ipSzPBaWUgaajKucFr1jsveiMvTft5iiWbctCFk', // settle_pnl_multiple
"3YXaEG95w5eG7jBBjz8hW9auXVAv9z2MH8yw51tL8nqSqmKgXtrD1hgE7LCqK2hpFwcrpjeWtBeVqGsbCHLh3kSe", // redeem mango 'KDv62AKFqyrvULdUUeShdYK9zCeUDX8yb6kuUadoa6dJKrqjDvKU3JQj9t8e4H1FTEaEMsyLnYAT4HbTr8ikSGq', // multiple update funding
"2HNnZmThkFUsG1pw9bNaJoeeGUZJun3hkcpwBJt3ZU9FKe3CY17wrJgk1BZ8txm13RJ512ThbZVZxaqsxNFn4xVs", // checked_add_net details ];
'4ebib6h5kQHpcpK4A4UpH7ThJVEtui2X7vVvTfCW8iuJgjHMocH7nymN3zVrrbwZL9HQYJY1tHdnGjo7ZSgrL7M6', // error example
'59DFSbsN1DbnqUiMsStS2xQn4tBncG8w3gg8aikBvQZKnWhf1yRt87EU4WMvXtnnVe18zWaRDFVfnVYp3ASgF7tV', //token socialized loss
'4RCvRY8BWPB6FixyfufYKojUdnE91DiqmFE2b8e4FCyuWCdT1ipSzPBaWUgaajKucFr1jsveiMvTft5iiWbctCFk' // settle_pnl_multiple
]
let signaturesSql = signatures.map(e => "'" + e + "'").join(',') let signaturesSql = signatures.map((e) => "'" + e + "'").join(',');
res = await client.query("select transaction, signature from " + schema + ".transactions where signature in (" + signaturesSql + ") order by id desc limit $1", [limit]) res = await client.query(
'select transaction, signature from ' +
schema +
'.transactions where signature in (' +
signaturesSql +
') order by id desc limit $1',
[limit],
);
} finally {
client.release();
}
let transactions = res.rows.map((e) => [e.transaction, e.signature]);
let [processStates, parsedTransactions] = parseTransactions(
transactions,
mangoProgramId,
);
} finally { // Set a breakpoint here to examine parsed transactions
client.release() console.log(parsedTransactions);
}
let transactions = res.rows.map(e => [e.transaction, e.signature]);
let [processStates, parsedTransactions] = parseTransactions(transactions, mangoProgramId);
// Set a breakpoint here to examine parsed transactions
console.log(parsedTransactions)
} }
async function consumeTransactions() { async function consumeTransactions() {
const rawConnectionString = process.env.CONNECTION_STRING_RAW const rawConnectionString = process.env.CONNECTION_STRING_RAW;
let schema = 'transactions_v3';
const rawTransactionsPool = new Pool( let schema = 'transactions_v3';
{
connectionString: rawConnectionString,
ssl: {
rejectUnauthorized: false,
}
}
)
console.log('Initialized') const rawTransactionsPool = new Pool({
while (true) { connectionString: rawConnectionString,
console.log('Refreshing transactions ' + Date()) ssl: {
rejectUnauthorized: false,
},
});
let transactionsParsingLimit = 50000; console.log('Initialized');
await processMangoTransactions(rawTransactionsPool, schema, transactionsParsingLimit); while (true) {
} console.log('Refreshing transactions ' + Date());
let transactionsParsingLimit = 50000;
await processMangoTransactions(
rawTransactionsPool,
schema,
transactionsParsingLimit,
);
}
} }
consumeTransactions() consumeTransactions();