diff --git a/src/Submitter.ts b/src/Submitter.ts index 9b0b627..083ab4c 100644 --- a/src/Submitter.ts +++ b/src/Submitter.ts @@ -23,7 +23,8 @@ const MAX_ROUND_STALENESS = 10 export class Submitter { public aggregator!: Aggregator - public submissions!: Submissions + public roundSubmissions!: Submissions + public answerSubmissions!: Submissions public program: FluxAggregator public logger!: Logger public currentValue: BN @@ -45,7 +46,17 @@ export class Submitter { public async start() { // make sure the states are initialized this.aggregator = await Aggregator.load(this.aggregatorPK) - this.submissions = await Submissions.load(this.aggregator.roundSubmissions) + this.roundSubmissions = await Submissions.load( + + this.aggregator.roundSubmissions + + ) + this.answerSubmissions = await Submissions.load( + + this.aggregator.answerSubmissions + + ) + this.logger = logger.child({ aggregator: this.aggregator.config.description, }) @@ -56,13 +67,17 @@ export class Submitter { private async observeAggregatorState() { conn.onAccountChange(this.aggregatorPK, async (info) => { this.aggregator = Aggregator.deserialize(info.data) - this.submissions = await Submissions.load( + this.roundSubmissions = await Submissions.load( this.aggregator.roundSubmissions ) + this.answerSubmissions = await Submissions.load( + this.aggregator.answerSubmissions + ) // TODO: load answer this.logger.debug("state updated", { aggregator: this.aggregator, - submissions: this.submissions, + submissions: this.roundSubmissions, + answerSubmissions: this.answerSubmissions, }) this.onAggregatorStateUpdate() @@ -89,18 +104,19 @@ export class Submitter { const { round } = this.aggregator - if (!this.hadSubmitted) { + if (this.canSubmitToCurrentRound) { this.logger.info("Submit to current round") await this.submitCurrentValue(round.id) return } + // or, see if oracle can start a new round const epoch = await conn.getEpochInfo() const sinceLastUpdate = new BN(epoch.absoluteSlot).sub(round.updatedAt) // console.log("slot", epoch.absoluteSlot, sinceLastUpdate.toString()) if (sinceLastUpdate.ltn(MAX_ROUND_STALENESS)) { - // round is not stale yet. don't submit + // round is not stale yet. don't submit new round return } @@ -114,8 +130,7 @@ export class Submitter { } private async onAggregatorStateUpdate() { - // don't try to submit if already submitted... - if (this.hadSubmitted) { + if (this.canSubmitToCurrentRound) { return } @@ -123,8 +138,8 @@ export class Submitter { await this.trySubmit() } - get hadSubmitted(): boolean { - return this.submissions.hadSubmitted(this.oraclePK) + get canSubmitToCurrentRound(): boolean { + return this.roundSubmissions.canSubmit(this.oraclePK, this.aggregator.config) } private async submitCurrentValue(round: BN) { @@ -140,17 +155,24 @@ export class Submitter { value: value.toString(), }) - await this.program.submit({ - accounts: { - aggregator: { write: this.aggregatorPK }, - roundSubmissions: { write: this.aggregator.roundSubmissions }, - answerSubmissions: { write: this.aggregator.answerSubmissions }, - oracle: { write: this.oraclePK }, - oracle_owner: this.oracleOwnerWallet.account, - }, + try { + await this.program.submit({ + accounts: { + aggregator: { write: this.aggregatorPK }, + roundSubmissions: { write: this.aggregator.roundSubmissions }, + answerSubmissions: { write: this.aggregator.answerSubmissions }, + oracle: { write: this.oraclePK }, + oracle_owner: this.oracleOwnerWallet.account, + }, - round_id: round, - value, - }) + round_id: round, + value, + }) + } catch (err) { + this.logger.error("submit error", { + err, + }) + // throw err + } } } diff --git a/src/schema.ts b/src/schema.ts index e385845..d92d2fd 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -86,14 +86,14 @@ export abstract class Serialization { } class Submission { - public time!: BN + public updatedAt!: BN public value!: BN public oracle!: PublicKey public static schema = { kind: "struct", fields: [ - ["updated_at", "u64"], + ["updatedAt", "u64"], ["value", "u64"], ["oracle", [32], pubkeyMapper], ], @@ -138,6 +138,19 @@ export class Submissions extends Serialization { ], } + // if not already submitted, and has empty spot + public canSubmit(pk: PublicKey, cfg: AggregatorConfig): boolean { + if (this.hadSubmitted(pk)) { + return false + } + + let emptyIndex = this.submissions.findIndex((s) => { + return s.updatedAt.isZero() + }) + + return emptyIndex > 0 && emptyIndex < cfg.maxSubmissions + } + public hadSubmitted(pk: PublicKey): boolean { return !!this.submissions.find((s) => { return s.oracle.equals(pk) diff --git a/test.ts b/test.ts new file mode 100644 index 0000000..eaa6147 --- /dev/null +++ b/test.ts @@ -0,0 +1,113 @@ +import dotenv from "dotenv" +dotenv.config() + +import BN from "bn.js" + +import { BPFLoader, Wallet } from "solray" +import { AppContext, conn, network } from "./src/context" + +import fs from "fs" +import path from "path" +import { AggregatorConfig } from "./src/schema" +import FluxAggregator from "./src/FluxAggregator" + +import * as encoding from "./src/schema" +import { Account, AccountInfo, Connection, PublicKey } from "@solana/web3.js" +import { coinbase } from "./src/PriceFeed" +import { Submitter } from "./src/Submitter" + +const FLUX_AGGREGATOR_SO = path.resolve(__dirname, "build/flux_aggregator.so") + +async function main() { + let ctx = new AppContext() + + let deployer = await ctx.deployer() + let adminWallet = await ctx.adminWallet() + let oracleWallet = await ctx.oracleWallet() + + console.log(network) + + await conn.requestAirdrop(adminWallet.pubkey, 10 * 1e9) + console.log((await conn.getBalance(adminWallet.pubkey)) / 1e9) + + let aggregatorProgram = await deployer.ensure( + "aggregatorProgram", + async () => { + const programBinary = fs.readFileSync(FLUX_AGGREGATOR_SO) + + console.log(`deploying ${FLUX_AGGREGATOR_SO}...`) + const bpfLoader = new BPFLoader(adminWallet) + + return bpfLoader.load(programBinary) + } + ) + + const program = new FluxAggregator(adminWallet, aggregatorProgram.publicKey) + + let aggregator = await deployer.ensure( + "create btc:usd aggregator", + async () => { + let name = "btc:usd" + return program.initialize({ + config: new AggregatorConfig({ + description: name, + decimals: 2, + minSubmissions: 1, + maxSubmissions: 3, + restartDelay: 0, + rewardAmount: BigInt(10), + }), + owner: adminWallet.account, + }) + } + ) + + const N_ORACLES = 4 + interface OracleRole { + owner: Account + oracle: PublicKey + } + + const oracleRoles: OracleRole[] = [] + + for (let i = 0; i < N_ORACLES; i++) { + // TODO: probably put the desired oracles in a config file... + let owner = await deployer.ensure(`create oracle[${i}] owner`, async () => { + return new Account() + }) + + let oracle = await deployer.ensure( + `add oracle[${i}] to btc:usd`, + async () => { + return program.addOracle({ + description: "test-oracle", + aggregator: aggregator.publicKey, + aggregatorOwner: adminWallet.account, + oracleOwner: owner.publicKey, + }) + } + ) + + oracleRoles.push({ owner, oracle: oracle.publicKey }) + } + + for (const role of oracleRoles) { + // const wallet = Wallet.from + const owner = Wallet.fromAccount(role.owner, conn) + await conn.requestAirdrop(owner.pubkey, 10 * 1e9) + console.log(owner.address, await conn.getBalance(owner.pubkey)) + + const priceFeed = coinbase("BTC/USD") + const submitter = new Submitter( + aggregatorProgram.publicKey, + aggregator.publicKey, + role.oracle, + owner, + priceFeed + ) + + submitter.start() + } +} + +main().catch((err) => console.log(err))