switchboard crank improvements (#987)

dont retry oracle updates to prevent writing old state
submit all oracle updates in parallel to prevent writing old state
reduce average latency by 1s through prefetching of shared results
reduce p90 latency by 1m through async refresh of mango group & oracle definitions
handle promise rejection on tx submission
improved logging
This commit is contained in:
Maximilian Schneider 2024-08-07 02:35:32 +01:00 committed by GitHub
parent 70369ec672
commit 7e94bf6a3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 292 additions and 137 deletions

View File

@ -1,6 +1,7 @@
import {
AccountInfo,
Cluster,
Commitment,
Connection,
Keypair,
PublicKey,
@ -9,13 +10,15 @@ import {
import {
CrossbarClient,
PullFeed,
Queue,
RecentSlotHashes,
SB_ON_DEMAND_PID,
} from '@switchboard-xyz/on-demand';
import fs from 'fs';
import chunk from 'lodash/chunk';
import shuffle from 'lodash/shuffle';
import uniqWith from 'lodash/uniqWith';
import { Program as Anchor30Program, Idl } from 'switchboard-anchor';
import { Program as Anchor30Program, BN, Idl } from 'switchboard-anchor';
import { SequenceType } from '@blockworks-foundation/mangolana/lib/globalTypes';
import { sendSignAndConfirmTransactions } from '@blockworks-foundation/mangolana/lib/transactions';
@ -37,10 +40,10 @@ const LITE_RPC_URL = process.env.LITE_RPC_URL;
const USER_KEYPAIR =
process.env.USER_KEYPAIR_OVERRIDE || process.env.MB_PAYER_KEYPAIR;
const GROUP = process.env.GROUP_OVERRIDE || MANGO_V4_MAIN_GROUP.toBase58();
const SLEEP_MS = Number(process.env.SLEEP_MS) || 50_000;
const SLEEP_MS = Number(process.env.SLEEP_MS) || 20_000;
console.log(
`Starting with sleep ${SLEEP_MS}ms, cluster ${CLUSTER_URL}, cluster2 ${CLUSTER_URL_2}, liteRpcUrl ${LITE_RPC_URL}`,
`[start] config: sleep ${SLEEP_MS}ms, cluster ${CLUSTER_URL}, cluster2 ${CLUSTER_URL_2}, liteRpcUrl ${LITE_RPC_URL}`,
);
let lamportsPerCu: number | null = null;
@ -53,7 +56,7 @@ try {
lamportsPerCu = mean;
});
} catch (error) {
console.error('Error in main execution:', error);
console.error('[start]', error);
}
interface OracleInterface {
@ -61,150 +64,239 @@ interface OracleInterface {
oraclePk: PublicKey;
name: string;
};
decodedPullFeed: any;
ai: AccountInfo<Buffer> | null;
decodedPullFeed: any;
parsedConfigs: {
queue: any;
maxVariance: number;
minResponses: any;
feedHash: any;
ipfsHash: any;
};
jobs: any[];
gatewayUrl: string;
}
/// refresh mango group to detect new oracles added through governance
/// without a restart within 1 minute, result object will be dynamically
/// updated
async function setupBackgroundRefresh(
client: MangoClient,
group: Group,
sbOnDemandProgram: Anchor30Program<Idl>,
crossbarClient: CrossbarClient,
): Promise<{ oracles: OracleInterface[] }> {
// note: group was already reloaded before
const oracles = await prepareCandidateOracles(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
const result = { oracles };
const GROUP_REFRESH_INTERVAL = 60_000;
const refreshGroup = async function (): Promise<void> {
try {
await group.reloadAll(client);
result.oracles = await prepareCandidateOracles(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
} catch (e) {
console.error('[group]', e);
}
setTimeout(refreshGroup, GROUP_REFRESH_INTERVAL);
};
setTimeout(refreshGroup, GROUP_REFRESH_INTERVAL);
return result;
}
(async function main(): Promise<never> {
const { group, client, connection, user, userProvider } = await setupMango();
const { group, client, connection, user } = await setupMango();
const { sbOnDemandProgram, crossbarClient, queue } =
await setupSwitchboard(client);
const { sbOnDemandProgram, crossbarClient } = await setupSwitchboard(client);
const refresh = await setupBackgroundRefresh(
client,
group,
sbOnDemandProgram,
crossbarClient,
);
// eslint-disable-next-line no-constant-condition
while (true) {
try {
// periodically check if we have new candidates on the group
const filteredOracles = await prepareCandidateOracles(group, client);
// pull a fresh reference to the oracles from the background refresher
const { oracles } = refresh;
for (let i = 0; i < 10; i++) {
const start = Date.now();
const slot = await client.connection.getSlot('finalized');
const startedAt = Date.now();
const [block, slot] = await Promise.all([
// use finalized blockhash for faster timeouts on transactions
client.connection.getLatestBlockhash('finalized'),
// use processed slot for accurate staleness measurement
client.connection.getSlot('processed'),
]);
await updateFilteredOraclesAis(
client.connection,
sbOnDemandProgram,
filteredOracles,
);
await updateFilteredOraclesAis(
client.connection,
sbOnDemandProgram,
oracles,
);
const staleOracles = await filterForStaleOracles(
filteredOracles,
client,
slot,
);
const aisUpdatedAt = Date.now();
const crossBarSims = await Promise.all(
filteredOracles.map(
async (fo) =>
await crossbarClient.simulateFeeds([
new Buffer(fo.decodedPullFeed.feedHash).toString('hex'),
]),
),
);
const varianceThresholdCrossedOracles =
await filterForVarianceThresholdOracles(
filteredOracles,
client,
crossBarSims,
);
const oraclesToCrank: OracleInterface[] = uniqWith(
[...staleOracles, ...varianceThresholdCrossedOracles],
function (a, b) {
return a.oracle.oraclePk.equals(b.oracle.oraclePk);
},
);
const staleOracles = await filterForStaleOracles(oracles, client, slot);
console.log(
`- round candidates | Stale: ${staleOracles
.map((o) => o.oracle.name)
.join(', ')} | Variance: ${varianceThresholdCrossedOracles
.map((o) => o.oracle.name)
.join(', ')}`,
);
const staleFilteredAt = Date.now();
// todo use chunk
// todo use luts
const crossBarSims = await Promise.all(
oracles.map((o) =>
crossbarClient.simulateFeeds([
new Buffer(o.parsedConfigs.feedHash).toString('hex'),
]),
),
);
// const [pullIxs, luts] = await PullFeed.fetchUpdateManyIx(
// sbOnDemandProgram as any,
// {
// feeds: oraclesToCrank.map((o) => new PublicKey(o.oracle.oraclePk)),
// numSignatures: 3,
// },
// );
const simulatedAt = Date.now();
const pullIxs = (
await Promise.all(
oraclesToCrank.map(async (oracle) => {
const pullIx = await preparePullIx(sbOnDemandProgram, oracle);
return pullIx !== undefined ? pullIx : null;
}),
)
).filter((pullIx) => pullIx !== null);
const varianceThresholdCrossedOracles =
await filterForVarianceThresholdOracles(oracles, client, crossBarSims);
const ixsChunks = chunk(shuffle(pullIxs), 2, false);
const lamportsPerCu_ = Math.min(
Math.max(lamportsPerCu ?? 150_000, 150_000),
500_000,
);
try {
// dont await, fire and forget
// TODO use our own ALTs
sendSignAndConfirmTransactions({
connection,
wallet: new Wallet(user),
backupConnections: [
...(CLUSTER_URL_2
? [new Connection(LITE_RPC_URL!, 'recent')]
: []),
...(CLUSTER_URL_2
? [new Connection(CLUSTER_URL_2!, 'recent')]
: []),
],
transactionInstructions: ixsChunks.map((txChunk) => ({
instructionsSet: [
{
signers: [],
transactionInstruction: createComputeBudgetIx(lamportsPerCu_),
},
...txChunk.map((tx) => ({
signers: [],
transactionInstruction: tx,
})),
],
sequenceType: SequenceType.Parallel,
const varianceFilteredAt = Date.now();
const oraclesToCrank: OracleInterface[] = uniqWith(
[...staleOracles, ...varianceThresholdCrossedOracles],
function (a, b) {
return a.oracle.oraclePk.equals(b.oracle.oraclePk);
},
);
console.log(
`[main] round candidates | Stale: ${staleOracles
.map((o) => o.oracle.name)
.join(', ')} | Variance: ${varianceThresholdCrossedOracles
.map((o) => o.oracle.name)
.join(', ')}`,
);
// todo use chunk
// todo use luts
// const [pullIxs, luts] = await PullFeed.fetchUpdateManyIx(
// sbOnDemandProgram as any,
// {
// feeds: oraclesToCrank.map((o) => new PublicKey(o.oracle.oraclePk)),
// numSignatures: 3,
// },
// );
const recentSlothashes = await RecentSlotHashes.fetchLatestNSlothashes(
connection as any,
30,
);
const pullIxs = (
await Promise.all(
oraclesToCrank.map(async (oracle) => {
const pullIx = await preparePullIx(
sbOnDemandProgram,
oracle,
recentSlothashes,
);
return pullIx !== undefined ? pullIx : null;
}),
)
).filter((pullIx) => pullIx !== null);
const ixPreparedAt = Date.now();
const ixsChunks = chunk(shuffle(pullIxs), 2, false);
const lamportsPerCu_ = Math.min(
Math.max(lamportsPerCu ?? 150_000, 150_000),
500_000,
);
// dont await, fire and forget
// TODO use our own ALTs
sendSignAndConfirmTransactions({
connection,
wallet: new Wallet(user),
backupConnections: [
...(CLUSTER_URL_2 ? [new Connection(LITE_RPC_URL!, 'recent')] : []),
...(CLUSTER_URL_2 ? [new Connection(CLUSTER_URL_2!, 'recent')] : []),
],
// fail rather quickly and retry submission from scratch
// timeout using finalized to stay below switchboard oracle staleness limit
timeoutStrategy: { block, startBlockCheckAfterSecs: 20 },
transactionInstructions: ixsChunks.map((txChunk) => ({
instructionsSet: [
{
signers: [],
transactionInstruction: createComputeBudgetIx(lamportsPerCu_),
},
...txChunk.map((tx) => ({
signers: [],
transactionInstruction: tx,
})),
config: {
maxTxesInBatch: 1,
maxRetries: 5,
autoRetry: true,
logFlowInfo: false,
},
callbacks: {
afterEveryTxSend: function (data) {
console.log(
` - https://solscan.io/tx/${data['txid']}, in ${(Date.now() - start) / 1000}s, lamportsPerCu_ ${lamportsPerCu_}, lamportsPerCu ${lamportsPerCu}`,
);
},
},
});
} catch (error) {
console.log(
`Error in sending tx, ${JSON.stringify(error.message)}, https://solscan.io/tx/${error['txid']}, in ${(Date.now() - start) / 1000}s, lamportsPerCu_ ${lamportsPerCu_}, lamportsPerCu ${lamportsPerCu}`,
);
}
],
sequenceType: SequenceType.Parallel,
})),
config: {
maxTxesInBatch: 10,
autoRetry: false,
logFlowInfo: false,
},
callbacks: {
afterEveryTxSend: function (data) {
const sentAt = Date.now();
const total = (sentAt - startedAt) / 1000;
const aiUpdate = (aisUpdatedAt - startedAt) / 1000;
const staleFilter = (staleFilteredAt - aisUpdatedAt) / 1000;
const simulate = (simulatedAt - staleFilteredAt) / 1000;
const varianceFilter = (varianceFilteredAt - simulatedAt) / 1000;
const ixPrepare = (ixPreparedAt - varianceFilteredAt) / 1000;
const timing = {
aiUpdate,
staleFilter,
simulate,
varianceFilter,
ixPrepare,
};
await new Promise((r) => setTimeout(r, SLEEP_MS));
}
console.log(
`[tx send] https://solscan.io/tx/${data['txid']}, in ${total}s, lamportsPerCu_ ${lamportsPerCu_}, lamportsPerCu ${lamportsPerCu}, timiming ${JSON.stringify(timing)}`,
);
},
onError: function (e, notProcessedTransactions) {
console.error(
`[tx send] ${notProcessedTransactions.length} error(s) after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(e)}`,
);
},
},
}).catch((reason) =>
console.error(
`[tx send] promise rejected after ${(Date.now() - ixPreparedAt) / 1000}s ${JSON.stringify(reason)}`,
),
);
await new Promise((r) => setTimeout(r, SLEEP_MS));
} catch (error) {
console.log(error);
console.error('[main]', error);
}
}
})();
/**
* prepares the instruction to update an individual oracle using the cached data on oracle
*/
async function preparePullIx(
sbOnDemandProgram,
oracle: OracleInterface,
recentSlothashes?: Array<[BN, string]>,
): Promise<TransactionInstruction | undefined> {
const pullFeed = new PullFeed(
sbOnDemandProgram as any,
@ -212,11 +304,16 @@ async function preparePullIx(
);
const conf = {
numSignatures: oracle.decodedPullFeed.minResponses,
numSignatures: oracle.parsedConfigs.minResponses,
feed: oracle.oracle.oraclePk,
feedConfigs: oracle.parsedConfigs,
gateway: oracle.gatewayUrl,
};
// TODO use fetchUpdateMany
const [pullIx, responses, success] = await pullFeed.fetchUpdateIx(conf);
const [pullIx] = await pullFeed.fetchUpdateIx(
conf,
recentSlothashes,
);
return pullIx;
}
@ -242,17 +339,16 @@ async function filterForVarianceThresholdOracles(
crossBarSim[0].results.length;
const changePct = (Math.abs(res.price - simPrice) * 100) / res.price;
const changeBps = changePct * 100;
if (changePct > item.decodedPullFeed.maxVariance / 1000000000) {
console.log(
`- ${item.oracle.name}, candidate, ${
`[filter variance] ${item.oracle.name}, candidate, ${
item.decodedPullFeed.maxVariance / 1000000000
}, ${simPrice}, ${res.price}, ${changePct}`,
);
varianceThresholdCrossedOracles.push(item);
} else {
console.log(
`- ${item.oracle.name}, non-candidate, ${
`[filter variance] ${item.oracle.name}, non-candidate, ${
item.decodedPullFeed.maxVariance / 1000000000
}, ${simPrice}, ${res.price}, ${changePct}`,
);
@ -276,29 +372,39 @@ async function filterForStaleOracles(
const diff = slot - res.lastUpdatedSlot;
if (
slot > res.lastUpdatedSlot &&
slot - res.lastUpdatedSlot > (item.decodedPullFeed.maxStaleness * 1) / 2
// maxStaleness will usually be 250 (=100s)
// one iteration takes 10s, retry is every 20s
// this allows for 2 retries until the oracle becomes stale
diff >
item.decodedPullFeed.maxStaleness * 0.3
) {
console.log(
`- ${item.oracle.name}, candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
`[filter stale] ${item.oracle.name}, candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
);
staleOracles.push(item);
} else {
console.log(
`- ${item.oracle.name}, non-candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
`[filter stale] ${item.oracle.name}, non-candidate, ${item.decodedPullFeed.maxStaleness}, ${slot}, ${res.lastUpdatedSlot}, ${diff}`,
);
}
}
return staleOracles;
}
/**
* fetch all on-demand oracles used on mango group and parse their configuration
*/
async function prepareCandidateOracles(
group: Group,
client: MangoClient,
group: Group,
sbOnDemandProgram: Anchor30Program<Idl>,
crossbarClient: CrossbarClient,
): Promise<OracleInterface[]> {
// collect
const oracles = getOraclesForMangoGroup(group);
oracles.push(...extendOraclesManually(CLUSTER));
// load all oracle account infos
const ais = (
await Promise.all(
chunk(
@ -314,6 +420,7 @@ async function prepareCandidateOracles(
)
).flat();
// ensure rpc response is correct
for (const [idx, ai] of ais.entries()) {
if (ai == null || ai.data == null) {
throw new Error(
@ -327,13 +434,52 @@ async function prepareCandidateOracles(
);
}
const filteredOracles = oracles
// combine account info
const sbodOracles = oracles
.map((o, i) => {
return { oracle: o, ai: ais[i], decodedPullFeed: undefined };
return { oracle: o, ai: ais[i] };
})
.filter((item) => item.ai?.owner.equals(SB_ON_DEMAND_PID));
return filteredOracles;
// parse account info data
const parsedOracles = sbodOracles.map((item) => {
const d = sbOnDemandProgram.coder.accounts.decode(
'pullFeedAccountData',
item.ai!.data,
);
return {
decodedPullFeed: d,
parsedConfigs: {
queue: d.queue,
maxVariance: d.maxVariance / 1e9,
minResponses: d.minResponses,
feedHash: d.feedHash,
ipfsHash: d.ipfsHash,
},
};
});
const jobs = await Promise.all(
parsedOracles.map((o) =>
crossbarClient
.fetch(Buffer.from(o.parsedConfigs.feedHash).toString('hex'))
.then((r) => r.jobs),
),
);
const gateways = await Promise.all(
parsedOracles.map((o) =>
new Queue(sbOnDemandProgram, o.parsedConfigs.queue).fetchAllGateways(),
),
);
// assemble all data together
return sbodOracles.map((o, i) => ({
...o,
...parsedOracles[i],
jobs: jobs[i],
gatewayUrl: gateways[i][0].gatewayUrl,
}));
}
function extendOraclesManually(cluster: Cluster): {
@ -371,7 +517,9 @@ async function setupMango(): Promise<{
user: Keypair;
userProvider: AnchorProvider;
}> {
const options = AnchorProvider.defaultOptions();
// the connection needs to be set to confirmed so that we never
// submit an oracle update with a processed -> forked away slot hash
const options = { commitment: 'confirmed' as Commitment };
const connection = new Connection(CLUSTER_URL!, options);
const user = Keypair.fromSecretKey(
Buffer.from(
@ -396,6 +544,10 @@ async function setupMango(): Promise<{
return { group, client, connection, user, userProvider };
}
/**
* scans mango group for all oracles that need updating
* includes bank oracle, fallback oracle and perp market oracles
*/
function getOraclesForMangoGroup(
group: Group,
): { oraclePk: PublicKey; name: string }[] {
@ -469,6 +621,9 @@ async function setupSwitchboard(client: MangoClient): Promise<{
return { sbOnDemandProgram, crossbarClient, queue };
}
/**
* reloads the account states for each oracle passed through the provided connection
*/
async function updateFilteredOraclesAis(
connection: Connection,
sbOnDemandProgram: Anchor30Program<Idl>,
@ -480,7 +635,7 @@ async function updateFilteredOraclesAis(
filteredOracles.map((item) => item.oracle.oraclePk),
50,
false,
).map(async (chunk) => await connection.getMultipleAccountsInfo(chunk)),
).map((chunk) => connection.getMultipleAccountsInfo(chunk)),
)
).flat();

View File

@ -949,7 +949,7 @@ export class HealthCache {
// - be careful about finding the minFnValue: the function isn't convex
const initialRatio = this.healthRatio(HealthType.init);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const healthCacheClone: HealthCache = deepClone<HealthCache>(this);
const sourceIndex = healthCacheClone.getOrCreateTokenInfoIndex(sourceBank);