From 4f3316e1e443a3c07e957a7a6f9196c0f5cf4156 Mon Sep 17 00:00:00 2001 From: De Facto Date: Tue, 16 Mar 2021 13:38:35 +0800 Subject: [PATCH 1/3] comments --- program/src/processor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/program/src/processor.rs b/program/src/processor.rs index 54541dc..571de23 100644 --- a/program/src/processor.rs +++ b/program/src/processor.rs @@ -155,10 +155,10 @@ impl<'a> RemoveOracleContext<'a> { struct SubmitContext<'a> { clock: Clock, - aggregator: &'a AccountInfo<'a>, - round_submissions: &'a AccountInfo<'a>, - answer_submissions: &'a AccountInfo<'a>, - oracle: &'a AccountInfo<'a>, + aggregator: &'a AccountInfo<'a>, // write + round_submissions: &'a AccountInfo<'a>, // write + answer_submissions: &'a AccountInfo<'a>, // write + oracle: &'a AccountInfo<'a>, // write oracle_owner: &'a AccountInfo<'a>, // signed // NOTE: 5.84942*10^11 years even if 1 sec per round. don't bother with handling wrapparound. From 51932068d04e52cea1b28771bcaf2b28babdb5fa Mon Sep 17 00:00:00 2001 From: De Facto Date: Tue, 16 Mar 2021 16:04:38 +0800 Subject: [PATCH 2/3] aggregated feeds --- sandbox/aggregated-feeds.ts | 25 +++ src/feeds.ts | 316 +++++++++++++++++++++++++++++++++++- src/utils.ts | 14 ++ 3 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 sandbox/aggregated-feeds.ts diff --git a/sandbox/aggregated-feeds.ts b/sandbox/aggregated-feeds.ts new file mode 100644 index 0000000..9453078 --- /dev/null +++ b/sandbox/aggregated-feeds.ts @@ -0,0 +1,25 @@ +import { AggregatedFeed, BitStamp, CoinBase, FTX } from "../src/feeds" + +// print the median of the current prices of three CEXes +async function main() { + const feeds = [new CoinBase(), new BitStamp(), new FTX()] + for (let feed of feeds) { + feed.connect() + } + + // const aggfeed = new AggregatedFeed(feeds, "btc:usd") + // const aggfeed2 = new AggregatedFeed(feeds, "eth:usd") + + for (let pair of ["btc:usd", "eth:usd"]) { + const aggfeed = new AggregatedFeed(feeds, pair) + + setImmediate(async () => { + for await (let _ of aggfeed.updates()) { + console.log(aggfeed.prices) + console.log(aggfeed.median) + } + }) + } +} + +main().catch((err) => console.log(err)) diff --git a/src/feeds.ts b/src/feeds.ts index da74e62..1e415c4 100644 --- a/src/feeds.ts +++ b/src/feeds.ts @@ -1,12 +1,15 @@ import WebSocket from "ws" import EventEmitter from "events" -import { eventsIter } from "./utils" +import { eventsIter, median } from "./utils" import { log } from "./log" +import winston from "winston" export const UPDATE = "UPDATE" export interface IPrice { + source: string + pair: string decimals: number value: number } @@ -15,6 +18,315 @@ export interface IPriceFeed { [Symbol.asyncIterator]: () => AsyncIterator } +export abstract class PriceFeed { + public emitter = new EventEmitter() + + protected conn!: WebSocket + protected connected!: Promise + + protected abstract get log(): winston.Logger + protected abstract get baseurl(): string + + // subscribed pairs. should re-subscribe on reconnect + public pairs: string[] = [] + + async connect() { + this.log.debug("connecting", { baseurl: this.baseurl }) + + this.connected = new Promise((resolve) => { + const conn = new WebSocket(this.baseurl) + conn.on("open", () => { + this.log.debug("connected") + + this.conn = conn + + for (let pair of this.pairs) { + this.handleSubscribe(pair) + } + + resolve() + }) + + conn.on("close", () => { + // TODO: auto-reconnect & re-subscribe + }) + + conn.on("message", async (data) => { + // this.log.debug("raw price update", { data }) + + const price = this.parseMessage(data) + + if (price) { + this.onMessage(price) + } + }) + }) + + return this.connected + } + + subscribe(pair: string) { + if (this.pairs.includes(pair)) { + // already subscribed + return + } + + this.pairs.push(pair) + + if (this.conn) { + // if already connected immediately subscribe + this.handleSubscribe(pair) + } + } + + onMessage(price: IPrice) { + this.log.debug("emit price update", { price }) + + this.emitter.emit(UPDATE, price) + } + + abstract parseMessage(data: any): IPrice | undefined + // abstract parseMessage(pair: string) + // abstract parseMessage(pair: string) + abstract handleSubscribe(pair: string): Promise +} + +export class BitStamp extends PriceFeed { + protected log = log.child({ class: BitStamp.name }) + protected baseurl = "wss://ws.bitstamp.net" + + parseMessage(data) { + const payload = JSON.parse(data) + + // { + // "channel": "live_trades_btcusd", + // "data": { + // "amount": 0.02, + // "amount_str": "0.02000000", + // "buy_order_id": 1339567984607234, + // "id": 157699738, + // "microtimestamp": "1615877939649000", + // "price": 55008.3, + // "price_str": "55008.30", + // "sell_order_id": 1339567982141443, + // "timestamp": "1615877939", + // "type": 0 + // }, + // "event": "trade" + // } + + if (payload.event != "trade") { + return + } + + const channel = (payload.channel as string).replace("live_trades_", "") + + // assume that the symbols for the pair are 3 letters + const pair = channel.slice(0, 3) + ":" + channel.slice(3) + + const price: IPrice = { + source: BitStamp.name, + pair, + decimals: 2, + value: Math.floor(payload.data.price * 100), + } + + return price + } + + async handleSubscribe(pair: string) { + // "btc:usd" => "BTCUSD" + const targetPair = pair.replace(":", "").toUpperCase() + + this.conn.send( + JSON.stringify({ + event: "bts:subscribe", + data: { + channel: `live_trades_${targetPair.replace("/", "").toLowerCase()}`, + }, + }) + ) + } +} + +export class FTX extends PriceFeed { + protected log = log.child({ class: FTX.name }) + protected baseurl = "wss://ftx.com/ws/" + + parseMessage(data) { + const payload = JSON.parse(data) + + // { + // "channel": "ticker", + // "market": "BTC/USD", + // "type": "update", + // "data": { + // "bid": 54567, + // "ask": 54577, + // "bidSize": 0.0583, + // "askSize": 0.2051, + // "last": 54582, + // "time": 1615877027.551234 + // } + // } + + if (payload.type != "update" || payload.channel != "ticker") { + return + } + + const pair = (payload.market as string).replace("/", ":").toLowerCase() + + const price: IPrice = { + source: FTX.name, + pair, + decimals: 2, + value: Math.floor(payload.data.last * 100), + } + + return price + } + + async handleSubscribe(pair: string) { + // "btc:usd" => "BTC-USD" + const targetPair = pair.replace(":", "/").toUpperCase() + + this.conn.send( + JSON.stringify({ + op: "subscribe", + channel: "ticker", + market: targetPair, + }) + ) + } +} + +export class CoinBase extends PriceFeed { + protected log = log.child({ class: CoinBase.name }) + protected baseurl = "wss://ws-feed.pro.coinbase.com" + + parseMessage(data) { + const payload = JSON.parse(data) + + // { + // "type": "ticker", + // "sequence": 22772426228, + // "product_id": "BTC-USD", + // "price": "53784.59", + // "open_24h": "58795.78", + // "volume_24h": "35749.39437842", + // "low_24h": "53221", + // "high_24h": "58799.66", + // "volume_30d": "733685.27275521", + // "best_bid": "53784.58", + // "best_ask": "53784.59", + // "side": "buy", + // "time": "2021-03-16T06:26:06.791440Z", + // "trade_id": 145698988, + // "last_size": "0.00474597" + // } + + if (payload.type != "ticker") { + return + } + + // "BTC-USD" => "btc:usd" + const pair = (payload.product_id as string).replace("-", ":").toLowerCase() + + const price: IPrice = { + source: CoinBase.name, + pair, + decimals: 2, + value: Math.floor(payload.price * 100), + } + + return price + } + + async handleSubscribe(pair: string) { + // "btc:usd" => "BTC-USD" + const targetPair = pair.replace(":", "-").toUpperCase() + + this.conn.send( + JSON.stringify({ + type: "subscribe", + product_ids: [targetPair], + channels: ["ticker"], + }) + ) + } +} + +export class AggregatedFeed { + public emitter = new EventEmitter() + public prices: IPrice[] = [] + + // assume that the feeds are already connected + constructor(public feeds: PriceFeed[], public pair: string) { + this.subscribe() + } + + private subscribe() { + const pair = this.pair + + let i = 0 + for (let feed of this.feeds) { + feed.subscribe(pair) + + const index = i + i++ + + // store the price updates in the ith position of `this.prices` + feed.emitter.on(UPDATE, (price: IPrice) => { + if (price.pair != pair) { + return + } + + this.prices[index] = price + + this.onPriceUpdate(price) + }) + } + } + + private onPriceUpdate(price: IPrice) { + // log.debug("aggregated price update", { + // prices: this.prices, + // median: this.median, + // }) + this.emitter.emit(UPDATE, this) + } + + async* medians() { + for await (let _ of this.updates()) { + yield this.median + } + } + + async* updates() { + for await (let _ of eventsIter(this.emitter, "UPDATE")) { + yield this + } + } + + get median(): IPrice | undefined { + const prices = this.prices.filter((price) => price != undefined) + + if (prices.length == 0) { + return + } + + const values = prices.map((price) => price.value) + + return { + source: "median", + pair: prices[0].pair, + decimals: prices[0].decimals, + value: median(values), + } + } +} + +// TODO remove export function coinbase(pair: string): IPriceFeed { // TODO: can subscribe to many pairs with one connection const emitter = new EventEmitter() @@ -42,6 +354,8 @@ export function coinbase(pair: string): IPriceFeed { return } const price: IPrice = { + source: "coinbase", + pair, decimals: 2, value: Math.floor(json.price * 100), } diff --git a/src/utils.ts b/src/utils.ts index 53fd59c..cf80c7f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,6 +3,20 @@ import EventEmitter from "events" import { solana, Wallet } from "solray" +export function median(values: number[]): number { + if (values.length === 0) return 0 + + values.sort(function (a, b) { + return a - b + }) + + var half = Math.floor(values.length / 2) + + if (values.length % 2) return values[half] + + return Math.floor((values[half - 1] + values[half]) / 2.0) +} + export function getMedian(submissions: number[]): number { const values = submissions .filter((s: any) => s.value != 0) From e36d977e4072601254595bdbefa8f5f7c262d7bb Mon Sep 17 00:00:00 2001 From: De Facto Date: Tue, 16 Mar 2021 16:16:56 +0800 Subject: [PATCH 3/3] use aggregated cex median in PriceFeeder --- src/PriceFeeder.ts | 25 ++++++++++++++++++++++--- src/feeds.ts | 9 ++++++--- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/PriceFeeder.ts b/src/PriceFeeder.ts index cba7afc..2a9beed 100644 --- a/src/PriceFeeder.ts +++ b/src/PriceFeeder.ts @@ -2,7 +2,14 @@ import fs from "fs" import { Wallet } from "solray" import { AggregatorDeployFile } from "./Deployer" import { loadJSONFile } from "./json" -import { coinbase } from "./feeds" +import { + AggregatedFeed, + BitStamp, + CoinBase, + coinbase, + FTX, + PriceFeed, +} from "./feeds" import { Submitter, SubmitterConfig } from "./Submitter" import { log } from "./log" import { conn } from "./context" @@ -10,12 +17,21 @@ import { conn } from "./context" // Look at all the available aggregators and submit to those that the wallet can // act as an oracle. export class PriceFeeder { + private feeds: PriceFeed[] + constructor( private deployInfo: AggregatorDeployFile, private wallet: Wallet - ) {} + ) { + this.feeds = [new CoinBase(), new BitStamp(), new FTX()] + } async start() { + // connect to the price feeds + for (const feed of this.feeds) { + feed.connect() + } + // find aggregators that this wallet can act as oracle this.startAccessibleAggregators() } @@ -40,7 +56,10 @@ export class PriceFeeder { continue } - const priceFeed = coinbase(name) + const feed = new AggregatedFeed(this.feeds, name) + const priceFeed = feed.medians() + // const priceFeed = coinbase(name) + const submitter = new Submitter( this.deployInfo.programID, aggregatorInfo.pubkey, diff --git a/src/feeds.ts b/src/feeds.ts index 1e415c4..425d798 100644 --- a/src/feeds.ts +++ b/src/feeds.ts @@ -296,13 +296,16 @@ export class AggregatedFeed { this.emitter.emit(UPDATE, this) } - async* medians() { + async *medians() { for await (let _ of this.updates()) { - yield this.median + const price = this.median + if (price) { + yield price + } } } - async* updates() { + async *updates() { for await (let _ of eventsIter(this.emitter, "UPDATE")) { yield this }