diff --git a/src/PriceFeeder.ts b/src/PriceFeeder.ts index 71781ec..cba7afc 100644 --- a/src/PriceFeeder.ts +++ b/src/PriceFeeder.ts @@ -13,8 +13,7 @@ export class PriceFeeder { constructor( private deployInfo: AggregatorDeployFile, private wallet: Wallet - ) { - } + ) {} async start() { // find aggregators that this wallet can act as oracle @@ -23,7 +22,7 @@ export class PriceFeeder { private async startAccessibleAggregators() { let slot = await conn.getSlot() - conn.onSlotChange(slotInfo => { + conn.onSlotChange((slotInfo) => { slot = slotInfo.slot }) @@ -53,7 +52,7 @@ export class PriceFeeder { // now, don't submit value unless btc changes at least a dollar minValueChangeForNewRound: 100, }, - () => slot, + () => slot ) submitter.start() diff --git a/src/Submitter.ts b/src/Submitter.ts index 39bd928..db52b3d 100644 --- a/src/Submitter.ts +++ b/src/Submitter.ts @@ -4,7 +4,7 @@ import { conn } from "./context" import { Aggregator, Submissions, Oracle } from "./schema" import BN from "bn.js" -import { getMultipleAccounts, sleep } from "./utils" +import { getAccounts, getMultipleAccounts, sleep } from "./utils" import FluxAggregator from "./FluxAggregator" import { createLogger, Logger } from "winston" @@ -29,7 +29,7 @@ export class Submitter { public logger!: Logger public currentValue: BN private epoch?: EpochInfo - private refreshAccounts: () => Promise = async () => { } + private refreshAccounts: () => Promise = async () => {} public reportedRound: BN @@ -40,7 +40,7 @@ export class Submitter { private oracleOwnerWallet: Wallet, private priceFeed: IPriceFeed, private cfg: SubmitterConfig, - private getSlot: () => number, + private getSlot: () => number ) { this.program = new FluxAggregator(this.oracleOwnerWallet, programID) @@ -62,54 +62,42 @@ export class Submitter { public async withdrawRewards() {} - private async observeAggregatorState() { - // load state + private async updateStates() { if (!this.aggregator) { this.aggregator = await Aggregator.load(this.aggregatorPK) } - - let registeredHooks = false - const keysToQuery: { [keys: string]: (key: string, acc: AccountInfo) => void } = { - [this.oraclePK.toBase58()]: (key, acc) => { - this.oracle = Oracle.deserialize(acc.data) - log.debug(`Update oracle: ${key}`) - }, - [this.aggregator.answerSubmissions.toBase58()]: (key, acc) => { - this.answerSubmissions = Submissions.deserialize(acc.data); - log.debug(`Update answerSubmissions: ${key}`) - }, - [this.aggregator.roundSubmissions.toBase58()]: (key, acc) => { - this.roundSubmissions = Submissions.deserialize(acc.data); - log.debug(`Update roundSubmissions: ${key}`) - }, - } - this.refreshAccounts = async () => { - const { keys, array } = await getMultipleAccounts(conn, Object.keys(keysToQuery)); - keys.forEach((key, i) => { - keysToQuery[key](key, array[i]) + const [ + oracle, + roundSubmissions, + answerSubmissions, + ] = await getAccounts(conn, [ + this.oraclePK, + this.aggregator.roundSubmissions, + this.aggregator.answerSubmissions, + ]) - if(!registeredHooks) { - conn.onAccountChange(new PublicKey(key), async (info) => { - keysToQuery[key](key, info) - }) - } - }) + this.oracle = Oracle.deserialize(oracle.data) + this.answerSubmissions = Submissions.deserialize(answerSubmissions.data) + this.roundSubmissions = Submissions.deserialize(roundSubmissions.data) + } - registeredHooks = true; - }; + private isRoundReported(roundID: BN): boolean { + return !roundID.isZero() && roundID.lte(this.reportedRound) + } - await this.refreshAccounts() + private async observeAggregatorState() { + await this.updateStates() conn.onAccountChange(this.aggregatorPK, async (info) => { this.aggregator = Aggregator.deserialize(info.data) - const roundID = this.aggregator.round.id - if (!roundID.isZero() && roundID.lte(this.reportedRound)) { - this.logger.debug("don't report to the same round twice") + if (this.isRoundReported(this.aggregator.round.id)) { return } + // only update states if actually reporting to save RPC calls + await this.updateStates() this.onAggregatorStateUpdate() }) } @@ -174,7 +162,7 @@ export class Submitter { aggregator: this.aggregator, submissions: this.roundSubmissions, answerSubmissions: this.answerSubmissions, - }); + }) if (!this.canSubmitToCurrentRound) { return @@ -227,14 +215,12 @@ export class Submitter { value, }) - this.logger.info("Submit OK"); + this.logger.info("Submit OK") } catch (err) { console.log(err) this.logger.error("Submit error", { err: err.toString(), }) } - - } } diff --git a/src/utils.ts b/src/utils.ts index 4ff22ef..6642032 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -72,19 +72,25 @@ export function chunks(array: T[], size: number): T[][] { return Array.apply( 0, new Array(Math.ceil(array.length / size)) - ).map((_, index) => array.slice(index * size, (index + 1) * size)); + ).map((_, index) => array.slice(index * size, (index + 1) * size)) } -export const getMultipleAccounts = async ( +export const getAccounts = async ( connection: Connection, - keys: string[], - commitment: string = 'single' + keys: PublicKey[], + commitment: string = "single" ) => { + // const [a, b, c] = getAccounts(pk1, pk2, pk3) + const result = await Promise.all( chunks(keys, 99).map((chunk) => - getMultipleAccountsCore(connection, chunk, commitment) + getMultipleAccountsCore( + connection, + chunk.map((key) => key.toBase58()), + commitment + ) ) - ); + ) const array = result .map( @@ -92,37 +98,65 @@ export const getMultipleAccounts = async ( a.array .filter((acc) => !!acc) .map((acc) => { - const { data, ...rest } = acc; + const { data, ...rest } = acc const obj = { ...rest, data: Buffer.from(data[0], "base64"), - } as AccountInfo; - return obj; + } as AccountInfo + return obj }) as AccountInfo[] ) - .flat(); - return { keys, array }; -}; + .flat() + return array +} + +export const getMultipleAccounts = async ( + connection: Connection, + keys: string[], + commitment: string = "single" +) => { + const result = await Promise.all( + chunks(keys, 99).map((chunk) => + getMultipleAccountsCore(connection, chunk, commitment) + ) + ) + + const array = result + .map( + (a) => + a.array + .filter((acc) => !!acc) + .map((acc) => { + const { data, ...rest } = acc + const obj = { + ...rest, + data: Buffer.from(data[0], "base64"), + } as AccountInfo + return obj + }) as AccountInfo[] + ) + .flat() + return { keys, array } +} const getMultipleAccountsCore = async ( connection: any, keys: string[], commitment: string ) => { - const args = connection._buildArgs([keys], commitment, "base64"); + const args = connection._buildArgs([keys], commitment, "base64") - const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args); + const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args) if (unsafeRes.error) { throw new Error( "failed to get info about account " + unsafeRes.error.message - ); + ) } if (unsafeRes.result.value) { - const array = unsafeRes.result.value as AccountInfo[]; - return { keys, array }; + const array = unsafeRes.result.value as AccountInfo[] + return { keys, array } } - throw new Error('Unable to get account'); -}; - + throw new Error("Unable to get account") +}