diff --git a/third_party/pyth/price-service/src/helpers.ts b/third_party/pyth/price-service/src/helpers.ts index 2f8d1202..10b16675 100644 --- a/third_party/pyth/price-service/src/helpers.ts +++ b/third_party/pyth/price-service/src/helpers.ts @@ -1,3 +1,7 @@ +// Time in seconds +export type TimestampInSec = number; +export type DurationInSec = number; + export function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/third_party/pyth/price-service/src/index.ts b/third_party/pyth/price-service/src/index.ts index deaf6f70..68a96543 100644 --- a/third_party/pyth/price-service/src/index.ts +++ b/third_party/pyth/price-service/src/index.ts @@ -21,7 +21,7 @@ setDefaultWasm("node"); initLogger({logLevel: process.env.LOG_LEVEL}); const promClient = new PromClient({ - name: "pyth_relay", + name: "price_service", port: parseInt(envOrErr("PROM_PORT")) }); @@ -39,7 +39,7 @@ const isReady = () => listener.isReady(); const restAPI = new RestAPI({ port: parseInt(envOrErr("REST_PORT")) -}, listener, isReady); +}, listener, isReady, promClient); listener.run(); restAPI.run(); diff --git a/third_party/pyth/price-service/src/listen.ts b/third_party/pyth/price-service/src/listen.ts index 20408cb4..0261c88d 100644 --- a/third_party/pyth/price-service/src/listen.ts +++ b/third_party/pyth/price-service/src/listen.ts @@ -11,19 +11,17 @@ import { import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm"; -import { envOrErr, sleep } from "./helpers"; +import { envOrErr, sleep, TimestampInSec } from "./helpers"; import { PromClient } from "./promClient"; import { getBatchSummary, parseBatchPriceAttestation } from "@certusone/p2w-sdk"; import { ClientReadableStream } from "@grpc/grpc-js"; import { FilterEntry, SubscribeSignedVAAResponse } from "@certusone/wormhole-spydk/lib/cjs/proto/spy/v1/spy"; import { logger } from "./logging"; -// Timestamp (in seconds) -type Timestamp = number; - export type VaaInfo = { vaaBytes: string, - seqNum: number; + seqNum: number, + receiveTime: TimestampInSec, }; export interface PriceFeedVaaInfo { @@ -44,13 +42,13 @@ type ListenerConfig = { export class Listener implements PriceFeedVaaInfo { // Mapping of Price Feed Id to Vaa private priceFeedVaaMap = new Map(); - private promClient: PromClient; + private promClient: PromClient | undefined; private spyServiceHost: string; private filters: FilterEntry[] = []; - private spyConnectionTime: Timestamp | undefined; + private spyConnectionTime: TimestampInSec | undefined; private readinessConfig: ListenerReadinessConfig; - constructor(config: ListenerConfig, promClient: PromClient) { + constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; this.spyServiceHost = config.spyServiceHost; this.loadFilters(config.filtersRaw); @@ -178,7 +176,8 @@ export class Listener implements PriceFeedVaaInfo { if (lastSeqNum === undefined || lastSeqNum < parsedVAA.sequence) { this.priceFeedVaaMap.set(key, { seqNum: parsedVAA.sequence, - vaaBytes: vaaBytes + vaaBytes: vaaBytes, + receiveTime: (new Date()).getTime() / 1000, }); } } @@ -194,7 +193,7 @@ export class Listener implements PriceFeedVaaInfo { getBatchSummary(batchAttestation) ); - this.promClient.incIncoming(); + this.promClient?.incReceivedVaa(); } getLatestVaaForPriceFeed(priceFeedId: string): VaaInfo | undefined { @@ -202,7 +201,7 @@ export class Listener implements PriceFeedVaaInfo { } isReady(): boolean { - let currentTime: Timestamp = (new Date()).getTime() / 1000; + let currentTime: TimestampInSec = (new Date()).getTime() / 1000; if (this.spyConnectionTime === undefined || currentTime < this.spyConnectionTime + this.readinessConfig.spySyncTimeSeconds) { return false; diff --git a/third_party/pyth/price-service/src/promClient.ts b/third_party/pyth/price-service/src/promClient.ts index 6e62309e..0f317b8d 100644 --- a/third_party/pyth/price-service/src/promClient.ts +++ b/third_party/pyth/price-service/src/promClient.ts @@ -1,5 +1,6 @@ import http = require("http"); import client = require("prom-client"); +import { DurationInSec } from "./helpers"; import { logger } from "./logging"; // NOTE: To create a new metric: @@ -7,16 +8,34 @@ import { logger } from "./logging"; // 2) Create a method to set the metric to a value (such as `incIncoming` function below) // 3) Register the metric using `register.registerMetric` function. +const SERVICE_PREFIX = "price_service__"; + export class PromClient { private register = new client.Registry(); - private walletReg = new client.Registry(); private collectDefaultMetrics = client.collectDefaultMetrics; // Actual metrics - private listenCounter = new client.Counter({ - name: "VAAs_received", + private receivedVaaCounter = new client.Counter({ + name: `${SERVICE_PREFIX}vaas_received`, help: "number of Pyth VAAs received", }); + private apiLatestVaaRequestsCounter = new client.Counter({ + name: `${SERVICE_PREFIX}api_latest_vaa_requests_received`, + help: "Number of requests for latest vaa of a price feed" + }); + private apiLatestVaaNotFoundResponseCounter = new client.Counter({ + name: `${SERVICE_PREFIX}api_latest_vaa_not_found_response`, + help: "Number of not found responses for latest vaa of a price feed request" + }); + private apiLatestVaaSuccessResponseCounter = new client.Counter({ + name: `${SERVICE_PREFIX}api_latest_vaa_not_found`, + help: "Number of successful responses for latest vaa of a price feed request" + }); + private apiLatestVaaFreshnessHistogram = new client.Histogram({ + name: `${SERVICE_PREFIX}api_latest_vaa_freshness`, + help: "Freshness time of Vaa (time difference of Vaa and request time)", + buckets: [1, 5, 10, 15, 30, 60, 120, 180] + }); // End metrics private server = http.createServer(async (req, res) => { @@ -24,7 +43,7 @@ export class PromClient { // Return all metrics in the Prometheus exposition format res.setHeader("Content-Type", this.register.contentType); res.write(await this.register.metrics()); - res.end(await this.walletReg.metrics()); + res.end(); } }); @@ -32,16 +51,36 @@ export class PromClient { this.register.setDefaultLabels({ app: config.name, }); - this.collectDefaultMetrics({ register: this.register }); + this.collectDefaultMetrics({ register: this.register, prefix: SERVICE_PREFIX }); // Register each metric - this.register.registerMetric(this.listenCounter); + this.register.registerMetric(this.receivedVaaCounter); + this.register.registerMetric(this.apiLatestVaaRequestsCounter); + this.register.registerMetric(this.apiLatestVaaNotFoundResponseCounter); + this.register.registerMetric(this.apiLatestVaaSuccessResponseCounter); + this.register.registerMetric(this.apiLatestVaaFreshnessHistogram); // End registering metric logger.info("prometheus client listening on port " + config.port); this.server.listen(config.port); } - incIncoming() { - this.listenCounter.inc(); + incReceivedVaa() { + this.receivedVaaCounter.inc(); + } + + incApiLatestVaaRequests() { + this.apiLatestVaaRequestsCounter.inc(); + } + + incApiLatestVaaNotFoundResponse() { + this.apiLatestVaaNotFoundResponseCounter.inc(); + } + + incApiLatestVaaSuccessResponse() { + this.apiLatestVaaSuccessResponseCounter.inc(); + } + + addApiLatestVaaFreshness(duration: DurationInSec) { + this.apiLatestVaaFreshnessHistogram.observe(duration); } } diff --git a/third_party/pyth/price-service/src/rest.ts b/third_party/pyth/price-service/src/rest.ts index 1e90ca9d..f160de05 100644 --- a/third_party/pyth/price-service/src/rest.ts +++ b/third_party/pyth/price-service/src/rest.ts @@ -3,19 +3,24 @@ import cors from "cors"; import { Request, Response } from "express"; import { PriceFeedVaaInfo } from "./listen"; import { logger } from "./logging"; +import { PromClient } from "./promClient"; +import { DurationInSec } from "./helpers"; export class RestAPI { private port: number; private priceFeedVaaInfo: PriceFeedVaaInfo; - private isReady: () => boolean; + private isReady: (() => boolean) | undefined; + private promClient: PromClient | undefined; constructor(config: { port: number; }, priceFeedVaaInfo: PriceFeedVaaInfo, - isReady: () => boolean) { + isReady?: () => boolean, + promClient?: PromClient) { this.port = config.port; this.priceFeedVaaInfo = priceFeedVaaInfo; this.isReady = isReady; + this.promClient = promClient; } // Run this function without blocking (`await`) if you want to run it async. @@ -27,29 +32,25 @@ export class RestAPI { logger.debug("listening on REST port " + this.port) ); - app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => { - let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id); - - if (latestVaa === undefined) { - res.sendStatus(404); - return; - } - - res.status(200); - res.write(latestVaa.vaaBytes); - res.end(); - }); - let endpoints: string[] = []; - + app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => { + this.promClient?.incApiLatestVaaRequests(); + logger.info(`Received latest_vaa_bytes request for ${req.params.price_feed_id}`) + let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id); if (latestVaa === undefined) { + this.promClient?.incApiLatestVaaNotFoundResponse(); res.sendStatus(404); return; } + this.promClient?.incApiLatestVaaSuccessResponse(); + + const freshness: DurationInSec = (new Date).getTime()/1000 - latestVaa.receiveTime; + this.promClient?.addApiLatestVaaFreshness(freshness); + res.status(200); res.write(latestVaa.vaaBytes); res.end();