From 6a46dfcb6e8e696e0ff5016fa1289220a25f4a18 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 30 Jun 2022 12:51:06 +0200 Subject: [PATCH] Use single server for both ws and http (#230) * Use single server for both ws and http * Format code --- devnet/pyth-price-service.yaml | 5 -- .../price-service/src/__tests__/ws.test.ts | 9 ++- third_party/pyth/price-service/src/index.ts | 68 +++++++++---------- third_party/pyth/price-service/src/listen.ts | 13 +++- third_party/pyth/price-service/src/rest.ts | 16 +++-- third_party/pyth/price-service/src/ws.ts | 21 ++---- 6 files changed, 65 insertions(+), 67 deletions(-) diff --git a/devnet/pyth-price-service.yaml b/devnet/pyth-price-service.yaml index c01fa1a9..b8595f0d 100644 --- a/devnet/pyth-price-service.yaml +++ b/devnet/pyth-price-service.yaml @@ -13,9 +13,6 @@ spec: - port: 4200 name: rest-api protocol: TCP - - port: 6200 - name: wss-api - protocol: TCP clusterIP: None selector: app: pyth-price-service @@ -71,8 +68,6 @@ spec: value: '[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]' - name: REST_PORT value: '4200' - - name: WS_PORT - value: '6200' - name: PROM_PORT value: '8081' - name: READINESS_SPY_SYNC_TIME_SECONDS diff --git a/third_party/pyth/price-service/src/__tests__/ws.test.ts b/third_party/pyth/price-service/src/__tests__/ws.test.ts index 96d30210..8e9daca0 100644 --- a/third_party/pyth/price-service/src/__tests__/ws.test.ts +++ b/third_party/pyth/price-service/src/__tests__/ws.test.ts @@ -52,7 +52,7 @@ async function waitForMessages(messages: any[], cnt: number): Promise { } async function createSocketClient(): Promise<[WebSocket, any[]]> { - const client = new WebSocket(`ws://localhost:${port}`); + const client = new WebSocket(`ws://localhost:${port}/ws`); await waitForSocketState(client, client.OPEN); @@ -79,9 +79,12 @@ beforeAll(async () => { getPriceIds: () => new Set(priceFeeds.map((priceFeed) => priceFeed.id)), }; - api = new WebSocketAPI({ port }, priceInfo); + api = new WebSocketAPI(priceInfo); - [wss, server] = api.run(); + server = new Server(); + server.listen(port); + + wss = api.run(server); }); afterAll(async () => { diff --git a/third_party/pyth/price-service/src/index.ts b/third_party/pyth/price-service/src/index.ts index a808934a..90f15571 100644 --- a/third_party/pyth/price-service/src/index.ts +++ b/third_party/pyth/price-service/src/index.ts @@ -20,43 +20,43 @@ setDefaultWasm("node"); // Set up the logger. initLogger({ logLevel: process.env.LOG_LEVEL }); -const promClient = new PromClient({ - name: "price_service", - port: parseInt(envOrErr("PROM_PORT")), -}); +async function run() { + const promClient = new PromClient({ + name: "price_service", + port: parseInt(envOrErr("PROM_PORT")), + }); -const listener = new Listener( - { - spyServiceHost: envOrErr("SPY_SERVICE_HOST"), - filtersRaw: process.env.SPY_SERVICE_FILTERS, - readiness: { - spySyncTimeSeconds: parseInt(envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")), - numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS")), + const listener = new Listener( + { + spyServiceHost: envOrErr("SPY_SERVICE_HOST"), + filtersRaw: process.env.SPY_SERVICE_FILTERS, + readiness: { + spySyncTimeSeconds: parseInt( + envOrErr("READINESS_SPY_SYNC_TIME_SECONDS") + ), + numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS")), + }, }, - }, - promClient -); + promClient + ); -// In future if we have more components we will modify it to include them all -const isReady = () => listener.isReady(); + // In future if we have more components we will modify it to include them all + const isReady = () => listener.isReady(); -const restAPI = new RestAPI( - { - port: parseInt(envOrErr("REST_PORT")), - }, - listener, - isReady, - promClient -); + const restAPI = new RestAPI( + { + port: parseInt(envOrErr("REST_PORT")), + }, + listener, + isReady, + promClient + ); -const wsAPI = new WebSocketAPI( - { - port: parseInt(envOrErr("WS_PORT")), - }, - listener, - promClient -); + const wsAPI = new WebSocketAPI(listener, promClient); -listener.run(); -restAPI.run(); -wsAPI.run(); + listener.run(); + const server = await restAPI.run(); + wsAPI.run(server); +} + +run(); diff --git a/third_party/pyth/price-service/src/listen.ts b/third_party/pyth/price-service/src/listen.ts index e687ef6a..96f0b9e5 100644 --- a/third_party/pyth/price-service/src/listen.ts +++ b/third_party/pyth/price-service/src/listen.ts @@ -168,8 +168,12 @@ export class Listener implements PriceStore { let isAnyPriceNew = batchAttestation.priceAttestations.some( (priceAttestation) => { const key = priceAttestation.priceId; - let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime; - return lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime; + let lastAttestationTime = + this.priceFeedVaaMap.get(key)?.attestationTime; + return ( + lastAttestationTime === undefined || + lastAttestationTime < priceAttestation.attestationTime + ); } ); @@ -182,7 +186,10 @@ export class Listener implements PriceStore { let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime; - if (lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime) { + if ( + lastAttestationTime === undefined || + lastAttestationTime < priceAttestation.attestationTime + ) { const priceFeed = priceAttestationToPriceFeed(priceAttestation); this.priceFeedVaaMap.set(key, { seqNum: parsedVAA.sequence, diff --git a/third_party/pyth/price-service/src/rest.ts b/third_party/pyth/price-service/src/rest.ts index 78ecb05a..728adabf 100644 --- a/third_party/pyth/price-service/src/rest.ts +++ b/third_party/pyth/price-service/src/rest.ts @@ -9,6 +9,7 @@ import { PromClient } from "./promClient"; import { DurationInMs, DurationInSec } from "./helpers"; import { StatusCodes } from "http-status-codes"; import { validate, ValidationError, Joi, schema } from "express-validation"; +import { Server } from "http"; const MORGAN_LOG_FORMAT = ':remote-addr - :remote-user ":method :url HTTP/:http-version"' + @@ -80,7 +81,7 @@ export class RestAPI { }).required(), }; app.get( - "/latest_vaas", + "/api/latest_vaas", validate(latestVaasInputSchema), (req: Request, res: Response) => { let priceIds = req.query.ids as string[]; @@ -126,7 +127,7 @@ export class RestAPI { } ); endpoints.push( - "latest_vaas?ids[]=&ids[]=&.." + "api/latest_vaas?ids[]=&ids[]=&.." ); const latestPriceFeedsInputSchema: schema = { @@ -137,7 +138,7 @@ export class RestAPI { }).required(), }; app.get( - "/latest_price_feeds", + "/api/latest_price_feeds", validate(latestPriceFeedsInputSchema), (req: Request, res: Response) => { let priceIds = req.query.ids as string[]; @@ -177,7 +178,7 @@ export class RestAPI { } ); endpoints.push( - "latest_price_feeds?ids[]=&ids[]=&.." + "api/latest_price_feeds?ids[]=&ids[]=&.." ); app.get("/ready", (_, res: Response) => { @@ -194,6 +195,9 @@ export class RestAPI { }); endpoints.push("live"); + // Websocket endpoint + endpoints.push("ws"); + app.get("/", (_, res: Response) => res.json(endpoints)); app.use(function (err: any, _: Request, res: Response, next: NextFunction) { @@ -211,9 +215,9 @@ export class RestAPI { return app; } - async run() { + async run(): Promise { let app = await this.createApp(); - app.listen(this.port, () => + return app.listen(this.port, () => logger.debug("listening on REST port " + this.port) ); } diff --git a/third_party/pyth/price-service/src/ws.ts b/third_party/pyth/price-service/src/ws.ts index e51a023b..f508e3e1 100644 --- a/third_party/pyth/price-service/src/ws.ts +++ b/third_party/pyth/price-service/src/ws.ts @@ -34,19 +34,13 @@ export type ServerMessage = ServerResponse | ServerPriceUpdate; export class WebSocketAPI { private wsCounter: number; - private port: number; private priceFeedClients: Map>; private aliveClients: Set; private wsId: Map; private priceFeedVaaInfo: PriceStore; private promClient: PromClient | undefined; - constructor( - config: { port: number }, - priceFeedVaaInfo: PriceStore, - promClient?: PromClient - ) { - this.port = config.port; + constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) { this.priceFeedVaaInfo = priceFeedVaaInfo; this.priceFeedClients = new Map(); this.aliveClients = new Set(); @@ -167,11 +161,8 @@ export class WebSocketAPI { ws.send(JSON.stringify(response)); } - run(): [WebSocketServer, http.Server] { - const app = express(); - const server = http.createServer(app); - - const wss = new WebSocketServer({ server }); + run(server: http.Server): WebSocketServer { + const wss = new WebSocketServer({ server, path: "/ws" }); wss.on("connection", (ws: WebSocket, request: http.IncomingMessage) => { logger.info( @@ -220,12 +211,10 @@ export class WebSocketAPI { clearInterval(pingInterval); }); - server.listen(this.port, () => - logger.debug("listening on WS port " + this.port) - ); this.priceFeedVaaInfo.addUpdateListener( this.dispatchPriceFeedUpdate.bind(this) ); - return [wss, server]; + + return wss; } }