flux submitter

This commit is contained in:
De Facto 2021-02-17 21:39:03 +08:00
parent a1a7656cd7
commit b76aef53ce
7 changed files with 354 additions and 15 deletions

View File

@ -18,6 +18,7 @@
"commander": "^6.2.0",
"dotenv": "^8.2.0",
"solray": "git+https://github.com/czl1378/solray.git",
"winston": "^3.3.3",
"ws": "^7.4.1"
},
"devDependencies": {

View File

@ -7,6 +7,8 @@ import {
SPLToken,
} from "solray"
import BN from "bn.js"
import {
SYSVAR_RENT_PUBKEY,
SYSVAR_CLOCK_PUBKEY,
@ -83,8 +85,8 @@ interface SubmitParams {
oracle_owner: Account
}
round_id: BigInt
value: BigInt
round_id: BN
value: BN
}
interface WithdrawParams {

78
src/PriceFeed.ts Normal file
View File

@ -0,0 +1,78 @@
import WebSocket from "ws"
import EventEmitter from "events"
export const UPDATE = "UPDATE"
export interface IPrice {
decimals: number
value: number
}
export interface IPriceFeed {
[Symbol.asyncIterator]: () => AsyncIterator<IPrice>
}
// events convert an particular event type of event emitter to an async iterator
function events<T>(emitter: EventEmitter, key: string) {
// TODO support cancel
let resolve
let p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
emitter.on(key, (value) => {
resolve(value)
p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
})
return {
[Symbol.asyncIterator]: () => {
return {
next() {
return p.then((info) => ({ done: false, value: info }))
},
}
},
}
}
export function coinbase(pair: string): IPriceFeed {
const emitter = new EventEmitter()
const ws = new WebSocket("wss://ws-feed.pro.coinbase.com")
ws.on("open", () => {
console.log(`${pair} price feed connected`)
ws.send(
JSON.stringify({
type: "subscribe",
product_ids: [pair.replace("/", "-").toUpperCase()],
channels: ["ticker"],
})
)
})
ws.on("message", async (data) => {
const json = JSON.parse(data)
if (!json || !json.price) {
return console.log(data)
}
const price: IPrice = {
decimals: 2,
value: Math.floor(json.price * 100),
}
emitter.emit(UPDATE, price)
// console.log("current price:", json.price)
})
ws.on("close", (err) => {
// TODO: automatic reconnect
console.error(`websocket closed: ${err}`)
process.exit(1)
})
return events(emitter, UPDATE)
}

158
src/Submitter.ts Normal file
View File

@ -0,0 +1,158 @@
import { Connection } from "@solana/web3.js"
import { PublicKey, Wallet } from "solray"
import { conn } from "./context"
import { Aggregator, Submissions, Oracle } from "./schema"
import BN from "bn.js"
import { sleep } from "./utils"
import FluxAggregator from "./FluxAggregator"
import { createLogger, Logger } from "winston"
import logger from "winston"
logger.add(
new logger.transports.Console({
format: logger.format.simple(),
level: "debug",
})
)
import { IPriceFeed } from "./PriceFeed"
// allow oracle to start a new round after this many slots. each slot is about 500ms
const MAX_ROUND_STALENESS = 10
export class Submitter {
public aggregator!: Aggregator
public submissions!: Submissions
public program: FluxAggregator
public logger!: Logger
public currentValue: BN
constructor(
programID: PublicKey,
public aggregatorPK: PublicKey,
public oraclePK: PublicKey,
private oracleOwnerWallet: Wallet,
private priceFeed: IPriceFeed
) {
this.program = new FluxAggregator(this.oracleOwnerWallet, programID)
this.currentValue = new BN(0)
}
// TODO: harvest rewards if > n
public async start() {
// make sure the states are initialized
this.aggregator = await Aggregator.load(this.aggregatorPK)
this.submissions = await Submissions.load(this.aggregator.roundSubmissions)
this.logger = logger.child({
aggregator: this.aggregator.config.description,
})
await Promise.all([this.observeAggregatorState(), this.observePriceFlux()])
}
private async observeAggregatorState() {
conn.onAccountChange(this.aggregatorPK, async (info) => {
this.aggregator = Aggregator.deserialize(info.data)
this.submissions = await Submissions.load(
this.aggregator.roundSubmissions
)
// TODO: load answer
this.logger.debug("state updated", {
aggregator: this.aggregator,
submissions: this.submissions,
})
this.onAggregatorStateUpdate()
})
}
// TODO: immediately submit to current round if not submitted yet
private async observePriceFlux() {
for await (let price of this.priceFeed) {
if (price.decimals != this.aggregator.config.decimals) {
throw new Error(
`Expect price with decimals of ${this.aggregator.config.decimals} got: ${price.decimals}`
)
}
this.currentValue = new BN(price.value)
// TODO: check flux against current answer
await this.trySubmit()
}
}
// compare with current answer
private async trySubmit() {
// TODO: make it possible to be triggered by chainlink task
// TODO: If from chainlink node, update state before running
const { round } = this.aggregator
const epoch = await conn.getEpochInfo()
const sinceLastUpdate = new BN(epoch.absoluteSlot).sub(round.updatedAt)
// console.log("slot", epoch.absoluteSlot, sinceLastUpdate.toString())
if (!this.hadSubmitted) {
this.logger.info("Submit to current round")
await this.submitCurrentValue(round.id)
return
}
if (!sinceLastUpdate.gtn(MAX_ROUND_STALENESS)) {
return
}
// The round is stale. start a new round if possible
this.logger.info("Starting a new round")
const oracle = await Oracle.load(this.oraclePK)
if (oracle.canStartNewRound(round.id)) {
return this.submitCurrentValue(round.id.addn(1))
}
}
private async onAggregatorStateUpdate() {
// don't try to submit if already submitted...
if (this.hadSubmitted) {
return
}
this.logger.info("Another oracle started a new round")
await this.trySubmit()
}
get hadSubmitted(): boolean {
return this.submissions.hadSubmitted(this.oraclePK)
}
private async submitCurrentValue(round: BN) {
// guard zero value
const value = this.currentValue
if (value.isZero()) {
this.logger.warn("current value is zero. skip submit.")
return
}
this.logger.info("submit", {
round: round.toString(),
value: value.toString(),
})
await this.program.submit({
accounts: {
aggregator: { write: this.aggregatorPK },
roundSubmissions: { write: this.aggregator.roundSubmissions },
answerSubmissions: { write: this.aggregator.answerSubmissions },
oracle: { write: this.oraclePK },
oracle_owner: this.oracleOwnerWallet.account,
},
round_id: round,
value,
})
}
}

62
src/context.ts Normal file
View File

@ -0,0 +1,62 @@
import { walletFromEnv } from "./utils"
import FluxAggregator from "./FluxAggregator"
import { solana, Wallet, Deployer } from "solray"
export const network = (process.env.NETWORK || "local") as any
export const conn = solana.connect(network)
async function openDeployer(): Promise<Deployer> {
return Deployer.open(`deploy.${network}.json`)
}
export class AppContext {
// static readonly AGGREGATOR_PROGRAM = "aggregatorProgram"
// static async forAdmin() {
// const deployer = await openDeployer()
// const admin = await walletFromEnv("ADMIN_MNEMONIC", conn)
// return new AppContext(deployer, admin)
// }
// static async forOracle() {
// const deployer = await openDeployer()
// const wallet = await walletFromEnv("ORACLE_MNEMONIC", conn)
// return new AppContext(deployer, wallet)
// }
// constructor(public deployer: Deployer, public wallet: Wallet) {}
async deployer() {
return Deployer.open(`deploy.${network}.json`)
}
async adminWallet() {
return walletFromEnv("ADMIN_MNEMONIC", conn)
}
async oracleWallet() {
return walletFromEnv("ADMIN_MNEMONIC", conn)
}
// get aggregatorProgramID() {
// return this.aggregatorProgramAccount.publicKey
// }
// get aggregator() {
// return new FluxAggregator(this.wallet, this.aggregatorProgramID)
// }
// get aggregatorProgramAccount() {
// const program = this.deployer.account(AppContext.AGGREGATOR_PROGRAM)
// if (program == null) {
// throw new Error(`flux aggregator program is not yet deployed`)
// }
// return program
// }
}

View File

@ -1,6 +1,7 @@
import { PublicKey, Account } from "solray"
import BN from "bn.js"
import { deserialize, serialize } from "borsh"
import { conn } from "./context"
const MAX_ORACLES = 13
@ -37,7 +38,29 @@ const str32Mapper = {
},
}
abstract class Serialization {
const u64Date = {
encode: (date: Date) => {
return new BN(Math.floor(date.getTime() / 1000))
},
decode: (unixtime: BN) => {
return new Date(unixtime.toNumber() * 1000)
},
}
export abstract class Serialization {
public static async load<T>(
this: { new (data: any): T },
key: PublicKey
): Promise<T> {
const info = await conn.getAccountInfo(key, "recent")
if (!info) {
throw new Error("account does not exist")
}
return deserialize(schema, this, info.data)
}
public static deserialize<T>(this: { new (data: any): T }, data: Buffer): T {
return deserialize(schema, this, data)
}
@ -103,6 +126,9 @@ export class AggregatorConfig extends Serialization {
}
export class Submissions extends Serialization {
public isInitialized!: boolean
public submissions!: Submission[]
public static size = 625
public static schema = {
kind: "struct",
@ -111,19 +137,33 @@ export class Submissions extends Serialization {
["submissions", [Submission, MAX_ORACLES]],
],
}
public hadSubmitted(pk: PublicKey): boolean {
return !!this.submissions.find((s) => {
return s.oracle.equals(pk)
})
}
}
class Round extends Serialization {
public id!: BN
public createdAt!: BN
public updatedAt!: BN
public static schema = {
kind: "struct",
fields: [
["id", "u64"],
["created_at", "u64"],
["updated_at", "u64"],
["createdAt", "u64"],
["updatedAt", "u64"],
],
}
}
class Answer extends Serialization {
public round_id!: BN
public created_at!: BN
public updated_at!: BN
public static schema = {
kind: "struct",
fields: [
@ -140,6 +180,8 @@ export class Aggregator extends Serialization {
public config!: AggregatorConfig
public roundSubmissions!: PublicKey
public answerSubmissions!: PublicKey
public answer!: Answer
public round!: Round
public static schema = {
kind: "struct",
@ -260,6 +302,7 @@ function boolToInt(t: boolean) {
export class Oracle extends Serialization {
public static size = 113
public allowStartRound!: BN
public static schema = {
kind: "struct",
@ -267,11 +310,15 @@ export class Oracle extends Serialization {
["description", [32], str32Mapper],
["isInitialized", "u8", boolMapper],
["withdrawable", "u64"],
["allow_start_round", "u64"],
["allowStartRound", "u64"],
["aggregator", [32], pubkeyMapper],
["owner", [32], pubkeyMapper],
],
}
public canStartNewRound(round: BN): boolean {
return this.allowStartRound.lte(round)
}
}
// if there is optional or variable length items, what is: borsh_utils::get_packed_len::<Submission>()?

View File

@ -108,12 +108,3 @@ export async function walletFromEnv(
return wallet
}
export async function openDeployer(): Promise<Deployer> {
const deployFile = process.env.DEPLOY_FILE
if (!deployFile) {
throw new Error(`Set DEPLOY_FILE in .env`)
}
return Deployer.open(deployFile)
}