diff --git a/price-service/src/__tests__/listen.test.ts b/price-service/src/__tests__/listen.test.ts new file mode 100644 index 00000000..e747caaf --- /dev/null +++ b/price-service/src/__tests__/listen.test.ts @@ -0,0 +1,116 @@ +import { VaaConfig, VaaCache } from "../listen"; + +describe("VAA Cache works", () => { + test("Setting and getting works as expected", async () => { + const cache = new VaaCache(); + + expect(cache.get("a", 3)).toBeUndefined(); + + cache.set("a", 1, "a-1"); + + expect(cache.get("a", 3)).toBeUndefined(); + + cache.set("a", 4, "a-2"); + + expect(cache.get("a", 3)).toEqual({ + publishTime: 4, + vaa: "a-2", + }); + + cache.set("a", 10, "a-3"); + + // Adding some elements with other keys to make sure + // they are not stored separately. + cache.set("b", 3, "b-1"); + cache.set("b", 7, "b-2"); + cache.set("b", 9, "b-3"); + + expect(cache.get("a", 3)).toEqual({ + publishTime: 4, + vaa: "a-2", + }); + expect(cache.get("a", 4)).toEqual({ + publishTime: 4, + vaa: "a-2", + }); + expect(cache.get("a", 5)).toEqual({ + publishTime: 10, + vaa: "a-3", + }); + expect(cache.get("a", 10)).toEqual({ + publishTime: 10, + vaa: "a-3", + }); + + expect(cache.get("b", 3)).toEqual({ + publishTime: 3, + vaa: "b-1", + }); + expect(cache.get("b", 4)).toEqual({ + publishTime: 7, + vaa: "b-2", + }); + + // When no item item more recent than asked pubTime is asked it should return undefined + expect(cache.get("a", 11)).toBeUndefined(); + expect(cache.get("b", 10)).toBeUndefined(); + + // When the asked pubTime is less than the first existing pubTime we are not sure that + // this is the first vaa after that time, so we should return undefined. + expect(cache.get("a", 0)).toBeUndefined(); + expect(cache.get("b", 1)).toBeUndefined(); + expect(cache.get("b", 2)).toBeUndefined(); + }); + + test("removeExpiredValues clears the old values", async () => { + jest.useFakeTimers(); + + // TTL of 500 seconds for the cache + const cache = new VaaCache(500); + + cache.set("a", 300, "a-1"); + cache.set("a", 700, "a-2"); + cache.set("a", 900, "a-3"); + + expect(cache.get("a", 300)).toEqual({ + publishTime: 300, + vaa: "a-1", + }); + + expect(cache.get("a", 500)).toEqual({ + publishTime: 700, + vaa: "a-2", + }); + + // Set time to second 1000 + jest.setSystemTime(1000 * 1000); + + cache.removeExpiredValues(); + + expect(cache.get("a", 300)).toBeUndefined(); + expect(cache.get("a", 500)).toBeUndefined(); + }); + + test("the cache clean loop works", async () => { + jest.useFakeTimers(); + + // TTL of 500 seconds for the cache and cleanup of every 100 seconds + const cache = new VaaCache(500, 100); + cache.runRemoveExpiredValuesLoop(); + + cache.set("a", 300, "a-1"); + cache.set("a", 700, "a-2"); + cache.set("a", 900, "a-3"); + + expect(cache.get("a", 900)).toEqual({ + publishTime: 900, + vaa: "a-3", + }); + + // Set time to second 2000. Everything should be evicted from cache now. + jest.setSystemTime(2000 * 1000); + jest.advanceTimersToNextTimer(); + + expect(cache.get("a", 900)).toBeUndefined(); + }); +}); diff --git a/price-service/src/__tests__/rest.test.ts b/price-service/src/__tests__/rest.test.ts index b1ed4737..709ecfa8 100644 --- a/price-service/src/__tests__/rest.test.ts +++ b/price-service/src/__tests__/rest.test.ts @@ -1,10 +1,16 @@ -import { HexString, Price, PriceFeed } from "@pythnetwork/pyth-sdk-js"; -import { Express } from "express"; +import { + HexString, + Price, + PriceFeed, + PriceFeedMetadata, +} from "@pythnetwork/pyth-sdk-js"; +import express, { Express } from "express"; import { StatusCodes } from "http-status-codes"; import request from "supertest"; -import { PriceInfo, PriceStore, VaaCache } from "../listen"; +import { PriceInfo, PriceStore, VaaCache, VaaConfig } from "../listen"; import { RestAPI } from "../rest"; +let priceInfo: PriceStore; let app: Express; let priceInfoMap: Map; let vaasCache: VaaCache; @@ -40,8 +46,8 @@ function dummyPriceInfoPair( id, { priceFeed: dummyPriceFeed(id), - publishTime: 0, - attestationTime: 0, + publishTime: 1, + attestationTime: 2, seqNum, vaa: Buffer.from(vaa, "hex"), emitterChainId: 0, @@ -57,14 +63,10 @@ beforeAll(async () => { dummyPriceInfoPair(expandTo64Len("3456"), 2, "bad01bad"), dummyPriceInfoPair(expandTo64Len("10101"), 3, "bidbidbid"), ]); - vaasCache = new VaaCache(); - vaasCache.set( - expandTo64Len("abcd"), - 1, - Buffer.from("a1b2c3d4", "hex").toString("base64") - ); - const priceInfo: PriceStore = { + vaasCache = new VaaCache(); + + priceInfo = { getLatestPriceInfo: (priceFeedId: string) => { return priceInfoMap.get(priceFeedId); }, @@ -76,7 +78,6 @@ beforeAll(async () => { }; const api = new RestAPI({ port: 8889 }, priceInfo, () => true); - app = await api.createApp(); }); @@ -92,6 +93,48 @@ describe("Latest Price Feed Endpoint", () => { expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson()); }); + test("When called with valid ids with leading 0x, returns correct price feed", async () => { + const ids = [expandTo64Len("abcd"), expandTo64Len("3456")]; + const resp = await request(app) + .get("/api/latest_price_feeds") + .query({ + ids: ids.map((id) => "0x" + id), // Add 0x to the queries + }); + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body.length).toBe(2); + + // Please note that the response id is without 0x + expect(resp.body).toContainEqual(dummyPriceFeed(ids[0]).toJson()); + expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson()); + }); + + test("When called with valid ids and verbose flag set to true, returns correct price feed with verbose information", async () => { + const ids = [expandTo64Len("abcd"), expandTo64Len("3456")]; + const resp = await request(app) + .get("/api/latest_price_feeds") + .query({ ids, verbose: true }); + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body.length).toBe(2); + expect(resp.body).toContainEqual({ + ...priceInfoMap.get(ids[0])!.priceFeed.toJson(), + metadata: new PriceFeedMetadata({ + attestationTime: priceInfoMap.get(ids[0])!.attestationTime, + emitterChain: priceInfoMap.get(ids[0])!.emitterChainId, + receiveTime: priceInfoMap.get(ids[0])!.priceServiceReceiveTime, + sequenceNumber: priceInfoMap.get(ids[0])!.seqNum, + }).toJson(), + }); + expect(resp.body).toContainEqual({ + ...priceInfoMap.get(ids[1])!.priceFeed.toJson(), + metadata: new PriceFeedMetadata({ + attestationTime: priceInfoMap.get(ids[1])!.attestationTime, + emitterChain: priceInfoMap.get(ids[1])!.emitterChainId, + receiveTime: priceInfoMap.get(ids[1])!.priceServiceReceiveTime, + sequenceNumber: priceInfoMap.get(ids[1])!.seqNum, + }).toJson(), + }); + }); + test("When called with valid ids and binary flag set to true, returns correct price feed with binary vaa", async () => { const ids = [expandTo64Len("abcd"), expandTo64Len("3456")]; const resp = await request(app) @@ -143,6 +186,29 @@ describe("Latest Vaa Bytes Endpoint", () => { ); }); + test("When called with valid ids with leading 0x, returns vaa bytes as array, merged if necessary", async () => { + const ids = [ + expandTo64Len("abcd"), + expandTo64Len("ef01"), + expandTo64Len("3456"), + ]; + + const resp = await request(app) + .get("/api/latest_vaas") + .query({ + ids: ids.map((id) => "0x" + id), // Add 0x to the queries + }); + + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body.length).toBe(2); + expect(resp.body).toContain( + Buffer.from("a1b2c3d4", "hex").toString("base64") + ); + expect(resp.body).toContain( + Buffer.from("bad01bad", "hex").toString("base64") + ); + }); + test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => { const ids = [ expandTo64Len("ab01"), @@ -156,3 +222,251 @@ describe("Latest Vaa Bytes Endpoint", () => { expect(resp.body.message).toContain(ids[2]); }); }); + +describe("Get VAA endpoint and Get VAA CCIP", () => { + test("When called with valid id and timestamp in the cache returns the correct answer", async () => { + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(app).get("/api/get_vaa").query({ + id, + publish_time: 16, + }); + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body).toEqual({ + vaa: "abcd20", + publishTime: 20, + }); + + const pubTime16AsHex64Bit = "0000000000000010"; + const ccipResp = await request(app) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime16AsHex64Bit, + }); + const pubTime20AsHex64Bit = "0000000000000014"; + expect(ccipResp.status).toBe(StatusCodes.OK); + expect(ccipResp.body).toEqual({ + data: + "0x" + + pubTime20AsHex64Bit + + Buffer.from("abcd20", "base64").toString("hex"), + }); + }); + + test("When called with valid id with leading 0x and timestamp in the cache returns the correct answer", async () => { + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(app) + .get("/api/get_vaa") + .query({ + id: "0x" + id, + publish_time: 16, + }); + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body).toEqual({ + vaa: "abcd20", + publishTime: 20, + }); + }); + + test("When called with invalid id returns price id found", async () => { + // dead does not exist in the ids + const id = expandTo64Len("dead"); + + const resp = await request(app).get("/api/get_vaa").query({ + id, + publish_time: 16, + }); + expect(resp.status).toBe(StatusCodes.BAD_REQUEST); + expect(resp.body.message).toContain(id); + + const pubTime16AsHex64Bit = "0000000000000010"; + const ccipResp = await request(app) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime16AsHex64Bit, + }); + expect(ccipResp.status).toBe(StatusCodes.BAD_REQUEST); + expect(ccipResp.body.message).toContain(id); + }); + + test("When called with valid id and timestamp not in the cache without db returns vaa not found", async () => { + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(app) + .get("/api/get_vaa") + .query({ + id: "0x" + id, + publish_time: 5, + }); + expect(resp.status).toBe(StatusCodes.NOT_FOUND); + + const pubTime5AsHex64Bit = "0000000000000005"; + const ccipResp = await request(app) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime5AsHex64Bit, + }); + // On CCIP we expect bad gateway so the client want to retry other ccip endpoints. + expect(ccipResp.status).toBe(StatusCodes.BAD_GATEWAY); + }); + + test("When called with valid id and timestamp not in the cache with db returns ok", async () => { + const dbBackend = express(); + dbBackend.get("/vaa", (req, res) => { + const priceId = req.query.id; + const pubTime = Number(req.query.publishTime); + const cluster = req.query.cluster; + + res.json([ + { + vaa: `${cluster}${priceId}${pubTime}`, + publishTime: new Date(pubTime * 1000).toISOString(), + }, + ]); + }); + const dbApp = dbBackend.listen({ port: 37777 }); + + const apiWithDb = new RestAPI( + { + port: 8889, + dbApiCluster: "pythnet", + dbApiEndpoint: "http://localhost:37777", + }, + priceInfo, + () => true + ); + const appWithDb = await apiWithDb.createApp(); + + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(appWithDb) + .get("/api/get_vaa") + .query({ + id: "0x" + id, + publish_time: 5, + }); + expect(resp.status).toBe(StatusCodes.OK); + expect(resp.body).toEqual({ + vaa: `pythnet${id}5`, + publishTime: 5, + }); + + const pubTime5AsHex64Bit = "0000000000000005"; + const ccipResp = await request(appWithDb) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime5AsHex64Bit, + }); + expect(ccipResp.status).toBe(StatusCodes.OK); + expect(ccipResp.body).toEqual({ + data: + "0x" + + pubTime5AsHex64Bit + + Buffer.from(`pythnet${id}5`, "base64").toString("hex"), + }); + + dbApp.close(); + }); + + test( + "When called with valid id and timestamp not in the cache" + + "and not in the db returns vaa not found", + async () => { + const dbBackend = express(); + dbBackend.get("/vaa", (_req, res) => { + // Return an empty array when vaa is not there, this is the same + // behaviour as our api. + res.json([]); + }); + + const dbApp = dbBackend.listen({ port: 37777 }); + + const apiWithDb = new RestAPI( + { + port: 8889, + dbApiCluster: "pythnet", + dbApiEndpoint: "http://localhost:37777", + }, + priceInfo, + () => true + ); + const appWithDb = await apiWithDb.createApp(); + + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(appWithDb) + .get("/api/get_vaa") + .query({ + id: "0x" + id, + publish_time: 5, + }); + expect(resp.status).toBe(StatusCodes.NOT_FOUND); + + const pubTime5AsHex64Bit = "0000000000000005"; + const ccipResp = await request(appWithDb) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime5AsHex64Bit, + }); + + // On CCIP we expect bad gateway so the client want to retry other ccip endpoints. + expect(ccipResp.status).toBe(StatusCodes.BAD_GATEWAY); + + dbApp.close(); + } + ); + + test( + "When called with valid id and timestamp not in the cache" + + "and db is not available returns internal server error", + async () => { + const apiWithDb = new RestAPI( + { + port: 8889, + dbApiCluster: "pythnet", + dbApiEndpoint: "http://localhost:37777", + }, + priceInfo, + () => true + ); + const appWithDb = await apiWithDb.createApp(); + + const id = expandTo64Len("abcd"); + vaasCache.set(id, 10, "abcd10"); + vaasCache.set(id, 20, "abcd20"); + vaasCache.set(id, 30, "abcd30"); + + const resp = await request(appWithDb) + .get("/api/get_vaa") + .query({ + id: "0x" + id, + publish_time: 5, + }); + expect(resp.status).toBe(StatusCodes.INTERNAL_SERVER_ERROR); + + const pubTime5AsHex64Bit = "0000000000000005"; + const ccipResp = await request(appWithDb) + .get("/api/get_vaa_ccip") + .query({ + data: "0x" + id + pubTime5AsHex64Bit, + }); + expect(ccipResp.status).toBe(StatusCodes.INTERNAL_SERVER_ERROR); + } + ); +}); diff --git a/price-service/src/__tests__/ws.test.ts b/price-service/src/__tests__/ws.test.ts index aad27090..74922e09 100644 --- a/price-service/src/__tests__/ws.test.ts +++ b/price-service/src/__tests__/ws.test.ts @@ -1,4 +1,9 @@ -import { HexString, Price, PriceFeed } from "@pythnetwork/pyth-sdk-js"; +import { + HexString, + Price, + PriceFeed, + PriceFeedMetadata, +} from "@pythnetwork/pyth-sdk-js"; import { Server } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { sleep } from "../helpers"; @@ -12,39 +17,20 @@ let server: Server; let wss: WebSocketServer; let priceInfos: PriceInfo[]; -let priceMetadata: any; function expandTo64Len(id: string): string { return id.repeat(64).substring(0, 64); } -function dummyPriceMetadata( - attestationTime: number, - emitterChainId: number, - seqNum: number, - priceServiceReceiveTime: number -): any { +function dummyPriceInfo(id: HexString, vaa: HexString): PriceInfo { return { - attestation_time: attestationTime, - emitter_chain: emitterChainId, - sequence_number: seqNum, - price_service_receive_time: priceServiceReceiveTime, - }; -} - -function dummyPriceInfo( - id: HexString, - vaa: HexString, - dummyPriceMetadataValue: any -): PriceInfo { - return { - seqNum: dummyPriceMetadataValue.sequence_number, + seqNum: 1, publishTime: 0, - attestationTime: dummyPriceMetadataValue.attestation_time, - emitterChainId: dummyPriceMetadataValue.emitter_chain, + attestationTime: 2, + emitterChainId: 3, priceFeed: dummyPriceFeed(id), vaa: Buffer.from(vaa, "hex"), - priceServiceReceiveTime: dummyPriceMetadataValue.price_service_receive_time, + priceServiceReceiveTime: 4, }; } @@ -96,12 +82,11 @@ async function createSocketClient(): Promise<[WebSocket, any[]]> { } beforeAll(async () => { - priceMetadata = dummyPriceMetadata(0, 0, 0, 0); priceInfos = [ - dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4", priceMetadata), - dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4", priceMetadata), - dummyPriceInfo(expandTo64Len("2345"), "bad01bad", priceMetadata), - dummyPriceInfo(expandTo64Len("6789"), "bidbidbid", priceMetadata), + dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4"), + dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4"), + dummyPriceInfo(expandTo64Len("2345"), "bad01bad"), + dummyPriceInfo(expandTo64Len("6789"), "bidbidbid"), ]; const priceInfo: PriceStore = { @@ -190,7 +175,12 @@ describe("Client receives data", () => { type: "price_update", price_feed: { ...priceInfos[0].priceFeed.toJson(), - metadata: priceMetadata, + metadata: new PriceFeedMetadata({ + attestationTime: 2, + emitterChain: 3, + receiveTime: 4, + sequenceNumber: 1, + }).toJson(), }, }); @@ -202,7 +192,12 @@ describe("Client receives data", () => { type: "price_update", price_feed: { ...priceInfos[1].priceFeed.toJson(), - metadata: priceMetadata, + metadata: new PriceFeedMetadata({ + attestationTime: 2, + emitterChain: 3, + receiveTime: 4, + sequenceNumber: 1, + }).toJson(), }, }); diff --git a/price-service/src/listen.ts b/price-service/src/listen.ts index aca94166..9a906263 100644 --- a/price-service/src/listen.ts +++ b/price-service/src/listen.ts @@ -62,14 +62,19 @@ export type VaaConfig = { export class VaaCache { private cache: Map; - private ttl: number; + private ttl: DurationInSec; + private cacheCleanupLoopInterval: DurationInSec; - constructor(ttl: DurationInSec = 300) { + constructor( + ttl: DurationInSec = 300, + cacheCleanupLoopInterval: DurationInSec = 60 + ) { this.cache = new Map(); this.ttl = ttl; + this.cacheCleanupLoopInterval = cacheCleanupLoopInterval; } - set(key: VaaKey, publishTime: number, vaa: string): void { + set(key: VaaKey, publishTime: TimestampInSec, vaa: string): void { if (this.cache.has(key)) { this.cache.get(key)!.push({ publishTime, vaa }); } else { @@ -77,7 +82,7 @@ export class VaaCache { } } - get(key: VaaKey, publishTime: number): VaaConfig | undefined { + get(key: VaaKey, publishTime: TimestampInSec): VaaConfig | undefined { if (!this.cache.has(key)) { return undefined; } else { @@ -86,7 +91,10 @@ export class VaaCache { } } - find(arr: VaaConfig[], publishTime: number): VaaConfig | undefined { + private find( + arr: VaaConfig[], + publishTime: TimestampInSec + ): VaaConfig | undefined { // If the publishTime is less than the first element we are // not sure that this VAA is actually the first VAA after that // time. @@ -123,6 +131,13 @@ export class VaaCache { ); } } + + runRemoveExpiredValuesLoop() { + setInterval( + this.removeExpiredValues.bind(this), + this.cacheCleanupLoopInterval * 1000 + ); + } } export class Listener implements PriceStore { @@ -136,7 +151,6 @@ export class Listener implements PriceStore { private updateCallbacks: ((priceInfo: PriceInfo) => any)[]; private observedVaas: LRUCache; private vaasCache: VaaCache; - private cacheCleanupLoopInterval: DurationInSec; constructor(config: ListenerConfig, promClient?: PromClient) { this.promClient = promClient; @@ -148,8 +162,10 @@ export class Listener implements PriceStore { max: 10000, // At most 10000 items ttl: 60 * 1000, // 60 seconds }); - this.vaasCache = new VaaCache(config.cacheTtl); - this.cacheCleanupLoopInterval = config.cacheCleanupLoopInterval ?? 60; + this.vaasCache = new VaaCache( + config.cacheTtl, + config.cacheCleanupLoopInterval + ); } private loadFilters(filtersRaw?: string) { @@ -188,10 +204,7 @@ export class Listener implements PriceStore { this.spyServiceHost ); - setInterval( - this.vaasCache.removeExpiredValues.bind(this.vaasCache), - this.cacheCleanupLoopInterval * 1000 - ); + this.vaasCache.runRemoveExpiredValuesLoop(); while (true) { let stream: ClientReadableStream | undefined; diff --git a/tilt-devnet/k8s/pyth-price-service.yaml b/tilt-devnet/k8s/pyth-price-service.yaml index 35cfdcac..d8db5770 100644 --- a/tilt-devnet/k8s/pyth-price-service.yaml +++ b/tilt-devnet/k8s/pyth-price-service.yaml @@ -26,7 +26,7 @@ spec: matchLabels: app: pyth-price-service serviceName: pyth-price-service - replicas: 2 + replicas: 1 template: metadata: labels: @@ -76,3 +76,14 @@ spec: value: "60" - name: CACHE_TTL_SECONDS value: "300" + - name: tests + image: pyth-price-service + command: + - /bin/sh + - -c + - "npm run test && nc -lkp 2358 0.0.0.0" + readinessProbe: + periodSeconds: 5 + failureThreshold: 300 + tcpSocket: + port: 2358