rpc optimizations

This commit is contained in:
bartosz-lipinski 2021-02-24 20:41:36 -06:00
parent 6e28864719
commit 73d3146eb0
4 changed files with 129 additions and 41 deletions

5
.gitignore vendored
View File

@ -9,4 +9,7 @@ dist
deploy.json deploy.json
program/Cargo.lock program/Cargo.lock
integration-example/Cargo.lock integration-example/Cargo.lock
.DS_Store .DS_Store
.vscode/launch.json
config/setup.private.json
deploy.private.json

View File

@ -5,6 +5,7 @@ import { loadJSONFile } from "./json"
import { coinbase } from "./feeds" import { coinbase } from "./feeds"
import { Submitter, SubmitterConfig } from "./Submitter" import { Submitter, SubmitterConfig } from "./Submitter"
import { log } from "./log" import { log } from "./log"
import { conn } from "./context"
// Look at all the available aggregators and submit to those that the wallet can // Look at all the available aggregators and submit to those that the wallet can
// act as an oracle. // act as an oracle.
@ -20,7 +21,12 @@ export class PriceFeeder {
this.startAccessibleAggregators() this.startAccessibleAggregators()
} }
private startAccessibleAggregators() { private async startAccessibleAggregators() {
let slot = await conn.getSlot()
conn.onSlotChange(slotInfo => {
slot = slotInfo.slot
})
for (let [name, aggregatorInfo] of Object.entries( for (let [name, aggregatorInfo] of Object.entries(
this.deployInfo.aggregators this.deployInfo.aggregators
)) { )) {
@ -46,7 +52,8 @@ export class PriceFeeder {
// TODO: errrrr... probably make configurable on chain. hardwire for // TODO: errrrr... probably make configurable on chain. hardwire for
// now, don't submit value unless btc changes at least a dollar // now, don't submit value unless btc changes at least a dollar
minValueChangeForNewRound: 100, minValueChangeForNewRound: 100,
} },
() => slot,
) )
submitter.start() submitter.start()

View File

@ -1,10 +1,10 @@
import { Connection } from "@solana/web3.js" import { AccountInfo, Connection, EpochInfo } from "@solana/web3.js"
import { PublicKey, Wallet } from "solray" import { PublicKey, Wallet } from "solray"
import { conn } from "./context" import { conn } from "./context"
import { Aggregator, Submissions, Oracle } from "./schema" import { Aggregator, Submissions, Oracle } from "./schema"
import BN from "bn.js" import BN from "bn.js"
import { sleep } from "./utils" import { getMultipleAccounts, sleep } from "./utils"
import FluxAggregator from "./FluxAggregator" import FluxAggregator from "./FluxAggregator"
import { createLogger, Logger } from "winston" import { createLogger, Logger } from "winston"
@ -28,6 +28,8 @@ export class Submitter {
public program: FluxAggregator public program: FluxAggregator
public logger!: Logger public logger!: Logger
public currentValue: BN public currentValue: BN
private epoch?: EpochInfo
private refreshAccounts: () => Promise<void> = async () => { }
public reportedRound: BN public reportedRound: BN
@ -37,7 +39,8 @@ export class Submitter {
public oraclePK: PublicKey, public oraclePK: PublicKey,
private oracleOwnerWallet: Wallet, private oracleOwnerWallet: Wallet,
private priceFeed: IPriceFeed, private priceFeed: IPriceFeed,
private cfg: SubmitterConfig private cfg: SubmitterConfig,
private getSlot: () => number,
) { ) {
this.program = new FluxAggregator(this.oracleOwnerWallet, programID) this.program = new FluxAggregator(this.oracleOwnerWallet, programID)
@ -48,43 +51,64 @@ export class Submitter {
// TODO: harvest rewards if > n // TODO: harvest rewards if > n
public async start() { public async start() {
// make sure the states are initialized await this.observeAggregatorState()
await this.reloadState()
this.logger = log.child({ this.logger = log.child({
aggregator: this.aggregator.config.description, aggregator: this.aggregator.config.description,
}) })
await Promise.all([this.observeAggregatorState(), this.observePriceFeed()]) await this.observePriceFeed()
} }
public async withdrawRewards() {} public async withdrawRewards() {}
private async reloadState(loadAggregator = true) { private async observeAggregatorState() {
if (loadAggregator) { // load state
if (!this.aggregator) {
this.aggregator = await Aggregator.load(this.aggregatorPK) this.aggregator = await Aggregator.load(this.aggregatorPK)
} }
let registeredHooks = false
const keysToQuery: { [keys: string]: (key: string, acc: AccountInfo<Buffer>) => void } = {
[this.oraclePK.toBase58()]: (key, acc) => {
this.oracle = Oracle.deserialize(acc.data)
log.debug(`Update oracle: ${key}`)
},
[this.aggregator.answerSubmissions.toBase58()]: (key, acc) => {
this.answerSubmissions = Submissions.deserialize(acc.data);
log.debug(`Update answerSubmissions: ${key}`)
},
[this.aggregator.roundSubmissions.toBase58()]: (key, acc) => {
this.roundSubmissions = Submissions.deserialize(acc.data);
log.debug(`Update roundSubmissions: ${key}`)
},
}
this.roundSubmissions = await Submissions.load( this.refreshAccounts = async () => {
this.aggregator.roundSubmissions const { keys, array } = await getMultipleAccounts(conn, Object.keys(keysToQuery));
) keys.forEach((key, i) => {
this.answerSubmissions = await Submissions.load( keysToQuery[key](key, array[i])
this.aggregator.answerSubmissions
)
this.oracle = await Oracle.load(this.oraclePK) if(!registeredHooks) {
} conn.onAccountChange(new PublicKey(key), async (info) => {
keysToQuery[key](key, info)
})
}
})
registeredHooks = true;
};
await this.refreshAccounts()
private async observeAggregatorState() {
conn.onAccountChange(this.aggregatorPK, async (info) => { conn.onAccountChange(this.aggregatorPK, async (info) => {
this.aggregator = Aggregator.deserialize(info.data) this.aggregator = Aggregator.deserialize(info.data)
await this.reloadState(false)
this.logger.debug("state updated", { const roundID = this.aggregator.round.id
aggregator: this.aggregator, if (!roundID.isZero() && roundID.lte(this.reportedRound)) {
submissions: this.roundSubmissions, this.logger.debug("don't report to the same round twice")
answerSubmissions: this.answerSubmissions, return
}) }
this.onAggregatorStateUpdate() this.onAggregatorStateUpdate()
}) })
@ -117,8 +141,6 @@ export class Submitter {
private async trySubmit() { private async trySubmit() {
// TODO: make it possible to be triggered by chainlink task // TODO: make it possible to be triggered by chainlink task
// TODO: If from chainlink node, update state before running // TODO: If from chainlink node, update state before running
this.oracle = await Oracle.load(this.oraclePK)
this.logger.debug("oracle", { oracle: this.oracle }) this.logger.debug("oracle", { oracle: this.oracle })
const { round } = this.aggregator const { round } = this.aggregator
@ -130,10 +152,7 @@ export class Submitter {
} }
// or, see if oracle can start a new round // or, see if oracle can start a new round
const epoch = await conn.getEpochInfo() const sinceLastUpdate = new BN(this.getSlot()).sub(round.updatedAt)
const sinceLastUpdate = new BN(epoch.absoluteSlot).sub(round.updatedAt)
// console.log("slot", epoch.absoluteSlot, sinceLastUpdate.toString())
if (sinceLastUpdate.ltn(MAX_ROUND_STALENESS)) { if (sinceLastUpdate.ltn(MAX_ROUND_STALENESS)) {
// round is not stale yet. don't submit new round // round is not stale yet. don't submit new round
return return
@ -141,8 +160,7 @@ export class Submitter {
// The round is stale. start a new round if possible, or wait for another // The round is stale. start a new round if possible, or wait for another
// oracle to start // oracle to start
const oracle = await Oracle.load(this.oraclePK) if (this.oracle.canStartNewRound(round.id)) {
if (oracle.canStartNewRound(round.id)) {
let newRoundID = round.id.addn(1) let newRoundID = round.id.addn(1)
this.logger.info("Starting a new round", { this.logger.info("Starting a new round", {
round: newRoundID.toString(), round: newRoundID.toString(),
@ -152,6 +170,12 @@ export class Submitter {
} }
private async onAggregatorStateUpdate() { private async onAggregatorStateUpdate() {
this.logger.debug("state updated", {
aggregator: this.aggregator,
submissions: this.roundSubmissions,
answerSubmissions: this.answerSubmissions,
});
if (!this.canSubmitToCurrentRound) { if (!this.canSubmitToCurrentRound) {
return return
} }
@ -203,12 +227,7 @@ export class Submitter {
value, value,
}) })
await this.reloadState() this.logger.info("Submit OK");
this.logger.info("Submit OK", {
withdrawable: this.oracle.withdrawable.toString(),
rewardToken: this.aggregator.config.rewardTokenAccount.toString(),
})
} catch (err) { } catch (err) {
console.log(err) console.log(err)
this.logger.error("Submit error", { this.logger.error("Submit error", {

View File

@ -1,4 +1,4 @@
import { Connection, PublicKey } from "@solana/web3.js" import { AccountInfo, Connection, PublicKey } from "@solana/web3.js"
import EventEmitter from "events" import EventEmitter from "events"
import { solana, Wallet, NetworkName, Deployer } from "solray" import { solana, Wallet, NetworkName, Deployer } from "solray"
@ -67,3 +67,62 @@ export function eventsIter<T>(emitter: EventEmitter, key: string) {
}, },
} }
} }
export function chunks<T>(array: T[], size: number): T[][] {
return Array.apply<number, T[], T[][]>(
0,
new Array(Math.ceil(array.length / size))
).map((_, index) => array.slice(index * size, (index + 1) * size));
}
export const getMultipleAccounts = async (
connection: Connection,
keys: string[],
commitment: string = 'single'
) => {
const result = await Promise.all(
chunks(keys, 99).map((chunk) =>
getMultipleAccountsCore(connection, chunk, commitment)
)
);
const array = result
.map(
(a) =>
a.array
.filter((acc) => !!acc)
.map((acc) => {
const { data, ...rest } = acc;
const obj = {
...rest,
data: Buffer.from(data[0], "base64"),
} as AccountInfo<Buffer>;
return obj;
}) as AccountInfo<Buffer>[]
)
.flat();
return { keys, array };
};
const getMultipleAccountsCore = async (
connection: any,
keys: string[],
commitment: string
) => {
const args = connection._buildArgs([keys], commitment, "base64");
const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args);
if (unsafeRes.error) {
throw new Error(
"failed to get info about account " + unsafeRes.error.message
);
}
if (unsafeRes.result.value) {
const array = unsafeRes.result.value as AccountInfo<string[]>[];
return { keys, array };
}
throw new Error('Unable to get account');
};