From 9ac61742c39f2303143a53ab97d93bca919b0dd2 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 29 Dec 2022 18:35:12 +0900 Subject: [PATCH] add binary option to subscribe to binary vaas (#444) * add binary option to subscrube to binary vaas * change property name * combine verbosity and binary to a single config * add tests --- .../price-service/src/__tests__/ws.test.ts | 84 ++++++++++++++++++- third_party/pyth/price-service/src/ws.ts | 67 +++++++++------ 2 files changed, 126 insertions(+), 25 deletions(-) diff --git a/third_party/pyth/price-service/src/__tests__/ws.test.ts b/third_party/pyth/price-service/src/__tests__/ws.test.ts index b0e8868e..bf53b1a0 100644 --- a/third_party/pyth/price-service/src/__tests__/ws.test.ts +++ b/third_party/pyth/price-service/src/__tests__/ws.test.ts @@ -1,6 +1,5 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; import { Server } from "http"; -import { number } from "joi"; import { WebSocket, WebSocketServer } from "ws"; import { sleep } from "../helpers"; import { PriceInfo, PriceStore } from "../listen"; @@ -250,6 +249,89 @@ describe("Client receives data", () => { await waitForSocketState(client, client.CLOSED); }); + test("When subscribes with valid ids and binary flag set to true, returns correct price feed with vaa", async () => { + const [client, serverMessages] = await createSocketClient(); + + const message: ClientMessage = { + ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id], + type: "subscribe", + binary: true, + }; + + client.send(JSON.stringify(message)); + + await waitForMessages(serverMessages, 1); + + expect(serverMessages[0]).toStrictEqual({ + type: "response", + status: "success", + }); + + api.dispatchPriceFeedUpdate(priceInfos[0]); + + await waitForMessages(serverMessages, 2); + + expect(serverMessages[1]).toEqual({ + type: "price_update", + price_feed: priceInfos[0].priceFeed.toJson(), + }); + + api.dispatchPriceFeedUpdate(priceInfos[1]); + + await waitForMessages(serverMessages, 3); + + expect(serverMessages[2]).toEqual({ + type: "price_update", + price_feed: { + ...priceInfos[1].priceFeed.toJson(), + vaa: priceInfos[1].vaa.toString("base64"), + }, + }); + + client.close(); + await waitForSocketState(client, client.CLOSED); + }); + + test("When subscribes with valid ids and binary flag set to false, returns correct price feed without vaa", async () => { + const [client, serverMessages] = await createSocketClient(); + + const message: ClientMessage = { + ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id], + type: "subscribe", + binary: false, + }; + + client.send(JSON.stringify(message)); + + await waitForMessages(serverMessages, 1); + + expect(serverMessages[0]).toStrictEqual({ + type: "response", + status: "success", + }); + + api.dispatchPriceFeedUpdate(priceInfos[0]); + + await waitForMessages(serverMessages, 2); + + expect(serverMessages[1]).toEqual({ + type: "price_update", + price_feed: priceInfos[0].priceFeed.toJson(), + }); + + api.dispatchPriceFeedUpdate(priceInfos[1]); + + await waitForMessages(serverMessages, 3); + + expect(serverMessages[2]).toEqual({ + type: "price_update", + price_feed: priceInfos[1].priceFeed.toJson(), + }); + + client.close(); + await waitForSocketState(client, client.CLOSED); + }); + test("When subscribes with invalid ids, returns error", async () => { const [client, serverMessages] = await createSocketClient(); diff --git a/third_party/pyth/price-service/src/ws.ts b/third_party/pyth/price-service/src/ws.ts index 356a2209..4885bd91 100644 --- a/third_party/pyth/price-service/src/ws.ts +++ b/third_party/pyth/price-service/src/ws.ts @@ -12,12 +12,14 @@ const ClientMessageSchema: Joi.Schema = Joi.object({ .items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/)) .required(), verbose: Joi.boolean(), + binary: Joi.boolean(), }).required(); export type ClientMessage = { type: "subscribe" | "unsubscribe"; ids: HexString[]; verbose?: boolean; + binary?: boolean; }; export type ServerResponse = { @@ -31,12 +33,20 @@ export type ServerPriceUpdate = { price_feed: any; }; +export type PriceFeedConfig = { + verbose: boolean; + binary: boolean; +}; + export type ServerMessage = ServerResponse | ServerPriceUpdate; export class WebSocketAPI { private wsCounter: number; private priceFeedClients: Map>; - private priceFeedClientsVerbosity: Map>; + private priceFeedClientsConfig: Map< + HexString, + Map + >; private aliveClients: Set; private wsId: Map; private priceFeedVaaInfo: PriceStore; @@ -45,7 +55,7 @@ export class WebSocketAPI { constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) { this.priceFeedVaaInfo = priceFeedVaaInfo; this.priceFeedClients = new Map(); - this.priceFeedClientsVerbosity = new Map(); + this.priceFeedClientsConfig = new Map(); this.aliveClients = new Set(); this.wsCounter = 0; this.wsId = new Map(); @@ -55,13 +65,14 @@ export class WebSocketAPI { private addPriceFeedClient( ws: WebSocket, id: HexString, - verbose: boolean = false + verbose: boolean = false, + binary: boolean = false ) { if (!this.priceFeedClients.has(id)) { this.priceFeedClients.set(id, new Set()); - this.priceFeedClientsVerbosity.set(id, new Map([[ws, verbose]])); + this.priceFeedClientsConfig.set(id, new Map([[ws, { verbose, binary }]])); } else { - this.priceFeedClientsVerbosity.get(id)!.set(ws, verbose); + this.priceFeedClientsConfig.get(id)!.set(ws, { verbose, binary }); } this.priceFeedClients.get(id)!.add(ws); } @@ -71,7 +82,7 @@ export class WebSocketAPI { return; } this.priceFeedClients.get(id)!.delete(ws); - this.priceFeedClientsVerbosity.get(id)!.delete(ws); + this.priceFeedClientsConfig.get(id)!.delete(ws); } dispatchPriceFeedUpdate(priceInfo: PriceInfo) { @@ -96,27 +107,30 @@ export class WebSocketAPI { for (const client of clients.values()) { this.promClient?.addWebSocketInteraction("server_update", "ok"); - const verbose = this.priceFeedClientsVerbosity + const config = this.priceFeedClientsConfig .get(priceInfo.priceFeed.id)! .get(client); - const priceUpdate: ServerPriceUpdate = verbose - ? { - type: "price_update", - price_feed: { - ...priceInfo.priceFeed.toJson(), - metadata: { - emitter_chain: priceInfo.emitterChainId, - attestation_time: priceInfo.attestationTime, - sequence_number: priceInfo.seqNum, - price_service_receive_time: priceInfo.priceServiceReceiveTime, - }, + const verbose = config?.verbose; + const binary = config?.binary; + + const priceUpdate: ServerPriceUpdate = { + type: "price_update", + price_feed: { + ...priceInfo.priceFeed.toJson(), + ...(verbose && { + metadata: { + emitter_chain: priceInfo.emitterChainId, + attestation_time: priceInfo.attestationTime, + sequence_number: priceInfo.seqNum, + price_service_receive_time: priceInfo.priceServiceReceiveTime, }, - } - : { - type: "price_update", - price_feed: priceInfo.priceFeed.toJson(), - }; + }), + ...(binary && { + vaa: priceInfo.vaa.toString("base64"), + }), + }, + }; client.send(JSON.stringify(priceUpdate)); } @@ -161,7 +175,12 @@ export class WebSocketAPI { if (message.type === "subscribe") { message.ids.forEach((id) => - this.addPriceFeedClient(ws, id, message.verbose === true) + this.addPriceFeedClient( + ws, + id, + message.verbose === true, + message.binary === true + ) ); } else { message.ids.forEach((id) => this.delPriceFeedClient(ws, id));