From 27f9f75b79fdcc83f9ed70eabd6777f82c03a0ba Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 1 Dec 2022 18:37:02 +0100 Subject: [PATCH] [price-service] Fine-tune metrics (#401) - Remove nodejs default metrics. We don't use them. - Remove response time metric. - Remove freshness metric and add gap metric for attestationTime and publishTime. They are similar; however, freshness was measured upon user request but gap is measured upon receiving the next update. - Change receivedVaa to actually represent distinct vaa received. Prior to this, the older vaas, or vaas with same attestation time were not counted in this metric. This will also improve the performance. - Refactors the code a little. `vaaBytes` type was not string and was Buffer. It is fixed now. --- prometheus_config.yaml | 4 + .../pyth/price-service/package-lock.json | 90 +++++++++++++++---- third_party/pyth/price-service/package.json | 1 + .../price-service/src/__tests__/rest.test.ts | 3 +- .../price-service/src/__tests__/ws.test.ts | 3 +- third_party/pyth/price-service/src/listen.ts | 68 ++++++++++---- .../pyth/price-service/src/promClient.ts | 65 ++++++-------- third_party/pyth/price-service/src/rest.ts | 34 +------ 8 files changed, 161 insertions(+), 107 deletions(-) diff --git a/prometheus_config.yaml b/prometheus_config.yaml index 2c3ad33d..d7181ebc 100644 --- a/prometheus_config.yaml +++ b/prometheus_config.yaml @@ -3,3 +3,7 @@ scrape_configs: scrape_interval: 5s static_configs: - targets: ["p2w-attest:3000"] + - job_name: price_service + scrape_interval: 5s + static_configs: + - targets: ["pyth-price-service:8081"] diff --git a/third_party/pyth/price-service/package-lock.json b/third_party/pyth/price-service/package-lock.json index 2183b11a..8657d2c2 100644 --- a/third_party/pyth/price-service/package-lock.json +++ b/third_party/pyth/price-service/package-lock.json @@ -25,6 +25,7 @@ "express-validation": "^4.0.1", "http-status-codes": "^2.2.0", "joi": "^17.6.0", + "lru-cache": "^7.14.1", "morgan": "^1.10.0", "prom-client": "^14.0.1", "response-time": "^2.3.2", @@ -56,7 +57,7 @@ "dependencies": { "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", - "@pythnetwork/pyth-sdk-js": "^1.0.0" + "@pythnetwork/pyth-sdk-js": "^1.1.0" }, "devDependencies": { "@openzeppelin/contracts": "^4.2.0", @@ -6455,6 +6456,18 @@ "node": ">=8" } }, + "node_modules/jest-snapshot/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/jest-snapshot/node_modules/semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -6996,15 +7009,11 @@ } }, "node_modules/lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "dependencies": { - "yallist": "^4.0.0" - }, + "version": "7.14.1", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.14.1.tgz", + "integrity": "sha512-ysxwsnTKdAx96aTRdhDOCQfDgbHnt8SK0KY8SEjO0wHinhWOFTESbjVCMPbU1uGXg/ch4lifqx0wfjOawU2+WA==", "engines": { - "node": ">=10" + "node": ">=12" } }, "node_modules/make-dir": { @@ -8317,6 +8326,18 @@ } } }, + "node_modules/superagent/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/superagent/node_modules/mime": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/mime/-/mime-2.6.0.tgz", @@ -8618,6 +8639,18 @@ } } }, + "node_modules/ts-jest/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/ts-jest/node_modules/semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -10532,7 +10565,7 @@ "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", "@openzeppelin/contracts": "^4.2.0", - "@pythnetwork/pyth-sdk-js": "^1.0.0", + "@pythnetwork/pyth-sdk-js": "^1.1.0", "@typechain/ethers-v5": "^7.1.2", "@types/long": "^4.0.1", "@types/node": "^16.6.1", @@ -13857,6 +13890,15 @@ "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", "dev": true }, + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -14272,13 +14314,9 @@ } }, "lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "requires": { - "yallist": "^4.0.0" - } + "version": "7.14.1", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.14.1.tgz", + "integrity": "sha512-ysxwsnTKdAx96aTRdhDOCQfDgbHnt8SK0KY8SEjO0wHinhWOFTESbjVCMPbU1uGXg/ch4lifqx0wfjOawU2+WA==" }, "make-dir": { "version": "3.1.0", @@ -15251,6 +15289,15 @@ "ms": "2.1.2" } }, + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "mime": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/mime/-/mime-2.6.0.tgz", @@ -15467,6 +15514,15 @@ "yargs-parser": "^20.x" }, "dependencies": { + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", diff --git a/third_party/pyth/price-service/package.json b/third_party/pyth/price-service/package.json index 3646b061..0ffefd41 100644 --- a/third_party/pyth/price-service/package.json +++ b/third_party/pyth/price-service/package.json @@ -44,6 +44,7 @@ "express-validation": "^4.0.1", "http-status-codes": "^2.2.0", "joi": "^17.6.0", + "lru-cache": "^7.14.1", "morgan": "^1.10.0", "prom-client": "^14.0.1", "response-time": "^2.3.2", diff --git a/third_party/pyth/price-service/src/__tests__/rest.test.ts b/third_party/pyth/price-service/src/__tests__/rest.test.ts index b2a6ae86..d5a1c724 100644 --- a/third_party/pyth/price-service/src/__tests__/rest.test.ts +++ b/third_party/pyth/price-service/src/__tests__/rest.test.ts @@ -39,9 +39,10 @@ function dummyPriceInfoPair( id, { priceFeed: dummyPriceFeed(id), + publishTime: 0, attestationTime: 0, seqNum, - vaaBytes: Buffer.from(vaa, "hex").toString("binary"), + vaa: Buffer.from(vaa, "hex"), emitterChainId: 0, priceServiceReceiveTime: 0, }, diff --git a/third_party/pyth/price-service/src/__tests__/ws.test.ts b/third_party/pyth/price-service/src/__tests__/ws.test.ts index 8c4135b1..b0e8868e 100644 --- a/third_party/pyth/price-service/src/__tests__/ws.test.ts +++ b/third_party/pyth/price-service/src/__tests__/ws.test.ts @@ -40,10 +40,11 @@ function dummyPriceInfo( ): PriceInfo { return { seqNum: dummyPriceMetadataValue.sequence_number, + publishTime: 0, attestationTime: dummyPriceMetadataValue.attestation_time, emitterChainId: dummyPriceMetadataValue.emitter_chain, priceFeed: dummyPriceFeed(id), - vaaBytes: Buffer.from(vaa, "hex").toString("binary"), + vaa: Buffer.from(vaa, "hex"), priceServiceReceiveTime: dummyPriceMetadataValue.price_service_receive_time, }; } diff --git a/third_party/pyth/price-service/src/listen.ts b/third_party/pyth/price-service/src/listen.ts index e26bbb78..0a7e233b 100644 --- a/third_party/pyth/price-service/src/listen.ts +++ b/third_party/pyth/price-service/src/listen.ts @@ -1,8 +1,4 @@ -import { - ChainId, - hexToUint8Array, - uint8ArrayToHex, -} from "@certusone/wormhole-sdk"; +import { ChainId, uint8ArrayToHex } from "@certusone/wormhole-sdk"; import { createSpyRPCServiceClient, @@ -11,6 +7,8 @@ import { import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm"; +import { createHash } from "crypto"; + import { getBatchSummary, parseBatchPriceAttestation, @@ -25,10 +23,12 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; import { sleep, TimestampInSec } from "./helpers"; import { logger } from "./logging"; import { PromClient } from "./promClient"; +import LRUCache from "lru-cache"; export type PriceInfo = { - vaaBytes: string; + vaa: Buffer; seqNum: number; + publishTime: TimestampInSec; attestationTime: TimestampInSec; priceFeed: PriceFeed; emitterChainId: number; @@ -52,6 +52,8 @@ type ListenerConfig = { readiness: ListenerReadinessConfig; }; +type VaaHash = string; + export class Listener implements PriceStore { // Mapping of Price Feed Id to Vaa private priceFeedVaaMap = new Map(); @@ -61,6 +63,7 @@ export class Listener implements PriceStore { private spyConnectionTime: TimestampInSec | undefined; private readinessConfig: ListenerReadinessConfig; private updateCallbacks: ((priceInfo: PriceInfo) => any)[]; + private observedVaas: LRUCache; constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; @@ -68,6 +71,10 @@ export class Listener implements PriceStore { this.loadFilters(config.filtersRaw); this.readinessConfig = config.readiness; this.updateCallbacks = []; + this.observedVaas = new LRUCache({ + max: 10000, // At most 10000 items + ttl: 60 * 1000, // 60 seconds + }); } private loadFilters(filtersRaw?: string) { @@ -114,7 +121,7 @@ export class Listener implements PriceStore { ); stream = await subscribeSignedVAA(client, { filters: this.filters }); - stream!.on("data", ({ vaaBytes }: { vaaBytes: string }) => { + stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => { this.processVaa(vaaBytes); }); @@ -150,19 +157,29 @@ export class Listener implements PriceStore { } } - async processVaa(vaaBytes: string) { + async processVaa(vaa: Buffer) { const { parse_vaa } = await importCoreWasm(); - const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes)); + + const vaaHash: VaaHash = createHash("md5").update(vaa).digest("base64"); + + if (this.observedVaas.has(vaaHash)) { + return; + } + + this.observedVaas.set(vaaHash, true); + this.promClient?.incReceivedVaa(); + + const parsedVaa = parse_vaa(vaa); let batchAttestation; try { batchAttestation = await parseBatchPriceAttestation( - Buffer.from(parsedVAA.payload) + Buffer.from(parsedVaa.payload) ); } catch (e: any) { logger.error(e, e.stack); - logger.error("Parsing failed. Dropping vaa: %o", parsedVAA); + logger.error("Parsing failed. Dropping vaa: %o", parsedVaa); return; } @@ -194,15 +211,30 @@ export class Listener implements PriceStore { ) { const priceFeed = priceAttestationToPriceFeed(priceAttestation); const priceInfo = { - seqNum: parsedVAA.sequence, - vaaBytes, + seqNum: parsedVaa.sequence, + vaa, + publishTime: priceAttestation.publishTime, attestationTime: priceAttestation.attestationTime, priceFeed, - emitterChainId: parsedVAA.emitter_chain, + emitterChainId: parsedVaa.emitter_chain, priceServiceReceiveTime: Math.floor(new Date().getTime() / 1000), }; this.priceFeedVaaMap.set(key, priceInfo); + if (lastAttestationTime !== undefined) { + this.promClient?.addPriceUpdatesAttestationTimeGap( + priceAttestation.attestationTime - lastAttestationTime + ); + } + + const lastPublishTime = this.priceFeedVaaMap.get(key)?.publishTime; + + if (lastPublishTime !== undefined) { + this.promClient?.addPriceUpdatesPublishTimeGap( + priceAttestation.publishTime - lastPublishTime + ); + } + for (const callback of this.updateCallbacks) { callback(priceInfo); } @@ -211,16 +243,14 @@ export class Listener implements PriceStore { logger.info( "Parsed a new Batch Price Attestation: [" + - parsedVAA.emitter_chain + + parsedVaa.emitter_chain + ":" + - uint8ArrayToHex(parsedVAA.emitter_address) + + uint8ArrayToHex(parsedVaa.emitter_address) + "], seqNum: " + - parsedVAA.sequence + + parsedVaa.sequence + ", Batch Summary: " + getBatchSummary(batchAttestation) ); - - this.promClient?.incReceivedVaa(); } getLatestPriceInfo(priceFeedId: string): PriceInfo | undefined { diff --git a/third_party/pyth/price-service/src/promClient.ts b/third_party/pyth/price-service/src/promClient.ts index f3b69d03..c9b00e3d 100644 --- a/third_party/pyth/price-service/src/promClient.ts +++ b/third_party/pyth/price-service/src/promClient.ts @@ -1,4 +1,3 @@ -import { stat } from "fs"; import http = require("http"); import client = require("prom-client"); import { DurationInMs, DurationInSec } from "./helpers"; @@ -11,25 +10,30 @@ import { logger } from "./logging"; const SERVICE_PREFIX = "pyth__price_service__"; +type WebSocketInteractionType = + | "connection" + | "close" + | "timeout" + | "server_update" + | "client_message"; + export class PromClient { private register = new client.Registry(); - private collectDefaultMetrics = client.collectDefaultMetrics; // Actual metrics private receivedVaaCounter = new client.Counter({ name: `${SERVICE_PREFIX}vaas_received`, help: "number of Pyth VAAs received", }); - private apiResponseTimeSummary = new client.Summary({ - name: `${SERVICE_PREFIX}api_response_time_ms`, - help: "Response time of a VAA", - labelNames: ["path", "status"], + private priceUpdatesPublishTimeGapHistogram = new client.Histogram({ + name: `${SERVICE_PREFIX}price_updates_publish_time_gap_seconds`, + help: "Summary of publish time gaps between price updates", + buckets: [1, 3, 5, 10, 15, 30, 60, 120], }); - private apiRequestsPriceFreshnessHistogram = new client.Histogram({ - name: `${SERVICE_PREFIX}api_requests_price_freshness_seconds`, - help: "Freshness time of Vaa (time difference of Vaa and request time)", - buckets: [1, 5, 10, 15, 30, 60, 120, 180], - labelNames: ["path", "price_id"], + private priceUpdatesAttestationTimeGapHistogram = new client.Histogram({ + name: `${SERVICE_PREFIX}price_updates_attestation_time_gap_seconds`, + help: "Summary of attestation time gaps between price updates", + buckets: [1, 3, 5, 10, 15, 30, 60, 120], }); private webSocketInteractionCounter = new client.Counter({ name: `${SERVICE_PREFIX}websocket_interaction`, @@ -51,14 +55,10 @@ export class PromClient { this.register.setDefaultLabels({ app: config.name, }); - this.collectDefaultMetrics({ - register: this.register, - prefix: SERVICE_PREFIX, - }); // Register each metric this.register.registerMetric(this.receivedVaaCounter); - this.register.registerMetric(this.apiResponseTimeSummary); - this.register.registerMetric(this.apiRequestsPriceFreshnessHistogram); + this.register.registerMetric(this.priceUpdatesPublishTimeGapHistogram); + this.register.registerMetric(this.priceUpdatesAttestationTimeGapHistogram); this.register.registerMetric(this.webSocketInteractionCounter); // End registering metric @@ -70,31 +70,18 @@ export class PromClient { this.receivedVaaCounter.inc(); } - addResponseTime(path: string, status: number, duration: DurationInMs) { - this.apiResponseTimeSummary.observe( - { - path, - status, - }, - duration - ); + addPriceUpdatesPublishTimeGap(gap: DurationInSec) { + this.priceUpdatesPublishTimeGapHistogram.observe(gap); } - addApiRequestsPriceFreshness( - path: string, - priceId: string, - duration: DurationInSec + addPriceUpdatesAttestationTimeGap(gap: DurationInSec) { + this.priceUpdatesAttestationTimeGapHistogram.observe(gap); + } + + addWebSocketInteraction( + type: WebSocketInteractionType, + status: "ok" | "err" ) { - this.apiRequestsPriceFreshnessHistogram.observe( - { - path, - price_id: priceId, - }, - duration - ); - } - - addWebSocketInteraction(type: string, status: "ok" | "err") { this.webSocketInteractionCounter.inc({ type, status, diff --git a/third_party/pyth/price-service/src/rest.ts b/third_party/pyth/price-service/src/rest.ts index 2661eb01..2e29361c 100644 --- a/third_party/pyth/price-service/src/rest.ts +++ b/third_party/pyth/price-service/src/rest.ts @@ -63,14 +63,6 @@ export class RestAPI { app.use(morgan(MORGAN_LOG_FORMAT, { stream: winstonStream })); - app.use( - responseTime((req: Request, res: Response, time: DurationInMs) => { - if (res.statusCode !== StatusCodes.NOT_FOUND) { - this.promClient?.addResponseTime(req.path, res.statusCode, time); - } - }) - ); - const endpoints: string[] = []; const latestVaasInputSchema: schema = { @@ -88,7 +80,7 @@ export class RestAPI { // Multiple price ids might share same vaa, we use sequence number as // key of a vaa and deduplicate using a map of seqnum to vaa bytes. - const vaaMap = new Map(); + const vaaMap = new Map(); const notFoundIds: string[] = []; @@ -104,24 +96,15 @@ export class RestAPI { continue; } - const freshness: DurationInSec = - new Date().getTime() / 1000 - - latestPriceInfo.priceFeed.getPriceUnchecked().publishTime; - this.promClient?.addApiRequestsPriceFreshness( - req.path, - id, - freshness - ); - - vaaMap.set(latestPriceInfo.seqNum, latestPriceInfo.vaaBytes); + vaaMap.set(latestPriceInfo.seqNum, latestPriceInfo.vaa); } if (notFoundIds.length > 0) { throw RestException.PriceFeedIdNotFound(notFoundIds); } - const jsonResponse = Array.from(vaaMap.values(), (vaaBytes) => - Buffer.from(vaaBytes, "binary").toString("base64") + const jsonResponse = Array.from(vaaMap.values(), (vaa) => + vaa.toString("base64") ); res.json(jsonResponse); @@ -163,15 +146,6 @@ export class RestAPI { continue; } - const freshness: DurationInSec = - new Date().getTime() / 1000 - - latestPriceInfo.priceFeed.getEmaPriceUnchecked().publishTime; - this.promClient?.addApiRequestsPriceFreshness( - req.path, - id, - freshness - ); - if (verbose) { responseJson.push({ ...latestPriceInfo.priceFeed.toJson(),