diff --git a/package.json b/package.json index 59b9ad9..9b38079 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "commander": "^6.2.0", "dotenv": "^8.2.0", "solray": "git+https://github.com/czl1378/solray.git", + "winston": "^3.3.3", "ws": "^7.4.1" }, "devDependencies": { diff --git a/src/FluxAggregator.ts b/src/FluxAggregator.ts index 30ef41f..9fe1dba 100644 --- a/src/FluxAggregator.ts +++ b/src/FluxAggregator.ts @@ -7,6 +7,8 @@ import { SPLToken, } from "solray" +import BN from "bn.js" + import { SYSVAR_RENT_PUBKEY, SYSVAR_CLOCK_PUBKEY, @@ -83,8 +85,8 @@ interface SubmitParams { oracle_owner: Account } - round_id: BigInt - value: BigInt + round_id: BN + value: BN } interface WithdrawParams { diff --git a/src/PriceFeed.ts b/src/PriceFeed.ts new file mode 100644 index 0000000..1751554 --- /dev/null +++ b/src/PriceFeed.ts @@ -0,0 +1,78 @@ +import WebSocket from "ws" +import EventEmitter from "events" + +export const UPDATE = "UPDATE" + +export interface IPrice { + decimals: number + value: number +} + +export interface IPriceFeed { + [Symbol.asyncIterator]: () => AsyncIterator +} + +// events convert an particular event type of event emitter to an async iterator +function events(emitter: EventEmitter, key: string) { + // TODO support cancel + + let resolve + let p = new Promise((resolveFn) => { + resolve = resolveFn + }) + + emitter.on(key, (value) => { + resolve(value) + p = new Promise((resolveFn) => { + resolve = resolveFn + }) + }) + + return { + [Symbol.asyncIterator]: () => { + return { + next() { + return p.then((info) => ({ done: false, value: info })) + }, + } + }, + } +} + +export function coinbase(pair: string): IPriceFeed { + const emitter = new EventEmitter() + + const ws = new WebSocket("wss://ws-feed.pro.coinbase.com") + + ws.on("open", () => { + console.log(`${pair} price feed connected`) + ws.send( + JSON.stringify({ + type: "subscribe", + product_ids: [pair.replace("/", "-").toUpperCase()], + channels: ["ticker"], + }) + ) + }) + + ws.on("message", async (data) => { + const json = JSON.parse(data) + if (!json || !json.price) { + return console.log(data) + } + const price: IPrice = { + decimals: 2, + value: Math.floor(json.price * 100), + } + emitter.emit(UPDATE, price) + // console.log("current price:", json.price) + }) + + ws.on("close", (err) => { + // TODO: automatic reconnect + console.error(`websocket closed: ${err}`) + process.exit(1) + }) + + return events(emitter, UPDATE) +} diff --git a/src/Submitter.ts b/src/Submitter.ts new file mode 100644 index 0000000..e82c6c2 --- /dev/null +++ b/src/Submitter.ts @@ -0,0 +1,158 @@ +import { Connection } from "@solana/web3.js" +import { PublicKey, Wallet } from "solray" +import { conn } from "./context" + +import { Aggregator, Submissions, Oracle } from "./schema" +import BN from "bn.js" +import { sleep } from "./utils" +import FluxAggregator from "./FluxAggregator" + +import { createLogger, Logger } from "winston" +import logger from "winston" +logger.add( + new logger.transports.Console({ + format: logger.format.simple(), + level: "debug", + }) +) + +import { IPriceFeed } from "./PriceFeed" + +// allow oracle to start a new round after this many slots. each slot is about 500ms +const MAX_ROUND_STALENESS = 10 + +export class Submitter { + public aggregator!: Aggregator + public submissions!: Submissions + public program: FluxAggregator + public logger!: Logger + public currentValue: BN + + constructor( + programID: PublicKey, + public aggregatorPK: PublicKey, + public oraclePK: PublicKey, + private oracleOwnerWallet: Wallet, + private priceFeed: IPriceFeed + ) { + this.program = new FluxAggregator(this.oracleOwnerWallet, programID) + + this.currentValue = new BN(0) + } + + // TODO: harvest rewards if > n + + public async start() { + // make sure the states are initialized + this.aggregator = await Aggregator.load(this.aggregatorPK) + this.submissions = await Submissions.load(this.aggregator.roundSubmissions) + this.logger = logger.child({ + aggregator: this.aggregator.config.description, + }) + + await Promise.all([this.observeAggregatorState(), this.observePriceFlux()]) + } + + private async observeAggregatorState() { + conn.onAccountChange(this.aggregatorPK, async (info) => { + this.aggregator = Aggregator.deserialize(info.data) + this.submissions = await Submissions.load( + this.aggregator.roundSubmissions + ) + // TODO: load answer + this.logger.debug("state updated", { + aggregator: this.aggregator, + submissions: this.submissions, + }) + + this.onAggregatorStateUpdate() + }) + } + + // TODO: immediately submit to current round if not submitted yet + + private async observePriceFlux() { + for await (let price of this.priceFeed) { + if (price.decimals != this.aggregator.config.decimals) { + throw new Error( + `Expect price with decimals of ${this.aggregator.config.decimals} got: ${price.decimals}` + ) + } + + this.currentValue = new BN(price.value) + // TODO: check flux against current answer + await this.trySubmit() + } + } + // compare with current answer + + private async trySubmit() { + // TODO: make it possible to be triggered by chainlink task + // TODO: If from chainlink node, update state before running + + const { round } = this.aggregator + + const epoch = await conn.getEpochInfo() + + const sinceLastUpdate = new BN(epoch.absoluteSlot).sub(round.updatedAt) + // console.log("slot", epoch.absoluteSlot, sinceLastUpdate.toString()) + + if (!this.hadSubmitted) { + this.logger.info("Submit to current round") + await this.submitCurrentValue(round.id) + return + } + + if (!sinceLastUpdate.gtn(MAX_ROUND_STALENESS)) { + return + } + + // The round is stale. start a new round if possible + this.logger.info("Starting a new round") + const oracle = await Oracle.load(this.oraclePK) + if (oracle.canStartNewRound(round.id)) { + return this.submitCurrentValue(round.id.addn(1)) + } + } + + private async onAggregatorStateUpdate() { + // don't try to submit if already submitted... + if (this.hadSubmitted) { + return + } + + this.logger.info("Another oracle started a new round") + await this.trySubmit() + } + + get hadSubmitted(): boolean { + return this.submissions.hadSubmitted(this.oraclePK) + } + + private async submitCurrentValue(round: BN) { + // guard zero value + const value = this.currentValue + if (value.isZero()) { + this.logger.warn("current value is zero. skip submit.") + return + } + + this.logger.info("submit", { + round: round.toString(), + value: value.toString(), + }) + + await this.program.submit({ + accounts: { + aggregator: { write: this.aggregatorPK }, + roundSubmissions: { write: this.aggregator.roundSubmissions }, + answerSubmissions: { write: this.aggregator.answerSubmissions }, + oracle: { write: this.oraclePK }, + oracle_owner: this.oracleOwnerWallet.account, + }, + + round_id: round, + value, + }) + } +} diff --git a/src/context.ts b/src/context.ts new file mode 100644 index 0000000..347daea --- /dev/null +++ b/src/context.ts @@ -0,0 +1,62 @@ +import { walletFromEnv } from "./utils" + +import FluxAggregator from "./FluxAggregator" + +import { solana, Wallet, Deployer } from "solray" + +export const network = (process.env.NETWORK || "local") as any +export const conn = solana.connect(network) + +async function openDeployer(): Promise { + return Deployer.open(`deploy.${network}.json`) +} + +export class AppContext { + // static readonly AGGREGATOR_PROGRAM = "aggregatorProgram" + + // static async forAdmin() { + // const deployer = await openDeployer() + // const admin = await walletFromEnv("ADMIN_MNEMONIC", conn) + + // return new AppContext(deployer, admin) + // } + + // static async forOracle() { + // const deployer = await openDeployer() + // const wallet = await walletFromEnv("ORACLE_MNEMONIC", conn) + + // return new AppContext(deployer, wallet) + // } + + // constructor(public deployer: Deployer, public wallet: Wallet) {} + + async deployer() { + return Deployer.open(`deploy.${network}.json`) + } + + async adminWallet() { + return walletFromEnv("ADMIN_MNEMONIC", conn) + } + + async oracleWallet() { + return walletFromEnv("ADMIN_MNEMONIC", conn) + } + + // get aggregatorProgramID() { + // return this.aggregatorProgramAccount.publicKey + // } + + // get aggregator() { + // return new FluxAggregator(this.wallet, this.aggregatorProgramID) + // } + + // get aggregatorProgramAccount() { + // const program = this.deployer.account(AppContext.AGGREGATOR_PROGRAM) + + // if (program == null) { + // throw new Error(`flux aggregator program is not yet deployed`) + // } + + // return program + // } +} diff --git a/src/schema.ts b/src/schema.ts index 6f5558c..45b3209 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -1,6 +1,7 @@ import { PublicKey, Account } from "solray" import BN from "bn.js" import { deserialize, serialize } from "borsh" +import { conn } from "./context" const MAX_ORACLES = 13 @@ -37,7 +38,29 @@ const str32Mapper = { }, } -abstract class Serialization { +const u64Date = { + encode: (date: Date) => { + return new BN(Math.floor(date.getTime() / 1000)) + }, + + decode: (unixtime: BN) => { + return new Date(unixtime.toNumber() * 1000) + }, +} + +export abstract class Serialization { + public static async load( + this: { new (data: any): T }, + key: PublicKey + ): Promise { + const info = await conn.getAccountInfo(key, "recent") + if (!info) { + throw new Error("account does not exist") + } + + return deserialize(schema, this, info.data) + } + public static deserialize(this: { new (data: any): T }, data: Buffer): T { return deserialize(schema, this, data) } @@ -103,6 +126,9 @@ export class AggregatorConfig extends Serialization { } export class Submissions extends Serialization { + public isInitialized!: boolean + public submissions!: Submission[] + public static size = 625 public static schema = { kind: "struct", @@ -111,19 +137,33 @@ export class Submissions extends Serialization { ["submissions", [Submission, MAX_ORACLES]], ], } + + public hadSubmitted(pk: PublicKey): boolean { + return !!this.submissions.find((s) => { + return s.oracle.equals(pk) + }) + } } class Round extends Serialization { + public id!: BN + public createdAt!: BN + public updatedAt!: BN + public static schema = { kind: "struct", fields: [ ["id", "u64"], - ["created_at", "u64"], - ["updated_at", "u64"], + ["createdAt", "u64"], + ["updatedAt", "u64"], ], } } class Answer extends Serialization { + public round_id!: BN + public created_at!: BN + public updated_at!: BN + public static schema = { kind: "struct", fields: [ @@ -140,6 +180,8 @@ export class Aggregator extends Serialization { public config!: AggregatorConfig public roundSubmissions!: PublicKey public answerSubmissions!: PublicKey + public answer!: Answer + public round!: Round public static schema = { kind: "struct", @@ -260,6 +302,7 @@ function boolToInt(t: boolean) { export class Oracle extends Serialization { public static size = 113 + public allowStartRound!: BN public static schema = { kind: "struct", @@ -267,11 +310,15 @@ export class Oracle extends Serialization { ["description", [32], str32Mapper], ["isInitialized", "u8", boolMapper], ["withdrawable", "u64"], - ["allow_start_round", "u64"], + ["allowStartRound", "u64"], ["aggregator", [32], pubkeyMapper], ["owner", [32], pubkeyMapper], ], } + + public canStartNewRound(round: BN): boolean { + return this.allowStartRound.lte(round) + } } // if there is optional or variable length items, what is: borsh_utils::get_packed_len::()? diff --git a/src/utils.ts b/src/utils.ts index 681f6ed..afe1a2a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -108,12 +108,3 @@ export async function walletFromEnv( return wallet } -export async function openDeployer(): Promise { - const deployFile = process.env.DEPLOY_FILE - - if (!deployFile) { - throw new Error(`Set DEPLOY_FILE in .env`) - } - - return Deployer.open(deployFile) -}