Initial commit

This commit is contained in:
Nicholas Clarke 2021-07-18 17:25:03 -07:00
commit 77726a6f38
11 changed files with 1358 additions and 0 deletions

.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@

Procfile Normal file
View File

@ -0,0 +1 @@
web: npm start

7 Normal file
View File

@ -0,0 +1,7 @@
# Mango transaction scraper
### Run
yarn install
yarn start

package.json Normal file
View File

@ -0,0 +1,33 @@
"name": "mango-transaction-scraper",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"start": "ts-node src/index.ts",
"update-pnl-cache": "ts-node src/updatePnlCache/index.ts",
"test": "echo \"Error: no test specified\" && exit 1"
"repository": "",
"author": "Nicholas Clarke",
"license": "MIT",
"homepage": "",
"dependencies": {
"@blockworks-foundation/mango-client": "^2.1.3",
"@solana/web3.js": "^1.2.7",
"axios": "^0.21.1",
"bs58": "^4.0.1",
"package.json": "^2.0.1",
"pg": "^8.6.0",
"pg-format": "^1.0.4",
"pg-promise": "^10.10.2",
"node-cron": "^3.0.0"
"devDependencies": {
"ts-node": "^10.0.0",
"typescript": "^4.3.2"
"engines": {
"node": "14.x"

src/index.ts Normal file
View File

@ -0,0 +1,606 @@
import { Connection, PublicKey, ConfirmedTransaction, Transaction, ConfirmedSignatureInfo } from '@solana/web3.js';
import { MangoInstructionLayout, sleep, IDS, MangoClient, awaitTransactionSignatureConfirmation} from '@blockworks-foundation/mango-client';
const schema_1 = require("@blockworks-foundation/mango-client/lib/schema.js");
import {createReverseIdsMap} from './maps';
import { insertNewSignatures } from './signatures';
import { Pool } from 'pg'
import { notify } from './utils';
const bs58 = require('bs58')
const pgp = require('pg-promise')({
capSQL: true
const mangoProgramId = process.env.MANGO_PROGRAM_ID || '5fNfvyp5czQVX77yoACa3JJVEhdRaWjPuazuWgjhTqEH'
var reverseIds;
function ParseLiquidationData(instruction, instructionNum, confirmedTransaction) {
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
let transactionAccounts = => e.pubkey.toBase58())
let instructionAccounts = => e.toBase58())
let mangoGroup, liqor, liqorInTokenWallet, liqorOutTokenWallet, liqeeMarginAccount, inTokenVault, outTokenVault, signerKey;
[mangoGroup, liqor, liqorInTokenWallet, liqorOutTokenWallet, liqeeMarginAccount, inTokenVault, outTokenVault, signerKey] = instructionAccounts.slice(0, 8);
let innerInstructions = confirmedTransaction.meta.innerInstructions.find(e => e.index === instructionNum-1).instructions
let inTokenAmount
let outTokenAmount
let inTokenSymbol
let outTokenSymbol
let inTokenFound: boolean = false
let outTokenFound: boolean = false
for (let innerInstruction of innerInstructions) {
let info =
if (info.destination === inTokenVault) {
inTokenSymbol = reverseIds.vault_symbol[inTokenVault]
let decimals = reverseIds.mango_groups[mangoGroup].mint_decimals[inTokenSymbol]
inTokenAmount = parseInt(info.amount) / Math.pow(10, decimals)
inTokenFound = true
} else if (info.source === outTokenVault) {
outTokenSymbol = reverseIds.vault_symbol[outTokenVault]
let decimals = reverseIds.mango_groups[mangoGroup].mint_decimals[outTokenSymbol]
outTokenAmount = parseInt(info.amount) / Math.pow(10, decimals)
outTokenFound = true
if (!inTokenFound || !outTokenFound) { throw 'liquidation transfers not found'}
let symbols;
let startAssets;
let startLiabs;
let endAssets;
let endLiabs;
let socializedLoss;
let totalDeposits;
let prices;
let socializedLossPercentages: number[] = [];
let startAssetsVal = 0;
let startLiabsVal = 0;
for (let logMessage of confirmedTransaction.meta.logMessages) {
if (logMessage.startsWith('Program log: liquidation details: ')) {
let liquidationDetails = JSON.parse(logMessage.slice('Program log: liquidation details: '.length));
prices = liquidationDetails.prices;
startAssets = liquidationDetails.start.assets;
startLiabs = liquidationDetails.start.liabs;
endAssets = liquidationDetails.end.assets;
endLiabs = liquidationDetails.end.liabs;
socializedLoss = liquidationDetails.socialized_losses;
totalDeposits = liquidationDetails.total_deposits;
symbols = reverseIds.mango_groups[mangoGroup].symbols
// symbols = mangoGroupSymbolMap[mangoGroup]
let quoteDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbols[symbols.length - 1]]
for (let i = 0; i < startAssets.length; i++) {
let symbol = symbols[i]
let mintDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbol]
prices[i] = prices[i] * Math.pow(10, mintDecimals - quoteDecimals)
startAssets[i] = startAssets[i] / Math.pow(10, mintDecimals)
startLiabs[i] = startLiabs[i] / Math.pow(10, mintDecimals)
endAssets[i] = endAssets[i] / Math.pow(10, mintDecimals)
endLiabs[i] = endLiabs[i] / Math.pow(10, mintDecimals)
totalDeposits[i] = totalDeposits[i] / Math.pow(10, mintDecimals)
socializedLossPercentages.push(endLiabs[i] / totalDeposits[i])
startAssetsVal += startAssets[i] * prices[i];
startLiabsVal += startLiabs[i] * prices[i];
let collRatio = startAssetsVal / startLiabsVal;
let inTokenPrice = prices[symbols.indexOf(inTokenSymbol)];
let outTokenPrice = prices[symbols.indexOf(outTokenSymbol)];
let inTokenUsd = inTokenAmount * inTokenPrice;
let outTokenUsd = outTokenAmount * outTokenPrice;
let liquidationFeeUsd = outTokenUsd - inTokenUsd;
let liquidation = {
mango_group: mangoGroup,
liqor: liqor,
liqee: liqeeMarginAccount,
coll_ratio: collRatio,
in_token_symbol: inTokenSymbol,
in_token_amount: inTokenAmount,
in_token_price: inTokenPrice,
in_token_usd: inTokenUsd,
out_token_symbol: outTokenSymbol,
out_token_amount: outTokenAmount,
out_token_price: outTokenPrice,
out_token_usd: outTokenUsd,
liquidation_fee_usd: liquidationFeeUsd,
socialized_losses: socializedLoss,
block_datetime: blockDatetime
let liquidationHoldings: any = [];
for (let i= 0; i < startAssets.length; i++) {
if ((startAssets[i] > 0) || (startLiabs[i] > 0)) {
symbol: symbols[i],
start_assets: startAssets[i],
start_liabs: startLiabs[i],
end_assets: endAssets[i],
end_liabs: endLiabs[i],
price: prices[i]
let socializedLosses: any[] = [];
if (socializedLoss) {
for (let i= 0; i < totalDeposits.length; i++) {
if (endLiabs[i] > 0) {
symbol: symbols[i],
symbol_price: prices[i],
loss: endLiabs[i],
total_deposits: totalDeposits[i],
loss_percentage: socializedLossPercentages[i],
loss_usd: endLiabs[i] * prices[i],
total_deposits_usd: totalDeposits[i] * prices[i]
return [liquidation, liquidationHoldings, socializedLosses]
function parseOracleData(confirmedTransaction) {
let instructions = confirmedTransaction.transaction.message.instructions
if (instructions.length > 1) {throw 'Unexpected oracle instruction'}
let oracleInstruction = instructions[0]
let oraclePk = oracleInstruction.accounts[1].toBase58()
let slot = confirmedTransaction.slot;
let instructionData = schema_1.Instruction.deserialize(bs58.decode(;
let roundId = instructionData.Submit.round_id.toNumber();
let submitValue = instructionData.Submit.value.toNumber()
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
return {slot: slot, oracle_pk: oraclePk, round_id: roundId, submit_value: submitValue, block_datetime: blockDatetime}
function parseDepositWithdrawData(instruction, confirmedTransaction) {
let blockDatetime = (new Date(confirmedTransaction.blockTime * 1000)).toISOString()
let decodedInstruction = MangoInstructionLayout.decode(bs58.decode(;
let instructionName = Object.keys(decodedInstruction)[0];
let mangoGroup = instruction.accounts[0].toBase58()
let marginAccount = instruction.accounts[1].toBase58()
let owner = instruction.accounts[2].toBase58()
let vault = instruction.accounts[4].toBase58()
let symbol = reverseIds.vault_symbol[vault]
let mintDecimals = reverseIds.mango_groups[mangoGroup].mint_decimals[symbol]
let quantity = decodedInstruction[instructionName].quantity.toNumber() / Math.pow(10, mintDecimals)
let oraclePk = reverseIds.mango_groups[mangoGroup].oracles[symbol]
return {mango_group: mangoGroup, owner: owner, quantity: quantity, symbol: symbol,
side: instructionName, margin_account: marginAccount, oracle_pk: oraclePk, block_datetime: blockDatetime}
function parseMangoTransactions(transactions) {
let processStates: any[] = [];
let transactionSummaries: any[] = [];
let depositWithdrawInserts: any[] = [];
let liquidationInserts: any[] = [];
let liquidationHoldingsInserts: any[] = [];
let socializedLossInserts: any[] = [];
for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction;
try {
let transactionSummary = parseTransactionSummary(confirmedTransaction)
transactionSummary['signature'] = signature
if (confirmedTransaction.meta.err !== null) {
processStates.push({signature: signature, process_state: 'transaction error'});
} else {
let slot = confirmedTransaction.slot;
let instructions = confirmedTransaction.transaction.message.instructions;
// Can have multiple inserts per signature so add instructionNum column to allow a primary key
let instructionNum = 1;
for (let instruction of instructions) {
if (instruction.programId == mangoProgramId) {
let decodeData = bs58.decode(;
let decodedInstruction = MangoInstructionLayout.decode(decodeData);
let instructionName = Object.keys(decodedInstruction)[0];
if ((instructionName === 'Deposit') || (instructionName === 'Withdraw')) {
// Luckily Deposit and Withdraw have the same layout
let depositWithdrawData = parseDepositWithdrawData(instruction, confirmedTransaction)
depositWithdrawData['signature'] = signature
depositWithdrawData['slot'] = slot
depositWithdrawData['instruction_num'] = instructionNum
} else if (instructionName === 'PartialLiquidate') {
if (confirmedTransaction.meta.logMessages.includes('Program log: Account above init_coll_ratio after settling borrows')) {
// If the coll ratio can after settling borrows then there won't be any liquidation details
// TODO: is there a better way to identify the above scenario?
// pass
} else {
let [liquidation, liquidationHoldings, socializedLosses] = ParseLiquidationData(instruction, instructionNum, confirmedTransaction)
liquidation['signature'] = signature
for (let liquidationHolding of liquidationHoldings) {
liquidationHolding['signature'] = signature
for (let socializedLoss of socializedLosses) {
socializedLoss['signature'] = signature
processStates.push({signature: signature, process_state: 'processed'});
} catch(e) {
processStates.push({signature: signature, process_state: 'processing error'});
return [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts]
function parseOracleTransactions(transactions) {
let processStates: any[] = [];
let transactionSummaries: any[] = [];
let oracleTransactions: any[] = [];
for (let transaction of transactions) {
let [signature, confirmedTransaction] = transaction;
try {
let transactionSummary = parseTransactionSummary(confirmedTransaction)
transactionSummary['signature'] = signature
if (confirmedTransaction.meta.err !== null) {
processStates.push({signature: signature, process_state: 'transaction error'});
} else {
let oracleTransaction = parseOracleData(confirmedTransaction)
oracleTransaction['signature'] = signature
processStates.push({signature: signature, process_state: 'processed'});
} catch(e) {
processStates.push({signature: signature, process_state: 'processing error'});
return [processStates, transactionSummaries, oracleTransactions]
async function getUnprocessedSignatures(pool, account) {
const client = await pool.connect();
let signatures;
try {
const res = await client.query("select signature from all_transactions where process_state = 'unprocessed' and account = $1 order by id asc", [account])
signatures = => e['signature'])
} finally {
return signatures;
function parseTransactionSummary(confirmedTransaction) {
let maxCompute = 0;
for (let logMessage of confirmedTransaction.meta!.logMessages!) {
if (logMessage.endsWith('compute units')) {
let re = new RegExp(/(\d+)\sof/);
let matches = re.exec(logMessage);
if (matches) {
let compute = parseInt(matches[1]);
if (compute > maxCompute) {
maxCompute = compute;
let logMessages = confirmedTransaction.meta!.logMessages!.join('\n');
return {log_messages: logMessages, compute: maxCompute}
async function insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts) {
const processStateCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: 'all_transactions'});
const transactionSummaryCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'compute'], {table: 'all_transactions'});
const depositWithdrawCs = new pgp.helpers.ColumnSet(
['signature', 'mango_group', 'instruction_num', 'slot', 'owner', 'side', 'quantity',
'symbol', 'margin_account', 'oracle_pk', 'block_datetime'],
{table: 'deposit_withdraw'});
const liquidationsCs = new pgp.helpers.ColumnSet(
['signature', 'mango_group', 'liqor', 'liqee', 'coll_ratio', 'in_token_symbol', 'in_token_amount', 'in_token_price',
'in_token_usd', 'out_token_symbol', 'out_token_amount', 'out_token_price', 'out_token_usd', 'liquidation_fee_usd',
'socialized_losses', 'block_datetime'],
{table: 'liquidations'});
const liquidationHoldingsCs = new pgp.helpers.ColumnSet(
['signature', 'symbol', 'start_assets', 'start_liabs', 'end_assets', 'end_liabs', 'price'],
{table: 'liquidation_holdings'});
const socializedLossesCs = new pgp.helpers.ColumnSet(
['signature', 'symbol', 'symbol_price', 'loss', 'total_deposits', 'loss_percentage', 'loss_usd', 'total_deposits_usd'],
{table: 'socialized_losses'});
const depositWithdrawPricesUSD = "update deposit_withdraw set symbol_price = 1, usd_equivalent = quantity where symbol_price is null and symbol in ('USDT', 'USDC')"
const depositWithdrawPricesNonUSD = `
UPDATE deposit_withdraw t1
symbol_price = t2.symbol_price,
usd_equivalent = t2.usd_equivalent,
oracle_slot = t2.slot
select t1.signature,
t2.submit_value / power(10, t3.decimals) as symbol_price,
t2.submit_value / power(10, t3.decimals) * t1.quantity as usd_equivalent
max(ot.slot) as max_slot
from deposit_withdraw dw
left join oracle_transactions ot
on dw.oracle_pk = ot.oracle_pk
and dw.slot >= ot.slot
where dw.symbol_price is null
group by
) t1
left join oracle_transactions t2
on t2.oracle_pk = t1.oracle_pk
and t2.slot = t1.max_slot
left join oracle_meta t3 on
t3.oracle_pk = t1.oracle_pk
) t2
t2.signature = t1.signature
and t2.oracle_pk = t1.oracle_pk
and t1.symbol_price is null
let batchSize = 1000;
let client = await pool.connect()
try {
await client.query('BEGIN')
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, processStateCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, transactionSummaryCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
for (let i = 0, j = depositWithdrawInserts.length; i < j; i += batchSize) {
let insertsBatch = depositWithdrawInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, depositWithdrawCs);
await client.query(insertsSql)
console.log('deposits and withdraws inserted')
// Small performance increase is USD query executed first (less rows with symbol price = null)
await client.query(depositWithdrawPricesUSD);
await client.query(depositWithdrawPricesNonUSD);
console.log('deposits and withdraw prices updated')
for (let i = 0, j = liquidationInserts.length; i < j; i += batchSize) {
let insertsBatch = liquidationInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, liquidationsCs);
await client.query(insertsSql)
for (let i = 0, j = liquidationHoldingsInserts.length; i < j; i += batchSize) {
let insertsBatch = liquidationHoldingsInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, liquidationHoldingsCs);
await client.query(insertsSql)
for (let i = 0, j = socializedLossInserts.length; i < j; i += batchSize) {
let insertsBatch = socializedLossInserts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, socializedLossesCs);
await client.query(insertsSql)
console.log('liquidations inserted')
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
async function insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions) {
// Oracle transactions are quite frequent - so update in batches here for performance
const processStartCs = new pgp.helpers.ColumnSet(['?signature', 'process_state'], {table: 'all_transactions'});
const transactionSummaryCs = new pgp.helpers.ColumnSet(['?signature', 'log_messages', 'compute'], {table: 'all_transactions'});
const oracleCs = new pgp.helpers.ColumnSet(['signature', 'slot', 'oracle_pk', 'round_id', 'submit_value', 'block_datetime'], {table: 'oracle_transactions'});
let batchSize = 1000;
let client = await pool.connect()
try {
await client.query('BEGIN')
for (let i = 0, j = processStates.length; i < j; i += batchSize) {
let updatesBatch = processStates.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, processStartCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
for (let i = 0, j = transactionSummaries.length; i < j; i += batchSize) {
let updatesBatch = transactionSummaries.slice(i, i + batchSize);
let updatedSql = pgp.helpers.update(updatesBatch, transactionSummaryCs) + ' WHERE v.signature = t.signature';
await client.query(updatedSql)
for (let i = 0, j = oracleTransactions.length; i < j; i += batchSize) {
let insertsBatch = oracleTransactions.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, oracleCs);
await client.query(insertsSql)
console.log('oracle prices inserted')
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
async function getNewAddressTransactions(connection, address, requestWaitTime, pool) {
let signaturesToProcess = (await getUnprocessedSignatures(pool, address))
let promises: Promise<void>[] = [];
let transactions: any[] = [];
let counter = 1;
for (let signature of signaturesToProcess) {
// let promise = connection.getConfirmedTransaction(signature).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
let promise = connection.getParsedConfirmedTransaction(signature).then(confirmedTransaction => transactions.push([signature, confirmedTransaction]));
console.log('requested ', counter, ' of ', signaturesToProcess.length);
// Limit request frequency to avoid request failures due to rate limiting
await sleep(requestWaitTime);
await (Promise as any).allSettled(promises);
return transactions
async function processOracleTransactions(connection, address, pool, requestWaitTime) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool)
let [processStates, transactionSummaries, oracleTransactions] = parseOracleTransactions(transactions)
await insertOracleTransactions(pool, processStates, transactionSummaries, oracleTransactions)
async function processMangoTransactions(connection, address, pool, requestWaitTime) {
let transactions = await getNewAddressTransactions(connection, address, requestWaitTime, pool)
let [processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts] = parseMangoTransactions(transactions)
await insertMangoTransactions(pool, processStates, transactionSummaries, depositWithdrawInserts, liquidationInserts, liquidationHoldingsInserts, socializedLossInserts)
async function consumeTransactions() {
const cluster = process.env.CLUSTER || 'mainnet-beta';
const clusterUrl = process.env.CLUSTER_URL || "";
let requestWaitTime = parseInt(process.env.REQUEST_WAIT_TIME!) || 500;
const connectionString = process.env.TRANSACTIONS_CONNECTION_STRING
const oracleProgramId = process.env.ORACLE_PROGRAM_ID || 'FjJ5mhdRWeuaaremiHjeQextaAw1sKWDqr3D7pXjgztv';
let connection = new Connection(clusterUrl, 'finalized');
const pool = new Pool(
connectionString: connectionString,
ssl: {
rejectUnauthorized: false,
const oracleProgramPk = new PublicKey(oracleProgramId);
const mangoProgramPk = new PublicKey(mangoProgramId);
reverseIds = await createReverseIdsMap(cluster, new MangoClient(), connection);
while (true) {
console.log('Refreshing transactions')
// Order of inserting transactions important - inserting deposit_withdraw relies on having all oracle prices available
// So get new signatures of oracle transactions after mango transactions and insert oracle transactions first
await insertNewSignatures(mangoProgramPk, connection, pool, requestWaitTime);
await insertNewSignatures(oracleProgramPk, connection, pool, requestWaitTime);
await processOracleTransactions(connection, oracleProgramId, pool, requestWaitTime);
await processMangoTransactions(connection, mangoProgramId, pool, requestWaitTime);
console.log('Refresh complete')
// Unnecessary but let's give the servers a break
await sleep(20*1000)
async function main() {
while (true) {
try {
await consumeTransactions()
catch(e) {
console.log(e, e.stack)
// Wait for 10 mins
await sleep(10*60*1000)

src/maps.ts Normal file
View File

@ -0,0 +1,92 @@
import { IDS} from '@blockworks-foundation/mango-client';
import { PublicKey} from '@solana/web3.js';
export async function createReverseIdsMap(cluster, client, connection) {
let reverseIds = {}
let ids = IDS;
// vault - symbol map
let vaultSymbolMap = {};
for (let mangoGroupName in ids[cluster].mango_groups) {
let mangoGroupObj = ids[cluster].mango_groups[mangoGroupName]
for (let symbol in mangoGroupObj.symbols) {
let mintPk = mangoGroupObj.symbols[symbol];
let mintIndex = mangoGroupObj.mint_pks.indexOf(mintPk);
let vaultPk = mangoGroupObj.vault_pks[mintIndex];
vaultSymbolMap[vaultPk] = symbol;
reverseIds['vault_symbol'] = vaultSymbolMap;
// oracles - symbol map
let oracleSymbolMap = {};
for (let mangoGroupName in ids[cluster].mango_groups) {
let mangoGroupObj = ids[cluster].mango_groups[mangoGroupName]
for (let symbol in mangoGroupObj.symbols) {
let mintPk = mangoGroupObj.symbols[symbol];
let mintIndex = mangoGroupObj.mint_pks.indexOf(mintPk);
// There are one less oracle than the number of tokens in the mango group
if (mintIndex < mangoGroupObj.mint_pks.length - 1) {
let oraclePk = mangoGroupObj.oracle_pks[mintIndex];
oracleSymbolMap[oraclePk] = symbol;
reverseIds['oracle_symbol'] = oracleSymbolMap;
// mangoGroup - symbols-array map
let mangoGroupMap = {};
for (let mangoGroupName in ids[cluster].mango_groups) {
let mangoGroupObj = ids[cluster].mango_groups[mangoGroupName]
let mangoGroupPk = mangoGroupObj["mango_group_pk"]
mangoGroupMap[mangoGroupPk] = {};
let symbols: string[] = []
for (let mintPk of mangoGroupObj.mint_pks) {
for (let symbol of Object.keys(mangoGroupObj.symbols)) {
if (mangoGroupObj.symbols[symbol] === mintPk) {
let oracles = {}
for (let i = 0; i < mangoGroupObj.oracle_pks.length; i++) {
oracles[symbols[i]] = mangoGroupObj.oracle_pks[i]
mangoGroupMap[mangoGroupPk]['symbols'] = symbols
mangoGroupMap[mangoGroupPk]['oracles'] = oracles
reverseIds['mango_groups'] = mangoGroupMap;
let oracleDecimalsMap = {};
for (let mangoGroupName in ids[cluster].mango_groups) {
let mangoGroupObj = ids[cluster].mango_groups[mangoGroupName]
let mangoGroupPk = mangoGroupObj.mango_group_pk;
let mangoGroup = await client.getMangoGroup(connection, new PublicKey(mangoGroupPk));
for (let i = 0; i < mangoGroup.oracles.length; i++) {
oracleDecimalsMap[mangoGroup.oracles[i].toBase58()] = mangoGroup.oracleDecimals[i]
let symbols = reverseIds['mango_groups'][mangoGroupPk].symbols;
let map = {}
for (let i = 0; i < symbols.length; i++) {
map[symbols[i]] = mangoGroup.mintDecimals[i]
reverseIds['mango_groups'][mangoGroupPk]['mint_decimals'] = map;
reverseIds['oracle_decimals'] = oracleDecimalsMap;
return reverseIds

src/signatures.ts Normal file
View File

@ -0,0 +1,132 @@
import { Connection, PublicKey, ConfirmedSignatureInfo } from '@solana/web3.js';
import { sleep} from '@blockworks-foundation/mango-client';
import { bulkBatchInsert } from './utils';
export async function getNewSignatures(afterSignature: string, connection: Connection, address: PublicKey, requestWaitTime) {
// Fetches all signatures associated with the account - working backwards in time until it encounters the "afterSignature" signature
let signatures;
const limit = 1000;
let before = null;
let options;
let allSignaturesInfo: ConfirmedSignatureInfo[] = [];
while (true) {
if (before === null) {
options = {limit: limit};
} else {
options = {limit: limit, before: before};
let signaturesInfo = (await connection.getConfirmedSignaturesForAddress2(address, options));
signatures = => x['signature']);
let afterSignatureIndex = signatures.indexOf(afterSignature);
if (afterSignatureIndex !== -1) {
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo.slice(0, afterSignatureIndex));
} else {
// if afterSignatureIndex is not found then we should have gotten signaturesInfo of length limit
// otherwise we have an issue where the rpc endpoint does not have enough history
if (signaturesInfo.length !== limit) {
throw 'rpc endpoint does not have sufficient signature history to reach afterSignature ' + afterSignature
allSignaturesInfo = allSignaturesInfo.concat(signaturesInfo);
before = signatures[signatures.length-1];
console.log(new Date(signaturesInfo[signaturesInfo.length-1].blockTime! * 1000).toISOString());
await sleep(requestWaitTime);
return allSignaturesInfo
export async function getLatestSignatureBeforeUnixEpoch(connection, address, unixEpoch, requestWaitTime) {
let signaturesInfo;
let limit = 1000;
let options;
let earliestSignature = null;
let signature;
let found = false;
while (true) {
if (earliestSignature === null) {
options = {limit: limit};
} else {
options = {limit: limit, before: earliestSignature};
signaturesInfo = await connection.getConfirmedSignaturesForAddress2(address, options);
for (let signatureInfo of signaturesInfo) {
if (signatureInfo.blockTime < unixEpoch) {
signature = signatureInfo.signature;
found = true;
if (found) {break}
earliestSignature = signaturesInfo[signaturesInfo.length - 1].signature;
console.log(new Date(signaturesInfo[signaturesInfo.length-1].blockTime! * 1000).toISOString());
await sleep(requestWaitTime);
return signature;
export async function insertNewSignatures(address, connection, pool, requestWaitTime) {
let client = await pool.connect()
let latestDbSignatureRows = await client.query('select signature from all_transactions where id = (select max(id) from all_transactions where account = $1)', [address.toBase58()])
// let latestDbSignatureRow = db.prepare('select signature from transactions where id = (select max(id) from transactions where account = ?)').get(account.toBase58());
let latestDbSignature;
if (latestDbSignatureRows.rows.length === 0) {
let currentUnixEpoch = Math.round(;
// If tranasctions table is empty - initialise by getting all signatures in the 6 hours
let latestSignatureUnixEpoch = currentUnixEpoch - 6 * 60 * 60;
console.log('Current time ', new Date(currentUnixEpoch * 1000).toISOString());
console.log('Getting all signatures after ', new Date(latestSignatureUnixEpoch * 1000).toISOString());
latestDbSignature = await getLatestSignatureBeforeUnixEpoch(connection, address, latestSignatureUnixEpoch, requestWaitTime);
} else {
latestDbSignature = latestDbSignatureRows.rows[0]['signature'];
let newSignatures = await getNewSignatures(latestDbSignature, connection, address, requestWaitTime);
// By default the signatures returned by getConfirmedSignaturesForAddress2 will be ordered newest -> oldest
// We reverse the order to oldest -> newest here
// This is useful for our purposes as by inserting oldest -> newest if inserts are interrupted for some reason the process can pick up where it left off seamlessly (with no gaps)
// Also ensures that the auto increment id in our table is incremented oldest -> newest
newSignatures = newSignatures.reverse();
const inserts = => ({
signature: signatureInfo.signature,
account: address.toBase58(),
block_time: signatureInfo.blockTime,
block_datetime: (new Date(signatureInfo.blockTime! * 1000)).toISOString(),
slot: signatureInfo.slot,
err: signatureInfo.err === null ? 0 : 1,
process_state: 'unprocessed'
let columns = ['signature', 'account', 'block_time', 'block_datetime', 'slot', 'err', 'process_state'];
let table = 'all_transactions'
let batchSize = 10000
await bulkBatchInsert(pool, table, columns, inserts, batchSize);
console.log('inserted ' + newSignatures.length + ' signatures')

src/updatePnlCache/index.ts Normal file
View File

@ -0,0 +1,154 @@
import { Connection, PublicKey } from '@solana/web3.js';
import {
} from '@blockworks-foundation/mango-client';
import * as cron from "node-cron";
import { Pool } from 'pg'
const pgp = require('pg-promise')({
capSQL: true
import axios from 'axios';
import {getPricesSql, updatePnlCacheSql, getLiquidationsSql} from './updatePnlCacheSqlStatements';
async function getOpenOrdersMarginAccounts(connection, mangoClient, mangoGroup, programId) {
const marginAccounts = await mangoClient.getAllMarginAccounts(connection, programId, mangoGroup)
let openOrdersMarginAccounts: any[] = []
for (let m of marginAccounts) {
let marginAccountPk = m.publicKey.toString()
let ownerPk = m.owner.toString()
let name = String.fromCharCode( => x !== 0))
for (let o of m.openOrdersAccounts) {
if (o !== undefined) {
'open_orders_account': o.publicKey.toString(),
'margin_account': marginAccountPk,
'owner': ownerPk,
'name': name === '' ? null : name
return openOrdersMarginAccounts
async function getPrices(pool, mangoGroupPk) {
const client = await pool.connect();
let prices = (await client.query(getPricesSql, [mangoGroupPk])).rows
return prices
async function getLiquidations(pool, mangoGroupPk) {
const client = await pool.connect();
let liquidations = (await client.query(getLiquidationsSql, [mangoGroupPk])).rows
return liquidations
async function updateCache() {
const cluster = 'mainnet-beta'
const clusterUrl = process.env.CLUSTER_URL || IDS.cluster_urls[cluster]
const connection = new Connection(clusterUrl, 'singleGossip')
const programId = new PublicKey(IDS[cluster].mango_program_id)
const dexProgramId = new PublicKey(IDS[cluster].dex_program_id)
const mangoGroupPk = new PublicKey(IDS[cluster].mango_groups['BTC_ETH_SOL_SRM_USDC'].mango_group_pk)
const tradeHistoryConnectionString = process.env.TRADE_HISTORY_CONNECTION_STRING
const tradesPool = new Pool({connectionString: tradeHistoryConnectionString,ssl: {rejectUnauthorized: false,}})
const transactionsConnectionString = process.env.TRANSACTIONS_CONNECTION_STRING
const transactionsPool = new Pool({connectionString: transactionsConnectionString,ssl: {rejectUnauthorized: false,}})
const mangoClient: MangoClient = new MangoClient()
const priceCs = new pgp.helpers.ColumnSet(['price_date', 'currency', 'price', 'block_datetime'], {table: 'prices'});
const liquidationsCs = new pgp.helpers.ColumnSet([ 'signature', 'liqor', 'liqee', 'coll_ratio', 'in_token_symbol', 'in_token_amount', 'in_token_price', 'out_token_symbol', 'out_token_amount', 'out_token_price', 'socialized_losses', 'in_token_usd', 'out_token_usd', 'liquidation_fee_usd', 'mango_group', 'block_datetime'], {table: 'liquidations'});
const openOrdersMarginAccountsCs = new pgp.helpers.ColumnSet(['open_orders_account', 'margin_account', 'owner', 'name'], {table: 'open_orders_meta'});
let batchSize = 1000;
const tradesClient = await tradesPool.connect();
try {
await tradesClient.query('BEGIN')
// TODO: this could be improved by only inserting new prices - prices >= max date for each currency
// If I cached the hourly prices query for the deposit/withdraw history api it could alternatively pull from that - just take max price timestamp from each day
let prices = await getPrices(transactionsPool, mangoGroupPk.toString())
await tradesClient.query('truncate table prices')
for (let i = 0, j = prices.length; i < j; i += batchSize) {
let insertsBatch = prices.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, priceCs);
await tradesClient.query(insertsSql)
let liquidations = await getLiquidations(transactionsPool, mangoGroupPk.toString())
await tradesClient.query('truncate table liquidations')
for (let i = 0, j = liquidations.length; i < j; i += batchSize) {
let insertsBatch = liquidations.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, liquidationsCs);
await tradesClient.query(insertsSql)
// Could use an upsert here - insert with on constraint do nothing
let mangoGroup = await mangoClient.getMangoGroup(connection, mangoGroupPk)
let openOrdersMarginAccounts = await getOpenOrdersMarginAccounts(connection, mangoClient, mangoGroup, programId)
await tradesClient.query('truncate table open_orders_meta')
for (let i = 0, j = openOrdersMarginAccounts.length; i < j; i += batchSize) {
let insertsBatch = openOrdersMarginAccounts.slice(i, i + batchSize);
let insertsSql = pgp.helpers.insert(insertsBatch, openOrdersMarginAccountsCs);
await tradesClient.query(insertsSql)
await tradesClient.query('truncate table pnl_cache')
await tradesClient.query(updatePnlCacheSql)
await tradesClient.query('COMMIT')
} catch (e) {
await tradesClient.query('ROLLBACK')
throw e
} finally {
function notify(content) {
if (process.env.UPDATE_PNL_CACHE_WEBHOOK_URL) {, {content});
async function runCron() {
let hrstart
let hrend
notify('Initialized mango-pnl')
console.log('Initialized mango-pnl')
// cron.schedule("*/10 * * * *", async () => {
try {
console.log('updating cache')
console.log((new Date()).toISOString())
hrstart = process.hrtime()
await updateCache()
hrend = process.hrtime(hrstart)
console.log('Execution time (hr): %ds %dms', hrend[0], hrend[1] / 1000000)
} catch(e) {
throw e
// })

View File

@ -0,0 +1,209 @@
export const getPricesSql = `
-- fill in null values with last available price before date
select to_char(t4.date_value, 'YYYY-MM-DD') as price_date, om.symbol as currency, ot3.submit_value / power(10, om.decimals) as price, corrected_max_block_datetime as block_datetime
select oracle_pk, date_value, max_block_datetime, first_value(max_block_datetime) over (partition by oracle_pk, group_id order by date_value) as corrected_max_block_datetime
select t1.oracle_pk, dc.date_value, max(ot2.block_datetime) as max_block_datetime, sum(case when max(ot2.block_datetime) is not null then 1 end) over (order by dc.date_value) as group_id
ot.oracle_pk, date_trunc('day', min(block_datetime)) as min_block_datetime, date_trunc('day', max(block_datetime)) as max_block_datetime
from oracle_transactions ot
inner join mango_group_oracles mgo
on mgo.oracle_pk = ot.oracle_pk
mgo.mango_group_pk = $1
group by ot.oracle_pk
) t1
inner join date_calendar dc
on dc.date_value between t1.min_block_datetime and t1.max_block_datetime
left join
oracle_transactions ot2
on ot2.oracle_pk = t1.oracle_pk
and date_trunc('day', ot2.block_datetime) = dc.date_value
group by t1.oracle_pk, dc.date_value
order by t1.oracle_pk , dc.date_value
) t3
) t4
inner join
oracle_transactions ot3
on ot3.oracle_pk = t4.oracle_pk
and ot3.block_datetime = t4.corrected_max_block_datetime
inner join oracle_meta om
on om.oracle_pk = t4.oracle_pk
// TODO: look at explain plan - add indexes, etc
export const updatePnlCacheSql = `
with parsed_event as
distinct ON (e."uuid")
e."loadTimestamp"::date as load_date,
when = true then e."quoteCurrency"
as sent_currency,
when = true then e."baseCurrency"
as received_currency,
when = true then cast(e."nativeQuantityPaid" as bigint) / power(10, quote."MintDecimals")
cast(e."nativeQuantityPaid" as bigint) / power(10, base."MintDecimals")
* -1
as sent_quantity,
WHEN = true then cast(e."nativeQuantityReleased" as bigint) / power(10, base."MintDecimals")
cast(e."nativeQuantityReleased" as bigint) / power(10, quote."MintDecimals")
as received_quantity,
case when e.maker = true then -1 else 1 end * cast(e."nativeFeeOrRebate" as bigint) / power(10, quote."MintDecimals") as fee
from event e
inner join currency_meta quote on
quote.currency = e."quoteCurrency"
and quote.address = e.address
and quote."programId" = e."programId"
inner join currency_meta base on
base.currency = e."baseCurrency"
and base.address = e.address
and base."programId" = e."programId"
inner join open_orders_meta ma on
ma.open_orders_account = e."openOrders"
-- only including owner here for the purpose of ensuring the query engine uses an index scan
-- (the join on open_orders_margin_accounts doesn't trigger it for some reason - probably because there's no where condition)
inner join "owner" o on o."openOrders" = e."openOrders"
where o."owner" = 'EgRvS8NJGNDYngccUG38sb4zmkjC3dAyLKuNuLn3dX6w'
union all
-- add liquidation transactions from liqee side (treat them like trades)
block_datetime::date as load_date,
liqee as margin_account,
out_token_symbol as sent_currency,
in_token_symbol as received_currency,
out_token_amount as sent_quantity,
in_token_amount as received_quantity,
0 as fee
from liquidations l
left join
(select distinct margin_account, owner from open_orders_meta) oom
on l.liqee = oom.margin_account
-- exclude accounts that have never made a trade (null owner as they have no openorder accounts)
where oom."owner" is not null
t1 as
sum(sent_quantity) as sent_quantity,
sum(received_quantity) as received_quantity,
sum(fee) as fee
from parsed_event
group by
t2 as
-- instead of having 1 row with 2 currencies, let's have 2 rows with 1 currency
sent_currency as currency,
sent_quantity as quantity,
--sum(sent_quantity) over (partition by margin_account, received_currency, sent_currency order by load_date asc) as cumulative_quantity,
fee/2 as fee
from t1
union all
received_currency as currency,
received_quantity as quantity,
--sum(received_quantity) over (partition by margin_account, received_currency, sent_currency order by load_date asc) as cumulative_quantity,
fee/2 as fee
from t1
t3 as
-- sent and received currencies can be swapped (could have BTC as sent in one row and BTC as received in another)
select load_date, margin_account, currency, sum(quantity) as quantity,
--sum(cumulative_quantity) as cumulative_quantity,
sum(fee) as fee
from t2
group by load_date, margin_account, currency
t4 as
select margin_account, min(load_date) as min_load_date, max(load_date) as max_load_date
from t3
group by margin_account
prices_with_usdc as
select * from prices
union all
select distinct price_date, 'USDC', 1, null::timestamp from prices
t5 as
select t4.margin_account, p.price_date, p.currency, p.price, t3.quantity, t3.fee,
sum(t3.quantity) over (partition by t4.margin_account, p.currency order by p.price_date asc) as cumulative_quantity
from prices_with_usdc p
inner join t4 on t4.min_load_date <= p.price_date
left join t3 on
t3.margin_account = t4.margin_account
and t3.load_date = p.price_date
and t3.currency = p.currency
insert into pnl_cache
select t5.margin_account, t6.owner,, t5.price_date, sum(t5.price * t5.cumulative_quantity) as cumulative_pnl
from t5
left join (select distinct margin_account, owner, name from open_orders_meta oom) t6 on
t5.margin_account = t6.margin_account
group by t5.margin_account, t6.owner,, t5.price_date
export const getLiquidationsSql = `
from liquidations
where mango_group = $1

src/utils.ts Normal file
View File

@ -0,0 +1,44 @@
import axios from 'axios';
const pgp = require('pg-promise')({
capSQL: true
export async function bulkBatchInsert(pool, table, columns, inserts, batchSize) {
// Creates bulk insert statements from an array of inserts - avoids performance cost of insert roundtrips to the server
// Batches the inserts to stop the individual statements getting too large
// All inserts are done in a transaction - so if one fails they all will
if (inserts.length === 0) {
} else if (batchSize < 1) {
throw 'batchSize must be at least 1'
let client = await pool.connect()
const cs = new pgp.helpers.ColumnSet(columns, {table: table});
try {
await client.query('BEGIN')
for (let i = 0, j = inserts.length; i < j; i += batchSize) {
let insertsBatch = inserts.slice(i, i + batchSize);
let updatesSql = pgp.helpers.insert(insertsBatch, cs);
await client.query(updatesSql)
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
export function notify(content) {
if (process.env.TRANSACTIONS_SCRAPER_WEBHOOK_URL) {, {content});

tsconfig.json Normal file
View File

@ -0,0 +1,72 @@
"compileOnSave": true,
"compilerOptions": {
/* Visit to read more about this file */
/* Basic Options */
// "incremental": true, /* Enable incremental compilation */
"target": "es2019", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */
"module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */
"lib": ["es2019"], /* Specify library files to be included in the compilation. */
"allowJs": true, /* Allow javascript files to be compiled. */
// "checkJs": true, /* Report errors in .js files. */
// "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', or 'react'. */
// "declaration": true, /* Generates corresponding '.d.ts' file. */
// "declarationMap": true, /* Generates a sourcemap for each corresponding '.d.ts' file. */
"sourceMap": true, /* Generates corresponding '.map' file. */
// "outFile": "./", /* Concatenate and emit output to single file. */
"outDir": "dist", /* Redirect output structure to the directory. */
"rootDir": "src", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */
// "composite": true, /* Enable project compilation */
// "tsBuildInfoFile": "./", /* Specify file to store incremental compilation information */
// "removeComments": true, /* Do not emit comments to output. */
// "noEmit": true, /* Do not emit outputs. */
// "importHelpers": true, /* Import emit helpers from 'tslib'. */
// "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */
// "isolatedModules": true, /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */
/* Strict Type-Checking Options */
"strict": true, /* Enable all strict type-checking options. */
"noImplicitAny": false, /* Raise error on expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* Enable strict null checks. */
// "strictFunctionTypes": true, /* Enable strict checking of function types. */
// "strictBindCallApply": true, /* Enable strict 'bind', 'call', and 'apply' methods on functions. */
// "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */
"noImplicitThis": false, /* Raise error on 'this' expressions with an implied 'any' type. */
// "alwaysStrict": true, /* Parse in strict mode and emit "use strict" for each source file. */
/* Additional Checks */
// "noUnusedLocals": true, /* Report errors on unused locals. */
// "noUnusedParameters": true, /* Report errors on unused parameters. */
// "noImplicitReturns": true, /* Report error when not all code paths in function return a value. */
// "noFallthroughCasesInSwitch": true, /* Report errors for fallthrough cases in switch statement. */
// "noUncheckedIndexedAccess": true, /* Include 'undefined' in index signature results */
/* Module Resolution Options */
// "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */
// "baseUrl": "./", /* Base directory to resolve non-absolute module names. */
// "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
// "typeRoots": [], /* List of folders to include type definitions from. */
// "types": [], /* Type declaration files to be included in compilation. */
// "allowSyntheticDefaultImports": true, /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */
"esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */
// "preserveSymlinks": true, /* Do not resolve the real path of symlinks. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
/* Source Map Options */
// "sourceRoot": "", /* Specify the location where debugger should locate TypeScript files instead of source locations. */
// "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */
// "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */
// "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */
/* Experimental Options */
// "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */
// "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */
/* Advanced Options */
"skipLibCheck": true, /* Skip type checking of declaration files. */
"forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */