Use single server for both ws and http (#230)

* Use single server for both ws and http

* Format code
This commit is contained in:
Ali Behjati 2022-06-30 12:51:06 +02:00 committed by GitHub
parent da1f19bf0b
commit 6a46dfcb6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 67 deletions

View File

@ -13,9 +13,6 @@ spec:
- port: 4200
name: rest-api
protocol: TCP
- port: 6200
name: wss-api
protocol: TCP
clusterIP: None
selector:
app: pyth-price-service
@ -71,8 +68,6 @@ spec:
value: '[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]'
- name: REST_PORT
value: '4200'
- name: WS_PORT
value: '6200'
- name: PROM_PORT
value: '8081'
- name: READINESS_SPY_SYNC_TIME_SECONDS

View File

@ -52,7 +52,7 @@ async function waitForMessages(messages: any[], cnt: number): Promise<void> {
}
async function createSocketClient(): Promise<[WebSocket, any[]]> {
const client = new WebSocket(`ws://localhost:${port}`);
const client = new WebSocket(`ws://localhost:${port}/ws`);
await waitForSocketState(client, client.OPEN);
@ -79,9 +79,12 @@ beforeAll(async () => {
getPriceIds: () => new Set(priceFeeds.map((priceFeed) => priceFeed.id)),
};
api = new WebSocketAPI({ port }, priceInfo);
api = new WebSocketAPI(priceInfo);
[wss, server] = api.run();
server = new Server();
server.listen(port);
wss = api.run(server);
});
afterAll(async () => {

View File

@ -20,43 +20,43 @@ setDefaultWasm("node");
// Set up the logger.
initLogger({ logLevel: process.env.LOG_LEVEL });
const promClient = new PromClient({
async function run() {
const promClient = new PromClient({
name: "price_service",
port: parseInt(envOrErr("PROM_PORT")),
});
});
const listener = new Listener(
const listener = new Listener(
{
spyServiceHost: envOrErr("SPY_SERVICE_HOST"),
filtersRaw: process.env.SPY_SERVICE_FILTERS,
readiness: {
spySyncTimeSeconds: parseInt(envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")),
spySyncTimeSeconds: parseInt(
envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")
),
numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS")),
},
},
promClient
);
);
// In future if we have more components we will modify it to include them all
const isReady = () => listener.isReady();
// In future if we have more components we will modify it to include them all
const isReady = () => listener.isReady();
const restAPI = new RestAPI(
const restAPI = new RestAPI(
{
port: parseInt(envOrErr("REST_PORT")),
},
listener,
isReady,
promClient
);
);
const wsAPI = new WebSocketAPI(
{
port: parseInt(envOrErr("WS_PORT")),
},
listener,
promClient
);
const wsAPI = new WebSocketAPI(listener, promClient);
listener.run();
restAPI.run();
wsAPI.run();
listener.run();
const server = await restAPI.run();
wsAPI.run(server);
}
run();

View File

@ -168,8 +168,12 @@ export class Listener implements PriceStore {
let isAnyPriceNew = batchAttestation.priceAttestations.some(
(priceAttestation) => {
const key = priceAttestation.priceId;
let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime;
return lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime;
let lastAttestationTime =
this.priceFeedVaaMap.get(key)?.attestationTime;
return (
lastAttestationTime === undefined ||
lastAttestationTime < priceAttestation.attestationTime
);
}
);
@ -182,7 +186,10 @@ export class Listener implements PriceStore {
let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime;
if (lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime) {
if (
lastAttestationTime === undefined ||
lastAttestationTime < priceAttestation.attestationTime
) {
const priceFeed = priceAttestationToPriceFeed(priceAttestation);
this.priceFeedVaaMap.set(key, {
seqNum: parsedVAA.sequence,

View File

@ -9,6 +9,7 @@ import { PromClient } from "./promClient";
import { DurationInMs, DurationInSec } from "./helpers";
import { StatusCodes } from "http-status-codes";
import { validate, ValidationError, Joi, schema } from "express-validation";
import { Server } from "http";
const MORGAN_LOG_FORMAT =
':remote-addr - :remote-user ":method :url HTTP/:http-version"' +
@ -80,7 +81,7 @@ export class RestAPI {
}).required(),
};
app.get(
"/latest_vaas",
"/api/latest_vaas",
validate(latestVaasInputSchema),
(req: Request, res: Response) => {
let priceIds = req.query.ids as string[];
@ -126,7 +127,7 @@ export class RestAPI {
}
);
endpoints.push(
"latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
"api/latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
);
const latestPriceFeedsInputSchema: schema = {
@ -137,7 +138,7 @@ export class RestAPI {
}).required(),
};
app.get(
"/latest_price_feeds",
"/api/latest_price_feeds",
validate(latestPriceFeedsInputSchema),
(req: Request, res: Response) => {
let priceIds = req.query.ids as string[];
@ -177,7 +178,7 @@ export class RestAPI {
}
);
endpoints.push(
"latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
"api/latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
);
app.get("/ready", (_, res: Response) => {
@ -194,6 +195,9 @@ export class RestAPI {
});
endpoints.push("live");
// Websocket endpoint
endpoints.push("ws");
app.get("/", (_, res: Response) => res.json(endpoints));
app.use(function (err: any, _: Request, res: Response, next: NextFunction) {
@ -211,9 +215,9 @@ export class RestAPI {
return app;
}
async run() {
async run(): Promise<Server> {
let app = await this.createApp();
app.listen(this.port, () =>
return app.listen(this.port, () =>
logger.debug("listening on REST port " + this.port)
);
}

View File

@ -34,19 +34,13 @@ export type ServerMessage = ServerResponse | ServerPriceUpdate;
export class WebSocketAPI {
private wsCounter: number;
private port: number;
private priceFeedClients: Map<HexString, Set<WebSocket>>;
private aliveClients: Set<WebSocket>;
private wsId: Map<WebSocket, number>;
private priceFeedVaaInfo: PriceStore;
private promClient: PromClient | undefined;
constructor(
config: { port: number },
priceFeedVaaInfo: PriceStore,
promClient?: PromClient
) {
this.port = config.port;
constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) {
this.priceFeedVaaInfo = priceFeedVaaInfo;
this.priceFeedClients = new Map();
this.aliveClients = new Set();
@ -167,11 +161,8 @@ export class WebSocketAPI {
ws.send(JSON.stringify(response));
}
run(): [WebSocketServer, http.Server] {
const app = express();
const server = http.createServer(app);
const wss = new WebSocketServer({ server });
run(server: http.Server): WebSocketServer {
const wss = new WebSocketServer({ server, path: "/ws" });
wss.on("connection", (ws: WebSocket, request: http.IncomingMessage) => {
logger.info(
@ -220,12 +211,10 @@ export class WebSocketAPI {
clearInterval(pingInterval);
});
server.listen(this.port, () =>
logger.debug("listening on WS port " + this.port)
);
this.priceFeedVaaInfo.addUpdateListener(
this.dispatchPriceFeedUpdate.bind(this)
);
return [wss, server];
return wss;
}
}