simplify RPC optimization. subscription unreliable

This commit is contained in:
De Facto 2021-02-25 11:49:02 +08:00
parent 73d3146eb0
commit 885d9c0259
3 changed files with 84 additions and 65 deletions

View File

@ -13,8 +13,7 @@ export class PriceFeeder {
constructor( constructor(
private deployInfo: AggregatorDeployFile, private deployInfo: AggregatorDeployFile,
private wallet: Wallet private wallet: Wallet
) { ) {}
}
async start() { async start() {
// find aggregators that this wallet can act as oracle // find aggregators that this wallet can act as oracle
@ -23,7 +22,7 @@ export class PriceFeeder {
private async startAccessibleAggregators() { private async startAccessibleAggregators() {
let slot = await conn.getSlot() let slot = await conn.getSlot()
conn.onSlotChange(slotInfo => { conn.onSlotChange((slotInfo) => {
slot = slotInfo.slot slot = slotInfo.slot
}) })
@ -53,7 +52,7 @@ export class PriceFeeder {
// now, don't submit value unless btc changes at least a dollar // now, don't submit value unless btc changes at least a dollar
minValueChangeForNewRound: 100, minValueChangeForNewRound: 100,
}, },
() => slot, () => slot
) )
submitter.start() submitter.start()

View File

@ -4,7 +4,7 @@ import { conn } from "./context"
import { Aggregator, Submissions, Oracle } from "./schema" import { Aggregator, Submissions, Oracle } from "./schema"
import BN from "bn.js" import BN from "bn.js"
import { getMultipleAccounts, sleep } from "./utils" import { getAccounts, getMultipleAccounts, sleep } from "./utils"
import FluxAggregator from "./FluxAggregator" import FluxAggregator from "./FluxAggregator"
import { createLogger, Logger } from "winston" import { createLogger, Logger } from "winston"
@ -29,7 +29,7 @@ export class Submitter {
public logger!: Logger public logger!: Logger
public currentValue: BN public currentValue: BN
private epoch?: EpochInfo private epoch?: EpochInfo
private refreshAccounts: () => Promise<void> = async () => { } private refreshAccounts: () => Promise<void> = async () => {}
public reportedRound: BN public reportedRound: BN
@ -40,7 +40,7 @@ export class Submitter {
private oracleOwnerWallet: Wallet, private oracleOwnerWallet: Wallet,
private priceFeed: IPriceFeed, private priceFeed: IPriceFeed,
private cfg: SubmitterConfig, private cfg: SubmitterConfig,
private getSlot: () => number, private getSlot: () => number
) { ) {
this.program = new FluxAggregator(this.oracleOwnerWallet, programID) this.program = new FluxAggregator(this.oracleOwnerWallet, programID)
@ -62,54 +62,42 @@ export class Submitter {
public async withdrawRewards() {} public async withdrawRewards() {}
private async observeAggregatorState() { private async updateStates() {
// load state
if (!this.aggregator) { if (!this.aggregator) {
this.aggregator = await Aggregator.load(this.aggregatorPK) this.aggregator = await Aggregator.load(this.aggregatorPK)
} }
let registeredHooks = false
const keysToQuery: { [keys: string]: (key: string, acc: AccountInfo<Buffer>) => void } = {
[this.oraclePK.toBase58()]: (key, acc) => {
this.oracle = Oracle.deserialize(acc.data)
log.debug(`Update oracle: ${key}`)
},
[this.aggregator.answerSubmissions.toBase58()]: (key, acc) => {
this.answerSubmissions = Submissions.deserialize(acc.data);
log.debug(`Update answerSubmissions: ${key}`)
},
[this.aggregator.roundSubmissions.toBase58()]: (key, acc) => {
this.roundSubmissions = Submissions.deserialize(acc.data);
log.debug(`Update roundSubmissions: ${key}`)
},
}
this.refreshAccounts = async () => { const [
const { keys, array } = await getMultipleAccounts(conn, Object.keys(keysToQuery)); oracle,
keys.forEach((key, i) => { roundSubmissions,
keysToQuery[key](key, array[i]) answerSubmissions,
] = await getAccounts(conn, [
this.oraclePK,
this.aggregator.roundSubmissions,
this.aggregator.answerSubmissions,
])
if(!registeredHooks) { this.oracle = Oracle.deserialize(oracle.data)
conn.onAccountChange(new PublicKey(key), async (info) => { this.answerSubmissions = Submissions.deserialize(answerSubmissions.data)
keysToQuery[key](key, info) this.roundSubmissions = Submissions.deserialize(roundSubmissions.data)
}) }
}
})
registeredHooks = true; private isRoundReported(roundID: BN): boolean {
}; return !roundID.isZero() && roundID.lte(this.reportedRound)
}
await this.refreshAccounts() private async observeAggregatorState() {
await this.updateStates()
conn.onAccountChange(this.aggregatorPK, async (info) => { conn.onAccountChange(this.aggregatorPK, async (info) => {
this.aggregator = Aggregator.deserialize(info.data) this.aggregator = Aggregator.deserialize(info.data)
const roundID = this.aggregator.round.id if (this.isRoundReported(this.aggregator.round.id)) {
if (!roundID.isZero() && roundID.lte(this.reportedRound)) {
this.logger.debug("don't report to the same round twice")
return return
} }
// only update states if actually reporting to save RPC calls
await this.updateStates()
this.onAggregatorStateUpdate() this.onAggregatorStateUpdate()
}) })
} }
@ -174,7 +162,7 @@ export class Submitter {
aggregator: this.aggregator, aggregator: this.aggregator,
submissions: this.roundSubmissions, submissions: this.roundSubmissions,
answerSubmissions: this.answerSubmissions, answerSubmissions: this.answerSubmissions,
}); })
if (!this.canSubmitToCurrentRound) { if (!this.canSubmitToCurrentRound) {
return return
@ -227,14 +215,12 @@ export class Submitter {
value, value,
}) })
this.logger.info("Submit OK"); this.logger.info("Submit OK")
} catch (err) { } catch (err) {
console.log(err) console.log(err)
this.logger.error("Submit error", { this.logger.error("Submit error", {
err: err.toString(), err: err.toString(),
}) })
} }
} }
} }

View File

@ -72,19 +72,25 @@ export function chunks<T>(array: T[], size: number): T[][] {
return Array.apply<number, T[], T[][]>( return Array.apply<number, T[], T[][]>(
0, 0,
new Array(Math.ceil(array.length / size)) new Array(Math.ceil(array.length / size))
).map((_, index) => array.slice(index * size, (index + 1) * size)); ).map((_, index) => array.slice(index * size, (index + 1) * size))
} }
export const getMultipleAccounts = async ( export const getAccounts = async (
connection: Connection, connection: Connection,
keys: string[], keys: PublicKey[],
commitment: string = 'single' commitment: string = "single"
) => { ) => {
// const [a, b, c] = getAccounts(pk1, pk2, pk3)
const result = await Promise.all( const result = await Promise.all(
chunks(keys, 99).map((chunk) => chunks(keys, 99).map((chunk) =>
getMultipleAccountsCore(connection, chunk, commitment) getMultipleAccountsCore(
connection,
chunk.map((key) => key.toBase58()),
commitment
)
) )
); )
const array = result const array = result
.map( .map(
@ -92,37 +98,65 @@ export const getMultipleAccounts = async (
a.array a.array
.filter((acc) => !!acc) .filter((acc) => !!acc)
.map((acc) => { .map((acc) => {
const { data, ...rest } = acc; const { data, ...rest } = acc
const obj = { const obj = {
...rest, ...rest,
data: Buffer.from(data[0], "base64"), data: Buffer.from(data[0], "base64"),
} as AccountInfo<Buffer>; } as AccountInfo<Buffer>
return obj; return obj
}) as AccountInfo<Buffer>[] }) as AccountInfo<Buffer>[]
) )
.flat(); .flat()
return { keys, array }; return array
}; }
export const getMultipleAccounts = async (
connection: Connection,
keys: string[],
commitment: string = "single"
) => {
const result = await Promise.all(
chunks(keys, 99).map((chunk) =>
getMultipleAccountsCore(connection, chunk, commitment)
)
)
const array = result
.map(
(a) =>
a.array
.filter((acc) => !!acc)
.map((acc) => {
const { data, ...rest } = acc
const obj = {
...rest,
data: Buffer.from(data[0], "base64"),
} as AccountInfo<Buffer>
return obj
}) as AccountInfo<Buffer>[]
)
.flat()
return { keys, array }
}
const getMultipleAccountsCore = async ( const getMultipleAccountsCore = async (
connection: any, connection: any,
keys: string[], keys: string[],
commitment: string commitment: string
) => { ) => {
const args = connection._buildArgs([keys], commitment, "base64"); const args = connection._buildArgs([keys], commitment, "base64")
const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args); const unsafeRes = await connection._rpcRequest("getMultipleAccounts", args)
if (unsafeRes.error) { if (unsafeRes.error) {
throw new Error( throw new Error(
"failed to get info about account " + unsafeRes.error.message "failed to get info about account " + unsafeRes.error.message
); )
} }
if (unsafeRes.result.value) { if (unsafeRes.result.value) {
const array = unsafeRes.result.value as AccountInfo<string[]>[]; const array = unsafeRes.result.value as AccountInfo<string[]>[]
return { keys, array }; return { keys, array }
} }
throw new Error('Unable to get account'); throw new Error("Unable to get account")
}; }