mango-v4/ts/client/scripts/sb-on-demand-crank.ts

602 lines
18 KiB
TypeScript

import {
AccountInfo,
Cluster,
Commitment,
Connection,
Keypair,
PublicKey,
TransactionInstruction,
} from '@solana/web3.js';
import {
CrossbarClient,
PullFeed,
Queue,
RecentSlotHashes,
SB_ON_DEMAND_PID,
} from '@switchboard-xyz/on-demand';
import fs from 'fs';
import chunk from 'lodash/chunk';
import shuffle from 'lodash/shuffle';
import uniqWith from 'lodash/uniqWith';
import { Program as Anchor30Program, BN, Idl } from 'switchboard-anchor';
import { SequenceType } from '@blockworks-foundation/mangolana/lib/globalTypes';
import { sendSignAndConfirmTransactions } from '@blockworks-foundation/mangolana/lib/transactions';
import { AnchorProvider, Wallet } from 'switchboard-anchor';
import { Group } from '../src/accounts/group';
import { parseSwitchboardOracle } from '../src/accounts/oracle';
import { MangoClient } from '../src/client';
import { MANGO_V4_ID, MANGO_V4_MAIN_GROUP } from '../src/constants';
import { createComputeBudgetIx } from '../src/utils/rpc';
import { manageFeeWebSocket } from './manageFeeWs';
import { getOraclesForMangoGroup } from './sb-on-demand-crank-utils';
const CLUSTER: Cluster =
(process.env.CLUSTER_OVERRIDE as Cluster) || 'mainnet-beta';
const CLUSTER_URL =
process.env.CLUSTER_URL_OVERRIDE || process.env.MB_CLUSTER_URL;
const CLUSTER_URL_2 = process.env.MB_CLUSTER_URL_2;
const LITE_RPC_URL = process.env.LITE_RPC_URL;
const USER_KEYPAIR =
process.env.USER_KEYPAIR_OVERRIDE || process.env.MB_PAYER_KEYPAIR;
const GROUP = process.env.GROUP_OVERRIDE || MANGO_V4_MAIN_GROUP.toBase58();
const SLEEP_MS = Number(process.env.SLEEP_MS) || 20_000;
console.log(
`[start] config: sleep ${SLEEP_MS}ms, cluster ${CLUSTER_URL}, cluster2 ${CLUSTER_URL_2}, liteRpcUrl ${LITE_RPC_URL}`,
);
let lamportsPerCu: number | null = null;
try {
const wsUrl = new URL(
process.env.LITE_RPC_URL!.replace('https', 'wss'),
).toString();
manageFeeWebSocket(wsUrl, 10, (mean) => {
lamportsPerCu = mean;
});
} catch (error) {
console.error('[start]', error);
}
interface OracleInterface {
oracle: {
oraclePk: PublicKey;
name: string;
};
ai: AccountInfo<Buffer> | null;
decodedPullFeed: any;
parsedConfigs: {
queue: any;
maxVariance: number;
minResponses: any;
feedHash: any;
ipfsHash: any;
};
jobs: any[];
gatewayUrl: string;
}
/// refresh mango group to detect new oracles added through governance
/// without a restart within 1 minute, result object will be dynamically
/// updated
async function setupBackgroundRefresh(
client: MangoClient,
group: Group,
sbOnDemandProgram: Anchor30Program<Idl>,
crossbarClient: CrossbarClient,
): Promise<{ oracles: OracleInterface[] }> {
// note: group was already reloaded before
const oracles = await prepareCandidateOracles(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
const result = { oracles };
const GROUP_REFRESH_INTERVAL = 60_000;
const refreshGroup = async function (): Promise<void> {
try {
await group.reloadAll(client);
result.oracles = await prepareCandidateOracles(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
} catch (e) {
console.error('[group]', e);
}
setTimeout(refreshGroup, GROUP_REFRESH_INTERVAL);
};
setTimeout(refreshGroup, GROUP_REFRESH_INTERVAL);
return result;
}
(async function main(): Promise<never> {
const { group, client, connection, user } = await setupMango();
const { sbOnDemandProgram, crossbarClient } = await setupSwitchboard(client);
const refresh = await setupBackgroundRefresh(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
while (true) {
try {
// pull a fresh reference to the oracles from the background refresher
const { oracles } = refresh;
const startedAt = Date.now();
const [block, slot] = await Promise.all([
// use finalized blockhash for faster timeouts on transactions
client.connection.getLatestBlockhash('finalized'),
// use processed slot for accurate staleness measurement
client.connection.getSlot('processed'),
]);
await updateFilteredOraclesAis(
client.connection,
sbOnDemandProgram,
oracles,
);
const aisUpdatedAt = Date.now();
const staleOracles = await filterForStaleOracles(oracles, client, slot);
const staleFilteredAt = Date.now();
const crossBarSims = await Promise.all(
oracles.map((o) =>
crossbarClient.simulateFeeds([
new Buffer(o.parsedConfigs.feedHash).toString('hex'),
]),
),
);
const simulatedAt = Date.now();
const varianceThresholdCrossedOracles =
await filterForVarianceThresholdOracles(oracles, client, crossBarSims);
const varianceFilteredAt = Date.now();
const oraclesToCrank: OracleInterface[] = uniqWith(
[...staleOracles, ...varianceThresholdCrossedOracles],
function (a, b) {
return a.oracle.oraclePk.equals(b.oracle.oraclePk);
},
);
console.log(
`[main] round candidates | Stale: ${staleOracles
.map((o) => o.oracle.name)
.join(', ')} | Variance: ${varianceThresholdCrossedOracles
.map((o) => o.oracle.name)
.join(', ')}`,
);
// todo use chunk
// todo use luts
// const [pullIxs, luts] = await PullFeed.fetchUpdateManyIx(
// sbOnDemandProgram as any,
// {
// feeds: oraclesToCrank.map((o) => new PublicKey(o.oracle.oraclePk)),
// numSignatures: 3,
// },
// );
const recentSlothashes = await RecentSlotHashes.fetchLatestNSlothashes(
connection as any,
30,
);
const pullIxs = (
await Promise.all(
oraclesToCrank.map(async (oracle) => {
const pullIx = await preparePullIx(
sbOnDemandProgram,
oracle,
recentSlothashes,
);
return pullIx !== undefined ? pullIx : null;
}),
)
).filter((pullIx) => pullIx !== null);
const ixPreparedAt = Date.now();
const ixsChunks = chunk(shuffle(pullIxs), 2, false);
const lamportsPerCu_ = Math.min(
Math.max(lamportsPerCu ?? 150_000, 150_000),
500_000,
);
// dont await, fire and forget
sendSignAndConfirmTransactions({
connection,
wallet: new Wallet(user),
backupConnections: [
...(CLUSTER_URL_2 ? [new Connection(LITE_RPC_URL!, 'recent')] : []),
...(CLUSTER_URL_2 ? [new Connection(CLUSTER_URL_2!, 'recent')] : []),
],
// fail rather quickly and retry submission from scratch
// timeout using finalized to stay below switchboard oracle staleness limit
timeoutStrategy: { block, startBlockCheckAfterSecs: 20 },
transactionInstructions: ixsChunks.map((txChunk) => ({
instructionsSet: [
{
signers: [],
transactionInstruction: createComputeBudgetIx(lamportsPerCu_),
},
...txChunk.map((tx) => ({
signers: [],
transactionInstruction: tx,
// TODO disable alts for now
// alts: SBOD_ORACLE_LUTS.map(x=>new PublicKey(x)),
})),
],
sequenceType: SequenceType.Parallel,
})),
config: {
maxTxesInBatch: 10,
autoRetry: false,
logFlowInfo: false,
// TODO disable alts for now
// useVersionedTransactions: true,
},
callbacks: {
afterEveryTxSend: function (data) {
const sentAt = Date.now();
const total = (sentAt - startedAt) / 1000;
const aiUpdate = (aisUpdatedAt - startedAt) / 1000;
const staleFilter = (staleFilteredAt - aisUpdatedAt) / 1000;
const simulate = (simulatedAt - staleFilteredAt) / 1000;
const varianceFilter = (varianceFilteredAt - simulatedAt) / 1000;
const ixPrepare = (ixPreparedAt - varianceFilteredAt) / 1000;
const timing = {
aiUpdate,
staleFilter,
simulate,
varianceFilter,
ixPrepare,
};
console.log(
`[tx send] https://solscan.io/tx/${data['txid']}, in ${total}s, lamportsPerCu_ ${lamportsPerCu_}, lamportsPerCu ${lamportsPerCu}, timiming ${JSON.stringify(timing)}`,
);
},
onError: function (e, notProcessedTransactions) {
console.error(
`[tx send] ${notProcessedTransactions.length} error(s) after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(e)}`,
);
},
},
}).catch((reason) =>
console.error(
`[tx send] promise rejected after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(reason)}`,
),
);
await new Promise((r) => setTimeout(r, SLEEP_MS));
} catch (error) {
console.error('[main]', error);
}
}
})();
/**
* prepares the instruction to update an individual oracle using the cached data on oracle
*/
async function preparePullIx(
sbOnDemandProgram,
oracle: OracleInterface,
recentSlothashes?: Array<[BN, string]>,
): Promise<TransactionInstruction | undefined | null> {
const pullFeed = new PullFeed(
sbOnDemandProgram as any,
new PublicKey(oracle.oracle.oraclePk),
);
const conf = {
numSignatures: oracle.parsedConfigs.minResponses,
feed: oracle.oracle.oraclePk,
feedConfigs: oracle.parsedConfigs,
gateway: oracle.gatewayUrl,
};
// TODO use fetchUpdateMany
try {
const [pullIx] = await pullFeed.fetchUpdateIx(conf, recentSlothashes);
return pullIx;
} catch (error) {
console.log(`[preparePullIx] ${oracle.oracle.name} error ${error}`);
return null;
}
}
async function filterForVarianceThresholdOracles(
filteredOracles: OracleInterface[],
client: MangoClient,
crossBarSims,
): Promise<OracleInterface[]> {
const varianceThresholdCrossedOracles = new Array<OracleInterface>();
for (const [index, item] of filteredOracles.entries()) {
const res = await parseSwitchboardOracle(
item.oracle.oraclePk,
item.ai!,
client.connection,
);
// console.log(`${item.oracle.name} ${JSON.stringify(res)}`);
const crossBarSim = crossBarSims[index];
const simPrice =
crossBarSim[0].results.reduce((a, b) => a + b, 0) /
crossBarSim[0].results.length;
const changePct = (Math.abs(res.price - simPrice) * 100) / res.price;
if (changePct > item.decodedPullFeed.maxVariance / 1000000000) {
console.log(
`[filter variance] ${item.oracle.name}, candidate, ${
item.decodedPullFeed.maxVariance / 1000000000
}, ${simPrice}, ${res.price}, ${changePct}`,
);
varianceThresholdCrossedOracles.push(item);
} else {
console.log(
`[filter variance] ${item.oracle.name}, non-candidate, ${
item.decodedPullFeed.maxVariance / 1000000000
}, ${simPrice}, ${res.price}, ${changePct}`,
);
}
}
return varianceThresholdCrossedOracles;
}
async function filterForStaleOracles(
filteredOracles: OracleInterface[],
client: MangoClient,
slot: number,
): Promise<OracleInterface[]> {
const staleOracles = new Array<OracleInterface>();
for (const item of filteredOracles) {
const res = await parseSwitchboardOracle(
item.oracle.oraclePk,
item.ai!,
client.connection,
);
const diff = slot - res.lastUpdatedSlot;
if (
// maxStaleness will usually be 250 (=100s)
// one iteration takes 10s, retry is every 20s
// this allows for 2 retries until the oracle becomes stale
diff >
item.decodedPullFeed.maxStaleness * 0.3
) {
console.log(
`[filter stale] ${item.oracle.name}, candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
);
staleOracles.push(item);
} else {
console.log(
`[filter stale] ${item.oracle.name}, non-candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
);
}
}
return staleOracles;
}
/**
* fetch all on-demand oracles used on mango group and parse their configuration
*/
async function prepareCandidateOracles(
client: MangoClient,
group: Group,
sbOnDemandProgram: Anchor30Program<Idl>,
crossbarClient: CrossbarClient,
): Promise<OracleInterface[]> {
// collect
const oracles = getOraclesForMangoGroup(group);
oracles.push(...extendOraclesManually(CLUSTER));
// load all oracle account infos
const ais = (
await Promise.all(
chunk(
oracles.map((item) => item.oraclePk),
50,
false,
).map(
async (chunk) =>
await client.program.provider.connection.getMultipleAccountsInfo(
chunk,
),
),
)
).flat();
// ensure rpc response is correct
for (const [idx, ai] of ais.entries()) {
if (ai == null || ai.data == null) {
throw new Error(
`AI returned null for ${oracles[idx].name} ${oracles[idx].oraclePk}!`,
);
}
}
if (ais.length != oracles.length) {
throw new Error(
`Expected ${oracles.length}, but gMA returned ${ais.length}!`,
);
}
// combine account info
const sbodOracles = oracles
.map((o, i) => {
return { oracle: o, ai: ais[i] };
})
.filter((item) => item.ai?.owner.equals(SB_ON_DEMAND_PID));
// parse account info data
const parsedOracles = sbodOracles.map((item) => {
const d = sbOnDemandProgram.coder.accounts.decode(
'pullFeedAccountData',
item.ai!.data,
);
return {
decodedPullFeed: d,
parsedConfigs: {
queue: d.queue,
maxVariance: d.maxVariance / 1e9,
minResponses: d.minResponses,
feedHash: d.feedHash,
ipfsHash: d.ipfsHash,
},
};
});
const jobs = await Promise.all(
parsedOracles.map((o) =>
crossbarClient
.fetch(Buffer.from(o.parsedConfigs.feedHash).toString('hex'))
.then((r) => r.jobs),
),
);
const gateways = await Promise.all(
parsedOracles.map((o) =>
new Queue(sbOnDemandProgram, o.parsedConfigs.queue).fetchAllGateways(),
),
);
// assemble all data together
return sbodOracles.map((o, i) => ({
...o,
...parsedOracles[i],
jobs: jobs[i],
gatewayUrl: gateways[i][0].gatewayUrl,
}));
}
function extendOraclesManually(cluster: Cluster): {
oraclePk: PublicKey;
name: string;
}[] {
if (cluster == 'devnet') {
return [
{
oraclePk: new PublicKey('EtbG8PSDCyCSmDH8RE4Nf2qTV9d6P6zShzHY2XWvjFJf'),
name: 'BTC/USD',
},
];
}
return [
// These are now set on the group, and fetched from there
// ['JSOL/USD', 'Dnn9fKeB3rA2bor6Fys7FBPqXneAK8brxNfsBfZ32939'],
// ['compassSOL/USD', 'GzBpasKMSTLkytXpyo6NesDGpe2mLjPSovECWsebQpu5'],
// ['dualSOL/USD', 'D6UqFgtVC1yADBxw2EZFmUCTNuoqFoUXD3NW4NqRn8v3'],
// ['hubSOL/USD', '7LRVXc8zdPpzXNdknU2kRTYt7BizYs7BaM6Ft2zv8E4h'],
// ['hubSOL/USD', '137fd2LnDEPVAALhPFjRyvh2MD9DxSHPFaod7a5tmMox'],
// ['digitSOL/USD', '7skmP8qLf8KKJ61cpPiw91GXYfoGvGWekzSDQ78T3z1f'],
// ['mangoSOL/USD', '7pD4Y1hCsU4M6rfoJvL8fAmmrB2LwrJYxvWz4S6Cc24T'],
].map((item) => {
return {
oraclePk: new PublicKey(item[1]),
name: item[0],
};
});
}
async function setupMango(): Promise<{
group: Group;
client: MangoClient;
connection: Connection;
user: Keypair;
userProvider: AnchorProvider;
}> {
// the connection needs to be set to confirmed so that we never
// submit an oracle update with a processed -> forked away slot hash
const options = { commitment: 'confirmed' as Commitment };
const connection = new Connection(CLUSTER_URL!, options);
const user = Keypair.fromSecretKey(
Buffer.from(
JSON.parse(
process.env.KEYPAIR || fs.readFileSync(USER_KEYPAIR!, 'utf-8'),
),
),
);
const userWallet = new Wallet(user);
const userProvider = new AnchorProvider(connection, userWallet, options);
const client = await MangoClient.connect(
userProvider,
CLUSTER,
MANGO_V4_ID[CLUSTER],
{
idsSource: 'api',
},
);
const group = await client.getGroup(new PublicKey(GROUP));
await group.reloadAll(client);
return { group, client, connection, user, userProvider };
}
async function setupSwitchboard(client: MangoClient): Promise<{
sbOnDemandProgram: Anchor30Program<Idl>;
crossbarClient: CrossbarClient;
queue: PublicKey;
}> {
const idl = await Anchor30Program.fetchIdl(
SB_ON_DEMAND_PID,
client.program.provider,
);
const sbOnDemandProgram = new Anchor30Program(idl!, client.program.provider);
let queue = new PublicKey('A43DyUGA7s8eXPxqEjJY6EBu1KKbNgfxF8h17VAHn13w');
if (CLUSTER == 'devnet') {
queue = new PublicKey('FfD96yeXs4cxZshoPPSKhSPgVQxLAJUT3gefgh84m1Di');
}
const crossbarClient = new CrossbarClient(
'https://crossbar.switchboard.xyz',
false,
);
return { sbOnDemandProgram, crossbarClient, queue };
}
/**
* reloads the account states for each oracle passed through the provided connection
*/
async function updateFilteredOraclesAis(
connection: Connection,
sbOnDemandProgram: Anchor30Program<Idl>,
filteredOracles: OracleInterface[],
): Promise<void> {
const ais = (
await Promise.all(
chunk(
filteredOracles.map((item) => item.oracle.oraclePk),
50,
false,
).map((chunk) => connection.getMultipleAccountsInfo(chunk)),
)
).flat();
filteredOracles.forEach((fo, idx) => {
fo.ai = ais[idx];
const decodedPullFeed = sbOnDemandProgram.coder.accounts.decode(
'pullFeedAccountData',
fo.ai!.data,
);
fo.decodedPullFeed = decodedPullFeed;
});
}