diff --git a/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts b/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts new file mode 100644 index 00000000..d4f2f248 --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts @@ -0,0 +1,27 @@ +import fs from "fs"; +import { MetadataRepository } from "../../domain/repositories"; + +export class FileMetadataRepo implements MetadataRepository { + private readonly dirPath: string; + + constructor(dirPath: string) { + this.dirPath = dirPath; + if (!fs.existsSync(this.dirPath)) { + fs.mkdirSync(this.dirPath, { recursive: true }); + } + } + + async get(id: string): Promise { + const filePath = `${this.dirPath}/${id}.json`; + + return fs.promises + .readFile(filePath, "utf8") + .then(JSON.parse) + .catch((err) => null); + } + + async save(id: string, metadata: any): Promise { + const filePath = `${this.dirPath}/${id}.json`; + return fs.promises.writeFile(filePath, JSON.stringify(metadata), "utf8"); + } +} diff --git a/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts b/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts new file mode 100644 index 00000000..8b1f6e4d --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts @@ -0,0 +1,130 @@ +import axios, { AxiosError, AxiosInstance } from "axios"; +import { setTimeout } from "timers/promises"; + +/** + * A simple HTTP client with exponential backoff retries and 429 handling. + */ +export class HttpClient { + private initialDelay: number = 1_000; + private maxDelay: number = 60_000; + private retries: number = 0; + private timeout: number = 5_000; + private axios: AxiosInstance; + + constructor(options?: HttpClientOptions) { + options?.initialDelay && (this.initialDelay = options.initialDelay); + options?.maxDelay && (this.maxDelay = options.maxDelay); + options?.retries && (this.retries = options.retries); + options?.timeout && (this.timeout = options.timeout); + this.axios = axios.create(); + } + + public async post(url: string, body: any, opts?: HttpClientOptions): Promise { + return this.executeWithRetry(url, "POST", body, opts); + } + + private async execute( + url: string, + method: string, + body?: any, + opts?: HttpClientOptions + ): Promise { + let response; + try { + response = await this.axios.request({ + url: url, + method: method, + data: body, + timeout: opts?.timeout ?? this.timeout, + signal: AbortSignal.timeout(opts?.timeout ?? this.timeout), + }); + } catch (err: AxiosError | any) { + // Connection / timeout error: + if (err instanceof AxiosError) { + throw new HttpClientError(err.message ?? err.code, { status: err?.status ?? 0 }, err); + } + + throw new HttpClientError(err.message ?? err.code, undefined, err); + } + + if (!(response.status > 200) && !(response.status < 300)) { + throw new HttpClientError(undefined, response, response.data); + } + + return response.data; + } + + private async executeWithRetry( + url: string, + method: string, + body?: any, + opts?: HttpClientOptions + ): Promise { + const maxRetries = opts?.retries ?? this.retries; + let retries = 0; + const initialDelay = opts?.initialDelay ?? this.initialDelay; + const maxDelay = opts?.maxDelay ?? this.maxDelay; + while (maxRetries >= 0) { + try { + return await this.execute(url, method, body, opts); + } catch (err) { + if (err instanceof HttpClientError) { + if (retries < maxRetries) { + const retryAfter = err.getRetryAfter(maxDelay, err); + if (retryAfter) { + await setTimeout(retryAfter, { ref: false }); + } else { + const timeout = Math.min(initialDelay * 2 ** maxRetries, maxDelay); + await setTimeout(timeout, { ref: false }); + } + retries++; + continue; + } + } + throw err; + } + } + + throw new Error(`Failed to reach ${url}`); + } +} + +export type HttpClientOptions = { + initialDelay?: number; + maxDelay?: number; + retries?: number; + timeout?: number; +}; + +export class HttpClientError extends Error { + public readonly status?: number; + public readonly data?: any; + public readonly headers?: any; + + constructor(message?: string, response?: { status: number; headers?: any }, data?: any) { + super(message ?? `Unexpected status code: ${response?.status}`); + this.status = response?.status; + this.data = data; + this.headers = response?.headers; + Error.captureStackTrace(this, this.constructor); + } + + /** + * Parses the Retry-After header and returns the value in milliseconds. + * @param maxDelay + * @param error + * @throws {HttpClientError} if retry-after is bigger than maxDelay. + * @returns the retry-after value in milliseconds. + */ + public getRetryAfter(maxDelay: number, error: HttpClientError): number | undefined { + const retryAfter = this.headers?.get("Retry-After"); + if (retryAfter) { + const value = parseInt(retryAfter) * 1000; // header value is in seconds + if (value <= maxDelay) { + return value; + } + + throw error; + } + } +} diff --git a/blockchain-watcher/test/domain/HandleEvmLogs.test.ts b/blockchain-watcher/test/domain/HandleEvmLogs.test.ts new file mode 100644 index 00000000..5ea99017 --- /dev/null +++ b/blockchain-watcher/test/domain/HandleEvmLogs.test.ts @@ -0,0 +1,98 @@ +import { afterEach, describe, it, expect, jest } from "@jest/globals"; +import { HandleEvmLogs, HandleEvmLogsConfig } from "../../src/domain/actions/HandleEvmLogs"; +import { EvmLog, LogFoundEvent } from "../../src/domain/entities"; + +const ABI = + "event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)"; +const mapper = (log: EvmLog, args: ReadonlyArray) => { + return { + name: "send-event", + address: log.address, + chainId: 1, + txHash: "0x0", + blockHeight: 0n, + blockTime: 0, + attributes: { + sequence: args[0].toString(), + deliveryQuote: args[1].toString(), + paymentForExtraReceiverValue: args[2].toString(), + }, + }; +}; +const targetRepo = { + save: async (events: LogFoundEvent>[]) => { + Promise.resolve(); + }, + failingSave: async (events: LogFoundEvent>[]) => { + Promise.reject(); + }, +}; + +let targetRepoSpy: jest.SpiedFunction<(typeof targetRepo)["save"]>; + +let evmLogs: EvmLog[]; +let cfg: HandleEvmLogsConfig; +let handleEvmLogs: HandleEvmLogs>>; + +describe("HandleEvmLogs", () => { + afterEach(async () => {}); + + it("should be able to map logs", async () => { + const expectedLength = 5; + givenConfig(ABI); + givenEvmLogs(expectedLength, expectedLength); + givenHandleEvmLogs(); + + const result = await handleEvmLogs.handle(evmLogs); + + expect(result).toHaveLength(expectedLength); + expect(result[0].attributes.sequence).toBe("3389"); + expect(result[0].attributes.deliveryQuote).toBe("75150000000000000"); + expect(result[0].attributes.paymentForExtraReceiverValue).toBe("0"); + expect(targetRepoSpy).toBeCalledWith(result); + }); +}); + +const givenHandleEvmLogs = (targetFn: "save" | "failingSave" = "save") => { + targetRepoSpy = jest.spyOn(targetRepo, targetFn); + handleEvmLogs = new HandleEvmLogs(cfg, mapper, targetRepo[targetFn]); +}; + +const givenConfig = (abi: string) => { + cfg = { + filter: { + addresses: ["0x28D8F1Be96f97C1387e94A53e00eCcFb4E75175a"], + topics: ["0xda8540426b64ece7b164a9dce95448765f0a7263ef3ff85091c9c7361e485364"], + }, + abi, + }; +}; + +const givenEvmLogs = (length: number, matchingFilterOnes: number) => { + evmLogs = []; + let matchingCount = 0; + for (let i = 0; i < length; i++) { + let address = "0x392f472048681816e91026cd768c60958b55352add2837adea9ea6249178b8a8"; + let topic: string | undefined = undefined; + if (matchingCount < matchingFilterOnes) { + address = cfg.filter.addresses![0].toUpperCase(); + topic = cfg.filter.topics![0]; + matchingCount++; + } + + evmLogs.push({ + blockTime: 0, + blockNumber: BigInt(i + 1), + blockHash: "0x1a07d0bd31c84f0dab36eac31a2f3aa801852bf8240ffba19113c62463f694fa", + address: address, + removed: false, + data: "0x000000000000000000000000000000000000000000000000010afc86dedee0000000000000000000000000000000000000000000000000000000000000000000", + transactionHash: "0x2077dbd0c685c264dfa4e8e048ff15b03775043070216644258bf1bd3e419aa8", + transactionIndex: "0x4", + topics: topic + ? [topic, "0x0000000000000000000000000000000000000000000000000000000000000d3d"] + : [], + logIndex: 0, + }); + } +}; diff --git a/blockchain-watcher/test/domain/PollEvmLogs.test.ts b/blockchain-watcher/test/domain/PollEvmLogs.test.ts new file mode 100644 index 00000000..f9fd0c08 --- /dev/null +++ b/blockchain-watcher/test/domain/PollEvmLogs.test.ts @@ -0,0 +1,180 @@ +import { afterEach, describe, it, expect, jest } from "@jest/globals"; +import { setTimeout } from "timers/promises"; +import { + PollEvmLogsMetadata, + PollEvmLogs, + PollEvmLogsConfig, +} from "../../src/domain/actions/PollEvmLogs"; +import { + EvmBlockRepository, + MetadataRepository, + StatRepository, +} from "../../src/domain/repositories"; +import { EvmBlock, EvmLog } from "../../src/domain/entities"; + +let cfg = PollEvmLogsConfig.fromBlock(0n); + +let getBlocksSpy: jest.SpiedFunction; +let getLogsSpy: jest.SpiedFunction; +let handlerSpy: jest.SpiedFunction<(logs: EvmLog[]) => Promise>; +let metadataSaveSpy: jest.SpiedFunction["save"]>; + +let metadataRepo: MetadataRepository; +let evmBlockRepo: EvmBlockRepository; +let statsRepo: StatRepository; + +let handlers = { + working: (logs: EvmLog[]) => Promise.resolve(), + failing: (logs: EvmLog[]) => Promise.reject(), +}; +let pollEvmLogs: PollEvmLogs; + +describe("PollEvmLogs", () => { + afterEach(async () => { + await pollEvmLogs.stop(); + }); + + it("should be able to read logs from latest block when no fromBlock is configured", async () => { + const currentHeight = 10n; + const blocksAhead = 1n; + givenEvmBlockRepository(currentHeight, blocksAhead); + givenMetadataRepository(); + givenStatsRepository(); + givenPollEvmLogs(); + + await whenPollEvmLogsStarts(); + + await thenWaitForAssertion( + () => expect(getBlocksSpy).toHaveReturnedTimes(1), + () => expect(getBlocksSpy).toHaveBeenCalledWith(new Set([currentHeight, currentHeight + 1n])), + () => + expect(getLogsSpy).toBeCalledWith({ + addresses: cfg.addresses, + topics: cfg.topics, + fromBlock: currentHeight + blocksAhead, + toBlock: currentHeight + blocksAhead, + }) + ); + }); + + it("should be able to read logs from last known block when configured from is before", async () => { + const lastExtractedBlock = 10n; + const blocksAhead = 10n; + givenEvmBlockRepository(lastExtractedBlock, blocksAhead); + givenMetadataRepository({ lastBlock: lastExtractedBlock }); + givenStatsRepository(); + givenPollEvmLogs(lastExtractedBlock - 10n); + + await whenPollEvmLogsStarts(); + + await thenWaitForAssertion( + () => () => + expect(getBlocksSpy).toHaveBeenCalledWith( + new Set([lastExtractedBlock, lastExtractedBlock + 1n]) + ), + () => + expect(getLogsSpy).toBeCalledWith({ + addresses: cfg.addresses, + topics: cfg.topics, + fromBlock: lastExtractedBlock + 1n, + toBlock: lastExtractedBlock + blocksAhead, + }) + ); + }); + + it("should pass logs to handlers and persist metadata", async () => { + const currentHeight = 10n; + const blocksAhead = 1n; + givenEvmBlockRepository(currentHeight, blocksAhead); + givenMetadataRepository(); + givenStatsRepository(); + givenPollEvmLogs(currentHeight); + + await whenPollEvmLogsStarts(); + + await thenWaitForAssertion( + () => expect(handlerSpy).toHaveBeenCalledWith(expect.any(Array)), + () => + expect(metadataSaveSpy).toBeCalledWith("watch-evm-logs", { + lastBlock: currentHeight + blocksAhead, + }) + ); + }); +}); + +const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => { + const logsResponse: EvmLog[] = []; + const blocksResponse: Record = {}; + if (height) { + for (let index = 0n; index <= (blocksAhead ?? 1n); index++) { + logsResponse.push({ + blockNumber: height + index, + blockHash: `0x0${index}`, + blockTime: 0, + address: "", + removed: false, + data: "", + transactionHash: "", + transactionIndex: "", + topics: [], + logIndex: 0, + }); + blocksResponse[`0x0${index}`] = { + timestamp: 0, + hash: `0x0${index}`, + number: height + index, + }; + } + } + + evmBlockRepo = { + getBlocks: () => Promise.resolve(blocksResponse), + getBlockHeight: () => Promise.resolve(height ? height + (blocksAhead ?? 10n) : 10n), + getFilteredLogs: () => Promise.resolve(logsResponse), + }; + + getBlocksSpy = jest.spyOn(evmBlockRepo, "getBlocks"); + getLogsSpy = jest.spyOn(evmBlockRepo, "getFilteredLogs"); + handlerSpy = jest.spyOn(handlers, "working"); +}; + +const givenMetadataRepository = (data?: PollEvmLogsMetadata) => { + metadataRepo = { + get: () => Promise.resolve(data), + save: () => Promise.resolve(), + }; + metadataSaveSpy = jest.spyOn(metadataRepo, "save"); +}; + +const givenStatsRepository = () => { + statsRepo = { + count: () => {}, + measure: () => {}, + report: () => Promise.resolve(""), + }; +}; + +const givenPollEvmLogs = (from?: bigint) => { + cfg.setFromBlock(from); + pollEvmLogs = new PollEvmLogs(evmBlockRepo, metadataRepo, statsRepo, cfg); +}; + +const whenPollEvmLogsStarts = async () => { + pollEvmLogs.run([handlers.working]); +}; + +const thenWaitForAssertion = async (...assertions: (() => void)[]) => { + for (let index = 1; index < 5; index++) { + try { + for (const assertion of assertions) { + assertion(); + } + break; + } catch (error) { + if (index === 4) { + throw error; + } + await setTimeout(10, undefined, { ref: false }); + } + } +}; diff --git a/deploy/blockchain-watcher/env/staging-testnet.env b/deploy/blockchain-watcher/env/staging-testnet.env index ad445672..efa4e7e4 100644 --- a/deploy/blockchain-watcher/env/staging-testnet.env +++ b/deploy/blockchain-watcher/env/staging-testnet.env @@ -18,4 +18,4 @@ SNS_TOPIC_ARN= SNS_REGION= SOLANA_RPCS='[""]' -AWS_IAM_ROLE= \ No newline at end of file +AWS_IAM_ROLE=