enable verbose property to return metadata for ws endpoint (#271)

* enable verbose property to return metadata for ws endpoint

* remove console log

* fix test cases failing

* fix spelling errors

* update callback and listener to use PriceInfo

* isolate verbose flag to each client and add tests

* isolate verbose flag from clients
This commit is contained in:
Daniel Chew 2022-09-07 23:27:44 +08:00 committed by GitHub
parent bd90203cdf
commit 303385bfbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 252 additions and 101 deletions

View File

@ -11,7 +11,7 @@
"dependencies": { "dependencies": {
"@certusone/wormhole-sdk": "0.2.1", "@certusone/wormhole-sdk": "0.2.1",
"@improbable-eng/grpc-web-node-http-transport": "^0.14.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1",
"@pythnetwork/pyth-sdk-js": "^0.1.0" "@pythnetwork/pyth-sdk-js": "^0.3.0"
}, },
"devDependencies": { "devDependencies": {
"@openzeppelin/contracts": "^4.2.0", "@openzeppelin/contracts": "^4.2.0",
@ -913,9 +913,9 @@
"integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA="
}, },
"node_modules/@pythnetwork/pyth-sdk-js": { "node_modules/@pythnetwork/pyth-sdk-js": {
"version": "0.1.0", "version": "0.3.0",
"resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz",
"integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg=="
}, },
"node_modules/@solana/buffer-layout": { "node_modules/@solana/buffer-layout": {
"version": "4.0.0", "version": "4.0.0",
@ -3236,9 +3236,9 @@
"integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA="
}, },
"@pythnetwork/pyth-sdk-js": { "@pythnetwork/pyth-sdk-js": {
"version": "0.1.0", "version": "0.3.0",
"resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz",
"integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg=="
}, },
"@solana/buffer-layout": { "@solana/buffer-layout": {
"version": "4.0.0", "version": "4.0.0",

View File

@ -41,7 +41,7 @@
"dependencies": { "dependencies": {
"@certusone/wormhole-sdk": "0.2.1", "@certusone/wormhole-sdk": "0.2.1",
"@improbable-eng/grpc-web-node-http-transport": "^0.14.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1",
"@pythnetwork/pyth-sdk-js": "^0.1.0" "@pythnetwork/pyth-sdk-js": "^0.3.0"
}, },
"bugs": { "bugs": {
"url": "https://github.com/pyth-network/pyth-crosschain/issues" "url": "https://github.com/pyth-network/pyth-crosschain/issues"

View File

@ -1,18 +1,18 @@
{ {
"name": "@pythnetwork/price-service", "name": "@pythnetwork/pyth-price-service",
"version": "1.0.0", "version": "1.0.0",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@pythnetwork/price-service", "name": "@pythnetwork/pyth-price-service",
"version": "1.0.0", "version": "1.0.0",
"license": "Apache-2.0", "license": "Apache-2.0",
"dependencies": { "dependencies": {
"@certusone/p2w-sdk": "file:../p2w-sdk/js", "@certusone/p2w-sdk": "file:../p2w-sdk/js",
"@certusone/wormhole-sdk": "^0.1.4", "@certusone/wormhole-sdk": "^0.1.4",
"@certusone/wormhole-spydk": "^0.0.1", "@certusone/wormhole-spydk": "^0.0.1",
"@pythnetwork/pyth-sdk-js": "^0.1.0", "@pythnetwork/pyth-sdk-js": "^0.3.0",
"@types/cors": "^2.8.12", "@types/cors": "^2.8.12",
"@types/express": "^4.17.13", "@types/express": "^4.17.13",
"@types/morgan": "^1.9.3", "@types/morgan": "^1.9.3",
@ -2201,9 +2201,9 @@
"integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA="
}, },
"node_modules/@pythnetwork/pyth-sdk-js": { "node_modules/@pythnetwork/pyth-sdk-js": {
"version": "0.1.0", "version": "0.3.0",
"resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz",
"integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg=="
}, },
"node_modules/@sideway/address": { "node_modules/@sideway/address": {
"version": "4.1.4", "version": "4.1.4",
@ -10545,9 +10545,9 @@
"integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA="
}, },
"@pythnetwork/pyth-sdk-js": { "@pythnetwork/pyth-sdk-js": {
"version": "0.1.0", "version": "0.3.0",
"resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.1.0.tgz", "resolved": "https://registry.npmjs.org/@pythnetwork/pyth-sdk-js/-/pyth-sdk-js-0.3.0.tgz",
"integrity": "sha512-fsGx2vkXncoIpsrcjx6WY7JN0R74YG/lX2UA1Wz/m6MPgJrde1LHkmikOSdMZUU3KkpWGHT1ZdYoW+Ikv7Nv9g==" "integrity": "sha512-7xsSM5PWD8+ez8lB5R0ofpaP1J1bRrtVkp9zm7Ry8QtKq5dOFfQqSqOjh9tLTX2h8i2xD93//0EnXXw35pzCkg=="
}, },
"@sideway/address": { "@sideway/address": {
"version": "4.1.4", "version": "4.1.4",

View File

@ -28,7 +28,7 @@
"@certusone/p2w-sdk": "file:../p2w-sdk/js", "@certusone/p2w-sdk": "file:../p2w-sdk/js",
"@certusone/wormhole-sdk": "^0.1.4", "@certusone/wormhole-sdk": "^0.1.4",
"@certusone/wormhole-spydk": "^0.0.1", "@certusone/wormhole-spydk": "^0.0.1",
"@pythnetwork/pyth-sdk-js": "^0.1.0", "@pythnetwork/pyth-sdk-js": "^0.3.0",
"@types/cors": "^2.8.12", "@types/cors": "^2.8.12",
"@types/express": "^4.17.13", "@types/express": "^4.17.13",
"@types/morgan": "^1.9.3", "@types/morgan": "^1.9.3",

View File

@ -60,7 +60,7 @@ beforeAll(async () => {
getLatestPriceInfo: (priceFeedId: string) => { getLatestPriceInfo: (priceFeedId: string) => {
return priceInfoMap.get(priceFeedId); return priceInfoMap.get(priceFeedId);
}, },
addUpdateListener: (_callback: (priceFeed: PriceFeed) => any) => {}, addUpdateListener: (_callback: (priceInfo: PriceInfo) => any) => {},
getPriceIds: () => new Set(), getPriceIds: () => new Set(),
}; };
@ -72,20 +72,20 @@ beforeAll(async () => {
describe("Latest Price Feed Endpoint", () => { describe("Latest Price Feed Endpoint", () => {
test("When called with valid ids, returns correct price feed", async () => { test("When called with valid ids, returns correct price feed", async () => {
const ids = [expandTo64Len("abcd"), expandTo64Len("3456")]; const ids = [expandTo64Len("abcd"), expandTo64Len("3456")];
const resp = await request(app).get("/latest_price_feeds").query({ ids }); const resp = await request(app).get("/api/latest_price_feeds").query({ ids });
expect(resp.status).toBe(StatusCodes.OK); expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body.length).toBe(2); expect(resp.body.length).toBe(2);
expect(resp.body).toContainEqual(dummyPriceFeed(ids[0]).toJson()); expect(resp.body).toContainEqual(dummyPriceFeed(ids[0]).toJson());
expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson()); expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson());
}); });
test("When called with some non-existant ids within ids, returns error mentioning non-existant ids", async () => { test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => {
const ids = [ const ids = [
expandTo64Len("ab01"), expandTo64Len("ab01"),
expandTo64Len("3456"), expandTo64Len("3456"),
expandTo64Len("effe"), expandTo64Len("effe"),
]; ];
const resp = await request(app).get("/latest_price_feeds").query({ ids }); const resp = await request(app).get("/api/latest_price_feeds").query({ ids });
expect(resp.status).toBe(StatusCodes.BAD_REQUEST); expect(resp.status).toBe(StatusCodes.BAD_REQUEST);
expect(resp.body.message).toContain(ids[0]); expect(resp.body.message).toContain(ids[0]);
expect(resp.body.message).not.toContain(ids[1]); expect(resp.body.message).not.toContain(ids[1]);
@ -100,7 +100,7 @@ describe("Latest Vaa Bytes Endpoint", () => {
expandTo64Len("ef01"), expandTo64Len("ef01"),
expandTo64Len("3456"), expandTo64Len("3456"),
]; ];
const resp = await request(app).get("/latest_vaas").query({ ids }); const resp = await request(app).get("/api/latest_vaas").query({ ids });
expect(resp.status).toBe(StatusCodes.OK); expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body.length).toBe(2); expect(resp.body.length).toBe(2);
expect(resp.body).toContain( expect(resp.body).toContain(
@ -111,13 +111,13 @@ describe("Latest Vaa Bytes Endpoint", () => {
); );
}); });
test("When called with some non-existant ids within ids, returns error mentioning non-existant ids", async () => { test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => {
const ids = [ const ids = [
expandTo64Len("ab01"), expandTo64Len("ab01"),
expandTo64Len("3456"), expandTo64Len("3456"),
expandTo64Len("effe"), expandTo64Len("effe"),
]; ];
const resp = await request(app).get("/latest_vaas").query({ ids }); const resp = await request(app).get("/api/latest_vaas").query({ ids });
expect(resp.status).toBe(StatusCodes.BAD_REQUEST); expect(resp.status).toBe(StatusCodes.BAD_REQUEST);
expect(resp.body.message).toContain(ids[0]); expect(resp.body.message).toContain(ids[0]);
expect(resp.body.message).not.toContain(ids[1]); expect(resp.body.message).not.toContain(ids[1]);

View File

@ -1,9 +1,9 @@
import { HexString, PriceFeed, PriceStatus } from "@pythnetwork/pyth-sdk-js"; import { HexString, PriceFeed, PriceStatus } from "@pythnetwork/pyth-sdk-js";
import { PriceStore, PriceInfo } from "../listen";
import { WebSocketAPI, ClientMessage } from "../ws";
import { Server } from "http"; import { Server } from "http";
import { WebSocket, WebSocketServer } from "ws"; import { WebSocket, WebSocketServer } from "ws";
import { sleep } from "../helpers"; import { sleep } from "../helpers";
import { PriceInfo, PriceStore } from "../listen";
import { ClientMessage, WebSocketAPI } from "../ws";
const port = 2524; const port = 2524;
@ -11,27 +11,54 @@ let api: WebSocketAPI;
let server: Server; let server: Server;
let wss: WebSocketServer; let wss: WebSocketServer;
let priceFeeds: PriceFeed[]; let priceInfos: PriceInfo[];
let priceMetadata: any;
function expandTo64Len(id: string): string { function expandTo64Len(id: string): string {
return id.repeat(64).substring(0, 64); return id.repeat(64).substring(0, 64);
} }
function dummyPriceMetadata(
attestationTime: number,
emitterChainId: number,
seqNum: number
): any {
return {
attestation_time: attestationTime,
emitter_chain: emitterChainId,
sequence_number: seqNum,
};
}
function dummyPriceInfo(
id: HexString,
vaa: HexString,
priceMetadata: any
): PriceInfo {
return {
seqNum: priceMetadata.sequence_number,
attestationTime: priceMetadata.attestation_time,
emitterChainId: priceMetadata.emitter_chain,
priceFeed: dummyPriceFeed(id),
vaaBytes: Buffer.from(vaa, "hex").toString("binary"),
};
}
function dummyPriceFeed(id: string): PriceFeed { function dummyPriceFeed(id: string): PriceFeed {
return new PriceFeed({ return PriceFeed.fromJson({
conf: "0", conf: "0",
emaConf: "1", ema_conf: "1",
emaPrice: "2", ema_price: "2",
expo: 4, expo: 3,
id, id,
maxNumPublishers: 7, max_num_publishers: 5,
numPublishers: 6, num_publishers: 6,
prevConf: "8", prev_conf: "7",
prevPrice: "9", prev_price: "8",
prevPublishTime: 10, prev_publish_time: 9,
price: "11", price: "10",
productId: "def456", product_id: "def456",
publishTime: 13, publish_time: 12,
status: PriceStatus.Trading, status: PriceStatus.Trading,
}); });
} }
@ -66,17 +93,19 @@ async function createSocketClient(): Promise<[WebSocket, any[]]> {
} }
beforeAll(async () => { beforeAll(async () => {
priceFeeds = [ priceMetadata = dummyPriceMetadata(0, 0, 0);
dummyPriceFeed(expandTo64Len("abcd")), priceInfos = [
dummyPriceFeed(expandTo64Len("ef01")), dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4", priceMetadata),
dummyPriceFeed(expandTo64Len("2345")), dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4", priceMetadata),
dummyPriceFeed(expandTo64Len("6789")), dummyPriceInfo(expandTo64Len("2345"), "bad01bad", priceMetadata),
dummyPriceInfo(expandTo64Len("6789"), "bidbidbid", priceMetadata),
]; ];
let priceInfo: PriceStore = { let priceInfo: PriceStore = {
getLatestPriceInfo: (_priceFeedId: string) => undefined, getLatestPriceInfo: (_priceFeedId: string) => undefined,
addUpdateListener: (_callback: (priceFeed: PriceFeed) => any) => undefined, addUpdateListener: (_callback: (priceInfo: PriceInfo) => any) => undefined,
getPriceIds: () => new Set(priceFeeds.map((priceFeed) => priceFeed.id)), getPriceIds: () =>
new Set(priceInfos.map((priceInfo) => priceInfo.priceFeed.id)),
}; };
api = new WebSocketAPI(priceInfo); api = new WebSocketAPI(priceInfo);
@ -93,11 +122,11 @@ afterAll(async () => {
}); });
describe("Client receives data", () => { describe("Client receives data", () => {
test("When subscribes with valid ids, returns correct price feed", async () => { test("When subscribes with valid ids without verbose flag, returns correct price feed", async () => {
let [client, serverMessages] = await createSocketClient(); let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = { let message: ClientMessage = {
ids: [priceFeeds[0].id, priceFeeds[1].id], ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
type: "subscribe", type: "subscribe",
}; };
@ -110,22 +139,108 @@ describe("Client receives data", () => {
status: "success", status: "success",
}); });
api.dispatchPriceFeedUpdate(priceFeeds[0]); api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2); await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toStrictEqual({ expect(serverMessages[1]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[0].toJson(), price_feed: priceInfos[0].priceFeed.toJson(),
}); });
api.dispatchPriceFeedUpdate(priceFeeds[1]); api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages, 3); await waitForMessages(serverMessages, 3);
expect(serverMessages[2]).toStrictEqual({ expect(serverMessages[2]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[1].toJson(), price_feed: priceInfos[1].priceFeed.toJson(),
});
client.close();
await waitForSocketState(client, client.CLOSED);
});
test("When subscribes with valid ids and verbose flag set to true, returns correct price feed with metadata", async () => {
let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = {
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
type: "subscribe",
verbose: true,
};
client.send(JSON.stringify(message));
await waitForMessages(serverMessages, 1);
expect(serverMessages[0]).toStrictEqual({
type: "response",
status: "success",
});
api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toEqual({
type: "price_update",
price_feed: {
...priceInfos[0].priceFeed.toJson(),
metadata: priceMetadata,
},
});
api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages, 3);
expect(serverMessages[2]).toEqual({
type: "price_update",
price_feed: {
...priceInfos[1].priceFeed.toJson(),
metadata: priceMetadata,
},
});
client.close();
await waitForSocketState(client, client.CLOSED);
});
test("When subscribes with valid ids and verbose flag set to false, returns correct price feed without metadata", async () => {
let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = {
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
type: "subscribe",
verbose: false,
};
client.send(JSON.stringify(message));
await waitForMessages(serverMessages, 1);
expect(serverMessages[0]).toStrictEqual({
type: "response",
status: "success",
});
api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toEqual({
type: "price_update",
price_feed: priceInfos[0].priceFeed.toJson(),
});
api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages, 3);
expect(serverMessages[2]).toEqual({
type: "price_update",
price_feed: priceInfos[1].priceFeed.toJson(),
}); });
client.close(); client.close();
@ -156,7 +271,7 @@ describe("Client receives data", () => {
let [client, serverMessages] = await createSocketClient(); let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = { let message: ClientMessage = {
ids: [priceFeeds[0].id], ids: [priceInfos[0].priceFeed.id],
type: "subscribe", type: "subscribe",
}; };
@ -169,17 +284,17 @@ describe("Client receives data", () => {
status: "success", status: "success",
}); });
api.dispatchPriceFeedUpdate(priceFeeds[1]); api.dispatchPriceFeedUpdate(priceInfos[1]);
await sleep(100); await sleep(100);
api.dispatchPriceFeedUpdate(priceFeeds[0]); api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2); await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toStrictEqual({ expect(serverMessages[1]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[0].toJson(), price_feed: priceInfos[0].priceFeed.toJson(),
}); });
await sleep(100); await sleep(100);
@ -193,7 +308,7 @@ describe("Client receives data", () => {
let [client, serverMessages] = await createSocketClient(); let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = { let message: ClientMessage = {
ids: [priceFeeds[0].id], ids: [priceInfos[0].priceFeed.id],
type: "subscribe", type: "subscribe",
}; };
@ -206,17 +321,17 @@ describe("Client receives data", () => {
status: "success", status: "success",
}); });
api.dispatchPriceFeedUpdate(priceFeeds[0]); api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2); await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toStrictEqual({ expect(serverMessages[1]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[0].toJson(), price_feed: priceInfos[0].priceFeed.toJson(),
}); });
message = { message = {
ids: [priceFeeds[0].id], ids: [priceInfos[0].priceFeed.id],
type: "unsubscribe", type: "unsubscribe",
}; };
@ -229,7 +344,7 @@ describe("Client receives data", () => {
status: "success", status: "success",
}); });
api.dispatchPriceFeedUpdate(priceFeeds[0]); api.dispatchPriceFeedUpdate(priceInfos[0]);
await sleep(100); await sleep(100);
@ -243,7 +358,7 @@ describe("Client receives data", () => {
let [client, serverMessages] = await createSocketClient(); let [client, serverMessages] = await createSocketClient();
let message: ClientMessage = { let message: ClientMessage = {
ids: [priceFeeds[0].id], ids: [priceInfos[0].priceFeed.id],
type: "unsubscribe", type: "unsubscribe",
}; };
@ -265,14 +380,14 @@ describe("Client receives data", () => {
let [client2, serverMessages2] = await createSocketClient(); let [client2, serverMessages2] = await createSocketClient();
let message1: ClientMessage = { let message1: ClientMessage = {
ids: [priceFeeds[0].id], ids: [priceInfos[0].priceFeed.id],
type: "subscribe", type: "subscribe",
}; };
client1.send(JSON.stringify(message1)); client1.send(JSON.stringify(message1));
let message2: ClientMessage = { let message2: ClientMessage = {
ids: [priceFeeds[1].id], ids: [priceInfos[1].priceFeed.id],
type: "subscribe", type: "subscribe",
}; };
@ -291,20 +406,20 @@ describe("Client receives data", () => {
status: "success", status: "success",
}); });
api.dispatchPriceFeedUpdate(priceFeeds[0]); api.dispatchPriceFeedUpdate(priceInfos[0]);
api.dispatchPriceFeedUpdate(priceFeeds[1]); api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages1, 2); await waitForMessages(serverMessages1, 2);
await waitForMessages(serverMessages2, 2); await waitForMessages(serverMessages2, 2);
expect(serverMessages1[1]).toStrictEqual({ expect(serverMessages1[1]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[0].toJson(), price_feed: priceInfos[0].priceFeed.toJson(),
}); });
expect(serverMessages2[1]).toStrictEqual({ expect(serverMessages2[1]).toEqual({
type: "price_update", type: "price_update",
price_feed: priceFeeds[1].toJson(), price_feed: priceInfos[1].priceFeed.toJson(),
}); });
client1.close(); client1.close();

View File

@ -37,7 +37,7 @@ export type PriceInfo = {
export interface PriceStore { export interface PriceStore {
getPriceIds(): Set<HexString>; getPriceIds(): Set<HexString>;
getLatestPriceInfo(priceFeedId: HexString): PriceInfo | undefined; getLatestPriceInfo(priceFeedId: HexString): PriceInfo | undefined;
addUpdateListener(callback: (priceFeed: PriceFeed) => any): void; addUpdateListener(callback: (priceInfo: PriceInfo) => any): void;
} }
type ListenerReadinessConfig = { type ListenerReadinessConfig = {
@ -59,7 +59,7 @@ export class Listener implements PriceStore {
private filters: FilterEntry[] = []; private filters: FilterEntry[] = [];
private spyConnectionTime: TimestampInSec | undefined; private spyConnectionTime: TimestampInSec | undefined;
private readinessConfig: ListenerReadinessConfig; private readinessConfig: ListenerReadinessConfig;
private updateCallbacks: ((priceFeed: PriceFeed) => any)[]; private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
constructor(config: ListenerConfig, promClient?: PromClient) { constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient; this.promClient = promClient;
@ -192,16 +192,17 @@ export class Listener implements PriceStore {
lastAttestationTime < priceAttestation.attestationTime lastAttestationTime < priceAttestation.attestationTime
) { ) {
const priceFeed = priceAttestationToPriceFeed(priceAttestation); const priceFeed = priceAttestationToPriceFeed(priceAttestation);
this.priceFeedVaaMap.set(key, { const priceInfo = {
seqNum: parsedVAA.sequence, seqNum: parsedVAA.sequence,
vaaBytes: vaaBytes, vaaBytes: vaaBytes,
attestationTime: priceAttestation.attestationTime, attestationTime: priceAttestation.attestationTime,
priceFeed, priceFeed,
emitterChainId: parsedVAA.emitter_chain, emitterChainId: parsedVAA.emitter_chain,
}); }
this.priceFeedVaaMap.set(key, priceInfo);
for (let callback of this.updateCallbacks) { for (let callback of this.updateCallbacks) {
callback(priceFeed); callback(priceInfo);
} }
} }
} }
@ -224,7 +225,7 @@ export class Listener implements PriceStore {
return this.priceFeedVaaMap.get(priceFeedId); return this.priceFeedVaaMap.get(priceFeedId);
} }
addUpdateListener(callback: (priceFeed: PriceFeed) => any) { addUpdateListener(callback: (priceInfo: PriceInfo) => any) {
this.updateCallbacks.push(callback); this.updateCallbacks.push(callback);
} }

View File

@ -1,9 +1,8 @@
import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; import { HexString } from "@pythnetwork/pyth-sdk-js";
import express from "express";
import * as http from "http"; import * as http from "http";
import Joi from "joi"; import Joi from "joi";
import WebSocket, { RawData, WebSocketServer } from "ws"; import WebSocket, { RawData, WebSocketServer } from "ws";
import { PriceStore } from "./listen"; import { PriceInfo, PriceStore } from "./listen";
import { logger } from "./logging"; import { logger } from "./logging";
import { PromClient } from "./promClient"; import { PromClient } from "./promClient";
@ -12,11 +11,13 @@ const ClientMessageSchema: Joi.Schema = Joi.object({
ids: Joi.array() ids: Joi.array()
.items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/)) .items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/))
.required(), .required(),
verbose: Joi.boolean(),
}).required(); }).required();
export type ClientMessage = { export type ClientMessage = {
type: "subscribe" | "unsubscribe"; type: "subscribe" | "unsubscribe";
ids: HexString[]; ids: HexString[];
verbose?: boolean;
}; };
export type ServerResponse = { export type ServerResponse = {
@ -35,6 +36,7 @@ export type ServerMessage = ServerResponse | ServerPriceUpdate;
export class WebSocketAPI { export class WebSocketAPI {
private wsCounter: number; private wsCounter: number;
private priceFeedClients: Map<HexString, Set<WebSocket>>; private priceFeedClients: Map<HexString, Set<WebSocket>>;
private priceFeedClientsVerbosity: Map<HexString, Map<WebSocket, boolean>>;
private aliveClients: Set<WebSocket>; private aliveClients: Set<WebSocket>;
private wsId: Map<WebSocket, number>; private wsId: Map<WebSocket, number>;
private priceFeedVaaInfo: PriceStore; private priceFeedVaaInfo: PriceStore;
@ -43,47 +45,78 @@ export class WebSocketAPI {
constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) { constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) {
this.priceFeedVaaInfo = priceFeedVaaInfo; this.priceFeedVaaInfo = priceFeedVaaInfo;
this.priceFeedClients = new Map(); this.priceFeedClients = new Map();
this.priceFeedClientsVerbosity = new Map();
this.aliveClients = new Set(); this.aliveClients = new Set();
this.wsCounter = 0; this.wsCounter = 0;
this.wsId = new Map(); this.wsId = new Map();
this.promClient = promClient; this.promClient = promClient;
} }
private addPriceFeedClient(ws: WebSocket, id: HexString) { private addPriceFeedClient(
ws: WebSocket,
id: HexString,
verbose: boolean = false
) {
if (!this.priceFeedClients.has(id)) { if (!this.priceFeedClients.has(id)) {
this.priceFeedClients.set(id, new Set()); this.priceFeedClients.set(id, new Set());
this.priceFeedClientsVerbosity.set(id, new Map([[ws, verbose]]));
} else {
this.priceFeedClientsVerbosity.get(id)!.set(ws, verbose);
} }
this.priceFeedClients.get(id)!.add(ws); this.priceFeedClients.get(id)!.add(ws);
} }
private delPriceFeedClient(ws: WebSocket, id: HexString) { private delPriceFeedClient(ws: WebSocket, id: HexString) {
this.priceFeedClients.get(id)?.delete(ws); if (!this.priceFeedClients.has(id)) {
return;
}
this.priceFeedClients.get(id)!.delete(ws);
this.priceFeedClientsVerbosity.get(id)!.delete(ws);
} }
dispatchPriceFeedUpdate(priceFeed: PriceFeed) { dispatchPriceFeedUpdate(priceInfo: PriceInfo) {
if (this.priceFeedClients.get(priceFeed.id) === undefined) { if (this.priceFeedClients.get(priceInfo.priceFeed.id) === undefined) {
logger.info(`Sending ${priceFeed.id} price update to no clients.`); logger.info(
`Sending ${priceInfo.priceFeed.id} price update to no clients.`
);
return; return;
} }
logger.info( logger.info(
`Sending ${priceFeed.id} price update to ${ `Sending ${priceInfo.priceFeed.id} price update to ${
this.priceFeedClients.get(priceFeed.id)!.size this.priceFeedClients.get(priceInfo.priceFeed.id)!.size
} clients` } clients`
); );
for (let client of this.priceFeedClients.get(priceFeed.id)!.values()) { for (let client of this.priceFeedClients
.get(priceInfo.priceFeed.id)!
.values()) {
logger.info( logger.info(
`Sending ${priceFeed.id} price update to client ${this.wsId.get( `Sending ${
client priceInfo.priceFeed.id
)}` } price update to client ${this.wsId.get(client)}`
); );
this.promClient?.addWebSocketInteraction("server_update", "ok"); this.promClient?.addWebSocketInteraction("server_update", "ok");
let priceUpdate: ServerPriceUpdate = { let verbose = this.priceFeedClientsVerbosity
.get(priceInfo.priceFeed.id)!
.get(client);
let priceUpdate: ServerPriceUpdate = verbose
? {
type: "price_update", type: "price_update",
price_feed: priceFeed.toJson(), price_feed: {
...priceInfo.priceFeed.toJson(),
metadata: {
emitter_chain: priceInfo.emitterChainId,
attestation_time: priceInfo.attestationTime,
sequence_number: priceInfo.seqNum,
},
},
}
: {
type: "price_update",
price_feed: priceInfo.priceFeed.toJson(),
}; };
client.send(JSON.stringify(priceUpdate)); client.send(JSON.stringify(priceUpdate));
@ -128,7 +161,9 @@ export class WebSocketAPI {
} }
if (message.type == "subscribe") { if (message.type == "subscribe") {
message.ids.forEach((id) => this.addPriceFeedClient(ws, id)); message.ids.forEach((id) =>
this.addPriceFeedClient(ws, id, message.verbose === true)
);
} else { } else {
message.ids.forEach((id) => this.delPriceFeedClient(ws, id)); message.ids.forEach((id) => this.delPriceFeedClient(ws, id));
} }