[price-service] Improve and update tests (#521)

* Update/improve existing tests
* Add test for VaaCache + a little refactor
* Add tests for get_vaa endpoint
* Add tests to tilt
This commit is contained in:
Ali Behjati 2023-01-24 10:40:24 +01:00 committed by GitHub
parent d2411b7d7f
commit 58f3ddcb13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 507 additions and 58 deletions

View File

@ -0,0 +1,116 @@
import { VaaConfig, VaaCache } from "../listen";
describe("VAA Cache works", () => {
test("Setting and getting works as expected", async () => {
const cache = new VaaCache();
expect(cache.get("a", 3)).toBeUndefined();
cache.set("a", 1, "a-1");
expect(cache.get("a", 3)).toBeUndefined();
cache.set("a", 4, "a-2");
expect(cache.get("a", 3)).toEqual<VaaConfig>({
publishTime: 4,
vaa: "a-2",
});
cache.set("a", 10, "a-3");
// Adding some elements with other keys to make sure
// they are not stored separately.
cache.set("b", 3, "b-1");
cache.set("b", 7, "b-2");
cache.set("b", 9, "b-3");
expect(cache.get("a", 3)).toEqual<VaaConfig>({
publishTime: 4,
vaa: "a-2",
});
expect(cache.get("a", 4)).toEqual<VaaConfig>({
publishTime: 4,
vaa: "a-2",
});
expect(cache.get("a", 5)).toEqual<VaaConfig>({
publishTime: 10,
vaa: "a-3",
});
expect(cache.get("a", 10)).toEqual<VaaConfig>({
publishTime: 10,
vaa: "a-3",
});
expect(cache.get("b", 3)).toEqual<VaaConfig>({
publishTime: 3,
vaa: "b-1",
});
expect(cache.get("b", 4)).toEqual<VaaConfig>({
publishTime: 7,
vaa: "b-2",
});
// When no item item more recent than asked pubTime is asked it should return undefined
expect(cache.get("a", 11)).toBeUndefined();
expect(cache.get("b", 10)).toBeUndefined();
// When the asked pubTime is less than the first existing pubTime we are not sure that
// this is the first vaa after that time, so we should return undefined.
expect(cache.get("a", 0)).toBeUndefined();
expect(cache.get("b", 1)).toBeUndefined();
expect(cache.get("b", 2)).toBeUndefined();
});
test("removeExpiredValues clears the old values", async () => {
jest.useFakeTimers();
// TTL of 500 seconds for the cache
const cache = new VaaCache(500);
cache.set("a", 300, "a-1");
cache.set("a", 700, "a-2");
cache.set("a", 900, "a-3");
expect(cache.get("a", 300)).toEqual<VaaConfig>({
publishTime: 300,
vaa: "a-1",
});
expect(cache.get("a", 500)).toEqual<VaaConfig>({
publishTime: 700,
vaa: "a-2",
});
// Set time to second 1000
jest.setSystemTime(1000 * 1000);
cache.removeExpiredValues();
expect(cache.get("a", 300)).toBeUndefined();
expect(cache.get("a", 500)).toBeUndefined();
});
test("the cache clean loop works", async () => {
jest.useFakeTimers();
// TTL of 500 seconds for the cache and cleanup of every 100 seconds
const cache = new VaaCache(500, 100);
cache.runRemoveExpiredValuesLoop();
cache.set("a", 300, "a-1");
cache.set("a", 700, "a-2");
cache.set("a", 900, "a-3");
expect(cache.get("a", 900)).toEqual<VaaConfig>({
publishTime: 900,
vaa: "a-3",
});
// Set time to second 2000. Everything should be evicted from cache now.
jest.setSystemTime(2000 * 1000);
jest.advanceTimersToNextTimer();
expect(cache.get("a", 900)).toBeUndefined();
});
});

View File

@ -1,10 +1,16 @@
import { HexString, Price, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import { Express } from "express";
import {
HexString,
Price,
PriceFeed,
PriceFeedMetadata,
} from "@pythnetwork/pyth-sdk-js";
import express, { Express } from "express";
import { StatusCodes } from "http-status-codes";
import request from "supertest";
import { PriceInfo, PriceStore, VaaCache } from "../listen";
import { PriceInfo, PriceStore, VaaCache, VaaConfig } from "../listen";
import { RestAPI } from "../rest";
let priceInfo: PriceStore;
let app: Express;
let priceInfoMap: Map<string, PriceInfo>;
let vaasCache: VaaCache;
@ -40,8 +46,8 @@ function dummyPriceInfoPair(
id,
{
priceFeed: dummyPriceFeed(id),
publishTime: 0,
attestationTime: 0,
publishTime: 1,
attestationTime: 2,
seqNum,
vaa: Buffer.from(vaa, "hex"),
emitterChainId: 0,
@ -57,14 +63,10 @@ beforeAll(async () => {
dummyPriceInfoPair(expandTo64Len("3456"), 2, "bad01bad"),
dummyPriceInfoPair(expandTo64Len("10101"), 3, "bidbidbid"),
]);
vaasCache = new VaaCache();
vaasCache.set(
expandTo64Len("abcd"),
1,
Buffer.from("a1b2c3d4", "hex").toString("base64")
);
const priceInfo: PriceStore = {
vaasCache = new VaaCache();
priceInfo = {
getLatestPriceInfo: (priceFeedId: string) => {
return priceInfoMap.get(priceFeedId);
},
@ -76,7 +78,6 @@ beforeAll(async () => {
};
const api = new RestAPI({ port: 8889 }, priceInfo, () => true);
app = await api.createApp();
});
@ -92,6 +93,48 @@ describe("Latest Price Feed Endpoint", () => {
expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson());
});
test("When called with valid ids with leading 0x, returns correct price feed", async () => {
const ids = [expandTo64Len("abcd"), expandTo64Len("3456")];
const resp = await request(app)
.get("/api/latest_price_feeds")
.query({
ids: ids.map((id) => "0x" + id), // Add 0x to the queries
});
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body.length).toBe(2);
// Please note that the response id is without 0x
expect(resp.body).toContainEqual(dummyPriceFeed(ids[0]).toJson());
expect(resp.body).toContainEqual(dummyPriceFeed(ids[1]).toJson());
});
test("When called with valid ids and verbose flag set to true, returns correct price feed with verbose information", async () => {
const ids = [expandTo64Len("abcd"), expandTo64Len("3456")];
const resp = await request(app)
.get("/api/latest_price_feeds")
.query({ ids, verbose: true });
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body.length).toBe(2);
expect(resp.body).toContainEqual({
...priceInfoMap.get(ids[0])!.priceFeed.toJson(),
metadata: new PriceFeedMetadata({
attestationTime: priceInfoMap.get(ids[0])!.attestationTime,
emitterChain: priceInfoMap.get(ids[0])!.emitterChainId,
receiveTime: priceInfoMap.get(ids[0])!.priceServiceReceiveTime,
sequenceNumber: priceInfoMap.get(ids[0])!.seqNum,
}).toJson(),
});
expect(resp.body).toContainEqual({
...priceInfoMap.get(ids[1])!.priceFeed.toJson(),
metadata: new PriceFeedMetadata({
attestationTime: priceInfoMap.get(ids[1])!.attestationTime,
emitterChain: priceInfoMap.get(ids[1])!.emitterChainId,
receiveTime: priceInfoMap.get(ids[1])!.priceServiceReceiveTime,
sequenceNumber: priceInfoMap.get(ids[1])!.seqNum,
}).toJson(),
});
});
test("When called with valid ids and binary flag set to true, returns correct price feed with binary vaa", async () => {
const ids = [expandTo64Len("abcd"), expandTo64Len("3456")];
const resp = await request(app)
@ -143,6 +186,29 @@ describe("Latest Vaa Bytes Endpoint", () => {
);
});
test("When called with valid ids with leading 0x, returns vaa bytes as array, merged if necessary", async () => {
const ids = [
expandTo64Len("abcd"),
expandTo64Len("ef01"),
expandTo64Len("3456"),
];
const resp = await request(app)
.get("/api/latest_vaas")
.query({
ids: ids.map((id) => "0x" + id), // Add 0x to the queries
});
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body.length).toBe(2);
expect(resp.body).toContain(
Buffer.from("a1b2c3d4", "hex").toString("base64")
);
expect(resp.body).toContain(
Buffer.from("bad01bad", "hex").toString("base64")
);
});
test("When called with some non-existent ids within ids, returns error mentioning non-existent ids", async () => {
const ids = [
expandTo64Len("ab01"),
@ -156,3 +222,251 @@ describe("Latest Vaa Bytes Endpoint", () => {
expect(resp.body.message).toContain(ids[2]);
});
});
describe("Get VAA endpoint and Get VAA CCIP", () => {
test("When called with valid id and timestamp in the cache returns the correct answer", async () => {
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(app).get("/api/get_vaa").query({
id,
publish_time: 16,
});
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body).toEqual<VaaConfig>({
vaa: "abcd20",
publishTime: 20,
});
const pubTime16AsHex64Bit = "0000000000000010";
const ccipResp = await request(app)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime16AsHex64Bit,
});
const pubTime20AsHex64Bit = "0000000000000014";
expect(ccipResp.status).toBe(StatusCodes.OK);
expect(ccipResp.body).toEqual({
data:
"0x" +
pubTime20AsHex64Bit +
Buffer.from("abcd20", "base64").toString("hex"),
});
});
test("When called with valid id with leading 0x and timestamp in the cache returns the correct answer", async () => {
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(app)
.get("/api/get_vaa")
.query({
id: "0x" + id,
publish_time: 16,
});
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body).toEqual<VaaConfig>({
vaa: "abcd20",
publishTime: 20,
});
});
test("When called with invalid id returns price id found", async () => {
// dead does not exist in the ids
const id = expandTo64Len("dead");
const resp = await request(app).get("/api/get_vaa").query({
id,
publish_time: 16,
});
expect(resp.status).toBe(StatusCodes.BAD_REQUEST);
expect(resp.body.message).toContain(id);
const pubTime16AsHex64Bit = "0000000000000010";
const ccipResp = await request(app)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime16AsHex64Bit,
});
expect(ccipResp.status).toBe(StatusCodes.BAD_REQUEST);
expect(ccipResp.body.message).toContain(id);
});
test("When called with valid id and timestamp not in the cache without db returns vaa not found", async () => {
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(app)
.get("/api/get_vaa")
.query({
id: "0x" + id,
publish_time: 5,
});
expect(resp.status).toBe(StatusCodes.NOT_FOUND);
const pubTime5AsHex64Bit = "0000000000000005";
const ccipResp = await request(app)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime5AsHex64Bit,
});
// On CCIP we expect bad gateway so the client want to retry other ccip endpoints.
expect(ccipResp.status).toBe(StatusCodes.BAD_GATEWAY);
});
test("When called with valid id and timestamp not in the cache with db returns ok", async () => {
const dbBackend = express();
dbBackend.get("/vaa", (req, res) => {
const priceId = req.query.id;
const pubTime = Number(req.query.publishTime);
const cluster = req.query.cluster;
res.json([
{
vaa: `${cluster}${priceId}${pubTime}`,
publishTime: new Date(pubTime * 1000).toISOString(),
},
]);
});
const dbApp = dbBackend.listen({ port: 37777 });
const apiWithDb = new RestAPI(
{
port: 8889,
dbApiCluster: "pythnet",
dbApiEndpoint: "http://localhost:37777",
},
priceInfo,
() => true
);
const appWithDb = await apiWithDb.createApp();
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(appWithDb)
.get("/api/get_vaa")
.query({
id: "0x" + id,
publish_time: 5,
});
expect(resp.status).toBe(StatusCodes.OK);
expect(resp.body).toEqual<VaaConfig>({
vaa: `pythnet${id}5`,
publishTime: 5,
});
const pubTime5AsHex64Bit = "0000000000000005";
const ccipResp = await request(appWithDb)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime5AsHex64Bit,
});
expect(ccipResp.status).toBe(StatusCodes.OK);
expect(ccipResp.body).toEqual({
data:
"0x" +
pubTime5AsHex64Bit +
Buffer.from(`pythnet${id}5`, "base64").toString("hex"),
});
dbApp.close();
});
test(
"When called with valid id and timestamp not in the cache" +
"and not in the db returns vaa not found",
async () => {
const dbBackend = express();
dbBackend.get("/vaa", (_req, res) => {
// Return an empty array when vaa is not there, this is the same
// behaviour as our api.
res.json([]);
});
const dbApp = dbBackend.listen({ port: 37777 });
const apiWithDb = new RestAPI(
{
port: 8889,
dbApiCluster: "pythnet",
dbApiEndpoint: "http://localhost:37777",
},
priceInfo,
() => true
);
const appWithDb = await apiWithDb.createApp();
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(appWithDb)
.get("/api/get_vaa")
.query({
id: "0x" + id,
publish_time: 5,
});
expect(resp.status).toBe(StatusCodes.NOT_FOUND);
const pubTime5AsHex64Bit = "0000000000000005";
const ccipResp = await request(appWithDb)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime5AsHex64Bit,
});
// On CCIP we expect bad gateway so the client want to retry other ccip endpoints.
expect(ccipResp.status).toBe(StatusCodes.BAD_GATEWAY);
dbApp.close();
}
);
test(
"When called with valid id and timestamp not in the cache" +
"and db is not available returns internal server error",
async () => {
const apiWithDb = new RestAPI(
{
port: 8889,
dbApiCluster: "pythnet",
dbApiEndpoint: "http://localhost:37777",
},
priceInfo,
() => true
);
const appWithDb = await apiWithDb.createApp();
const id = expandTo64Len("abcd");
vaasCache.set(id, 10, "abcd10");
vaasCache.set(id, 20, "abcd20");
vaasCache.set(id, 30, "abcd30");
const resp = await request(appWithDb)
.get("/api/get_vaa")
.query({
id: "0x" + id,
publish_time: 5,
});
expect(resp.status).toBe(StatusCodes.INTERNAL_SERVER_ERROR);
const pubTime5AsHex64Bit = "0000000000000005";
const ccipResp = await request(appWithDb)
.get("/api/get_vaa_ccip")
.query({
data: "0x" + id + pubTime5AsHex64Bit,
});
expect(ccipResp.status).toBe(StatusCodes.INTERNAL_SERVER_ERROR);
}
);
});

View File

@ -1,4 +1,9 @@
import { HexString, Price, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import {
HexString,
Price,
PriceFeed,
PriceFeedMetadata,
} from "@pythnetwork/pyth-sdk-js";
import { Server } from "http";
import { WebSocket, WebSocketServer } from "ws";
import { sleep } from "../helpers";
@ -12,39 +17,20 @@ let server: Server;
let wss: WebSocketServer;
let priceInfos: PriceInfo[];
let priceMetadata: any;
function expandTo64Len(id: string): string {
return id.repeat(64).substring(0, 64);
}
function dummyPriceMetadata(
attestationTime: number,
emitterChainId: number,
seqNum: number,
priceServiceReceiveTime: number
): any {
function dummyPriceInfo(id: HexString, vaa: HexString): PriceInfo {
return {
attestation_time: attestationTime,
emitter_chain: emitterChainId,
sequence_number: seqNum,
price_service_receive_time: priceServiceReceiveTime,
};
}
function dummyPriceInfo(
id: HexString,
vaa: HexString,
dummyPriceMetadataValue: any
): PriceInfo {
return {
seqNum: dummyPriceMetadataValue.sequence_number,
seqNum: 1,
publishTime: 0,
attestationTime: dummyPriceMetadataValue.attestation_time,
emitterChainId: dummyPriceMetadataValue.emitter_chain,
attestationTime: 2,
emitterChainId: 3,
priceFeed: dummyPriceFeed(id),
vaa: Buffer.from(vaa, "hex"),
priceServiceReceiveTime: dummyPriceMetadataValue.price_service_receive_time,
priceServiceReceiveTime: 4,
};
}
@ -96,12 +82,11 @@ async function createSocketClient(): Promise<[WebSocket, any[]]> {
}
beforeAll(async () => {
priceMetadata = dummyPriceMetadata(0, 0, 0, 0);
priceInfos = [
dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4", priceMetadata),
dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4", priceMetadata),
dummyPriceInfo(expandTo64Len("2345"), "bad01bad", priceMetadata),
dummyPriceInfo(expandTo64Len("6789"), "bidbidbid", priceMetadata),
dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4"),
dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4"),
dummyPriceInfo(expandTo64Len("2345"), "bad01bad"),
dummyPriceInfo(expandTo64Len("6789"), "bidbidbid"),
];
const priceInfo: PriceStore = {
@ -190,7 +175,12 @@ describe("Client receives data", () => {
type: "price_update",
price_feed: {
...priceInfos[0].priceFeed.toJson(),
metadata: priceMetadata,
metadata: new PriceFeedMetadata({
attestationTime: 2,
emitterChain: 3,
receiveTime: 4,
sequenceNumber: 1,
}).toJson(),
},
});
@ -202,7 +192,12 @@ describe("Client receives data", () => {
type: "price_update",
price_feed: {
...priceInfos[1].priceFeed.toJson(),
metadata: priceMetadata,
metadata: new PriceFeedMetadata({
attestationTime: 2,
emitterChain: 3,
receiveTime: 4,
sequenceNumber: 1,
}).toJson(),
},
});

View File

@ -62,14 +62,19 @@ export type VaaConfig = {
export class VaaCache {
private cache: Map<string, VaaConfig[]>;
private ttl: number;
private ttl: DurationInSec;
private cacheCleanupLoopInterval: DurationInSec;
constructor(ttl: DurationInSec = 300) {
constructor(
ttl: DurationInSec = 300,
cacheCleanupLoopInterval: DurationInSec = 60
) {
this.cache = new Map();
this.ttl = ttl;
this.cacheCleanupLoopInterval = cacheCleanupLoopInterval;
}
set(key: VaaKey, publishTime: number, vaa: string): void {
set(key: VaaKey, publishTime: TimestampInSec, vaa: string): void {
if (this.cache.has(key)) {
this.cache.get(key)!.push({ publishTime, vaa });
} else {
@ -77,7 +82,7 @@ export class VaaCache {
}
}
get(key: VaaKey, publishTime: number): VaaConfig | undefined {
get(key: VaaKey, publishTime: TimestampInSec): VaaConfig | undefined {
if (!this.cache.has(key)) {
return undefined;
} else {
@ -86,7 +91,10 @@ export class VaaCache {
}
}
find(arr: VaaConfig[], publishTime: number): VaaConfig | undefined {
private find(
arr: VaaConfig[],
publishTime: TimestampInSec
): VaaConfig | undefined {
// If the publishTime is less than the first element we are
// not sure that this VAA is actually the first VAA after that
// time.
@ -123,6 +131,13 @@ export class VaaCache {
);
}
}
runRemoveExpiredValuesLoop() {
setInterval(
this.removeExpiredValues.bind(this),
this.cacheCleanupLoopInterval * 1000
);
}
}
export class Listener implements PriceStore {
@ -136,7 +151,6 @@ export class Listener implements PriceStore {
private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
private observedVaas: LRUCache<VaaKey, boolean>;
private vaasCache: VaaCache;
private cacheCleanupLoopInterval: DurationInSec;
constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient;
@ -148,8 +162,10 @@ export class Listener implements PriceStore {
max: 10000, // At most 10000 items
ttl: 60 * 1000, // 60 seconds
});
this.vaasCache = new VaaCache(config.cacheTtl);
this.cacheCleanupLoopInterval = config.cacheCleanupLoopInterval ?? 60;
this.vaasCache = new VaaCache(
config.cacheTtl,
config.cacheCleanupLoopInterval
);
}
private loadFilters(filtersRaw?: string) {
@ -188,10 +204,7 @@ export class Listener implements PriceStore {
this.spyServiceHost
);
setInterval(
this.vaasCache.removeExpiredValues.bind(this.vaasCache),
this.cacheCleanupLoopInterval * 1000
);
this.vaasCache.runRemoveExpiredValuesLoop();
while (true) {
let stream: ClientReadableStream<SubscribeSignedVAAResponse> | undefined;

View File

@ -26,7 +26,7 @@ spec:
matchLabels:
app: pyth-price-service
serviceName: pyth-price-service
replicas: 2
replicas: 1
template:
metadata:
labels:
@ -76,3 +76,14 @@ spec:
value: "60"
- name: CACHE_TTL_SECONDS
value: "300"
- name: tests
image: pyth-price-service
command:
- /bin/sh
- -c
- "npm run test && nc -lkp 2358 0.0.0.0"
readinessProbe:
periodSeconds: 5
failureThreshold: 300
tcpSocket:
port: 2358