add binary option to subscribe to binary vaas (#444)

* add binary option to subscrube to binary vaas

* change property name

* combine verbosity and binary to a single config

* add tests
This commit is contained in:
Daniel Chew 2022-12-29 18:35:12 +09:00 committed by GitHub
parent 6b07ae607c
commit 9ac61742c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 126 additions and 25 deletions

View File

@ -1,6 +1,5 @@
import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import { Server } from "http";
import { number } from "joi";
import { WebSocket, WebSocketServer } from "ws";
import { sleep } from "../helpers";
import { PriceInfo, PriceStore } from "../listen";
@ -250,6 +249,89 @@ describe("Client receives data", () => {
await waitForSocketState(client, client.CLOSED);
});
test("When subscribes with valid ids and binary flag set to true, returns correct price feed with vaa", async () => {
const [client, serverMessages] = await createSocketClient();
const message: ClientMessage = {
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
type: "subscribe",
binary: true,
};
client.send(JSON.stringify(message));
await waitForMessages(serverMessages, 1);
expect(serverMessages[0]).toStrictEqual({
type: "response",
status: "success",
});
api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toEqual({
type: "price_update",
price_feed: priceInfos[0].priceFeed.toJson(),
});
api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages, 3);
expect(serverMessages[2]).toEqual({
type: "price_update",
price_feed: {
...priceInfos[1].priceFeed.toJson(),
vaa: priceInfos[1].vaa.toString("base64"),
},
});
client.close();
await waitForSocketState(client, client.CLOSED);
});
test("When subscribes with valid ids and binary flag set to false, returns correct price feed without vaa", async () => {
const [client, serverMessages] = await createSocketClient();
const message: ClientMessage = {
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
type: "subscribe",
binary: false,
};
client.send(JSON.stringify(message));
await waitForMessages(serverMessages, 1);
expect(serverMessages[0]).toStrictEqual({
type: "response",
status: "success",
});
api.dispatchPriceFeedUpdate(priceInfos[0]);
await waitForMessages(serverMessages, 2);
expect(serverMessages[1]).toEqual({
type: "price_update",
price_feed: priceInfos[0].priceFeed.toJson(),
});
api.dispatchPriceFeedUpdate(priceInfos[1]);
await waitForMessages(serverMessages, 3);
expect(serverMessages[2]).toEqual({
type: "price_update",
price_feed: priceInfos[1].priceFeed.toJson(),
});
client.close();
await waitForSocketState(client, client.CLOSED);
});
test("When subscribes with invalid ids, returns error", async () => {
const [client, serverMessages] = await createSocketClient();

View File

@ -12,12 +12,14 @@ const ClientMessageSchema: Joi.Schema = Joi.object({
.items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/))
.required(),
verbose: Joi.boolean(),
binary: Joi.boolean(),
}).required();
export type ClientMessage = {
type: "subscribe" | "unsubscribe";
ids: HexString[];
verbose?: boolean;
binary?: boolean;
};
export type ServerResponse = {
@ -31,12 +33,20 @@ export type ServerPriceUpdate = {
price_feed: any;
};
export type PriceFeedConfig = {
verbose: boolean;
binary: boolean;
};
export type ServerMessage = ServerResponse | ServerPriceUpdate;
export class WebSocketAPI {
private wsCounter: number;
private priceFeedClients: Map<HexString, Set<WebSocket>>;
private priceFeedClientsVerbosity: Map<HexString, Map<WebSocket, boolean>>;
private priceFeedClientsConfig: Map<
HexString,
Map<WebSocket, PriceFeedConfig>
>;
private aliveClients: Set<WebSocket>;
private wsId: Map<WebSocket, number>;
private priceFeedVaaInfo: PriceStore;
@ -45,7 +55,7 @@ export class WebSocketAPI {
constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) {
this.priceFeedVaaInfo = priceFeedVaaInfo;
this.priceFeedClients = new Map();
this.priceFeedClientsVerbosity = new Map();
this.priceFeedClientsConfig = new Map();
this.aliveClients = new Set();
this.wsCounter = 0;
this.wsId = new Map();
@ -55,13 +65,14 @@ export class WebSocketAPI {
private addPriceFeedClient(
ws: WebSocket,
id: HexString,
verbose: boolean = false
verbose: boolean = false,
binary: boolean = false
) {
if (!this.priceFeedClients.has(id)) {
this.priceFeedClients.set(id, new Set());
this.priceFeedClientsVerbosity.set(id, new Map([[ws, verbose]]));
this.priceFeedClientsConfig.set(id, new Map([[ws, { verbose, binary }]]));
} else {
this.priceFeedClientsVerbosity.get(id)!.set(ws, verbose);
this.priceFeedClientsConfig.get(id)!.set(ws, { verbose, binary });
}
this.priceFeedClients.get(id)!.add(ws);
}
@ -71,7 +82,7 @@ export class WebSocketAPI {
return;
}
this.priceFeedClients.get(id)!.delete(ws);
this.priceFeedClientsVerbosity.get(id)!.delete(ws);
this.priceFeedClientsConfig.get(id)!.delete(ws);
}
dispatchPriceFeedUpdate(priceInfo: PriceInfo) {
@ -96,27 +107,30 @@ export class WebSocketAPI {
for (const client of clients.values()) {
this.promClient?.addWebSocketInteraction("server_update", "ok");
const verbose = this.priceFeedClientsVerbosity
const config = this.priceFeedClientsConfig
.get(priceInfo.priceFeed.id)!
.get(client);
const priceUpdate: ServerPriceUpdate = verbose
? {
type: "price_update",
price_feed: {
...priceInfo.priceFeed.toJson(),
metadata: {
emitter_chain: priceInfo.emitterChainId,
attestation_time: priceInfo.attestationTime,
sequence_number: priceInfo.seqNum,
price_service_receive_time: priceInfo.priceServiceReceiveTime,
},
const verbose = config?.verbose;
const binary = config?.binary;
const priceUpdate: ServerPriceUpdate = {
type: "price_update",
price_feed: {
...priceInfo.priceFeed.toJson(),
...(verbose && {
metadata: {
emitter_chain: priceInfo.emitterChainId,
attestation_time: priceInfo.attestationTime,
sequence_number: priceInfo.seqNum,
price_service_receive_time: priceInfo.priceServiceReceiveTime,
},
}
: {
type: "price_update",
price_feed: priceInfo.priceFeed.toJson(),
};
}),
...(binary && {
vaa: priceInfo.vaa.toString("base64"),
}),
},
};
client.send(JSON.stringify(priceUpdate));
}
@ -161,7 +175,12 @@ export class WebSocketAPI {
if (message.type === "subscribe") {
message.ids.forEach((id) =>
this.addPriceFeedClient(ws, id, message.verbose === true)
this.addPriceFeedClient(
ws,
id,
message.verbose === true,
message.binary === true
)
);
} else {
message.ids.forEach((id) => this.delPriceFeedClient(ws, id));