[price-service] Add get_vaa_ccip endpoint (#500)

* [price-service] Refactor + bugfix

* [price-service] Add get_vaa_ccip endpoint

* [price-service] Refactor + add retry

* [price-service] Address feedback + handle api error
This commit is contained in:
Ali Behjati 2023-01-17 18:14:07 +01:00 committed by GitHub
parent 9d2bd87f95
commit 5e8b9f868a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 184 additions and 42 deletions

View File

@ -30,6 +30,7 @@
"node-fetch": "^2.6.1",
"prom-client": "^14.0.1",
"response-time": "^2.3.2",
"ts-retry-promise": "^0.7.0",
"winston": "^3.3.3",
"ws": "^8.6.0"
},
@ -9861,6 +9862,14 @@
"node": ">=10"
}
},
"node_modules/ts-retry-promise": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/ts-retry-promise/-/ts-retry-promise-0.7.0.tgz",
"integrity": "sha512-x6yWZXC4BfXy4UyMweOFvbS1yJ/Y5biSz/mEPiILtJZLrqD3ZxIpzVOGGgifHHdaSe3WxzFRtsRbychI6zofOg==",
"engines": {
"node": ">=6"
}
},
"node_modules/tslib": {
"version": "2.4.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz",
@ -17811,6 +17820,11 @@
}
}
},
"ts-retry-promise": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/ts-retry-promise/-/ts-retry-promise-0.7.0.tgz",
"integrity": "sha512-x6yWZXC4BfXy4UyMweOFvbS1yJ/Y5biSz/mEPiILtJZLrqD3ZxIpzVOGGgifHHdaSe3WxzFRtsRbychI6zofOg=="
},
"tslib": {
"version": "2.4.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz",

View File

@ -50,6 +50,7 @@
"node-fetch": "^2.6.1",
"prom-client": "^14.0.1",
"response-time": "^2.3.2",
"ts-retry-promise": "^0.7.0",
"winston": "^3.3.3",
"ws": "^8.6.0"
},

View File

@ -15,3 +15,21 @@ export function envOrErr(env: string): string {
}
return String(process.env[env]);
}
export function parseToOptionalNumber(
s: string | undefined
): number | undefined {
if (s === undefined) {
return undefined;
}
return parseInt(s, 10);
}
export function removeLeading0x(s: string): string {
if (s.startsWith("0x")) {
return s.substring(2);
}
return s;
}

View File

@ -1,4 +1,4 @@
import { envOrErr } from "./helpers";
import { envOrErr, parseToOptionalNumber } from "./helpers";
import { Listener } from "./listen";
import { initLogger } from "./logging";
import { PromClient } from "./promClient";
@ -38,6 +38,10 @@ async function run() {
10
),
},
cacheCleanupLoopInterval: parseToOptionalNumber(
process.env.REMOVE_EXPIRED_VALUES_INTERVAL_SECONDS
),
cacheTtl: parseToOptionalNumber(process.env.CACHE_TTL_SECONDS),
},
promClient
);
@ -59,7 +63,7 @@ async function run() {
const wsAPI = new WebSocketAPI(listener, promClient);
listener.run();
listener.runCacheCleanupLoop();
const server = await restAPI.run();
wsAPI.run(server);
}

View File

@ -17,7 +17,7 @@ import {
} from "@pythnetwork/p2w-sdk-js";
import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import LRUCache from "lru-cache";
import { sleep, TimestampInSec } from "./helpers";
import { DurationInSec, sleep, TimestampInSec } from "./helpers";
import { logger } from "./logging";
import { PromClient } from "./promClient";
@ -49,11 +49,13 @@ type ListenerConfig = {
readiness: ListenerReadinessConfig;
webApiEndpoint?: string;
webApiCluster?: string;
cacheCleanupLoopInterval?: DurationInSec;
cacheTtl?: DurationInSec;
};
type VaaKey = string;
type VaaConfig = {
export type VaaConfig = {
publishTime: number;
vaa: string;
};
@ -62,7 +64,7 @@ export class VaaCache {
private cache: Map<string, VaaConfig[]>;
private ttl: number;
constructor(ttl: number = 300) {
constructor(ttl: DurationInSec = 300) {
this.cache = new Map();
this.ttl = ttl;
}
@ -85,6 +87,9 @@ export class VaaCache {
}
find(arr: VaaConfig[], publishTime: number): VaaConfig | undefined {
// If the publishTime is less than the first element we are
// not sure that this VAA is actually the first VAA after that
// time.
if (arr.length === 0 || publishTime < arr[0].publishTime) {
return undefined;
}
@ -126,6 +131,7 @@ export class Listener implements PriceStore {
private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
private observedVaas: LRUCache<VaaKey, boolean>;
private vaasCache: VaaCache;
private cacheCleanupLoopInterval: DurationInSec;
constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient;
@ -137,7 +143,8 @@ export class Listener implements PriceStore {
max: 10000, // At most 10000 items
ttl: 60 * 1000, // 60 seconds
});
this.vaasCache = new VaaCache();
this.vaasCache = new VaaCache(config.cacheTtl);
this.cacheCleanupLoopInterval = config.cacheCleanupLoopInterval ?? 60;
}
private loadFilters(filtersRaw?: string) {
@ -170,22 +177,21 @@ export class Listener implements PriceStore {
logger.info("loaded " + this.filters.length + " filters");
}
async runCacheCleanupLoop(interval: number = 60) {
setInterval(this.vaasCache.removeExpiredValues, interval * 1000);
}
async run() {
logger.info(
"pyth_relay starting up, will listen for signed VAAs from " +
this.spyServiceHost
);
setInterval(
this.vaasCache.removeExpiredValues.bind(this.vaasCache),
this.cacheCleanupLoopInterval * 1000
);
while (true) {
let stream: ClientReadableStream<SubscribeSignedVAAResponse> | undefined;
try {
const client = createSpyRPCServiceClient(
process.env.SPY_SERVICE_HOST || ""
);
const client = createSpyRPCServiceClient(this.spyServiceHost);
stream = await subscribeSignedVAA(client, { filters: this.filters });
stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => {

View File

@ -6,10 +6,11 @@ import { Server } from "http";
import { StatusCodes } from "http-status-codes";
import morgan from "morgan";
import fetch from "node-fetch";
import { TimestampInSec } from "./helpers";
import { PriceStore } from "./listen";
import { removeLeading0x, TimestampInSec } from "./helpers";
import { PriceStore, VaaConfig } from "./listen";
import { logger } from "./logging";
import { PromClient } from "./promClient";
import { retry } from "ts-retry-promise";
const MORGAN_LOG_FORMAT =
':remote-addr - :remote-user ":method :url HTTP/:http-version"' +
@ -27,9 +28,25 @@ export class RestException extends Error {
static PriceFeedIdNotFound(notFoundIds: string[]): RestException {
return new RestException(
StatusCodes.BAD_REQUEST,
`Price Feeds with ids ${notFoundIds.join(", ")} not found`
`Price Feed(s) with id(s) ${notFoundIds.join(", ")} not found.`
);
}
static DbApiError(): RestException {
return new RestException(StatusCodes.INTERNAL_SERVER_ERROR, `DB API Error`);
}
static VaaNotFound(): RestException {
return new RestException(StatusCodes.NOT_FOUND, "VAA not found.");
}
}
function asyncWrapper(
callback: (req: Request, res: Response, next: NextFunction) => Promise<any>
) {
return function (req: Request, res: Response, next: NextFunction) {
callback(req, res, next).catch(next);
};
}
export class RestAPI {
@ -54,6 +71,39 @@ export class RestAPI {
this.promClient = promClient;
}
async getVaaWithDbLookup(priceFeedId: string, publishTime: TimestampInSec) {
// Try to fetch the vaa from the local cache
let vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime);
// if publishTime is older than cache ttl or vaa is not found, fetch from db
if (vaa === undefined && this.dbApiEndpoint && this.dbApiCluster) {
const priceFeedWithoutLeading0x = removeLeading0x(priceFeedId);
try {
const data = (await retry(
() =>
fetch(
`${this.dbApiEndpoint}/vaa?id=${priceFeedWithoutLeading0x}&publishTime=${publishTime}&cluster=${this.dbApiCluster}`
).then((res) => res.json()),
{ retries: 3 }
)) as any[];
if (data.length > 0) {
vaa = {
vaa: data[0].vaa,
publishTime: Math.floor(
new Date(data[0].publishTime).getTime() / 1000
),
};
}
} catch (e: any) {
logger.error(`DB API Error: ${e}`);
throw RestException.DbApiError();
}
}
return vaa;
}
// Run this function without blocking (`await`) if you want to run it async.
async createApp() {
const app = express();
@ -126,43 +176,92 @@ export class RestAPI {
publish_time: Joi.number().required(),
}).required(),
};
app.get(
"/api/get_vaa",
validate(getVaaInputSchema),
(req: Request, res: Response) => {
asyncWrapper(async (req: Request, res: Response) => {
const priceFeedId = req.query.id as string;
const publishTime = Number(req.query.publish_time as string);
const vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime);
// if publishTime is older than cache ttl or vaa is not found, fetch from db
if (!vaa) {
// cache miss
if (this.dbApiEndpoint && this.dbApiCluster) {
fetch(
`${this.dbApiEndpoint}/vaa?id=${priceFeedId}&publishTime=${publishTime}&cluster=${this.dbApiCluster}`
)
.then((r: any) => r.json())
.then((arr: any) => {
if (arr.length > 0 && arr[0]) {
res.json(arr[0]);
} else {
res.status(StatusCodes.NOT_FOUND).send("VAA not found");
}
});
}
} else {
// cache hit
const processedVaa = {
publishTime: new Date(vaa.publishTime),
vaa: vaa.vaa,
};
res.json(processedVaa);
if (
this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined
) {
throw RestException.PriceFeedIdNotFound([priceFeedId]);
}
}
const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime);
if (vaa === undefined) {
throw RestException.VaaNotFound();
} else {
res.json(vaa);
}
})
);
endpoints.push(
"api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>"
);
const getVaaCcipInputSchema: schema = {
query: Joi.object({
data: Joi.string()
.regex(/^0x[a-f0-9]{80}$/)
.required(),
}).required(),
};
// CCIP compatible endpoint. Read more information about it from
// https://eips.ethereum.org/EIPS/eip-3668
app.get(
"/api/get_vaa_ccip",
validate(getVaaCcipInputSchema),
asyncWrapper(async (req: Request, res: Response) => {
const dataHex = req.query.data as string;
const data = Buffer.from(removeLeading0x(dataHex), "hex");
const priceFeedId = data.slice(0, 32).toString("hex");
const publishTime = Number(data.readBigInt64BE(32));
if (
this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined
) {
throw RestException.PriceFeedIdNotFound([priceFeedId]);
}
const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime);
if (vaa === undefined) {
// Returning Bad Gateway error because CCIP expects a 5xx error if it needs to
// retry or try other endpoints. Bad Gateway seems the best choice here as this
// is not an internal error and could happen on two scenarios:
// 1. DB Api is not responding well (Bad Gateway is appropriate here)
// 2. Publish time is a few seconds before current time and a VAA
// Will be available in a few seconds. So we want the client to retry.
res
.status(StatusCodes.BAD_GATEWAY)
.json({ "message:": "VAA not found." });
} else {
const pubTimeBuffer = Buffer.alloc(8);
pubTimeBuffer.writeBigInt64BE(BigInt(vaa.publishTime));
const resData =
"0x" +
pubTimeBuffer.toString("hex") +
Buffer.from(vaa.vaa, "base64").toString("hex");
res.json({
data: resData,
});
}
})
);
endpoints.push(
"api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>"
);
const latestPriceFeedsInputSchema: schema = {
query: Joi.object({
ids: Joi.array()