From 5e8b9f868a1d51406ef90d9e761787561b38bb51 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Tue, 17 Jan 2023 18:14:07 +0100 Subject: [PATCH] [price-service] Add get_vaa_ccip endpoint (#500) * [price-service] Refactor + bugfix * [price-service] Add get_vaa_ccip endpoint * [price-service] Refactor + add retry * [price-service] Address feedback + handle api error --- price-service/package-lock.json | 14 +++ price-service/package.json | 1 + price-service/src/helpers.ts | 18 ++++ price-service/src/index.ts | 8 +- price-service/src/listen.ts | 28 +++--- price-service/src/rest.ts | 157 ++++++++++++++++++++++++++------ 6 files changed, 184 insertions(+), 42 deletions(-) diff --git a/price-service/package-lock.json b/price-service/package-lock.json index db5c4363..b11eb704 100644 --- a/price-service/package-lock.json +++ b/price-service/package-lock.json @@ -30,6 +30,7 @@ "node-fetch": "^2.6.1", "prom-client": "^14.0.1", "response-time": "^2.3.2", + "ts-retry-promise": "^0.7.0", "winston": "^3.3.3", "ws": "^8.6.0" }, @@ -9861,6 +9862,14 @@ "node": ">=10" } }, + "node_modules/ts-retry-promise": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/ts-retry-promise/-/ts-retry-promise-0.7.0.tgz", + "integrity": "sha512-x6yWZXC4BfXy4UyMweOFvbS1yJ/Y5biSz/mEPiILtJZLrqD3ZxIpzVOGGgifHHdaSe3WxzFRtsRbychI6zofOg==", + "engines": { + "node": ">=6" + } + }, "node_modules/tslib": { "version": "2.4.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz", @@ -17811,6 +17820,11 @@ } } }, + "ts-retry-promise": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/ts-retry-promise/-/ts-retry-promise-0.7.0.tgz", + "integrity": "sha512-x6yWZXC4BfXy4UyMweOFvbS1yJ/Y5biSz/mEPiILtJZLrqD3ZxIpzVOGGgifHHdaSe3WxzFRtsRbychI6zofOg==" + }, "tslib": { "version": "2.4.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz", diff --git a/price-service/package.json b/price-service/package.json index 14d83ba6..9307ae70 100644 --- a/price-service/package.json +++ b/price-service/package.json @@ -50,6 +50,7 @@ "node-fetch": "^2.6.1", "prom-client": "^14.0.1", "response-time": "^2.3.2", + "ts-retry-promise": "^0.7.0", "winston": "^3.3.3", "ws": "^8.6.0" }, diff --git a/price-service/src/helpers.ts b/price-service/src/helpers.ts index 2793327b..068b414a 100644 --- a/price-service/src/helpers.ts +++ b/price-service/src/helpers.ts @@ -15,3 +15,21 @@ export function envOrErr(env: string): string { } return String(process.env[env]); } + +export function parseToOptionalNumber( + s: string | undefined +): number | undefined { + if (s === undefined) { + return undefined; + } + + return parseInt(s, 10); +} + +export function removeLeading0x(s: string): string { + if (s.startsWith("0x")) { + return s.substring(2); + } + + return s; +} diff --git a/price-service/src/index.ts b/price-service/src/index.ts index 0f329a42..d4463d28 100644 --- a/price-service/src/index.ts +++ b/price-service/src/index.ts @@ -1,4 +1,4 @@ -import { envOrErr } from "./helpers"; +import { envOrErr, parseToOptionalNumber } from "./helpers"; import { Listener } from "./listen"; import { initLogger } from "./logging"; import { PromClient } from "./promClient"; @@ -38,6 +38,10 @@ async function run() { 10 ), }, + cacheCleanupLoopInterval: parseToOptionalNumber( + process.env.REMOVE_EXPIRED_VALUES_INTERVAL_SECONDS + ), + cacheTtl: parseToOptionalNumber(process.env.CACHE_TTL_SECONDS), }, promClient ); @@ -59,7 +63,7 @@ async function run() { const wsAPI = new WebSocketAPI(listener, promClient); listener.run(); - listener.runCacheCleanupLoop(); + const server = await restAPI.run(); wsAPI.run(server); } diff --git a/price-service/src/listen.ts b/price-service/src/listen.ts index 750c9765..bcb164f3 100644 --- a/price-service/src/listen.ts +++ b/price-service/src/listen.ts @@ -17,7 +17,7 @@ import { } from "@pythnetwork/p2w-sdk-js"; import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js"; import LRUCache from "lru-cache"; -import { sleep, TimestampInSec } from "./helpers"; +import { DurationInSec, sleep, TimestampInSec } from "./helpers"; import { logger } from "./logging"; import { PromClient } from "./promClient"; @@ -49,11 +49,13 @@ type ListenerConfig = { readiness: ListenerReadinessConfig; webApiEndpoint?: string; webApiCluster?: string; + cacheCleanupLoopInterval?: DurationInSec; + cacheTtl?: DurationInSec; }; type VaaKey = string; -type VaaConfig = { +export type VaaConfig = { publishTime: number; vaa: string; }; @@ -62,7 +64,7 @@ export class VaaCache { private cache: Map; private ttl: number; - constructor(ttl: number = 300) { + constructor(ttl: DurationInSec = 300) { this.cache = new Map(); this.ttl = ttl; } @@ -85,6 +87,9 @@ export class VaaCache { } find(arr: VaaConfig[], publishTime: number): VaaConfig | undefined { + // If the publishTime is less than the first element we are + // not sure that this VAA is actually the first VAA after that + // time. if (arr.length === 0 || publishTime < arr[0].publishTime) { return undefined; } @@ -126,6 +131,7 @@ export class Listener implements PriceStore { private updateCallbacks: ((priceInfo: PriceInfo) => any)[]; private observedVaas: LRUCache; private vaasCache: VaaCache; + private cacheCleanupLoopInterval: DurationInSec; constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; @@ -137,7 +143,8 @@ export class Listener implements PriceStore { max: 10000, // At most 10000 items ttl: 60 * 1000, // 60 seconds }); - this.vaasCache = new VaaCache(); + this.vaasCache = new VaaCache(config.cacheTtl); + this.cacheCleanupLoopInterval = config.cacheCleanupLoopInterval ?? 60; } private loadFilters(filtersRaw?: string) { @@ -170,22 +177,21 @@ export class Listener implements PriceStore { logger.info("loaded " + this.filters.length + " filters"); } - async runCacheCleanupLoop(interval: number = 60) { - setInterval(this.vaasCache.removeExpiredValues, interval * 1000); - } - async run() { logger.info( "pyth_relay starting up, will listen for signed VAAs from " + this.spyServiceHost ); + setInterval( + this.vaasCache.removeExpiredValues.bind(this.vaasCache), + this.cacheCleanupLoopInterval * 1000 + ); + while (true) { let stream: ClientReadableStream | undefined; try { - const client = createSpyRPCServiceClient( - process.env.SPY_SERVICE_HOST || "" - ); + const client = createSpyRPCServiceClient(this.spyServiceHost); stream = await subscribeSignedVAA(client, { filters: this.filters }); stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => { diff --git a/price-service/src/rest.ts b/price-service/src/rest.ts index aeaf9f29..870885f1 100644 --- a/price-service/src/rest.ts +++ b/price-service/src/rest.ts @@ -6,10 +6,11 @@ import { Server } from "http"; import { StatusCodes } from "http-status-codes"; import morgan from "morgan"; import fetch from "node-fetch"; -import { TimestampInSec } from "./helpers"; -import { PriceStore } from "./listen"; +import { removeLeading0x, TimestampInSec } from "./helpers"; +import { PriceStore, VaaConfig } from "./listen"; import { logger } from "./logging"; import { PromClient } from "./promClient"; +import { retry } from "ts-retry-promise"; const MORGAN_LOG_FORMAT = ':remote-addr - :remote-user ":method :url HTTP/:http-version"' + @@ -27,9 +28,25 @@ export class RestException extends Error { static PriceFeedIdNotFound(notFoundIds: string[]): RestException { return new RestException( StatusCodes.BAD_REQUEST, - `Price Feeds with ids ${notFoundIds.join(", ")} not found` + `Price Feed(s) with id(s) ${notFoundIds.join(", ")} not found.` ); } + + static DbApiError(): RestException { + return new RestException(StatusCodes.INTERNAL_SERVER_ERROR, `DB API Error`); + } + + static VaaNotFound(): RestException { + return new RestException(StatusCodes.NOT_FOUND, "VAA not found."); + } +} + +function asyncWrapper( + callback: (req: Request, res: Response, next: NextFunction) => Promise +) { + return function (req: Request, res: Response, next: NextFunction) { + callback(req, res, next).catch(next); + }; } export class RestAPI { @@ -54,6 +71,39 @@ export class RestAPI { this.promClient = promClient; } + async getVaaWithDbLookup(priceFeedId: string, publishTime: TimestampInSec) { + // Try to fetch the vaa from the local cache + let vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime); + + // if publishTime is older than cache ttl or vaa is not found, fetch from db + if (vaa === undefined && this.dbApiEndpoint && this.dbApiCluster) { + const priceFeedWithoutLeading0x = removeLeading0x(priceFeedId); + + try { + const data = (await retry( + () => + fetch( + `${this.dbApiEndpoint}/vaa?id=${priceFeedWithoutLeading0x}&publishTime=${publishTime}&cluster=${this.dbApiCluster}` + ).then((res) => res.json()), + { retries: 3 } + )) as any[]; + if (data.length > 0) { + vaa = { + vaa: data[0].vaa, + publishTime: Math.floor( + new Date(data[0].publishTime).getTime() / 1000 + ), + }; + } + } catch (e: any) { + logger.error(`DB API Error: ${e}`); + throw RestException.DbApiError(); + } + } + + return vaa; + } + // Run this function without blocking (`await`) if you want to run it async. async createApp() { const app = express(); @@ -126,43 +176,92 @@ export class RestAPI { publish_time: Joi.number().required(), }).required(), }; + app.get( "/api/get_vaa", validate(getVaaInputSchema), - (req: Request, res: Response) => { + asyncWrapper(async (req: Request, res: Response) => { const priceFeedId = req.query.id as string; const publishTime = Number(req.query.publish_time as string); - const vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime); - // if publishTime is older than cache ttl or vaa is not found, fetch from db - if (!vaa) { - // cache miss - if (this.dbApiEndpoint && this.dbApiCluster) { - fetch( - `${this.dbApiEndpoint}/vaa?id=${priceFeedId}&publishTime=${publishTime}&cluster=${this.dbApiCluster}` - ) - .then((r: any) => r.json()) - .then((arr: any) => { - if (arr.length > 0 && arr[0]) { - res.json(arr[0]); - } else { - res.status(StatusCodes.NOT_FOUND).send("VAA not found"); - } - }); - } - } else { - // cache hit - const processedVaa = { - publishTime: new Date(vaa.publishTime), - vaa: vaa.vaa, - }; - res.json(processedVaa); + + if ( + this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined + ) { + throw RestException.PriceFeedIdNotFound([priceFeedId]); } - } + + const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime); + + if (vaa === undefined) { + throw RestException.VaaNotFound(); + } else { + res.json(vaa); + } + }) ); + endpoints.push( "api/get_vaa?id=&publish_time=" ); + const getVaaCcipInputSchema: schema = { + query: Joi.object({ + data: Joi.string() + .regex(/^0x[a-f0-9]{80}$/) + .required(), + }).required(), + }; + + // CCIP compatible endpoint. Read more information about it from + // https://eips.ethereum.org/EIPS/eip-3668 + app.get( + "/api/get_vaa_ccip", + validate(getVaaCcipInputSchema), + asyncWrapper(async (req: Request, res: Response) => { + const dataHex = req.query.data as string; + const data = Buffer.from(removeLeading0x(dataHex), "hex"); + + const priceFeedId = data.slice(0, 32).toString("hex"); + const publishTime = Number(data.readBigInt64BE(32)); + + if ( + this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined + ) { + throw RestException.PriceFeedIdNotFound([priceFeedId]); + } + + const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime); + + if (vaa === undefined) { + // Returning Bad Gateway error because CCIP expects a 5xx error if it needs to + // retry or try other endpoints. Bad Gateway seems the best choice here as this + // is not an internal error and could happen on two scenarios: + // 1. DB Api is not responding well (Bad Gateway is appropriate here) + // 2. Publish time is a few seconds before current time and a VAA + // Will be available in a few seconds. So we want the client to retry. + res + .status(StatusCodes.BAD_GATEWAY) + .json({ "message:": "VAA not found." }); + } else { + const pubTimeBuffer = Buffer.alloc(8); + pubTimeBuffer.writeBigInt64BE(BigInt(vaa.publishTime)); + + const resData = + "0x" + + pubTimeBuffer.toString("hex") + + Buffer.from(vaa.vaa, "base64").toString("hex"); + + res.json({ + data: resData, + }); + } + }) + ); + + endpoints.push( + "api/get_vaa_ccip?data=<0x+>" + ); + const latestPriceFeedsInputSchema: schema = { query: Joi.object({ ids: Joi.array()