152 lines
5.4 KiB
TypeScript
152 lines
5.4 KiB
TypeScript
|
import {Connection, PublicKey} from '@solana/web3.js';
|
||
|
import {sleep} from '@blockworks-foundation/mango-client';
|
||
|
import {insertNewSignatures} from './signatures';
|
||
|
import { Pool } from 'pg'
|
||
|
import { notify } from './utils';
|
||
|
import {populateTransactions} from './insertTransactions';
|
||
|
import {parseTransactions} from './parseTransactions';
|
||
|
|
||
|
const pgp = require('pg-promise')({
|
||
|
capSQL: true
|
||
|
});
|
||
|
|
||
|
const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH'
|
||
|
|
||
|
async function insertMangoTransactions(rawTransactionsPool, parsedTransactionsPool, schema, processStates, parsedTransactions) {
|
||
|
// Insert parsed transactions to appropriate tables on timescaledb
|
||
|
// Update process states on transactions table - only once parsed transactions are sucessfully completed (can't use the same db transaction as they are on different databases)
|
||
|
|
||
|
let columnSets = {}
|
||
|
let tableName
|
||
|
let inserts
|
||
|
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
|
||
|
if (inserts.length > 0) {
|
||
|
let table = new pgp.helpers.TableName({table: tableName, schema: schema})
|
||
|
columnSets[tableName] = new pgp.helpers.ColumnSet(Object.keys(inserts[0]), {table: table});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
let batchSize = 1000;
|
||
|
let client = await parsedTransactionsPool.connect()
|
||
|
try {
|
||
|
await client.query('BEGIN')
|
||
|
|
||
|
for ([tableName, inserts] of Object.entries(parsedTransactions)) {
|
||
|
if (inserts.length > 0) {
|
||
|
console.log(tableName + ' insert started')
|
||
|
for (let i = 0, j = inserts.length; i < j; i += batchSize) {
|
||
|
let insertsBatch = inserts.slice(i, i + batchSize);
|
||
|
let insertsSql = pgp.helpers.insert(insertsBatch, columnSets[tableName]);
|
||
|
await client.query(insertsSql)
|
||
|
}
|
||
|
console.log(tableName + ' inserted')
|
||
|
}
|
||
|
}
|
||
|
await client.query('COMMIT')
|
||
|
} catch (e) {
|
||
|
await client.query('ROLLBACK')
|
||
|
throw e
|
||
|
} finally {
|
||
|
client.release()
|
||
|
}
|
||
|
|
||
|
tableName = 'transactions'
|
||
|
let table = new pgp.helpers.TableName({table: tableName, schema: schema})
|
||
|
const processStateCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: table});
|
||
|
|
||
|
client = await rawTransactionsPool.connect()
|
||
|
try {
|
||
|
await client.query('BEGIN')
|
||
|
|
||
|
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
|
||
|
let updatesBatch = processStates.slice(i, i + batchSize);
|
||
|
let updatedSql = pgp.helpers.update(updatesBatch, processStateCs) + ' WHERE v.signature = t.signature';
|
||
|
await client.query(updatedSql)
|
||
|
}
|
||
|
|
||
|
console.log('process states updated')
|
||
|
await client.query('COMMIT')
|
||
|
} catch (e) {
|
||
|
await client.query('ROLLBACK')
|
||
|
throw e
|
||
|
} finally {
|
||
|
client.release()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
async function processMangoTransactions(address, rawTransactionsPool, parsedTransactionsPool, schema, limit) {
|
||
|
const client = await rawTransactionsPool.connect();
|
||
|
let res;
|
||
|
try {
|
||
|
res = await client.query("select transaction, signature from " + schema + ".transactions where process_state = 'ready for parsing' and program_pk = $1 order by id asc limit $2", [address, limit])
|
||
|
} finally {
|
||
|
client.release()
|
||
|
}
|
||
|
|
||
|
let transactions = res.rows.map(e => [e.transaction, e.signature]);
|
||
|
let [processStates, parsedTransactions] = parseTransactions(transactions, mangoProgramId);
|
||
|
await insertMangoTransactions(rawTransactionsPool, parsedTransactionsPool, schema, processStates, parsedTransactions)
|
||
|
}
|
||
|
|
||
|
async function consumeTransactions() {
|
||
|
const clusterUrl = process.env.CLUSTER_URL || "https://api.mainnet-beta.solana.com";
|
||
|
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
|
||
|
const rawConnectionString = process.env.CONNECTION_STRING_RAW
|
||
|
const parsedConnectionString = process.env.CONNECTION_STRING_PARSED
|
||
|
|
||
|
let schema = 'transactions_v3';
|
||
|
|
||
|
console.log(clusterUrl);
|
||
|
|
||
|
let connection = new Connection(clusterUrl, 'finalized');
|
||
|
const rawTransactionsPool = new Pool(
|
||
|
{
|
||
|
connectionString: rawConnectionString,
|
||
|
ssl: {
|
||
|
rejectUnauthorized: false,
|
||
|
}
|
||
|
}
|
||
|
)
|
||
|
const parsedTransactionsPool = new Pool(
|
||
|
{
|
||
|
connectionString: parsedConnectionString,
|
||
|
ssl: {
|
||
|
rejectUnauthorized: false,
|
||
|
}
|
||
|
}
|
||
|
)
|
||
|
|
||
|
console.log('Initialized')
|
||
|
notify('v3: Initialized')
|
||
|
while (true) {
|
||
|
console.log('Refreshing transactions ' + Date())
|
||
|
|
||
|
await insertNewSignatures(mangoProgramId, connection, rawTransactionsPool, requestWaitTime, schema)
|
||
|
await populateTransactions(connection, mangoProgramId, rawTransactionsPool, requestWaitTime, schema);
|
||
|
|
||
|
let transactionsParsingLimit = 50000;
|
||
|
await processMangoTransactions(mangoProgramId, rawTransactionsPool, parsedTransactionsPool,schema, transactionsParsingLimit);
|
||
|
|
||
|
console.log('Refresh complete')
|
||
|
// Probably unnecessary but let's give the servers a break
|
||
|
await sleep(5*1000)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async function main() {
|
||
|
while (true) {
|
||
|
try {
|
||
|
await consumeTransactions()
|
||
|
}
|
||
|
catch(e: any) {
|
||
|
notify('v3: ' + e.toString())
|
||
|
console.log(e, e.stack)
|
||
|
// Wait for 10 mins
|
||
|
await sleep(10*60*1000)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
main()
|