print value updates

This commit is contained in:
De Facto 2021-02-20 17:05:05 +08:00
parent a65a03faf0
commit d9aecb68c2
6 changed files with 133 additions and 44 deletions

39
src/AggregatorObserver.ts Normal file
View File

@ -0,0 +1,39 @@
// event emitter of aggregator answer updates
import { Connection } from "@solana/web3.js"
import EventEmitter from "events"
import { PublicKey } from "solray"
import { Aggregator } from "./schema"
import { eventsIter } from "./utils"
import BN from "bn.js"
const ACCOUNT_CHANGE = "ACCOUNT_CHANGE"
export class AggregatorObserver {
constructor(private aggregatorPK: PublicKey, private conn: Connection) {}
// async iterator of updatedanswers
async *answers() {
let lastUpdate = new BN(0)
for await (let agg of this.stream()) {
if (agg.answer.updatedAt.gte(lastUpdate)) {
lastUpdate = agg.answer.updatedAt
yield agg.answer
}
}
}
// async iterator of updated aggregator states
stream() {
const ee = this.events()
return eventsIter<Aggregator>(ee, ACCOUNT_CHANGE)
}
events(): EventEmitter {
const ee = new EventEmitter()
this.conn.onAccountChange(this.aggregatorPK, (info) => {
ee.emit(ACCOUNT_CHANGE, Aggregator.deserialize(info.data))
})
return ee
}
}

37
src/cli.ts Normal file
View File

@ -0,0 +1,37 @@
import dotenv from "dotenv"
dotenv.config()
import { Command, option } from "commander"
import { jsonReplacer, loadJSONFile } from "./json"
import { AggregatorDeployFile } from "./Deployer"
import { conn, network } from "./context"
import { AggregatorObserver } from "./AggregatorObserver"
import { Aggregator, Answer } from "./schema"
const cli = new Command()
cli.command("observe <name>").action(async (name) => {
let deploy = loadJSONFile<AggregatorDeployFile>(process.env.DEPLOY_FILE!)
const aggregatorInfo = deploy.aggregators[name]
const observer = new AggregatorObserver(aggregatorInfo.pubkey, conn)
let agg = await Aggregator.load(aggregatorInfo.pubkey)
function printAnswer(answer: Answer) {
console.log({
description: aggregatorInfo.config.description,
decimals: aggregatorInfo.config.decimals,
roundID: answer.roundID.toString(),
median: answer.median.toString(),
updatedAt: answer.updatedAt.toString(),
createdAt: answer.createdAt.toString(),
})
}
printAnswer(agg.answer)
for await (let answer of observer.answers()) {
printAnswer(answer)
}
})
cli.parse(process.argv)

View File

@ -1,5 +1,6 @@
import WebSocket from "ws"
import EventEmitter from "events"
import { eventsIter } from "./utils"
export const UPDATE = "UPDATE"
@ -12,33 +13,6 @@ export interface IPriceFeed {
[Symbol.asyncIterator]: () => AsyncIterator<IPrice>
}
// events convert an particular event type of event emitter to an async iterator
function events<T>(emitter: EventEmitter, key: string) {
// TODO support cancel
let resolve
let p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
emitter.on(key, (value) => {
resolve(value)
p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
})
return {
[Symbol.asyncIterator]: () => {
return {
next() {
return p.then((info) => ({ done: false, value: info }))
},
}
},
}
}
export function coinbase(pair: string): IPriceFeed {
// TODO: can subscribe to many pairs with one connection
const emitter = new EventEmitter()
@ -77,5 +51,5 @@ export function coinbase(pair: string): IPriceFeed {
process.exit(1)
})
return events(emitter, UPDATE)
return eventsIter(emitter, UPDATE)
}

View File

@ -1,5 +1,6 @@
import { PublicKey } from "solray"
import fs from "fs"
import BN from "bn.js"
export function jsonReviver(_key: string, val: any) {
if (val && typeof val == "object") {
@ -11,14 +12,12 @@ export function jsonReviver(_key: string, val: any) {
}
export function jsonReplacer(key: string, value: any) {
if (value && typeof value != "object") {
return value
}
if (value.constructor == PublicKey) {
return {
type: "PublicKey",
base58: value.toBase58(),
if (value && typeof value == "object") {
if (value.constructor == PublicKey) {
return {
type: "PublicKey",
base58: value.toBase58(),
}
}
}

View File

@ -2,6 +2,7 @@ import { PublicKey, Account } from "solray"
import BN from "bn.js"
import { deserialize, serialize } from "borsh"
import { conn } from "./context"
import { jsonReplacer } from "./json"
const MAX_ORACLES = 13
@ -80,7 +81,18 @@ export abstract class Serialization {
return buf
}
// public toJSON(pretty = true) {
// return JSON.stringify(
// this[Serialization.DATA_KEY],
// jsonReplacer,
// pretty ? 2 : 0
// )
// }
// public static DATA_KEY = Symbol("DATA")
constructor(data) {
// this[Serialization.DATA_KEY] = data
Object.assign(this, data)
}
}
@ -171,7 +183,8 @@ export class Submissions extends Serialization {
})
}
}
class Round extends Serialization {
export class Round extends Serialization {
public id!: BN
public createdAt!: BN
public updatedAt!: BN
@ -186,19 +199,19 @@ class Round extends Serialization {
}
}
class Answer extends Serialization {
public round_id!: BN
export class Answer extends Serialization {
public roundID!: BN
public median!: BN
public created_at!: BN
public updated_at!: BN
public createdAt!: BN
public updatedAt!: BN
public static schema = {
kind: "struct",
fields: [
["round_id", "u64"],
["roundID", "u64"],
["median", "u64"],
["created_at", "u64"],
["updated_at", "u64"],
["createdAt", "u64"],
["updatedAt", "u64"],
],
}
}

View File

@ -1,4 +1,5 @@
import { Connection, PublicKey } from "@solana/web3.js"
import EventEmitter from "events"
import { solana, Wallet, NetworkName, Deployer } from "solray"
@ -41,3 +42,29 @@ export async function walletFromEnv(
return wallet
}
// events convert an particular event type of event emitter to an async iterator
export function eventsIter<T>(emitter: EventEmitter, key: string) {
// TODO support cancel
let resolve
let p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
emitter.on(key, (value) => {
resolve(value)
p = new Promise<T>((resolveFn) => {
resolve = resolveFn
})
})
return {
[Symbol.asyncIterator]: () => {
return {
next() {
return p.then((info) => ({ done: false, value: info }))
},
}
},
}
}