From 73d3146eb045ae1b553197d439204b468c85d929 Mon Sep 17 00:00:00 2001 From: bartosz-lipinski <264380+bartosz-lipinski@users.noreply.github.com> Date: Wed, 24 Feb 2021 20:41:36 -0600 Subject: [PATCH] rpc optimizations --- .gitignore | 5 ++- src/PriceFeeder.ts | 11 +++++- src/Submitter.ts | 93 ++++++++++++++++++++++++++++------------------ src/utils.ts | 61 +++++++++++++++++++++++++++++- 4 files changed, 129 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index 2afbb84..b208c22 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,7 @@ dist deploy.json program/Cargo.lock integration-example/Cargo.lock -.DS_Store \ No newline at end of file +.DS_Store +.vscode/launch.json +config/setup.private.json +deploy.private.json diff --git a/src/PriceFeeder.ts b/src/PriceFeeder.ts index 93fcfd8..71781ec 100644 --- a/src/PriceFeeder.ts +++ b/src/PriceFeeder.ts @@ -5,6 +5,7 @@ import { loadJSONFile } from "./json" import { coinbase } from "./feeds" import { Submitter, SubmitterConfig } from "./Submitter" import { log } from "./log" +import { conn } from "./context" // Look at all the available aggregators and submit to those that the wallet can // act as an oracle. @@ -20,7 +21,12 @@ export class PriceFeeder { this.startAccessibleAggregators() } - private startAccessibleAggregators() { + private async startAccessibleAggregators() { + let slot = await conn.getSlot() + conn.onSlotChange(slotInfo => { + slot = slotInfo.slot + }) + for (let [name, aggregatorInfo] of Object.entries( this.deployInfo.aggregators )) { @@ -46,7 +52,8 @@ export class PriceFeeder { // TODO: errrrr... probably make configurable on chain. hardwire for // now, don't submit value unless btc changes at least a dollar minValueChangeForNewRound: 100, - } + }, + () => slot, ) submitter.start() diff --git a/src/Submitter.ts b/src/Submitter.ts index 6a8c265..39bd928 100644 --- a/src/Submitter.ts +++ b/src/Submitter.ts @@ -1,10 +1,10 @@ -import { Connection } from "@solana/web3.js" +import { AccountInfo, Connection, EpochInfo } from "@solana/web3.js" import { PublicKey, Wallet } from "solray" import { conn } from "./context" import { Aggregator, Submissions, Oracle } from "./schema" import BN from "bn.js" -import { sleep } from "./utils" +import { getMultipleAccounts, sleep } from "./utils" import FluxAggregator from "./FluxAggregator" import { createLogger, Logger } from "winston" @@ -28,6 +28,8 @@ export class Submitter { public program: FluxAggregator public logger!: Logger public currentValue: BN + private epoch?: EpochInfo + private refreshAccounts: () => Promise = async () => { } public reportedRound: BN @@ -37,7 +39,8 @@ export class Submitter { public oraclePK: PublicKey, private oracleOwnerWallet: Wallet, private priceFeed: IPriceFeed, - private cfg: SubmitterConfig + private cfg: SubmitterConfig, + private getSlot: () => number, ) { this.program = new FluxAggregator(this.oracleOwnerWallet, programID) @@ -48,43 +51,64 @@ export class Submitter { // TODO: harvest rewards if > n public async start() { - // make sure the states are initialized - await this.reloadState() + await this.observeAggregatorState() this.logger = log.child({ aggregator: this.aggregator.config.description, }) - await Promise.all([this.observeAggregatorState(), this.observePriceFeed()]) + await this.observePriceFeed() } public async withdrawRewards() {} - private async reloadState(loadAggregator = true) { - if (loadAggregator) { + private async observeAggregatorState() { + // load state + if (!this.aggregator) { this.aggregator = await Aggregator.load(this.aggregatorPK) } + + let registeredHooks = false + const keysToQuery: { [keys: string]: (key: string, acc: AccountInfo) => void } = { + [this.oraclePK.toBase58()]: (key, acc) => { + this.oracle = Oracle.deserialize(acc.data) + log.debug(`Update oracle: ${key}`) + }, + [this.aggregator.answerSubmissions.toBase58()]: (key, acc) => { + this.answerSubmissions = Submissions.deserialize(acc.data); + log.debug(`Update answerSubmissions: ${key}`) + }, + [this.aggregator.roundSubmissions.toBase58()]: (key, acc) => { + this.roundSubmissions = Submissions.deserialize(acc.data); + log.debug(`Update roundSubmissions: ${key}`) + }, + } - this.roundSubmissions = await Submissions.load( - this.aggregator.roundSubmissions - ) - this.answerSubmissions = await Submissions.load( - this.aggregator.answerSubmissions - ) + this.refreshAccounts = async () => { + const { keys, array } = await getMultipleAccounts(conn, Object.keys(keysToQuery)); + keys.forEach((key, i) => { + keysToQuery[key](key, array[i]) - this.oracle = await Oracle.load(this.oraclePK) - } + if(!registeredHooks) { + conn.onAccountChange(new PublicKey(key), async (info) => { + keysToQuery[key](key, info) + }) + } + }) + + registeredHooks = true; + }; + + await this.refreshAccounts() - private async observeAggregatorState() { conn.onAccountChange(this.aggregatorPK, async (info) => { this.aggregator = Aggregator.deserialize(info.data) - await this.reloadState(false) - this.logger.debug("state updated", { - aggregator: this.aggregator, - submissions: this.roundSubmissions, - answerSubmissions: this.answerSubmissions, - }) + const roundID = this.aggregator.round.id + if (!roundID.isZero() && roundID.lte(this.reportedRound)) { + this.logger.debug("don't report to the same round twice") + return + } this.onAggregatorStateUpdate() }) @@ -117,8 +141,6 @@ export class Submitter { private async trySubmit() { // TODO: make it possible to be triggered by chainlink task // TODO: If from chainlink node, update state before running - - this.oracle = await Oracle.load(this.oraclePK) this.logger.debug("oracle", { oracle: this.oracle }) const { round } = this.aggregator @@ -130,10 +152,7 @@ export class Submitter { } // or, see if oracle can start a new round - const epoch = await conn.getEpochInfo() - const sinceLastUpdate = new BN(epoch.absoluteSlot).sub(round.updatedAt) - // console.log("slot", epoch.absoluteSlot, sinceLastUpdate.toString()) - + const sinceLastUpdate = new BN(this.getSlot()).sub(round.updatedAt) if (sinceLastUpdate.ltn(MAX_ROUND_STALENESS)) { // round is not stale yet. don't submit new round return @@ -141,8 +160,7 @@ export class Submitter { // The round is stale. start a new round if possible, or wait for another // oracle to start - const oracle = await Oracle.load(this.oraclePK) - if (oracle.canStartNewRound(round.id)) { + if (this.oracle.canStartNewRound(round.id)) { let newRoundID = round.id.addn(1) this.logger.info("Starting a new round", { round: newRoundID.toString(), @@ -152,6 +170,12 @@ export class Submitter { } private async onAggregatorStateUpdate() { + this.logger.debug("state updated", { + aggregator: this.aggregator, + submissions: this.roundSubmissions, + answerSubmissions: this.answerSubmissions, + }); + if (!this.canSubmitToCurrentRound) { return } @@ -203,12 +227,7 @@ export class Submitter { value, }) - await this.reloadState() - - this.logger.info("Submit OK", { - withdrawable: this.oracle.withdrawable.toString(), - rewardToken: this.aggregator.config.rewardTokenAccount.toString(), - }) + this.logger.info("Submit OK"); } catch (err) { console.log(err) this.logger.error("Submit error", { diff --git a/src/utils.ts b/src/utils.ts index 3c0fbf2..4ff22ef 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,4 @@ -import { Connection, PublicKey } from "@solana/web3.js" +import { AccountInfo, Connection, PublicKey } from "@solana/web3.js" import EventEmitter from "events" import { solana, Wallet, NetworkName, Deployer } from "solray" @@ -67,3 +67,62 @@ export function eventsIter(emitter: EventEmitter, key: string) { }, } } + +export function chunks(array: T[], size: number): T[][] { + return Array.apply( + 0, + new Array(Math.ceil(array.length / size)) + ).map((_, index) => array.slice(index * size, (index + 1) * size)); +} + +export const getMultipleAccounts = async ( + connection: Connection, + keys: string[], + commitment: string = 'single' +) => { + const result = await Promise.all( + chunks(keys, 99).map((chunk) => + getMultipleAccountsCore(connection, chunk, commitment) + ) + ); + + const array = result + .map( + (a) => + a.array + .filter((acc) => !!acc) + .map((acc) => { + const { data, ...rest } = acc; + const obj = { + ...rest, + data: Buffer.from(data[0], "base64"), + } as AccountInfo; + return obj; + }) as AccountInfo[] + ) + .flat(); + return { keys, array }; +}; + +const getMultipleAccountsCore = async ( + connection: any, + keys: string[], + commitment: string +) => { + const args = connection._buildArgs([keys], commitment, "base64"); + + const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args); + if (unsafeRes.error) { + throw new Error( + "failed to get info about account " + unsafeRes.error.message + ); + } + + if (unsafeRes.result.value) { + const array = unsafeRes.result.value as AccountInfo[]; + return { keys, array }; + } + + throw new Error('Unable to get account'); +}; +