diff --git a/third_party/pyth/p2w-sdk/js/package-lock.json b/third_party/pyth/p2w-sdk/js/package-lock.json index 1e6dd413..586d0be0 100644 --- a/third_party/pyth/p2w-sdk/js/package-lock.json +++ b/third_party/pyth/p2w-sdk/js/package-lock.json @@ -11,7 +11,7 @@ "dependencies": { "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", - "@pythnetwork/pyth-sdk-js": "^0.1.0" + "@pythnetwork/pyth-sdk-js": "^0.3.0" }, "devDependencies": { "@openzeppelin/contracts": "^4.2.0", @@ -913,9 +913,9 @@ "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "node_modules/@pythnetwork/pyth-sdk-js": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", - "integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz", + "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg==" }, "node_modules/@solana/buffer-layout": { "version": "4.0.0", @@ -3236,9 +3236,9 @@ "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "@pythnetwork/pyth-sdk-js": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", - "integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz", + "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg==" }, "@solana/buffer-layout": { "version": "4.0.0", diff --git a/third_party/pyth/p2w-sdk/js/package.json b/third_party/pyth/p2w-sdk/js/package.json index 46c667c3..eb0f566a 100644 --- a/third_party/pyth/p2w-sdk/js/package.json +++ b/third_party/pyth/p2w-sdk/js/package.json @@ -41,7 +41,7 @@ "dependencies": { "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", - "@pythnetwork/pyth-sdk-js": "^0.1.0" + "@pythnetwork/pyth-sdk-js": "^0.3.0" }, "bugs": { "url": "https://github.com/pyth-network/pyth-crosschain/issues" diff --git a/third_party/pyth/price-service/package-lock.json b/third_party/pyth/price-service/package-lock.json index 7ce9de19..6407594d 100644 --- a/third_party/pyth/price-service/package-lock.json +++ b/third_party/pyth/price-service/package-lock.json @@ -1,18 +1,18 @@ { - "name": "@pythnetwork/price-service", + "name": "@pythnetwork/pyth-price-service", "version": "1.0.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "name": "@pythnetwork/price-service", + "name": "@pythnetwork/pyth-price-service", "version": "1.0.0", "license": "Apache-2.0", "dependencies": { "@certusone/p2w-sdk": "file:../p2w-sdk/js", "@certusone/wormhole-sdk": "^0.1.4", "@certusone/wormhole-spydk": "^0.0.1", - "@pythnetwork/pyth-sdk-js": "^0.1.0", + "@pythnetwork/pyth-sdk-js": "^0.3.0", "@types/cors": "^2.8.12", "@types/express": "^4.17.13", "@types/morgan": "^1.9.3", @@ -2201,9 +2201,9 @@ "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "node_modules/@pythnetwork/pyth-sdk-js": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", - "integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz", + "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg==" }, "node_modules/@sideway/address": { "version": "4.1.4", @@ -10545,9 +10545,9 @@ "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "@pythnetwork/pyth-sdk-js": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", - "integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz", + "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg==" }, "@sideway/address": { "version": "4.1.4", diff --git a/third_party/pyth/price-service/package.json b/third_party/pyth/price-service/package.json index 3262d82e..26929cf3 100644 --- a/third_party/pyth/price-service/package.json +++ b/third_party/pyth/price-service/package.json @@ -28,7 +28,7 @@ "@certusone/p2w-sdk": "file:../p2w-sdk/js", "@certusone/wormhole-sdk": "^0.1.4", "@certusone/wormhole-spydk": "^0.0.1", - "@pythnetwork/pyth-sdk-js": "^0.1.0", + "@pythnetwork/pyth-sdk-js": "^0.3.0", "@types/cors": "^2.8.12", "@types/express": "^4.17.13", "@types/morgan": "^1.9.3", diff --git a/third_party/pyth/price-service/src/__tests__/rest.test.ts b/third_party/pyth/price-service/src/__tests__/rest.test.ts index d8ec44ac..6e0b61f0 100644 --- a/third_party/pyth/price-service/src/__tests__/rest.test.ts +++ b/third_party/pyth/price-service/src/__tests__/rest.test.ts @@ -60,7 +60,7 @@ beforeAll(async () => { getLatestPriceInfo: (priceFeedId: string) => { return priceInfoMap.get(priceFeedId); }, - addUpdateListener: (_callback: (priceFeed: PriceFeed) => any) => {}, + addUpdateListener: (_callback: (priceInfo: PriceInfo) => any) => {}, getPriceIds: () => new Set(), }; @@ -72,20 +72,20 @@ beforeAll(async () => { describe("Latest Price Feed Endpoint", () => { test("When called with valid ids, returns correct price feed", async () => { const ids = [expandTo64Len("abcd"), expandTo64Len("3456")]; - const resp = await request(app).get("/latest_price_feeds").query({ ids }); + const resp = await request(app).get("/api/latest_price_feeds").query({ ids }); expect(resp.status).toBe(StatusCodes.OK); expect(resp.body.length).toBe(2); expect(resp.body).toContainEqual(dummyPriceFeed(ids[0]).toJson()); expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson()); }); - test("When called with some non-existant ids within ids, returns error mentioning non-existant ids", async () => { + test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => { const ids = [ expandTo64Len("ab01"), expandTo64Len("3456"), expandTo64Len("effe"), ]; - const resp = await request(app).get("/latest_price_feeds").query({ ids }); + const resp = await request(app).get("/api/latest_price_feeds").query({ ids }); expect(resp.status).toBe(StatusCodes.BAD_REQUEST); expect(resp.body.message).toContain(ids[0]); expect(resp.body.message).not.toContain(ids[1]); @@ -100,7 +100,7 @@ describe("Latest Vaa Bytes Endpoint", () => { expandTo64Len("ef01"), expandTo64Len("3456"), ]; - const resp = await request(app).get("/latest_vaas").query({ ids }); + const resp = await request(app).get("/api/latest_vaas").query({ ids }); expect(resp.status).toBe(StatusCodes.OK); expect(resp.body.length).toBe(2); expect(resp.body).toContain( @@ -111,13 +111,13 @@ describe("Latest Vaa Bytes Endpoint", () => { ); }); - test("When called with some non-existant ids within ids, returns error mentioning non-existant ids", async () => { + test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => { const ids = [ expandTo64Len("ab01"), expandTo64Len("3456"), expandTo64Len("effe"), ]; - const resp = await request(app).get("/latest_vaas").query({ ids }); + const resp = await request(app).get("/api/latest_vaas").query({ ids }); expect(resp.status).toBe(StatusCodes.BAD_REQUEST); expect(resp.body.message).toContain(ids[0]); expect(resp.body.message).not.toContain(ids[1]); diff --git a/third_party/pyth/price-service/src/__tests__/ws.test.ts b/third_party/pyth/price-service/src/__tests__/ws.test.ts index 8e9daca0..442c8743 100644 --- a/third_party/pyth/price-service/src/__tests__/ws.test.ts +++ b/third_party/pyth/price-service/src/__tests__/ws.test.ts @@ -1,9 +1,9 @@ import { HexString, PriceFeed, PriceStatus } from "@pythnetwork/pyth-sdk-js"; -import { PriceStore, PriceInfo } from "../listen"; -import { WebSocketAPI, ClientMessage } from "../ws"; import { Server } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { sleep } from "../helpers"; +import { PriceInfo, PriceStore } from "../listen"; +import { ClientMessage, WebSocketAPI } from "../ws"; const port = 2524; @@ -11,27 +11,54 @@ let api: WebSocketAPI; let server: Server; let wss: WebSocketServer; -let priceFeeds: PriceFeed[]; +let priceInfos: PriceInfo[]; +let priceMetadata: any; function expandTo64Len(id: string): string { return id.repeat(64).substring(0, 64); } +function dummyPriceMetadata( + attestationTime: number, + emitterChainId: number, + seqNum: number +): any { + return { + attestation_time: attestationTime, + emitter_chain: emitterChainId, + sequence_number: seqNum, + }; +} + +function dummyPriceInfo( + id: HexString, + vaa: HexString, + priceMetadata: any +): PriceInfo { + return { + seqNum: priceMetadata.sequence_number, + attestationTime: priceMetadata.attestation_time, + emitterChainId: priceMetadata.emitter_chain, + priceFeed: dummyPriceFeed(id), + vaaBytes: Buffer.from(vaa, "hex").toString("binary"), + }; +} + function dummyPriceFeed(id: string): PriceFeed { - return new PriceFeed({ + return PriceFeed.fromJson({ conf: "0", - emaConf: "1", - emaPrice: "2", - expo: 4, + ema_conf: "1", + ema_price: "2", + expo: 3, id, - maxNumPublishers: 7, - numPublishers: 6, - prevConf: "8", - prevPrice: "9", - prevPublishTime: 10, - price: "11", - productId: "def456", - publishTime: 13, + max_num_publishers: 5, + num_publishers: 6, + prev_conf: "7", + prev_price: "8", + prev_publish_time: 9, + price: "10", + product_id: "def456", + publish_time: 12, status: PriceStatus.Trading, }); } @@ -66,17 +93,19 @@ async function createSocketClient(): Promise<[WebSocket, any[]]> { } beforeAll(async () => { - priceFeeds = [ - dummyPriceFeed(expandTo64Len("abcd")), - dummyPriceFeed(expandTo64Len("ef01")), - dummyPriceFeed(expandTo64Len("2345")), - dummyPriceFeed(expandTo64Len("6789")), + priceMetadata = dummyPriceMetadata(0, 0, 0); + priceInfos = [ + dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4", priceMetadata), + dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4", priceMetadata), + dummyPriceInfo(expandTo64Len("2345"), "bad01bad", priceMetadata), + dummyPriceInfo(expandTo64Len("6789"), "bidbidbid", priceMetadata), ]; let priceInfo: PriceStore = { getLatestPriceInfo: (_priceFeedId: string) => undefined, - addUpdateListener: (_callback: (priceFeed: PriceFeed) => any) => undefined, - getPriceIds: () => new Set(priceFeeds.map((priceFeed) => priceFeed.id)), + addUpdateListener: (_callback: (priceInfo: PriceInfo) => any) => undefined, + getPriceIds: () => + new Set(priceInfos.map((priceInfo) => priceInfo.priceFeed.id)), }; api = new WebSocketAPI(priceInfo); @@ -93,11 +122,11 @@ afterAll(async () => { }); describe("Client receives data", () => { - test("When subscribes with valid ids, returns correct price feed", async () => { + test("When subscribes with valid ids without verbose flag, returns correct price feed", async () => { let [client, serverMessages] = await createSocketClient(); let message: ClientMessage = { - ids: [priceFeeds[0].id, priceFeeds[1].id], + ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id], type: "subscribe", }; @@ -110,22 +139,108 @@ describe("Client receives data", () => { status: "success", }); - api.dispatchPriceFeedUpdate(priceFeeds[0]); + api.dispatchPriceFeedUpdate(priceInfos[0]); await waitForMessages(serverMessages, 2); - expect(serverMessages[1]).toStrictEqual({ + expect(serverMessages[1]).toEqual({ type: "price_update", - price_feed: priceFeeds[0].toJson(), + price_feed: priceInfos[0].priceFeed.toJson(), }); - api.dispatchPriceFeedUpdate(priceFeeds[1]); + api.dispatchPriceFeedUpdate(priceInfos[1]); await waitForMessages(serverMessages, 3); - expect(serverMessages[2]).toStrictEqual({ + expect(serverMessages[2]).toEqual({ type: "price_update", - price_feed: priceFeeds[1].toJson(), + price_feed: priceInfos[1].priceFeed.toJson(), + }); + + client.close(); + await waitForSocketState(client, client.CLOSED); + }); + + test("When subscribes with valid ids and verbose flag set to true, returns correct price feed with metadata", async () => { + let [client, serverMessages] = await createSocketClient(); + + let message: ClientMessage = { + ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id], + type: "subscribe", + verbose: true, + }; + + client.send(JSON.stringify(message)); + + await waitForMessages(serverMessages, 1); + + expect(serverMessages[0]).toStrictEqual({ + type: "response", + status: "success", + }); + + api.dispatchPriceFeedUpdate(priceInfos[0]); + + await waitForMessages(serverMessages, 2); + + expect(serverMessages[1]).toEqual({ + type: "price_update", + price_feed: { + ...priceInfos[0].priceFeed.toJson(), + metadata: priceMetadata, + }, + }); + + api.dispatchPriceFeedUpdate(priceInfos[1]); + + await waitForMessages(serverMessages, 3); + + expect(serverMessages[2]).toEqual({ + type: "price_update", + price_feed: { + ...priceInfos[1].priceFeed.toJson(), + metadata: priceMetadata, + }, + }); + + client.close(); + await waitForSocketState(client, client.CLOSED); + }); + + test("When subscribes with valid ids and verbose flag set to false, returns correct price feed without metadata", async () => { + let [client, serverMessages] = await createSocketClient(); + + let message: ClientMessage = { + ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id], + type: "subscribe", + verbose: false, + }; + + client.send(JSON.stringify(message)); + + await waitForMessages(serverMessages, 1); + + expect(serverMessages[0]).toStrictEqual({ + type: "response", + status: "success", + }); + + api.dispatchPriceFeedUpdate(priceInfos[0]); + + await waitForMessages(serverMessages, 2); + + expect(serverMessages[1]).toEqual({ + type: "price_update", + price_feed: priceInfos[0].priceFeed.toJson(), + }); + + api.dispatchPriceFeedUpdate(priceInfos[1]); + + await waitForMessages(serverMessages, 3); + + expect(serverMessages[2]).toEqual({ + type: "price_update", + price_feed: priceInfos[1].priceFeed.toJson(), }); client.close(); @@ -156,7 +271,7 @@ describe("Client receives data", () => { let [client, serverMessages] = await createSocketClient(); let message: ClientMessage = { - ids: [priceFeeds[0].id], + ids: [priceInfos[0].priceFeed.id], type: "subscribe", }; @@ -169,17 +284,17 @@ describe("Client receives data", () => { status: "success", }); - api.dispatchPriceFeedUpdate(priceFeeds[1]); + api.dispatchPriceFeedUpdate(priceInfos[1]); await sleep(100); - api.dispatchPriceFeedUpdate(priceFeeds[0]); + api.dispatchPriceFeedUpdate(priceInfos[0]); await waitForMessages(serverMessages, 2); - expect(serverMessages[1]).toStrictEqual({ + expect(serverMessages[1]).toEqual({ type: "price_update", - price_feed: priceFeeds[0].toJson(), + price_feed: priceInfos[0].priceFeed.toJson(), }); await sleep(100); @@ -193,7 +308,7 @@ describe("Client receives data", () => { let [client, serverMessages] = await createSocketClient(); let message: ClientMessage = { - ids: [priceFeeds[0].id], + ids: [priceInfos[0].priceFeed.id], type: "subscribe", }; @@ -206,17 +321,17 @@ describe("Client receives data", () => { status: "success", }); - api.dispatchPriceFeedUpdate(priceFeeds[0]); + api.dispatchPriceFeedUpdate(priceInfos[0]); await waitForMessages(serverMessages, 2); - expect(serverMessages[1]).toStrictEqual({ + expect(serverMessages[1]).toEqual({ type: "price_update", - price_feed: priceFeeds[0].toJson(), + price_feed: priceInfos[0].priceFeed.toJson(), }); message = { - ids: [priceFeeds[0].id], + ids: [priceInfos[0].priceFeed.id], type: "unsubscribe", }; @@ -229,7 +344,7 @@ describe("Client receives data", () => { status: "success", }); - api.dispatchPriceFeedUpdate(priceFeeds[0]); + api.dispatchPriceFeedUpdate(priceInfos[0]); await sleep(100); @@ -243,7 +358,7 @@ describe("Client receives data", () => { let [client, serverMessages] = await createSocketClient(); let message: ClientMessage = { - ids: [priceFeeds[0].id], + ids: [priceInfos[0].priceFeed.id], type: "unsubscribe", }; @@ -265,14 +380,14 @@ describe("Client receives data", () => { let [client2, serverMessages2] = await createSocketClient(); let message1: ClientMessage = { - ids: [priceFeeds[0].id], + ids: [priceInfos[0].priceFeed.id], type: "subscribe", }; client1.send(JSON.stringify(message1)); let message2: ClientMessage = { - ids: [priceFeeds[1].id], + ids: [priceInfos[1].priceFeed.id], type: "subscribe", }; @@ -291,20 +406,20 @@ describe("Client receives data", () => { status: "success", }); - api.dispatchPriceFeedUpdate(priceFeeds[0]); - api.dispatchPriceFeedUpdate(priceFeeds[1]); + api.dispatchPriceFeedUpdate(priceInfos[0]); + api.dispatchPriceFeedUpdate(priceInfos[1]); await waitForMessages(serverMessages1, 2); await waitForMessages(serverMessages2, 2); - expect(serverMessages1[1]).toStrictEqual({ + expect(serverMessages1[1]).toEqual({ type: "price_update", - price_feed: priceFeeds[0].toJson(), + price_feed: priceInfos[0].priceFeed.toJson(), }); - expect(serverMessages2[1]).toStrictEqual({ + expect(serverMessages2[1]).toEqual({ type: "price_update", - price_feed: priceFeeds[1].toJson(), + price_feed: priceInfos[1].priceFeed.toJson(), }); client1.close(); diff --git a/third_party/pyth/price-service/src/listen.ts b/third_party/pyth/price-service/src/listen.ts index 34c00ede..21bc1ebd 100644 --- a/third_party/pyth/price-service/src/listen.ts +++ b/third_party/pyth/price-service/src/listen.ts @@ -37,7 +37,7 @@ export type PriceInfo = { export interface PriceStore { getPriceIds(): Set; getLatestPriceInfo(priceFeedId: HexString): PriceInfo | undefined; - addUpdateListener(callback: (priceFeed: PriceFeed) => any): void; + addUpdateListener(callback: (priceInfo: PriceInfo) => any): void; } type ListenerReadinessConfig = { @@ -59,7 +59,7 @@ export class Listener implements PriceStore { private filters: FilterEntry[] = []; private spyConnectionTime: TimestampInSec | undefined; private readinessConfig: ListenerReadinessConfig; - private updateCallbacks: ((priceFeed: PriceFeed) => any)[]; + private updateCallbacks: ((priceInfo: PriceInfo) => any)[]; constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; @@ -192,16 +192,17 @@ export class Listener implements PriceStore { lastAttestationTime < priceAttestation.attestationTime ) { const priceFeed = priceAttestationToPriceFeed(priceAttestation); - this.priceFeedVaaMap.set(key, { + const priceInfo = { seqNum: parsedVAA.sequence, vaaBytes: vaaBytes, attestationTime: priceAttestation.attestationTime, priceFeed, emitterChainId: parsedVAA.emitter_chain, - }); + } + this.priceFeedVaaMap.set(key, priceInfo); for (let callback of this.updateCallbacks) { - callback(priceFeed); + callback(priceInfo); } } } @@ -224,7 +225,7 @@ export class Listener implements PriceStore { return this.priceFeedVaaMap.get(priceFeedId); } - addUpdateListener(callback: (priceFeed: PriceFeed) => any) { + addUpdateListener(callback: (priceInfo: PriceInfo) => any) { this.updateCallbacks.push(callback); } diff --git a/third_party/pyth/price-service/src/ws.ts b/third_party/pyth/price-service/src/ws.ts index f508e3e1..e703c040 100644 --- a/third_party/pyth/price-service/src/ws.ts +++ b/third_party/pyth/price-service/src/ws.ts @@ -1,9 +1,8 @@ -import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; -import express from "express"; +import { HexString } from "@pythnetwork/pyth-sdk-js"; import * as http from "http"; import Joi from "joi"; import WebSocket, { RawData, WebSocketServer } from "ws"; -import { PriceStore } from "./listen"; +import { PriceInfo, PriceStore } from "./listen"; import { logger } from "./logging"; import { PromClient } from "./promClient"; @@ -12,11 +11,13 @@ const ClientMessageSchema: Joi.Schema = Joi.object({ ids: Joi.array() .items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/)) .required(), + verbose: Joi.boolean(), }).required(); export type ClientMessage = { type: "subscribe" | "unsubscribe"; ids: HexString[]; + verbose?: boolean; }; export type ServerResponse = { @@ -35,6 +36,7 @@ export type ServerMessage = ServerResponse | ServerPriceUpdate; export class WebSocketAPI { private wsCounter: number; private priceFeedClients: Map>; + private priceFeedClientsVerbosity: Map>; private aliveClients: Set; private wsId: Map; private priceFeedVaaInfo: PriceStore; @@ -43,48 +45,79 @@ export class WebSocketAPI { constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) { this.priceFeedVaaInfo = priceFeedVaaInfo; this.priceFeedClients = new Map(); + this.priceFeedClientsVerbosity = new Map(); this.aliveClients = new Set(); this.wsCounter = 0; this.wsId = new Map(); this.promClient = promClient; } - private addPriceFeedClient(ws: WebSocket, id: HexString) { + private addPriceFeedClient( + ws: WebSocket, + id: HexString, + verbose: boolean = false + ) { if (!this.priceFeedClients.has(id)) { this.priceFeedClients.set(id, new Set()); + this.priceFeedClientsVerbosity.set(id, new Map([[ws, verbose]])); + } else { + this.priceFeedClientsVerbosity.get(id)!.set(ws, verbose); } - this.priceFeedClients.get(id)!.add(ws); } private delPriceFeedClient(ws: WebSocket, id: HexString) { - this.priceFeedClients.get(id)?.delete(ws); + if (!this.priceFeedClients.has(id)) { + return; + } + this.priceFeedClients.get(id)!.delete(ws); + this.priceFeedClientsVerbosity.get(id)!.delete(ws); } - dispatchPriceFeedUpdate(priceFeed: PriceFeed) { - if (this.priceFeedClients.get(priceFeed.id) === undefined) { - logger.info(`Sending ${priceFeed.id} price update to no clients.`); + dispatchPriceFeedUpdate(priceInfo: PriceInfo) { + if (this.priceFeedClients.get(priceInfo.priceFeed.id) === undefined) { + logger.info( + `Sending ${priceInfo.priceFeed.id} price update to no clients.` + ); return; } logger.info( - `Sending ${priceFeed.id} price update to ${ - this.priceFeedClients.get(priceFeed.id)!.size + `Sending ${priceInfo.priceFeed.id} price update to ${ + this.priceFeedClients.get(priceInfo.priceFeed.id)!.size } clients` ); - for (let client of this.priceFeedClients.get(priceFeed.id)!.values()) { + for (let client of this.priceFeedClients + .get(priceInfo.priceFeed.id)! + .values()) { logger.info( - `Sending ${priceFeed.id} price update to client ${this.wsId.get( - client - )}` + `Sending ${ + priceInfo.priceFeed.id + } price update to client ${this.wsId.get(client)}` ); this.promClient?.addWebSocketInteraction("server_update", "ok"); - let priceUpdate: ServerPriceUpdate = { - type: "price_update", - price_feed: priceFeed.toJson(), - }; + let verbose = this.priceFeedClientsVerbosity + .get(priceInfo.priceFeed.id)! + .get(client); + + let priceUpdate: ServerPriceUpdate = verbose + ? { + type: "price_update", + price_feed: { + ...priceInfo.priceFeed.toJson(), + metadata: { + emitter_chain: priceInfo.emitterChainId, + attestation_time: priceInfo.attestationTime, + sequence_number: priceInfo.seqNum, + }, + }, + } + : { + type: "price_update", + price_feed: priceInfo.priceFeed.toJson(), + }; client.send(JSON.stringify(priceUpdate)); } @@ -128,7 +161,9 @@ export class WebSocketAPI { } if (message.type == "subscribe") { - message.ids.forEach((id) => this.addPriceFeedClient(ws, id)); + message.ids.forEach((id) => + this.addPriceFeedClient(ws, id, message.verbose === true) + ); } else { message.ids.forEach((id) => this.delPriceFeedClient(ws, id)); }