Add metrics to pyth price service (#185)

This commit is contained in:
Ali Behjati 2022-04-13 18:48:56 +01:00 committed by GitHub
parent 2dd5357b46
commit 58a5317bc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 37 deletions

View File

@ -1,3 +1,7 @@
// Time in seconds
export type TimestampInSec = number;
export type DurationInSec = number;
export function sleep(ms: number) { export function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }

View File

@ -21,7 +21,7 @@ setDefaultWasm("node");
initLogger({logLevel: process.env.LOG_LEVEL}); initLogger({logLevel: process.env.LOG_LEVEL});
const promClient = new PromClient({ const promClient = new PromClient({
name: "pyth_relay", name: "price_service",
port: parseInt(envOrErr("PROM_PORT")) port: parseInt(envOrErr("PROM_PORT"))
}); });
@ -39,7 +39,7 @@ const isReady = () => listener.isReady();
const restAPI = new RestAPI({ const restAPI = new RestAPI({
port: parseInt(envOrErr("REST_PORT")) port: parseInt(envOrErr("REST_PORT"))
}, listener, isReady); }, listener, isReady, promClient);
listener.run(); listener.run();
restAPI.run(); restAPI.run();

View File

@ -11,19 +11,17 @@ import {
import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm"; import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import { envOrErr, sleep } from "./helpers"; import { envOrErr, sleep, TimestampInSec } from "./helpers";
import { PromClient } from "./promClient"; import { PromClient } from "./promClient";
import { getBatchSummary, parseBatchPriceAttestation } from "@certusone/p2w-sdk"; import { getBatchSummary, parseBatchPriceAttestation } from "@certusone/p2w-sdk";
import { ClientReadableStream } from "@grpc/grpc-js"; import { ClientReadableStream } from "@grpc/grpc-js";
import { FilterEntry, SubscribeSignedVAAResponse } from "@certusone/wormhole-spydk/lib/cjs/proto/spy/v1/spy"; import { FilterEntry, SubscribeSignedVAAResponse } from "@certusone/wormhole-spydk/lib/cjs/proto/spy/v1/spy";
import { logger } from "./logging"; import { logger } from "./logging";
// Timestamp (in seconds)
type Timestamp = number;
export type VaaInfo = { export type VaaInfo = {
vaaBytes: string, vaaBytes: string,
seqNum: number; seqNum: number,
receiveTime: TimestampInSec,
}; };
export interface PriceFeedVaaInfo { export interface PriceFeedVaaInfo {
@ -44,13 +42,13 @@ type ListenerConfig = {
export class Listener implements PriceFeedVaaInfo { export class Listener implements PriceFeedVaaInfo {
// Mapping of Price Feed Id to Vaa // Mapping of Price Feed Id to Vaa
private priceFeedVaaMap = new Map<string, VaaInfo>(); private priceFeedVaaMap = new Map<string, VaaInfo>();
private promClient: PromClient; private promClient: PromClient | undefined;
private spyServiceHost: string; private spyServiceHost: string;
private filters: FilterEntry[] = []; private filters: FilterEntry[] = [];
private spyConnectionTime: Timestamp | undefined; private spyConnectionTime: TimestampInSec | undefined;
private readinessConfig: ListenerReadinessConfig; private readinessConfig: ListenerReadinessConfig;
constructor(config: ListenerConfig, promClient: PromClient) { constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient; this.promClient = promClient;
this.spyServiceHost = config.spyServiceHost; this.spyServiceHost = config.spyServiceHost;
this.loadFilters(config.filtersRaw); this.loadFilters(config.filtersRaw);
@ -178,7 +176,8 @@ export class Listener implements PriceFeedVaaInfo {
if (lastSeqNum === undefined || lastSeqNum < parsedVAA.sequence) { if (lastSeqNum === undefined || lastSeqNum < parsedVAA.sequence) {
this.priceFeedVaaMap.set(key, { this.priceFeedVaaMap.set(key, {
seqNum: parsedVAA.sequence, seqNum: parsedVAA.sequence,
vaaBytes: vaaBytes vaaBytes: vaaBytes,
receiveTime: (new Date()).getTime() / 1000,
}); });
} }
} }
@ -194,7 +193,7 @@ export class Listener implements PriceFeedVaaInfo {
getBatchSummary(batchAttestation) getBatchSummary(batchAttestation)
); );
this.promClient.incIncoming(); this.promClient?.incReceivedVaa();
} }
getLatestVaaForPriceFeed(priceFeedId: string): VaaInfo | undefined { getLatestVaaForPriceFeed(priceFeedId: string): VaaInfo | undefined {
@ -202,7 +201,7 @@ export class Listener implements PriceFeedVaaInfo {
} }
isReady(): boolean { isReady(): boolean {
let currentTime: Timestamp = (new Date()).getTime() / 1000; let currentTime: TimestampInSec = (new Date()).getTime() / 1000;
if (this.spyConnectionTime === undefined || if (this.spyConnectionTime === undefined ||
currentTime < this.spyConnectionTime + this.readinessConfig.spySyncTimeSeconds) { currentTime < this.spyConnectionTime + this.readinessConfig.spySyncTimeSeconds) {
return false; return false;

View File

@ -1,5 +1,6 @@
import http = require("http"); import http = require("http");
import client = require("prom-client"); import client = require("prom-client");
import { DurationInSec } from "./helpers";
import { logger } from "./logging"; import { logger } from "./logging";
// NOTE: To create a new metric: // NOTE: To create a new metric:
@ -7,16 +8,34 @@ import { logger } from "./logging";
// 2) Create a method to set the metric to a value (such as `incIncoming` function below) // 2) Create a method to set the metric to a value (such as `incIncoming` function below)
// 3) Register the metric using `register.registerMetric` function. // 3) Register the metric using `register.registerMetric` function.
const SERVICE_PREFIX = "price_service__";
export class PromClient { export class PromClient {
private register = new client.Registry(); private register = new client.Registry();
private walletReg = new client.Registry();
private collectDefaultMetrics = client.collectDefaultMetrics; private collectDefaultMetrics = client.collectDefaultMetrics;
// Actual metrics // Actual metrics
private listenCounter = new client.Counter({ private receivedVaaCounter = new client.Counter({
name: "VAAs_received", name: `${SERVICE_PREFIX}vaas_received`,
help: "number of Pyth VAAs received", help: "number of Pyth VAAs received",
}); });
private apiLatestVaaRequestsCounter = new client.Counter({
name: `${SERVICE_PREFIX}api_latest_vaa_requests_received`,
help: "Number of requests for latest vaa of a price feed"
});
private apiLatestVaaNotFoundResponseCounter = new client.Counter({
name: `${SERVICE_PREFIX}api_latest_vaa_not_found_response`,
help: "Number of not found responses for latest vaa of a price feed request"
});
private apiLatestVaaSuccessResponseCounter = new client.Counter({
name: `${SERVICE_PREFIX}api_latest_vaa_not_found`,
help: "Number of successful responses for latest vaa of a price feed request"
});
private apiLatestVaaFreshnessHistogram = new client.Histogram({
name: `${SERVICE_PREFIX}api_latest_vaa_freshness`,
help: "Freshness time of Vaa (time difference of Vaa and request time)",
buckets: [1, 5, 10, 15, 30, 60, 120, 180]
});
// End metrics // End metrics
private server = http.createServer(async (req, res) => { private server = http.createServer(async (req, res) => {
@ -24,7 +43,7 @@ export class PromClient {
// Return all metrics in the Prometheus exposition format // Return all metrics in the Prometheus exposition format
res.setHeader("Content-Type", this.register.contentType); res.setHeader("Content-Type", this.register.contentType);
res.write(await this.register.metrics()); res.write(await this.register.metrics());
res.end(await this.walletReg.metrics()); res.end();
} }
}); });
@ -32,16 +51,36 @@ export class PromClient {
this.register.setDefaultLabels({ this.register.setDefaultLabels({
app: config.name, app: config.name,
}); });
this.collectDefaultMetrics({ register: this.register }); this.collectDefaultMetrics({ register: this.register, prefix: SERVICE_PREFIX });
// Register each metric // Register each metric
this.register.registerMetric(this.listenCounter); this.register.registerMetric(this.receivedVaaCounter);
this.register.registerMetric(this.apiLatestVaaRequestsCounter);
this.register.registerMetric(this.apiLatestVaaNotFoundResponseCounter);
this.register.registerMetric(this.apiLatestVaaSuccessResponseCounter);
this.register.registerMetric(this.apiLatestVaaFreshnessHistogram);
// End registering metric // End registering metric
logger.info("prometheus client listening on port " + config.port); logger.info("prometheus client listening on port " + config.port);
this.server.listen(config.port); this.server.listen(config.port);
} }
incIncoming() { incReceivedVaa() {
this.listenCounter.inc(); this.receivedVaaCounter.inc();
}
incApiLatestVaaRequests() {
this.apiLatestVaaRequestsCounter.inc();
}
incApiLatestVaaNotFoundResponse() {
this.apiLatestVaaNotFoundResponseCounter.inc();
}
incApiLatestVaaSuccessResponse() {
this.apiLatestVaaSuccessResponseCounter.inc();
}
addApiLatestVaaFreshness(duration: DurationInSec) {
this.apiLatestVaaFreshnessHistogram.observe(duration);
} }
} }

View File

@ -3,19 +3,24 @@ import cors from "cors";
import { Request, Response } from "express"; import { Request, Response } from "express";
import { PriceFeedVaaInfo } from "./listen"; import { PriceFeedVaaInfo } from "./listen";
import { logger } from "./logging"; import { logger } from "./logging";
import { PromClient } from "./promClient";
import { DurationInSec } from "./helpers";
export class RestAPI { export class RestAPI {
private port: number; private port: number;
private priceFeedVaaInfo: PriceFeedVaaInfo; private priceFeedVaaInfo: PriceFeedVaaInfo;
private isReady: () => boolean; private isReady: (() => boolean) | undefined;
private promClient: PromClient | undefined;
constructor(config: { port: number; }, constructor(config: { port: number; },
priceFeedVaaInfo: PriceFeedVaaInfo, priceFeedVaaInfo: PriceFeedVaaInfo,
isReady: () => boolean) { isReady?: () => boolean,
promClient?: PromClient) {
this.port = config.port; this.port = config.port;
this.priceFeedVaaInfo = priceFeedVaaInfo; this.priceFeedVaaInfo = priceFeedVaaInfo;
this.isReady = isReady; this.isReady = isReady;
this.promClient = promClient;
} }
// Run this function without blocking (`await`) if you want to run it async. // Run this function without blocking (`await`) if you want to run it async.
@ -27,29 +32,25 @@ export class RestAPI {
logger.debug("listening on REST port " + this.port) logger.debug("listening on REST port " + this.port)
); );
app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => {
let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id);
if (latestVaa === undefined) {
res.sendStatus(404);
return;
}
res.status(200);
res.write(latestVaa.vaaBytes);
res.end();
});
let endpoints: string[] = []; let endpoints: string[] = [];
app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => { app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => {
this.promClient?.incApiLatestVaaRequests();
logger.info(`Received latest_vaa_bytes request for ${req.params.price_feed_id}`)
let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id); let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id);
if (latestVaa === undefined) { if (latestVaa === undefined) {
this.promClient?.incApiLatestVaaNotFoundResponse();
res.sendStatus(404); res.sendStatus(404);
return; return;
} }
this.promClient?.incApiLatestVaaSuccessResponse();
const freshness: DurationInSec = (new Date).getTime()/1000 - latestVaa.receiveTime;
this.promClient?.addApiLatestVaaFreshness(freshness);
res.status(200); res.status(200);
res.write(latestVaa.vaaBytes); res.write(latestVaa.vaaBytes);
res.end(); res.end();