diff --git a/prometheus_config.yaml b/prometheus_config.yaml index 2c3ad33d..d7181ebc 100644 --- a/prometheus_config.yaml +++ b/prometheus_config.yaml @@ -3,3 +3,7 @@ scrape_configs: scrape_interval: 5s static_configs: - targets: ["p2w-attest:3000"] + - job_name: price_service + scrape_interval: 5s + static_configs: + - targets: ["pyth-price-service:8081"] diff --git a/third_party/pyth/price-service/package-lock.json b/third_party/pyth/price-service/package-lock.json index 2183b11a..8657d2c2 100644 --- a/third_party/pyth/price-service/package-lock.json +++ b/third_party/pyth/price-service/package-lock.json @@ -25,6 +25,7 @@ "express-validation": "^4.0.1", "http-status-codes": "^2.2.0", "joi": "^17.6.0", + "lru-cache": "^7.14.1", "morgan": "^1.10.0", "prom-client": "^14.0.1", "response-time": "^2.3.2", @@ -56,7 +57,7 @@ "dependencies": { "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", - "@pythnetwork/pyth-sdk-js": "^1.0.0" + "@pythnetwork/pyth-sdk-js": "^1.1.0" }, "devDependencies": { "@openzeppelin/contracts": "^4.2.0", @@ -6455,6 +6456,18 @@ "node": ">=8" } }, + "node_modules/jest-snapshot/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/jest-snapshot/node_modules/semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -6996,15 +7009,11 @@ } }, "node_modules/lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "dependencies": { - "yallist": "^4.0.0" - }, + "version": "7.14.1", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.14.1.tgz", + "integrity": "sha512-ysxwsnTKdAx96aTRdhDOCQfDgbHnt8SK0KY8SEjO0wHinhWOFTESbjVCMPbU1uGXg/ch4lifqx0wfjOawU2+WA==", "engines": { - "node": ">=10" + "node": ">=12" } }, "node_modules/make-dir": { @@ -8317,6 +8326,18 @@ } } }, + "node_modules/superagent/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/superagent/node_modules/mime": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/mime/-/mime-2.6.0.tgz", @@ -8618,6 +8639,18 @@ } } }, + "node_modules/ts-jest/node_modules/lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "dependencies": { + "yallist": "^4.0.0" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/ts-jest/node_modules/semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -10532,7 +10565,7 @@ "@certusone/wormhole-sdk": "0.2.1", "@improbable-eng/grpc-web-node-http-transport": "^0.14.1", "@openzeppelin/contracts": "^4.2.0", - "@pythnetwork/pyth-sdk-js": "^1.0.0", + "@pythnetwork/pyth-sdk-js": "^1.1.0", "@typechain/ethers-v5": "^7.1.2", "@types/long": "^4.0.1", "@types/node": "^16.6.1", @@ -13857,6 +13890,15 @@ "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", "dev": true }, + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", @@ -14272,13 +14314,9 @@ } }, "lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "requires": { - "yallist": "^4.0.0" - } + "version": "7.14.1", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.14.1.tgz", + "integrity": "sha512-ysxwsnTKdAx96aTRdhDOCQfDgbHnt8SK0KY8SEjO0wHinhWOFTESbjVCMPbU1uGXg/ch4lifqx0wfjOawU2+WA==" }, "make-dir": { "version": "3.1.0", @@ -15251,6 +15289,15 @@ "ms": "2.1.2" } }, + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "mime": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/mime/-/mime-2.6.0.tgz", @@ -15467,6 +15514,15 @@ "yargs-parser": "^20.x" }, "dependencies": { + "lru-cache": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", + "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, + "requires": { + "yallist": "^4.0.0" + } + }, "semver": { "version": "7.3.7", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.7.tgz", diff --git a/third_party/pyth/price-service/package.json b/third_party/pyth/price-service/package.json index 3646b061..0ffefd41 100644 --- a/third_party/pyth/price-service/package.json +++ b/third_party/pyth/price-service/package.json @@ -44,6 +44,7 @@ "express-validation": "^4.0.1", "http-status-codes": "^2.2.0", "joi": "^17.6.0", + "lru-cache": "^7.14.1", "morgan": "^1.10.0", "prom-client": "^14.0.1", "response-time": "^2.3.2", diff --git a/third_party/pyth/price-service/src/__tests__/rest.test.ts b/third_party/pyth/price-service/src/__tests__/rest.test.ts index b2a6ae86..d5a1c724 100644 --- a/third_party/pyth/price-service/src/__tests__/rest.test.ts +++ b/third_party/pyth/price-service/src/__tests__/rest.test.ts @@ -39,9 +39,10 @@ function dummyPriceInfoPair( id, { priceFeed: dummyPriceFeed(id), + publishTime: 0, attestationTime: 0, seqNum, - vaaBytes: Buffer.from(vaa, "hex").toString("binary"), + vaa: Buffer.from(vaa, "hex"), emitterChainId: 0, priceServiceReceiveTime: 0, }, diff --git a/third_party/pyth/price-service/src/__tests__/ws.test.ts b/third_party/pyth/price-service/src/__tests__/ws.test.ts index 8c4135b1..b0e8868e 100644 --- a/third_party/pyth/price-service/src/__tests__/ws.test.ts +++ b/third_party/pyth/price-service/src/__tests__/ws.test.ts @@ -40,10 +40,11 @@ function dummyPriceInfo( ): PriceInfo { return { seqNum: dummyPriceMetadataValue.sequence_number, + publishTime: 0, attestationTime: dummyPriceMetadataValue.attestation_time, emitterChainId: dummyPriceMetadataValue.emitter_chain, priceFeed: dummyPriceFeed(id), - vaaBytes: Buffer.from(vaa, "hex").toString("binary"), + vaa: Buffer.from(vaa, "hex"), priceServiceReceiveTime: dummyPriceMetadataValue.price_service_receive_time, }; } diff --git a/third_party/pyth/price-service/src/listen.ts b/third_party/pyth/price-service/src/listen.ts index e26bbb78..0a7e233b 100644 --- a/third_party/pyth/price-service/src/listen.ts +++ b/third_party/pyth/price-service/src/listen.ts @@ -1,8 +1,4 @@ -import { - ChainId, - hexToUint8Array, - uint8ArrayToHex, -} from "@certusone/wormhole-sdk"; +import { ChainId, uint8ArrayToHex } from "@certusone/wormhole-sdk"; import { createSpyRPCServiceClient, @@ -11,6 +7,8 @@ import { import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm"; +import { createHash } from "crypto"; + import { getBatchSummary, parseBatchPriceAttestation, @@ -25,10 +23,12 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; import { sleep, TimestampInSec } from "./helpers"; import { logger } from "./logging"; import { PromClient } from "./promClient"; +import LRUCache from "lru-cache"; export type PriceInfo = { - vaaBytes: string; + vaa: Buffer; seqNum: number; + publishTime: TimestampInSec; attestationTime: TimestampInSec; priceFeed: PriceFeed; emitterChainId: number; @@ -52,6 +52,8 @@ type ListenerConfig = { readiness: ListenerReadinessConfig; }; +type VaaHash = string; + export class Listener implements PriceStore { // Mapping of Price Feed Id to Vaa private priceFeedVaaMap = new Map(); @@ -61,6 +63,7 @@ export class Listener implements PriceStore { private spyConnectionTime: TimestampInSec | undefined; private readinessConfig: ListenerReadinessConfig; private updateCallbacks: ((priceInfo: PriceInfo) => any)[]; + private observedVaas: LRUCache; constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; @@ -68,6 +71,10 @@ export class Listener implements PriceStore { this.loadFilters(config.filtersRaw); this.readinessConfig = config.readiness; this.updateCallbacks = []; + this.observedVaas = new LRUCache({ + max: 10000, // At most 10000 items + ttl: 60 * 1000, // 60 seconds + }); } private loadFilters(filtersRaw?: string) { @@ -114,7 +121,7 @@ export class Listener implements PriceStore { ); stream = await subscribeSignedVAA(client, { filters: this.filters }); - stream!.on("data", ({ vaaBytes }: { vaaBytes: string }) => { + stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => { this.processVaa(vaaBytes); }); @@ -150,19 +157,29 @@ export class Listener implements PriceStore { } } - async processVaa(vaaBytes: string) { + async processVaa(vaa: Buffer) { const { parse_vaa } = await importCoreWasm(); - const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes)); + + const vaaHash: VaaHash = createHash("md5").update(vaa).digest("base64"); + + if (this.observedVaas.has(vaaHash)) { + return; + } + + this.observedVaas.set(vaaHash, true); + this.promClient?.incReceivedVaa(); + + const parsedVaa = parse_vaa(vaa); let batchAttestation; try { batchAttestation = await parseBatchPriceAttestation( - Buffer.from(parsedVAA.payload) + Buffer.from(parsedVaa.payload) ); } catch (e: any) { logger.error(e, e.stack); - logger.error("Parsing failed. Dropping vaa: %o", parsedVAA); + logger.error("Parsing failed. Dropping vaa: %o", parsedVaa); return; } @@ -194,15 +211,30 @@ export class Listener implements PriceStore { ) { const priceFeed = priceAttestationToPriceFeed(priceAttestation); const priceInfo = { - seqNum: parsedVAA.sequence, - vaaBytes, + seqNum: parsedVaa.sequence, + vaa, + publishTime: priceAttestation.publishTime, attestationTime: priceAttestation.attestationTime, priceFeed, - emitterChainId: parsedVAA.emitter_chain, + emitterChainId: parsedVaa.emitter_chain, priceServiceReceiveTime: Math.floor(new Date().getTime() / 1000), }; this.priceFeedVaaMap.set(key, priceInfo); + if (lastAttestationTime !== undefined) { + this.promClient?.addPriceUpdatesAttestationTimeGap( + priceAttestation.attestationTime - lastAttestationTime + ); + } + + const lastPublishTime = this.priceFeedVaaMap.get(key)?.publishTime; + + if (lastPublishTime !== undefined) { + this.promClient?.addPriceUpdatesPublishTimeGap( + priceAttestation.publishTime - lastPublishTime + ); + } + for (const callback of this.updateCallbacks) { callback(priceInfo); } @@ -211,16 +243,14 @@ export class Listener implements PriceStore { logger.info( "Parsed a new Batch Price Attestation: [" + - parsedVAA.emitter_chain + + parsedVaa.emitter_chain + ":" + - uint8ArrayToHex(parsedVAA.emitter_address) + + uint8ArrayToHex(parsedVaa.emitter_address) + "], seqNum: " + - parsedVAA.sequence + + parsedVaa.sequence + ", Batch Summary: " + getBatchSummary(batchAttestation) ); - - this.promClient?.incReceivedVaa(); } getLatestPriceInfo(priceFeedId: string): PriceInfo | undefined { diff --git a/third_party/pyth/price-service/src/promClient.ts b/third_party/pyth/price-service/src/promClient.ts index f3b69d03..c9b00e3d 100644 --- a/third_party/pyth/price-service/src/promClient.ts +++ b/third_party/pyth/price-service/src/promClient.ts @@ -1,4 +1,3 @@ -import { stat } from "fs"; import http = require("http"); import client = require("prom-client"); import { DurationInMs, DurationInSec } from "./helpers"; @@ -11,25 +10,30 @@ import { logger } from "./logging"; const SERVICE_PREFIX = "pyth__price_service__"; +type WebSocketInteractionType = + | "connection" + | "close" + | "timeout" + | "server_update" + | "client_message"; + export class PromClient { private register = new client.Registry(); - private collectDefaultMetrics = client.collectDefaultMetrics; // Actual metrics private receivedVaaCounter = new client.Counter({ name: `${SERVICE_PREFIX}vaas_received`, help: "number of Pyth VAAs received", }); - private apiResponseTimeSummary = new client.Summary({ - name: `${SERVICE_PREFIX}api_response_time_ms`, - help: "Response time of a VAA", - labelNames: ["path", "status"], + private priceUpdatesPublishTimeGapHistogram = new client.Histogram({ + name: `${SERVICE_PREFIX}price_updates_publish_time_gap_seconds`, + help: "Summary of publish time gaps between price updates", + buckets: [1, 3, 5, 10, 15, 30, 60, 120], }); - private apiRequestsPriceFreshnessHistogram = new client.Histogram({ - name: `${SERVICE_PREFIX}api_requests_price_freshness_seconds`, - help: "Freshness time of Vaa (time difference of Vaa and request time)", - buckets: [1, 5, 10, 15, 30, 60, 120, 180], - labelNames: ["path", "price_id"], + private priceUpdatesAttestationTimeGapHistogram = new client.Histogram({ + name: `${SERVICE_PREFIX}price_updates_attestation_time_gap_seconds`, + help: "Summary of attestation time gaps between price updates", + buckets: [1, 3, 5, 10, 15, 30, 60, 120], }); private webSocketInteractionCounter = new client.Counter({ name: `${SERVICE_PREFIX}websocket_interaction`, @@ -51,14 +55,10 @@ export class PromClient { this.register.setDefaultLabels({ app: config.name, }); - this.collectDefaultMetrics({ - register: this.register, - prefix: SERVICE_PREFIX, - }); // Register each metric this.register.registerMetric(this.receivedVaaCounter); - this.register.registerMetric(this.apiResponseTimeSummary); - this.register.registerMetric(this.apiRequestsPriceFreshnessHistogram); + this.register.registerMetric(this.priceUpdatesPublishTimeGapHistogram); + this.register.registerMetric(this.priceUpdatesAttestationTimeGapHistogram); this.register.registerMetric(this.webSocketInteractionCounter); // End registering metric @@ -70,31 +70,18 @@ export class PromClient { this.receivedVaaCounter.inc(); } - addResponseTime(path: string, status: number, duration: DurationInMs) { - this.apiResponseTimeSummary.observe( - { - path, - status, - }, - duration - ); + addPriceUpdatesPublishTimeGap(gap: DurationInSec) { + this.priceUpdatesPublishTimeGapHistogram.observe(gap); } - addApiRequestsPriceFreshness( - path: string, - priceId: string, - duration: DurationInSec + addPriceUpdatesAttestationTimeGap(gap: DurationInSec) { + this.priceUpdatesAttestationTimeGapHistogram.observe(gap); + } + + addWebSocketInteraction( + type: WebSocketInteractionType, + status: "ok" | "err" ) { - this.apiRequestsPriceFreshnessHistogram.observe( - { - path, - price_id: priceId, - }, - duration - ); - } - - addWebSocketInteraction(type: string, status: "ok" | "err") { this.webSocketInteractionCounter.inc({ type, status, diff --git a/third_party/pyth/price-service/src/rest.ts b/third_party/pyth/price-service/src/rest.ts index 2661eb01..2e29361c 100644 --- a/third_party/pyth/price-service/src/rest.ts +++ b/third_party/pyth/price-service/src/rest.ts @@ -63,14 +63,6 @@ export class RestAPI { app.use(morgan(MORGAN_LOG_FORMAT, { stream: winstonStream })); - app.use( - responseTime((req: Request, res: Response, time: DurationInMs) => { - if (res.statusCode !== StatusCodes.NOT_FOUND) { - this.promClient?.addResponseTime(req.path, res.statusCode, time); - } - }) - ); - const endpoints: string[] = []; const latestVaasInputSchema: schema = { @@ -88,7 +80,7 @@ export class RestAPI { // Multiple price ids might share same vaa, we use sequence number as // key of a vaa and deduplicate using a map of seqnum to vaa bytes. - const vaaMap = new Map(); + const vaaMap = new Map(); const notFoundIds: string[] = []; @@ -104,24 +96,15 @@ export class RestAPI { continue; } - const freshness: DurationInSec = - new Date().getTime() / 1000 - - latestPriceInfo.priceFeed.getPriceUnchecked().publishTime; - this.promClient?.addApiRequestsPriceFreshness( - req.path, - id, - freshness - ); - - vaaMap.set(latestPriceInfo.seqNum, latestPriceInfo.vaaBytes); + vaaMap.set(latestPriceInfo.seqNum, latestPriceInfo.vaa); } if (notFoundIds.length > 0) { throw RestException.PriceFeedIdNotFound(notFoundIds); } - const jsonResponse = Array.from(vaaMap.values(), (vaaBytes) => - Buffer.from(vaaBytes, "binary").toString("base64") + const jsonResponse = Array.from(vaaMap.values(), (vaa) => + vaa.toString("base64") ); res.json(jsonResponse); @@ -163,15 +146,6 @@ export class RestAPI { continue; } - const freshness: DurationInSec = - new Date().getTime() / 1000 - - latestPriceInfo.priceFeed.getEmaPriceUnchecked().publishTime; - this.promClient?.addApiRequestsPriceFreshness( - req.path, - id, - freshness - ); - if (verbose) { responseJson.push({ ...latestPriceInfo.priceFeed.toJson(),