solana-flux-aggregator/src/feeds.ts

60 lines
1.3 KiB
TypeScript
Raw Normal View History

2021-02-17 05:39:03 -08:00
import WebSocket from "ws"
import EventEmitter from "events"
2021-02-20 01:05:05 -08:00
import { eventsIter } from "./utils"
2021-02-17 05:39:03 -08:00
2021-02-20 02:00:17 -08:00
import { log } from "./log"
2021-02-17 05:39:03 -08:00
export const UPDATE = "UPDATE"
export interface IPrice {
decimals: number
value: number
}
export interface IPriceFeed {
[Symbol.asyncIterator]: () => AsyncIterator<IPrice>
}
export function coinbase(pair: string): IPriceFeed {
2021-02-19 05:13:56 -08:00
// TODO: can subscribe to many pairs with one connection
2021-02-17 05:39:03 -08:00
const emitter = new EventEmitter()
const ws = new WebSocket("wss://ws-feed.pro.coinbase.com")
2021-02-19 05:13:56 -08:00
// "btc:usd" => "BTC-USD"
pair = pair.replace(":", "-").toUpperCase()
2021-02-17 05:39:03 -08:00
ws.on("open", () => {
2021-02-20 02:00:17 -08:00
log.debug(`price feed connected`, { pair })
2021-02-17 05:39:03 -08:00
ws.send(
JSON.stringify({
type: "subscribe",
2021-02-19 05:13:56 -08:00
product_ids: [pair],
2021-02-17 05:39:03 -08:00
channels: ["ticker"],
})
)
})
ws.on("message", async (data) => {
const json = JSON.parse(data)
2021-02-20 02:00:17 -08:00
log.debug("price update", json)
2021-02-17 05:39:03 -08:00
if (!json || !json.price) {
2021-02-20 02:00:17 -08:00
return
2021-02-17 05:39:03 -08:00
}
const price: IPrice = {
decimals: 2,
value: Math.floor(json.price * 100),
}
emitter.emit(UPDATE, price)
// console.log("current price:", json.price)
})
ws.on("close", (err) => {
// TODO: automatic reconnect
2021-02-20 02:00:17 -08:00
log.debug(`price feed closed`, { pair, err: err.toString() })
2021-02-17 05:39:03 -08:00
process.exit(1)
})
2021-02-20 01:05:05 -08:00
return eventsIter(emitter, UPDATE)
2021-02-17 05:39:03 -08:00
}