mango-transaction-scraper/src/index.ts

621 lines
26 KiB
TypeScript

import { Connection, PublicKey, ConfirmedTransaction, Transaction, ConfirmedSignatureInfo } from '@solana/web3.js';
import { MangoInstructionLayout, sleep, IDS, MangoClient, awaitTransactionSignatureConfirmation} from '@blockworks-foundation/mango-client';
const schema_1 = require("@blockworks-foundation/mango-client/lib/schema.js");
import {createReverseIdsMap} from './maps';
import { insertNewSignatures } from './signatures';
import { Pool } from 'pg'
import { notify } from './utils';
const bs58 = require('bs58')
const pgp = require('pg-promise')({
capSQL: true
});
const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH'
var reverseIds;
function ParseLiquidationData(instruction, instructionNum, confirmedTransaction) {
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
let transactionAccounts = confirmedTransaction.transaction.message.accountKeys.map(e => e.pubkey.toBase58())
let instructionAccounts = instruction.accounts.map(e => e.toBase58())
let mangoGroup, liqor, liqorInTokenWallet, liqorOutTokenWallet, liqeeMarginAccount, inTokenVault, outTokenVault, signerKey;
[mangoGroup, liqor, liqorInTokenWallet, liqorOutTokenWallet, liqeeMarginAccount, inTokenVault, outTokenVault, signerKey] = instructionAccounts.slice(0, 8);
let innerInstructions = confirmedTransaction.meta.innerInstructions.find(e => e.index === instructionNum-1).instructions
let inTokenAmount
let outTokenAmount
let inTokenSymbol
let outTokenSymbol
let inTokenFound: boolean = false
let outTokenFound: boolean = false
for (let innerInstruction of innerInstructions) {
let info = innerInstruction.parsed.info
if (info.destination === inTokenVault) {
inTokenSymbol = reverseIds.vault_symbol[inTokenVault]
let decimals = reverseIds.mango_groups[mangoGroup].mint_decimals[inTokenSymbol]
inTokenAmount = parseInt(info.amount) / Math.pow(10, decimals)
inTokenFound = true
} else if (info.source === outTokenVault) {
outTokenSymbol = reverseIds.vault_symbol[outTokenVault]
let decimals = reverseIds.mango_groups[mangoGroup].mint_decimals[outTokenSymbol]
outTokenAmount = parseInt(info.amount) / Math.pow(10, decimals)
outTokenFound = true
}
}
if (!inTokenFound || !outTokenFound) { throw 'liquidation transfers not found'}
let symbols;
let startAssets;
let startLiabs;
let endAssets;
let endLiabs;
let socializedLoss;
let totalDeposits;
let prices;
let socializedLossPercentages: number[] = [];
let startAssetsVal = 0;
let startLiabsVal = 0;
for (let logMessage of confirmedTransaction.meta.logMessages) {
if (logMessage.startsWith('Program log: liquidation details: ')) {
let liquidationDetails = JSON.parse(logMessage.slice('Program log: liquidation details: '.length));
prices = liquidationDetails.prices;
startAssets = liquidationDetails.start.assets;
startLiabs = liquidationDetails.start.liabs;
endAssets = liquidationDetails.end.assets;
endLiabs = liquidationDetails.end.liabs;
socializedLoss = liquidationDetails.socialized_losses;
totalDeposits = liquidationDetails.total_deposits;
symbols = reverseIds.mango_groups[mangoGroup].symbols
// symbols = mangoGroupSymbolMap[mangoGroup]
let quoteDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbols[symbols.length - 1]]
for (let i = 0; i < startAssets.length; i++) {
let symbol = symbols[i]
let mintDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbol]
prices[i] = prices[i] * Math.pow(10, mintDecimals - quoteDecimals)
startAssets[i] = startAssets[i] / Math.pow(10, mintDecimals)
startLiabs[i] = startLiabs[i] / Math.pow(10, mintDecimals)
endAssets[i] = endAssets[i] / Math.pow(10, mintDecimals)
endLiabs[i] = endLiabs[i] / Math.pow(10, mintDecimals)
totalDeposits[i] = totalDeposits[i] / Math.pow(10, mintDecimals)
socializedLossPercentages.push(endLiabs[i] / totalDeposits[i])
startAssetsVal += startAssets[i] * prices[i];
startLiabsVal += startLiabs[i] * prices[i];
}
break;
}
}
// add epsilon to prevent exploding coll ratio
let collRatio = startAssetsVal / (startLiabsVal+0.0001);
let inTokenPrice = prices[symbols.indexOf(inTokenSymbol)];
let outTokenPrice = prices[symbols.indexOf(outTokenSymbol)];
let inTokenUsd = inTokenAmount * inTokenPrice;
let outTokenUsd = outTokenAmount * outTokenPrice;
let liquidationFeeUsd = outTokenUsd - inTokenUsd;
let liquidation = {
mango_group: mangoGroup,
liqor: liqor,
liqee: liqeeMarginAccount,
coll_ratio: collRatio,
in_token_symbol: inTokenSymbol,
in_token_amount: inTokenAmount,
in_token_price: inTokenPrice,
in_token_usd: inTokenUsd,
out_token_symbol: outTokenSymbol,
out_token_amount: outTokenAmount,
out_token_price: outTokenPrice,
out_token_usd: outTokenUsd,
liquidation_fee_usd: liquidationFeeUsd,
socialized_losses: socializedLoss,
block_datetime: blockDatetime
}
let liquidationHoldings: any = [];
for (let i= 0; i < startAssets.length; i++) {
if ((startAssets[i] > 0) || (startLiabs[i] > 0)) {
liquidationHoldings.push({
symbol: symbols[i],
start_assets: startAssets[i],
start_liabs: startLiabs[i],
end_assets: endAssets[i],
end_liabs: endLiabs[i],
price: prices[i]
})
}
}
let socializedLosses: any[] = [];
if (socializedLoss) {
for (let i= 0; i < totalDeposits.length; i++) {
if (endLiabs[i] > 0) {
socializedLosses.push({
symbol: symbols[i],
symbol_price: prices[i],
loss: endLiabs[i],
total_deposits: totalDeposits[i],
loss_percentage: socializedLossPercentages[i],
loss_usd: endLiabs[i] * prices[i],
total_deposits_usd: totalDeposits[i] * prices[i]
})
}
}
}
return [liquidation, liquidationHoldings, socializedLosses]
}
function parseOracleData(confirmedTransaction) {
let instructions = confirmedTransaction.transaction.message.instructions
if (instructions.length > 1) {throw 'Unexpected oracle instruction'}
let oracleInstruction = instructions[0]
let oraclePk = oracleInstruction.accounts[1].toBase58()
let slot = confirmedTransaction.slot;
let instructionData = schema_1.Instruction.deserialize(bs58.decode(oracleInstruction.data));
let roundId = instructionData.Submit.round_id.toNumber();
let submitValue = instructionData.Submit.value.toNumber()
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
return {slot: slot, oracle_pk: oraclePk, round_id: roundId, submit_value: submitValue, block_datetime: blockDatetime}
}
function parseDepositWithdrawData(instruction, confirmedTransaction) {
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
let decodedInstruction = MangoInstructionLayout.decode(bs58.decode(instruction.data));
let instructionName = Object.keys(decodedInstruction)[0];
let mangoGroup = instruction.accounts[0].toBase58()
let marginAccount = instruction.accounts[1].toBase58()
let owner = instruction.accounts[2].toBase58()
let vault = instruction.accounts[4].toBase58()
let symbol = reverseIds.vault_symbol[vault]
let mintDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbol]
let quantity = decodedInstruction[instructionName].quantity.toNumber() / Math.pow(10, mintDecimals)
let oraclePk = reverseIds.mango_groups[mangoGroup].oracles[symbol]
return {mango_group: mangoGroup, owner: owner, quantity: quantity, symbol: symbol,
side: instructionName, margin_account: marginAccount, oracle_pk: oraclePk, block_datetime: blockDatetime}
}
function parseMangoTransactions(transactions) {
let processStates: any[] = [];
let transactionSummaries: any[] = [];
let depositWithdrawInserts: any[] = [];
let liquidationInserts: any[] = [];
let liquidationHoldingsInserts: any[] = [];
let socializedLossInserts: any[] = [];
for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction;
try {
let transactionSummary = parseTransactionSummary(confirmedTransaction)
transactionSummary['signature'] = signature
transactionSummaries.push(transactionSummary)
if (confirmedTransaction.meta.err !== null) {
processStates.push({signature: signature, process_state: 'transaction error'});
} else {
let slot = confirmedTransaction.slot;
let instructions = confirmedTransaction.transaction.message.instructions;
// Can have multiple inserts per signature so add instructionNum column to allow a primary key
let instructionNum = 1;
for (let instruction of instructions) {
if (instruction.programId == mangoProgramId) {
let decodeData = bs58.decode(instruction.data);
let decodedInstruction = MangoInstructionLayout.decode(decodeData);
let instructionName = Object.keys(decodedInstruction)[0];
if ((instructionName === 'Deposit') || (instructionName === 'Withdraw')) {
// Luckily Deposit and Withdraw have the same layout
let depositWithdrawData = parseDepositWithdrawData(instruction, confirmedTransaction)
depositWithdrawData['signature'] = signature
depositWithdrawData['slot'] = slot
depositWithdrawData['instruction_num'] = instructionNum
depositWithdrawInserts.push(depositWithdrawData);
} else if (instructionName === 'PartialLiquidate') {
if (confirmedTransaction.meta.logMessages.includes('Program log: Account above init_coll_ratio after settling borrows')) {
// If the coll ratio can after settling borrows then there won't be any liquidation details
// TODO: is there a better way to identify the above scenario?
// pass
} else {
let [liquidation, liquidationHoldings, socializedLosses] = ParseLiquidationData(instruction, instructionNum, confirmedTransaction)
liquidation['signature'] = signature
liquidationInserts.push(liquidation)
for (let liquidationHolding of liquidationHoldings) {
liquidationHolding['signature'] = signature
liquidationHoldingsInserts.push(liquidationHolding)
}
for (let socializedLoss of socializedLosses) {
socializedLoss['signature'] = signature
socializedLossInserts.push(socializedLoss)
}
}
}
}
instructionNum++;
}
processStates.push({signature: signature, process_state: 'processed'});
}
} catch(e: any) {
console.log(e.stack)
processStates.push({signature: signature, process_state: 'processing error'});
}
}
return [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts]
}
function parseOracleTransactions(transactions) {
let processStates: any[] = [];
let transactionSummaries: any[] = [];
let oracleTransactions: any[] = [];
for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction;
try {
let transactionSummary = parseTransactionSummary(confirmedTransaction)
transactionSummary['signature'] = signature
transactionSummaries.push(transactionSummary)
if (confirmedTransaction.meta.err !== null) {
processStates.push({signature: signature, process_state: 'transaction error'});
} else {
let oracleTransaction = parseOracleData(confirmedTransaction)
oracleTransaction['signature'] = signature
oracleTransactions.push(oracleTransaction)
processStates.push({signature: signature, process_state: 'processed'});
}
} catch(e) {
processStates.push({signature: signature, process_state: 'processing error'});
}
}
return [processStates, transactionSummaries, oracleTransactions]
}
async function getUnprocessedSignatures(pool, account, limit) {
const client = await pool.connect();
let signatures;
try {
const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc limit $2", [account, limit])
signatures = res.rows.map(e => e['signature'])
} finally {
client.release()
}
return signatures;
}
function parseTransactionSummary(confirmedTransaction) {
let maxCompute = 0;
for (let logMessage of confirmedTransaction.meta!.logMessages!) {
if (logMessage.endsWith('compute units')) {
let re = new RegExp(/(\d+)\sof/);
let matches = re.exec(logMessage);
if (matches) {
let compute = parseInt(matches[1]);
if (compute > maxCompute) {
maxCompute = compute;
}
}
}
}
let logMessages = confirmedTransaction.meta!.logMessages!.join('\n');
return {log_messages: logMessages, compute: maxCompute}
}
async function insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts) {
const processStateCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: 'all_transactions'});
const transactionSummaryCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'compute'], {table: 'all_transactions'});
const depositWithdrawCs = new pgp.helpers.ColumnSet(
['signature', 'mango_group', 'instruction_num', 'slot', 'owner', 'side', 'quantity',
'symbol', 'margin_account', 'oracle_pk', 'block_datetime'],
{table: 'deposit_withdraw'});
const liquidationsCs = new pgp.helpers.ColumnSet(
['signature', 'mango_group', 'liqor', 'liqee', 'coll_ratio', 'in_token_symbol', 'in_token_amount', 'in_token_price',
'in_token_usd', 'out_token_symbol', 'out_token_amount', 'out_token_price', 'out_token_usd', 'liquidation_fee_usd',
'socialized_losses', 'block_datetime'],
{table: 'liquidations'});
const liquidationHoldingsCs = new pgp.helpers.ColumnSet(
['signature', 'symbol', 'start_assets', 'start_liabs', 'end_assets', 'end_liabs', 'price'],
{table: 'liquidation_holdings'});
const socializedLossesCs = new pgp.helpers.ColumnSet(
['signature', 'symbol', 'symbol_price', 'loss', 'total_deposits', 'loss_percentage', 'loss_usd', 'total_deposits_usd'],
{table: 'socialized_losses'});
const depositWithdrawPricesUSD = "update deposit_withdraw set symbol_price = 1, usd_equivalent = quantity where symbol_price is null and symbol in ('USDT', 'USDC')"
const depositWithdrawPricesNonUSD = `
UPDATE deposit_withdraw t1
SET
symbol_price = t2.symbol_price,
usd_equivalent = t2.usd_equivalent,
oracle_slot = t2.slot
from
(
select t1.signature,
t1.oracle_pk,
t2.slot,
t2.submit_value / power(10, t3.decimals) as symbol_price,
t2.submit_value / power(10, t3.decimals) * t1.quantity as usd_equivalent
from
(
select
dw.signature,
dw.oracle_pk,
dw.slot,
dw.quantity,
max(ot.slot) as max_slot
from deposit_withdraw dw
left join oracle_transactions ot
on dw.oracle_pk = ot.oracle_pk
and dw.slot >= ot.slot
where dw.symbol_price is null
group by
dw.signature,
dw.oracle_pk,
dw.slot,
dw.quantity
) t1
left join oracle_transactions t2
on t2.oracle_pk = t1.oracle_pk
and t2.slot = t1.max_slot
left join oracle_meta t3 on
t3.oracle_pk = t1.oracle_pk
) t2
where
t2.signature = t1.signature
and t2.oracle_pk = t1.oracle_pk
and t1.symbol_price is null
`
let batchSize = 1000;
let client = await pool.connect()
try {
await client.query('BEGIN')
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, processStateCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
}
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, transactionSummaryCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
}
for (let i = 0, j = depositWithdrawInserts.length; i < j; i += batchSize) {
let insertsBatch = depositWithdrawInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, depositWithdrawCs);
await client.query(insertsSql)
}
console.log('deposits and withdraws inserted')
// Small performance increase is USD query executed first (less rows with symbol price = null)
await client.query(depositWithdrawPricesUSD);
await client.query(depositWithdrawPricesNonUSD);
console.log('deposits and withdraw prices updated')
for (let i = 0, j = liquidationInserts.length; i < j; i += batchSize) {
let insertsBatch = liquidationInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, liquidationsCs);
await client.query(insertsSql)
}
for (let i = 0, j = liquidationHoldingsInserts.length; i < j; i += batchSize) {
let insertsBatch = liquidationHoldingsInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, liquidationHoldingsCs);
await client.query(insertsSql)
}
for (let i = 0, j = socializedLossInserts.length; i < j; i += batchSize) {
let insertsBatch = socializedLossInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, socializedLossesCs);
await client.query(insertsSql)
}
console.log('liquidations inserted')
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
}
async function insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions) {
// Oracle transactions are quite frequent - so update in batches here for performance
const processStartCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: 'all_transactions'});
const transactionSummaryCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'compute'], {table: 'all_transactions'});
const oracleCs = new pgp.helpers.ColumnSet(['signature', 'slot', 'oracle_pk', 'round_id', 'submit_value', 'block_datetime'], {table: 'oracle_transactions'});
let batchSize = 1000;
let client = await pool.connect()
try {
await client.query('BEGIN')
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, processStartCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
}
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, transactionSummaryCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
}
for (let i = 0, j = oracleTransactions.length; i < j; i += batchSize) {
let insertsBatch = oracleTransactions.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, oracleCs);
await client.query(insertsSql)
}
console.log('oracle prices inserted')
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
}
async function getNewAddressTransactions(connection, address, requestWaitTime, pool, limit) {
let signaturesToProcess = (await getUnprocessedSignatures(pool, address, limit))
let promises: Promise<void>[] = [];
let transactions: any[] = [];
let counter = 1;
for (let signature of signaturesToProcess) {
// let promise = connection.getConfirmedTransaction(signature).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
let promise = connection.getParsedConfirmedTransaction(signature).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
console.log('requested ', counter, ' of ', signaturesToProcess.length);
counter++;
promises.push(promise);
// Limit request frequency to avoid request failures due to rate limiting
await sleep(requestWaitTime);
}
await (Promise as any).allSettled(promises);
return transactions
}
async function processOracleTransactions(connection, address, pool, requestWaitTime, limit) {
while (true) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit)
let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions)
await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions)
if (transactions.length < limit) {
break
}
}
}
async function processMangoTransactions(connection, address, pool, requestWaitTime, limit) {
while (true) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool, limit)
let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions)
await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts)
if (transactions.length < limit) {
break
}
}
}
async function consumeTransactions() {
const cluster = process.env.CLUSTER || 'mainnet-beta';
const clusterUrl = process.env.CLUSTER_URL || "https://api.mainnet-beta.solana.com";
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
const connectionString = process.env.TRANSACTIONS_CONNECTION_STRING
const oracleProgramId = process.env.ORACLE_PROGRAM_ID || 'FjJ5mhdRWeuaaremiHjeQextaAw1sKWDqr3D7pXjgztv';
let connection = new Connection(clusterUrl, 'finalized');
const pool = new Pool(
{
connectionString: connectionString,
ssl: {
rejectUnauthorized: false,
}
}
)
const oracleProgramPk = new PublicKey(oracleProgramId);
const mangoProgramPk = new PublicKey(mangoProgramId);
reverseIds = await createReverseIdsMap(cluster, new MangoClient(), connection);
console.log('Initialized')
notify('Initialized')
while (true) {
console.log('Refreshing transactions')
// Order of inserting transactions important - inserting deposit_withdraw relies on having all oracle prices available
// So get new signatures of oracle transactions after mango transactions and insert oracle transactions first
let limit = 10000;
await insertNewSignatures(mangoProgramPk, connection, pool, requestWaitTime);
await insertNewSignatures(oracleProgramPk, connection, pool, requestWaitTime);
await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime, limit);
await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime, limit);
console.log('Refresh complete')
// Unnecessary but let's give the servers a break
await sleep(1000)
}
}
async function main() {
while (true) {
try {
await consumeTransactions()
}
catch(e: any) {
notify(e.toString())
console.log(e, e.stack)
// Wait for 10 mins
await sleep(10*60*1000)
}
}
}
main()