From 41bc5abe882dc4378b0459dfa48f56f99f8902c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C3=ADas=20Mart=C3=ADnez?= <131624652+mat1asm@users.noreply.github.com> Date: Thu, 16 Nov 2023 12:39:00 -0300 Subject: [PATCH] [Blockchain Watcher] Add StartJob action + rate limit handling (#799) * using configured default winston logger * wait for new block * adding basic metrics * staging mainnet config * send hexa block number * simpler log format * group entities in different files * grouping watchers * adding 429 handling http client * abstracting polling iteration * Load jobs dinamically * changing deployment strategy to jobs per pod --- .../config/custom-environment-variables.json | 3 + blockchain-watcher/config/default.json | 3 + blockchain-watcher/package-lock.json | 15 -- blockchain-watcher/package.json | 1 - .../src/domain/actions/PollEvmLogs.ts | 105 ++++++------ .../src/domain/actions/RunPollingJob.ts | 48 ++++++ .../src/domain/actions/StartJobs.ts | 41 +++++ .../src/domain/actions/index.ts | 2 + .../src/domain/entities/events.ts | 17 ++ .../domain/{entities.ts => entities/evm.ts} | 18 -- .../src/domain/entities/index.ts | 3 + .../src/domain/entities/jobs.ts | 28 ++++ blockchain-watcher/src/domain/repositories.ts | 9 +- blockchain-watcher/src/index.ts | 2 +- .../src/infrastructure/RepositoriesBuilder.ts | 47 ++++-- .../src/infrastructure/config.ts | 6 + blockchain-watcher/src/infrastructure/log.ts | 6 +- .../repositories/EvmJsonRPCBlockRepository.ts | 84 ++++------ .../repositories/FileMetadataRepo.ts | 6 +- .../infrastructure/repositories/HttpClient.ts | 130 ++++++++++++++ .../repositories/SnsEventRepository.ts | 11 ++ .../repositories/StaticJobRepository.ts | 120 +++++++++++++ .../src/infrastructure/repositories/index.ts | 1 + .../src/infrastructure/rpc/Server.ts | 2 +- .../watchers/AbstractWatcher.ts | 2 +- .../src/infrastructure/watchers/EvmWatcher.ts | 2 +- .../{ => watchers}/environment.ts | 4 +- .../handlers/AbstractHandler.ts | 0 .../handlers/LogMessagePublishedHandler.ts | 0 .../handlers/deliveryEventHandler.ts | 0 .../handlers/sendEventHandler.ts | 0 .../{ => watchers}/utils/websocket.ts | 0 blockchain-watcher/src/start.ts | 75 +-------- .../test/domain/PollEvmLogs.test.ts | 2 +- ...JsonRPCBlockRepository.integration.test.ts | 4 +- .../repositories/StaticJobRepository.test.ts | 77 +++++++++ .../env/staging-mainnet.env | 4 +- .../env/staging-testnet.env | 6 +- deploy/blockchain-watcher/metadata-pv.yaml | 27 --- deploy/blockchain-watcher/sa.yaml | 7 + deploy/blockchain-watcher/stateful-set.yaml | 71 -------- .../blockchain-watcher/workers/ethereum.yaml | 158 ++++++++++++++++++ 42 files changed, 812 insertions(+), 335 deletions(-) create mode 100644 blockchain-watcher/src/domain/actions/RunPollingJob.ts create mode 100644 blockchain-watcher/src/domain/actions/StartJobs.ts create mode 100644 blockchain-watcher/src/domain/entities/events.ts rename blockchain-watcher/src/domain/{entities.ts => entities/evm.ts} (66%) create mode 100644 blockchain-watcher/src/domain/entities/index.ts create mode 100644 blockchain-watcher/src/domain/entities/jobs.ts create mode 100644 blockchain-watcher/src/infrastructure/repositories/HttpClient.ts create mode 100644 blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts rename blockchain-watcher/src/infrastructure/{ => watchers}/environment.ts (97%) rename blockchain-watcher/src/infrastructure/{ => watchers}/handlers/AbstractHandler.ts (100%) rename blockchain-watcher/src/infrastructure/{ => watchers}/handlers/LogMessagePublishedHandler.ts (100%) rename blockchain-watcher/src/infrastructure/{ => watchers}/handlers/deliveryEventHandler.ts (100%) rename blockchain-watcher/src/infrastructure/{ => watchers}/handlers/sendEventHandler.ts (100%) rename blockchain-watcher/src/infrastructure/{ => watchers}/utils/websocket.ts (100%) create mode 100644 blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts delete mode 100644 deploy/blockchain-watcher/metadata-pv.yaml create mode 100644 deploy/blockchain-watcher/sa.yaml delete mode 100644 deploy/blockchain-watcher/stateful-set.yaml create mode 100644 deploy/blockchain-watcher/workers/ethereum.yaml diff --git a/blockchain-watcher/config/custom-environment-variables.json b/blockchain-watcher/config/custom-environment-variables.json index 52f7d480..afb60088 100644 --- a/blockchain-watcher/config/custom-environment-variables.json +++ b/blockchain-watcher/config/custom-environment-variables.json @@ -3,6 +3,9 @@ "port": "PORT", "logLevel": "LOG_LEVEL", "dryRun": "DRY_RUN_ENABLED", + "jobs": { + "dir": "JOBS_DIR" + }, "sns": { "topicArn": "SNS_TOPIC_ARN", "region": "SNS_REGION" diff --git a/blockchain-watcher/config/default.json b/blockchain-watcher/config/default.json index 8cb2816b..fa7377e8 100644 --- a/blockchain-watcher/config/default.json +++ b/blockchain-watcher/config/default.json @@ -13,6 +13,9 @@ "metadata": { "dir": "metadata-repo" }, + "jobs": { + "dir": "metadata-repo/jobs" + }, "platforms": { "ethereum": { "name": "ethereum", diff --git a/blockchain-watcher/package-lock.json b/blockchain-watcher/package-lock.json index f6069c63..a91be9e4 100644 --- a/blockchain-watcher/package-lock.json +++ b/blockchain-watcher/package-lock.json @@ -13,7 +13,6 @@ "@certusone/wormhole-sdk": "^0.9.21-beta.0", "@types/config": "^3.3.3", "axios": "^1.6.0", - "axios-rate-limit": "^1.3.0", "config": "^3.3.9", "dotenv": "^16.3.1", "ethers": "^5", @@ -5329,14 +5328,6 @@ "proxy-from-env": "^1.1.0" } }, - "node_modules/axios-rate-limit": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/axios-rate-limit/-/axios-rate-limit-1.3.0.tgz", - "integrity": "sha512-cKR5wTbU/CeeyF1xVl5hl6FlYsmzDVqxlN4rGtfO5x7J83UxKDckudsW0yW21/ZJRcO0Qrfm3fUFbhEbWTLayw==", - "peerDependencies": { - "axios": "*" - } - }, "node_modules/babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -15695,12 +15686,6 @@ "proxy-from-env": "^1.1.0" } }, - "axios-rate-limit": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/axios-rate-limit/-/axios-rate-limit-1.3.0.tgz", - "integrity": "sha512-cKR5wTbU/CeeyF1xVl5hl6FlYsmzDVqxlN4rGtfO5x7J83UxKDckudsW0yW21/ZJRcO0Qrfm3fUFbhEbWTLayw==", - "requires": {} - }, "babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", diff --git a/blockchain-watcher/package.json b/blockchain-watcher/package.json index 706d8f71..0bd3e4ff 100644 --- a/blockchain-watcher/package.json +++ b/blockchain-watcher/package.json @@ -18,7 +18,6 @@ "@certusone/wormhole-sdk": "^0.9.21-beta.0", "@types/config": "^3.3.3", "axios": "^1.6.0", - "axios-rate-limit": "^1.3.0", "config": "^3.3.9", "dotenv": "^16.3.1", "ethers": "^5", diff --git a/blockchain-watcher/src/domain/actions/PollEvmLogs.ts b/blockchain-watcher/src/domain/actions/PollEvmLogs.ts index 09d412ec..9adfbd16 100644 --- a/blockchain-watcher/src/domain/actions/PollEvmLogs.ts +++ b/blockchain-watcher/src/domain/actions/PollEvmLogs.ts @@ -1,16 +1,15 @@ import { EvmLog } from "../entities"; import { EvmBlockRepository, MetadataRepository, StatRepository } from "../repositories"; -import { setTimeout } from "timers/promises"; import winston from "winston"; +import { RunPollingJob } from "./RunPollingJob"; const ID = "watch-evm-logs"; -let ref: any; /** * PollEvmLogs is an action that watches for new blocks and extracts logs from them. */ -export class PollEvmLogs { - private readonly logger: winston.Logger = winston.child({ module: "PollEvmLogs" }); +export class PollEvmLogs extends RunPollingJob { + protected readonly logger: winston.Logger; private readonly blockRepo: EvmBlockRepository; private readonly metadataRepo: MetadataRepository; @@ -19,7 +18,7 @@ export class PollEvmLogs { private latestBlockHeight?: bigint; private blockHeightCursor?: bigint; - private started: boolean = false; + private lastRange?: { fromBlock: bigint; toBlock: bigint }; constructor( blockRepo: EvmBlockRepository, @@ -27,64 +26,67 @@ export class PollEvmLogs { statsRepository: StatRepository, cfg: PollEvmLogsConfig ) { + super(cfg.interval ?? 1_000); this.blockRepo = blockRepo; this.metadataRepo = metadataRepo; this.statsRepository = statsRepository; this.cfg = cfg; + this.logger = winston.child({ module: "PollEvmLogs", label: this.cfg.id }); } - public async start(handlers: ((logs: EvmLog[]) => Promise)[]): Promise { + protected async preHook(): Promise { const metadata = await this.metadataRepo.get(this.cfg.id); if (metadata) { this.blockHeightCursor = BigInt(metadata.lastBlock); } - - this.started = true; - this.watch(handlers); } - private async watch(handlers: ((logs: EvmLog[]) => Promise)[]): Promise { - while (this.started) { - this.report(); - if (this.cfg.hasFinished(this.blockHeightCursor)) { - this.logger.info( - `PollEvmLogs: (${this.cfg.id}) Finished processing all blocks from ${this.cfg.fromBlock} to ${this.cfg.toBlock}` - ); - await this.stop(); - break; - } + protected async hasNext(): Promise { + const hasFinished = this.cfg.hasFinished(this.blockHeightCursor); + if (hasFinished) { + this.logger.info( + `PollEvmLogs: (${this.cfg.id}) Finished processing all blocks from ${this.cfg.fromBlock} to ${this.cfg.toBlock}` + ); + } - this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment()); + return !hasFinished; + } - const range = this.getBlockRange(this.latestBlockHeight); + protected async get(): Promise { + this.report(); - if (range.fromBlock > this.latestBlockHeight) { - this.logger.info(`Next range is after latest block height, waiting...`); - ref = await setTimeout(this.cfg.interval ?? 1_000, undefined); - continue; - } + this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment()); - const logs = await this.blockRepo.getFilteredLogs({ - fromBlock: range.fromBlock, - toBlock: range.toBlock, - addresses: this.cfg.addresses, // Works when sending multiple addresses, but not multiple topics. - topics: [], // this.cfg.topics => will be applied by handlers - }); + const range = this.getBlockRange(this.latestBlockHeight); - const blockNumbers = new Set(logs.map((log) => log.blockNumber)); - const blocks = await this.blockRepo.getBlocks(blockNumbers); - logs.forEach((log) => { - const block = blocks[log.blockHash]; - log.blockTime = block.timestamp; - }); + if (range.fromBlock > this.latestBlockHeight) { + this.logger.info(`Next range is after latest block height, waiting...`); + return []; + } - // TODO: add error handling. - await Promise.all(handlers.map((handler) => handler(logs))); + const logs = await this.blockRepo.getFilteredLogs({ + fromBlock: range.fromBlock, + toBlock: range.toBlock, + addresses: this.cfg.addresses, // Works when sending multiple addresses, but not multiple topics. + topics: [], // this.cfg.topics => will be applied by handlers + }); - await this.metadataRepo.save(this.cfg.id, { lastBlock: range.toBlock }); - this.blockHeightCursor = range.toBlock; + const blockNumbers = new Set(logs.map((log) => log.blockNumber)); + const blocks = await this.blockRepo.getBlocks(blockNumbers); + logs.forEach((log) => { + const block = blocks[log.blockHash]; + log.blockTime = block.timestamp; + }); - ref = await setTimeout(this.cfg.interval ?? 1_000, undefined); + this.lastRange = range; + + return logs; + } + + protected async persist(): Promise { + this.blockHeightCursor = this.lastRange?.toBlock ?? this.blockHeightCursor; + if (this.blockHeightCursor) { + await this.metadataRepo.save(this.cfg.id, { lastBlock: this.blockHeightCursor }); } } @@ -109,7 +111,7 @@ export class PollEvmLogs { fromBlock = this.cfg.fromBlock; } - let toBlock = fromBlock + BigInt(this.cfg.getBlockBatchSize()); + let toBlock = BigInt(fromBlock) + BigInt(this.cfg.getBlockBatchSize()); // limit toBlock to obtained block height if (toBlock > fromBlock && toBlock > latestBlockHeight) { toBlock = latestBlockHeight; @@ -132,11 +134,6 @@ export class PollEvmLogs { this.statsRepository.measure("block_height", this.latestBlockHeight ?? 0n, labels); this.statsRepository.measure("block_cursor", this.blockHeightCursor ?? 0n, labels); } - - public async stop(): Promise { - clearTimeout(ref); - this.started = false; - } } export type PollEvmLogsMetadata = { @@ -174,12 +171,16 @@ export class PollEvmLogsConfig { return this.props.commitment ?? "latest"; } - public hasFinished(currentFromBlock?: bigint) { - return currentFromBlock && this.props.toBlock && currentFromBlock >= this.props.toBlock; + public hasFinished(currentFromBlock?: bigint): boolean { + return ( + currentFromBlock != undefined && + this.props.toBlock != undefined && + currentFromBlock >= this.props.toBlock + ); } public get fromBlock() { - return this.props.fromBlock; + return this.props.fromBlock ? BigInt(this.props.fromBlock) : undefined; } public setFromBlock(fromBlock: bigint | undefined) { diff --git a/blockchain-watcher/src/domain/actions/RunPollingJob.ts b/blockchain-watcher/src/domain/actions/RunPollingJob.ts new file mode 100644 index 00000000..5b884e75 --- /dev/null +++ b/blockchain-watcher/src/domain/actions/RunPollingJob.ts @@ -0,0 +1,48 @@ +import { setTimeout } from "timers/promises"; +import winston from "winston"; +import { Handler } from "../entities"; + +export abstract class RunPollingJob { + private interval: number; + private running: boolean = false; + protected abstract logger: winston.Logger; + protected abstract preHook(): Promise; + protected abstract hasNext(): Promise; + protected abstract get(): Promise; + protected abstract persist(): Promise; + + constructor(interval: number) { + this.interval = interval; + this.running = true; + } + + public async run(handlers: Handler[]): Promise { + this.logger.info("Starting polling job"); + await this.preHook(); + while (this.running) { + if (!(await this.hasNext())) { + this.logger.info("Finished processing"); + await this.stop(); + break; + } + + let items: any[]; + + try { + items = await this.get(); + await Promise.all(handlers.map((handler) => handler(items))); + } catch (e) { + this.logger.error("Error processing items", e); + await setTimeout(this.interval); + continue; + } + + await this.persist(); + await setTimeout(this.interval); + } + } + + public async stop(): Promise { + this.running = false; + } +} diff --git a/blockchain-watcher/src/domain/actions/StartJobs.ts b/blockchain-watcher/src/domain/actions/StartJobs.ts new file mode 100644 index 00000000..586fe244 --- /dev/null +++ b/blockchain-watcher/src/domain/actions/StartJobs.ts @@ -0,0 +1,41 @@ +import winston from "winston"; +import { JobDefinition } from "../entities"; +import { JobRepository } from "../repositories"; + +export class StartJobs { + private readonly logger = winston.child({ module: "StartJobs" }); + private readonly repo: JobRepository; + private runnables: Map Promise> = new Map(); + + constructor(repo: JobRepository) { + this.repo = repo; + } + + public async runSingle(job: JobDefinition): Promise { + if (this.runnables.has(job.id)) { + throw new Error(`Job ${job.id} already exists. Ids must be unique`); + } + + const handlers = await this.repo.getHandlers(job); + if (handlers.length === 0) { + this.logger.error(`No handlers for job ${job.id}`); + throw new Error("No handlers for job"); + } + + const source = this.repo.getSource(job); + + this.runnables.set(job.id, () => source.run(handlers)); + this.runnables.get(job.id)!(); + + return job; + } + + public async run(): Promise { + const jobs = await this.repo.getJobDefinitions(); + for (const job of jobs) { + await this.runSingle(job); + } + + return jobs; + } +} diff --git a/blockchain-watcher/src/domain/actions/index.ts b/blockchain-watcher/src/domain/actions/index.ts index 2b23f913..a82de1f9 100644 --- a/blockchain-watcher/src/domain/actions/index.ts +++ b/blockchain-watcher/src/domain/actions/index.ts @@ -1,2 +1,4 @@ export * from "./HandleEvmLogs"; export * from "./PollEvmLogs"; +export * from "./RunPollingJob"; +export * from "./StartJobs"; diff --git a/blockchain-watcher/src/domain/entities/events.ts b/blockchain-watcher/src/domain/entities/events.ts new file mode 100644 index 00000000..ec1c2d7e --- /dev/null +++ b/blockchain-watcher/src/domain/entities/events.ts @@ -0,0 +1,17 @@ +export type LogFoundEvent = { + name: string; + address: string; + chainId: number; + txHash: string; + blockHeight: bigint; + blockTime: number; + attributes: T; +}; + +export type LogMessagePublished = { + sequence: number; + sender: string; + nonce: number; + payload: string; + consistencyLevel: number; +}; diff --git a/blockchain-watcher/src/domain/entities.ts b/blockchain-watcher/src/domain/entities/evm.ts similarity index 66% rename from blockchain-watcher/src/domain/entities.ts rename to blockchain-watcher/src/domain/entities/evm.ts index e0895c18..ef318cfa 100644 --- a/blockchain-watcher/src/domain/entities.ts +++ b/blockchain-watcher/src/domain/entities/evm.ts @@ -30,21 +30,3 @@ export type EvmLogFilter = { addresses: string[]; topics: string[]; }; - -export type LogFoundEvent = { - name: string; - address: string; - chainId: number; - txHash: string; - blockHeight: bigint; - blockTime: number; - attributes: T; -}; - -export type LogMessagePublished = { - sequence: number; - sender: string; - nonce: number; - payload: string; - consistencyLevel: number; -}; diff --git a/blockchain-watcher/src/domain/entities/index.ts b/blockchain-watcher/src/domain/entities/index.ts new file mode 100644 index 00000000..7e93b44c --- /dev/null +++ b/blockchain-watcher/src/domain/entities/index.ts @@ -0,0 +1,3 @@ +export * from "./evm"; +export * from "./events"; +export * from "./jobs"; diff --git a/blockchain-watcher/src/domain/entities/jobs.ts b/blockchain-watcher/src/domain/entities/jobs.ts new file mode 100644 index 00000000..54549a66 --- /dev/null +++ b/blockchain-watcher/src/domain/entities/jobs.ts @@ -0,0 +1,28 @@ +export class JobDefinition { + id: string; + chain: string; + source: { + action: string; + config: Record; + }; + handlers: { + action: string; + target: string; + mapper: string; + config: Record; + }[]; + + constructor( + id: string, + chain: string, + source: { action: string; config: Record }, + handlers: { action: string; target: string; mapper: string; config: Record }[] + ) { + this.id = id; + this.chain = chain; + this.source = source; + this.handlers = handlers; + } +} + +export type Handler = (items: any[]) => Promise; diff --git a/blockchain-watcher/src/domain/repositories.ts b/blockchain-watcher/src/domain/repositories.ts index f5b034af..6bfd64d4 100644 --- a/blockchain-watcher/src/domain/repositories.ts +++ b/blockchain-watcher/src/domain/repositories.ts @@ -1,4 +1,5 @@ -import { EvmBlock, EvmLog, EvmLogFilter } from "./entities"; +import { RunPollingJob } from "./actions/RunPollingJob"; +import { EvmBlock, EvmLog, EvmLogFilter, Handler, JobDefinition } from "./entities"; export interface EvmBlockRepository { getBlockHeight(finality: string): Promise; @@ -16,3 +17,9 @@ export interface StatRepository { measure(id: string, value: bigint, labels: Record): void; report: () => Promise; } + +export interface JobRepository { + getJobDefinitions(): Promise; + getSource(jobDef: JobDefinition): RunPollingJob; + getHandlers(jobDef: JobDefinition): Promise; +} diff --git a/blockchain-watcher/src/index.ts b/blockchain-watcher/src/index.ts index 3f2a1f64..b37813ea 100644 --- a/blockchain-watcher/src/index.ts +++ b/blockchain-watcher/src/index.ts @@ -3,7 +3,7 @@ import { createWatchers, getEnvironment, initializeEnvironment, -} from "./infrastructure/environment"; +} from "./infrastructure/watchers/environment"; import AbstractWatcher from "./infrastructure/watchers/AbstractWatcher"; async function run() { diff --git a/blockchain-watcher/src/infrastructure/RepositoriesBuilder.ts b/blockchain-watcher/src/infrastructure/RepositoriesBuilder.ts index 1362d895..f81e4d82 100644 --- a/blockchain-watcher/src/infrastructure/RepositoriesBuilder.ts +++ b/blockchain-watcher/src/infrastructure/RepositoriesBuilder.ts @@ -6,14 +6,15 @@ import { EvmJsonRPCBlockRepositoryCfg, FileMetadataRepo, PromStatRepository, + StaticJobRepository, } from "./repositories"; -import axios, { AxiosInstance } from "axios"; -import axiosRateLimit from "axios-rate-limit"; + +import { HttpClient } from "./repositories/HttpClient"; +import { JobRepository } from "../domain/repositories"; export class RepositoriesBuilder { private cfg: Config; private snsClient?: SNSClient; - private axiosInstance?: AxiosInstance; private repositories = new Map(); constructor(cfg: Config) { @@ -23,7 +24,6 @@ export class RepositoriesBuilder { private build() { this.snsClient = this.createSnsClient(); - this.axiosInstance = this.createAxios(); this.repositories.set("sns", new SnsEventRepository(this.snsClient, this.cfg.sns)); this.repositories.set("metrics", new PromStatRepository()); @@ -32,16 +32,28 @@ export class RepositoriesBuilder { this.repositories.set("metadata", new FileMetadataRepo(this.cfg.metadata.dir)); this.cfg.supportedChains.forEach((chain) => { + const httpClient = this.createHttpClient(this.cfg.platforms[chain].timeout); const repoCfg: EvmJsonRPCBlockRepositoryCfg = { chain, rpc: this.cfg.platforms[chain].rpcs[0], timeout: this.cfg.platforms[chain].timeout, }; - this.repositories.set( - `${chain}-evmRepo`, - new EvmJsonRPCBlockRepository(repoCfg, this.axiosInstance!) - ); + this.repositories.set(`${chain}-evmRepo`, new EvmJsonRPCBlockRepository(repoCfg, httpClient)); }); + + this.repositories.set( + "jobs", + new StaticJobRepository( + this.cfg.jobs.dir, + this.cfg.dryRun, + (chain: string) => this.getEvmBlockRepository(chain), + { + metadataRepo: this.getMetadataRepository(), + statsRepo: this.getStatsRepository(), + snsRepo: this.getSnsEventRepository(), + } + ) + ); } public getEvmBlockRepository(chain: string): EvmJsonRPCBlockRepository { @@ -72,6 +84,13 @@ export class RepositoriesBuilder { return repo; } + public getJobsRepository(): JobRepository { + const repo = this.repositories.get("jobs"); + if (!repo) throw new Error(`No JobRepository`); + + return repo; + } + public close(): void { this.snsClient?.destroy(); } @@ -89,10 +108,12 @@ export class RepositoriesBuilder { return new SNSClient(snsCfg); } - private createAxios() { - return axiosRateLimit(axios.create(), { - perMilliseconds: 1000, - maxRequests: 1_000, - }); // TODO: configurable per repo + private createHttpClient(timeout?: number, retries?: number): HttpClient { + return new HttpClient({ + retries: retries ?? 3, + timeout: timeout ?? 5_000, + initialDelay: 1_000, + maxDelay: 30_000, + }); } } diff --git a/blockchain-watcher/src/infrastructure/config.ts b/blockchain-watcher/src/infrastructure/config.ts index b383a1d7..150d825d 100644 --- a/blockchain-watcher/src/infrastructure/config.ts +++ b/blockchain-watcher/src/infrastructure/config.ts @@ -10,6 +10,9 @@ export type Config = { metadata?: { dir: string; }; + jobs: { + dir: string; + }; platforms: Record; supportedChains: string[]; }; @@ -36,6 +39,9 @@ export const configuration = { metadata: { dir: config.get("metadata.dir"), }, + jobs: { + dir: config.get("jobs.dir"), + }, platforms: config.get>("platforms"), supportedChains: config.get("supportedChains"), } as Config; diff --git a/blockchain-watcher/src/infrastructure/log.ts b/blockchain-watcher/src/infrastructure/log.ts index e1f4041b..ca24ec1c 100644 --- a/blockchain-watcher/src/infrastructure/log.ts +++ b/blockchain-watcher/src/infrastructure/log.ts @@ -13,8 +13,10 @@ winston.configure({ winston.format.splat(), winston.format.errors({ stack: true }), winston.format.printf( - ({ level, message, module, chain }) => - `${level} [${module ?? ""}] ${chain ? `[${chain}]` : ""} ${message}` + ({ level, message, module, chain, label }) => + `${level} [${module ?? ""}]${chain ? `[${chain}]` : ""}${ + label ? `[${label}]` : "" + } ${message}` ) ), }); diff --git a/blockchain-watcher/src/infrastructure/repositories/EvmJsonRPCBlockRepository.ts b/blockchain-watcher/src/infrastructure/repositories/EvmJsonRPCBlockRepository.ts index da1dc11f..8551f708 100644 --- a/blockchain-watcher/src/infrastructure/repositories/EvmJsonRPCBlockRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/EvmJsonRPCBlockRepository.ts @@ -2,25 +2,20 @@ import { EvmBlock, EvmLogFilter, EvmLog, EvmTag } from "../../domain/entities"; import { EvmBlockRepository } from "../../domain/repositories"; import { AxiosInstance } from "axios"; import winston from "../log"; - -const headers = { - "Content-Type": "application/json", -}; +import { HttpClient, HttpClientError } from "./HttpClient"; /** * EvmJsonRPCBlockRepository is a repository that uses a JSON RPC endpoint to fetch blocks. * On the reliability side, only knows how to timeout. */ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { - private axios: AxiosInstance; + private httpClient: HttpClient; private rpc: URL; - private timeout: number; private readonly logger = winston.child({ module: "EvmJsonRPCBlockRepository" }); - constructor(cfg: EvmJsonRPCBlockRepositoryCfg, axios: AxiosInstance) { - this.axios = axios; + constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) { + this.httpClient = httpClient; this.rpc = new URL(cfg.rpc); - this.timeout = cfg.timeout ?? 10_000; this.logger = winston.child({ module: "EvmJsonRPCBlockRepository", chain: cfg.chain }); } @@ -47,16 +42,15 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { params: [blockNumberStr, false], }); } - const response = await this.axios.post(this.rpc.href, reqs, this.getRequestOptions()); - if (response.status !== 200) { - this.logger.error( - `Got ${response.status} from ${this.rpc.hostname}/eth_getBlockByNumber. ${ - response?.data?.error?.message ?? `${response?.data?.error.message}` - }` - ); + + let results: (undefined | { id: string; result?: EvmBlock; error?: ErrorBlock })[]; + try { + results = await this.httpClient.post(this.rpc.href, reqs); + } catch (e: HttpClientError | any) { + this.handleError(e, "eth_getBlockByNumber"); + throw e; } - const results = response?.data; if (results && results.length) { return results .map( @@ -124,26 +118,20 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { toBlock: `0x${filter.toBlock.toString(16)}`, }; - let response = await this.axios.post<{ result: Log[]; error?: ErrorBlock }>( - this.rpc.href, - { + let response: { result: Log[]; error?: ErrorBlock }; + try { + response = await this.httpClient.post(this.rpc.href, { jsonrpc: "2.0", method: "eth_getLogs", params: [parsedFilters], id: 1, - }, - this.getRequestOptions() - ); - - if (response.status !== 200 || response?.data.error) { - const msg = `Got error ${response?.data?.error?.message} for ${this.describeFilter( - filter - )} from ${this.rpc.hostname}/eth_getLogs`; - this.logger.error(`Got ${response.status} from ${this.rpc.hostname}. ${msg}`); - - throw new Error(msg); + }); + } catch (e: HttpClientError | any) { + this.handleError(e, "eth_getLogs"); + throw e; } - const logs = response?.data?.result; + + const logs = response?.result; this.logger.info( `Got ${logs?.length} logs for ${this.describeFilter(filter)} from ${this.rpc.hostname}` ); @@ -163,26 +151,20 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { * Loosely based on the wormhole-dashboard implementation (minus some specially crafted blocks when null result is obtained) */ private async getBlock(blockNumberOrTag: bigint | EvmTag): Promise { - let response = await this.axios.post( - this.rpc.href, - { + let response: { result?: EvmBlock; error?: ErrorBlock }; + try { + response = await this.httpClient.post(this.rpc.href, { jsonrpc: "2.0", method: "eth_getBlockByNumber", params: [blockNumberOrTag.toString(), false], // this means we'll get a light block (no txs) id: 1, - }, - this.getRequestOptions() - ); - - if (response.status !== 200 || response?.data?.error) { - this.logger.error( - `Got ${response.status} from ${this.rpc.hostname}/eth_getBlockByNumber. ${ - response?.data?.error?.message ?? `${response?.data?.error.message}` - }` - ); + }); + } catch (e: HttpClientError | any) { + this.handleError(e, "eth_getBlockByNumber"); + throw e; } - const result = response?.data?.result; + const result = response?.result; if (result && result.hash && result.number && result.timestamp) { // Convert to our domain compatible type @@ -197,8 +179,14 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { ); } - private getRequestOptions() { - return { headers, timeout: this.timeout, signal: AbortSignal.timeout(this.timeout) }; + private handleError(e: any, method: string) { + if (e instanceof HttpClientError) { + this.logger.error( + `Got ${e.status} from ${this.rpc.hostname}/${method}. ${e?.message ?? `${e?.message}`}` + ); + } else { + this.logger.error(`Got error ${e} from ${this.rpc.hostname}/${method}`); + } } } diff --git a/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts b/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts index 929851b3..d4f2f248 100644 --- a/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts +++ b/blockchain-watcher/src/infrastructure/repositories/FileMetadataRepo.ts @@ -6,13 +6,13 @@ export class FileMetadataRepo implements MetadataRepository { constructor(dirPath: string) { this.dirPath = dirPath; + if (!fs.existsSync(this.dirPath)) { + fs.mkdirSync(this.dirPath, { recursive: true }); + } } async get(id: string): Promise { const filePath = `${this.dirPath}/${id}.json`; - if (!fs.existsSync(this.dirPath)) { - fs.mkdirSync(this.dirPath); - } return fs.promises .readFile(filePath, "utf8") diff --git a/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts b/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts new file mode 100644 index 00000000..8b1f6e4d --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/HttpClient.ts @@ -0,0 +1,130 @@ +import axios, { AxiosError, AxiosInstance } from "axios"; +import { setTimeout } from "timers/promises"; + +/** + * A simple HTTP client with exponential backoff retries and 429 handling. + */ +export class HttpClient { + private initialDelay: number = 1_000; + private maxDelay: number = 60_000; + private retries: number = 0; + private timeout: number = 5_000; + private axios: AxiosInstance; + + constructor(options?: HttpClientOptions) { + options?.initialDelay && (this.initialDelay = options.initialDelay); + options?.maxDelay && (this.maxDelay = options.maxDelay); + options?.retries && (this.retries = options.retries); + options?.timeout && (this.timeout = options.timeout); + this.axios = axios.create(); + } + + public async post(url: string, body: any, opts?: HttpClientOptions): Promise { + return this.executeWithRetry(url, "POST", body, opts); + } + + private async execute( + url: string, + method: string, + body?: any, + opts?: HttpClientOptions + ): Promise { + let response; + try { + response = await this.axios.request({ + url: url, + method: method, + data: body, + timeout: opts?.timeout ?? this.timeout, + signal: AbortSignal.timeout(opts?.timeout ?? this.timeout), + }); + } catch (err: AxiosError | any) { + // Connection / timeout error: + if (err instanceof AxiosError) { + throw new HttpClientError(err.message ?? err.code, { status: err?.status ?? 0 }, err); + } + + throw new HttpClientError(err.message ?? err.code, undefined, err); + } + + if (!(response.status > 200) && !(response.status < 300)) { + throw new HttpClientError(undefined, response, response.data); + } + + return response.data; + } + + private async executeWithRetry( + url: string, + method: string, + body?: any, + opts?: HttpClientOptions + ): Promise { + const maxRetries = opts?.retries ?? this.retries; + let retries = 0; + const initialDelay = opts?.initialDelay ?? this.initialDelay; + const maxDelay = opts?.maxDelay ?? this.maxDelay; + while (maxRetries >= 0) { + try { + return await this.execute(url, method, body, opts); + } catch (err) { + if (err instanceof HttpClientError) { + if (retries < maxRetries) { + const retryAfter = err.getRetryAfter(maxDelay, err); + if (retryAfter) { + await setTimeout(retryAfter, { ref: false }); + } else { + const timeout = Math.min(initialDelay * 2 ** maxRetries, maxDelay); + await setTimeout(timeout, { ref: false }); + } + retries++; + continue; + } + } + throw err; + } + } + + throw new Error(`Failed to reach ${url}`); + } +} + +export type HttpClientOptions = { + initialDelay?: number; + maxDelay?: number; + retries?: number; + timeout?: number; +}; + +export class HttpClientError extends Error { + public readonly status?: number; + public readonly data?: any; + public readonly headers?: any; + + constructor(message?: string, response?: { status: number; headers?: any }, data?: any) { + super(message ?? `Unexpected status code: ${response?.status}`); + this.status = response?.status; + this.data = data; + this.headers = response?.headers; + Error.captureStackTrace(this, this.constructor); + } + + /** + * Parses the Retry-After header and returns the value in milliseconds. + * @param maxDelay + * @param error + * @throws {HttpClientError} if retry-after is bigger than maxDelay. + * @returns the retry-after value in milliseconds. + */ + public getRetryAfter(maxDelay: number, error: HttpClientError): number | undefined { + const retryAfter = this.headers?.get("Retry-After"); + if (retryAfter) { + const value = parseInt(retryAfter) * 1000; // header value is in seconds + if (value <= maxDelay) { + return value; + } + + throw error; + } + } +} diff --git a/blockchain-watcher/src/infrastructure/repositories/SnsEventRepository.ts b/blockchain-watcher/src/infrastructure/repositories/SnsEventRepository.ts index cee35321..eabdb1ce 100644 --- a/blockchain-watcher/src/infrastructure/repositories/SnsEventRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/SnsEventRepository.ts @@ -86,6 +86,17 @@ export class SnsEventRepository { status: "success", }; } + + async asTarget(): Promise<(events: LogFoundEvent[]) => Promise> { + return async (events: LogFoundEvent[]) => { + const result = await this.publish(events); + if (result.status === "error") { + this.logger.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`); + throw new Error(`Error publishing events to SNS: ${result.reason}`); + } + this.logger.info(`Published ${events.length} events to SNS`); + }; + } } export class SnsEvent { diff --git a/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts new file mode 100644 index 00000000..dbd96d31 --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts @@ -0,0 +1,120 @@ +import { + HandleEvmLogs, + PollEvmLogs, + PollEvmLogsConfig, + PollEvmLogsConfigProps, + RunPollingJob, +} from "../../domain/actions"; +import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities"; +import { + EvmBlockRepository, + JobRepository, + MetadataRepository, + StatRepository, +} from "../../domain/repositories"; +import { FileMetadataRepo, SnsEventRepository } from "./index"; +import { evmLogMessagePublishedMapper } from "../mappers/evmLogMessagePublishedMapper"; +import log from "../log"; + +export class StaticJobRepository implements JobRepository { + private fileRepo: FileMetadataRepo; + private dryRun: boolean = false; + private sources: Map RunPollingJob> = new Map(); + private handlers: Map Promise> = + new Map(); + private mappers: Map = new Map(); + private targets: Map Promise<(items: any[]) => Promise>> = new Map(); + private blockRepoProvider: (chain: string) => EvmBlockRepository; + private metadataRepo: MetadataRepository; + private statsRepo: StatRepository; + private snsRepo: SnsEventRepository; + + constructor( + path: string, + dryRun: boolean, + blockRepoProvider: (chain: string) => EvmBlockRepository, + repos: { + metadataRepo: MetadataRepository; + statsRepo: StatRepository; + snsRepo: SnsEventRepository; + } + ) { + this.fileRepo = new FileMetadataRepo(path); + this.blockRepoProvider = blockRepoProvider; + this.metadataRepo = repos.metadataRepo; + this.statsRepo = repos.statsRepo; + this.snsRepo = repos.snsRepo; + this.dryRun = dryRun; + this.fill(); + } + + async getJobDefinitions(): Promise { + const persisted = await this.fileRepo.get("jobs"); + if (!persisted) { + return Promise.resolve([]); + } + + return persisted; + } + + getSource(jobDef: JobDefinition): RunPollingJob { + const src = this.sources.get(jobDef.source.action); + if (!src) { + throw new Error(`Source ${jobDef.source.action} not found`); + } + + return src(jobDef); + } + + async getHandlers(jobDef: JobDefinition): Promise { + const result: Handler[] = []; + for (const handler of jobDef.handlers) { + const maybeHandler = this.handlers.get(handler.action); + if (!maybeHandler) { + throw new Error(`Handler ${handler.action} not found`); + } + const mapper = this.mappers.get(handler.mapper); + if (!mapper) { + throw new Error(`Handler ${handler.action} not found`); + } + result.push((await maybeHandler(handler.config, handler.target, mapper)).bind(maybeHandler)); + } + + return result; + } + + private fill() { + const pollEvmLogs = (jobDef: JobDefinition) => + new PollEvmLogs( + this.blockRepoProvider(jobDef.source.config.chain), + this.metadataRepo, + this.statsRepo, + new PollEvmLogsConfig({ + ...(jobDef.source.config as PollEvmLogsConfigProps), + id: jobDef.id, + }) + ); + this.sources.set("PollEvmLogs", pollEvmLogs); + + this.mappers.set("evmLogMessagePublishedMapper", evmLogMessagePublishedMapper); + + const snsTarget = () => this.snsRepo.asTarget(); + const dummyTarget = async () => async (events: any[]) => { + log.info(`Got ${events.length} events`); + }; + this.targets.set("sns", snsTarget); + this.targets.set("dummy", dummyTarget); + + const handleEvmLogs = async (config: any, target: string, mapper: any) => { + const instance = new HandleEvmLogs>( + config, + mapper, + await this.targets.get(this.dryRun ? "dummy" : target)!() + ); + + return instance.handle.bind(instance); + }; + + this.handlers.set("HandleEvmLogs", handleEvmLogs); + } +} diff --git a/blockchain-watcher/src/infrastructure/repositories/index.ts b/blockchain-watcher/src/infrastructure/repositories/index.ts index f79ba8d1..e1e89b26 100644 --- a/blockchain-watcher/src/infrastructure/repositories/index.ts +++ b/blockchain-watcher/src/infrastructure/repositories/index.ts @@ -11,3 +11,4 @@ export * from "./FileMetadataRepo"; export * from "./SnsEventRepository"; export * from "./EvmJsonRPCBlockRepository"; export * from "./PromStatRepository"; +export * from "./StaticJobRepository"; diff --git a/blockchain-watcher/src/infrastructure/rpc/Server.ts b/blockchain-watcher/src/infrastructure/rpc/Server.ts index bbb2e606..c3f2dd57 100644 --- a/blockchain-watcher/src/infrastructure/rpc/Server.ts +++ b/blockchain-watcher/src/infrastructure/rpc/Server.ts @@ -30,7 +30,7 @@ export class WebServer { start() { this.server.listen(this.port, () => { - log.info(`Server started on port 8080`); + log.info(`Server started on port ${this.port}`); }); } diff --git a/blockchain-watcher/src/infrastructure/watchers/AbstractWatcher.ts b/blockchain-watcher/src/infrastructure/watchers/AbstractWatcher.ts index db43f9a3..5b39b0e4 100644 --- a/blockchain-watcher/src/infrastructure/watchers/AbstractWatcher.ts +++ b/blockchain-watcher/src/infrastructure/watchers/AbstractWatcher.ts @@ -1,5 +1,5 @@ import { ChainId, Network } from "@certusone/wormhole-sdk"; -import AbstractHandler from "../handlers/AbstractHandler"; +import AbstractHandler from "./handlers/AbstractHandler"; export default abstract class AbstractWatcher { //store class fields from constructor diff --git a/blockchain-watcher/src/infrastructure/watchers/EvmWatcher.ts b/blockchain-watcher/src/infrastructure/watchers/EvmWatcher.ts index a6de4f38..b6172d37 100644 --- a/blockchain-watcher/src/infrastructure/watchers/EvmWatcher.ts +++ b/blockchain-watcher/src/infrastructure/watchers/EvmWatcher.ts @@ -1,6 +1,6 @@ import { ChainId, Network } from "@certusone/wormhole-sdk"; import AbstractWatcher from "./AbstractWatcher"; -import AbstractHandler from "../handlers/AbstractHandler"; +import AbstractHandler from "./handlers/AbstractHandler"; export default class EvmWatcher extends AbstractWatcher { constructor( diff --git a/blockchain-watcher/src/infrastructure/environment.ts b/blockchain-watcher/src/infrastructure/watchers/environment.ts similarity index 97% rename from blockchain-watcher/src/infrastructure/environment.ts rename to blockchain-watcher/src/infrastructure/watchers/environment.ts index acd084f7..50da5a73 100644 --- a/blockchain-watcher/src/infrastructure/environment.ts +++ b/blockchain-watcher/src/infrastructure/watchers/environment.ts @@ -1,7 +1,7 @@ import { ChainId, ChainName, Network, toChainName } from "@certusone/wormhole-sdk"; -import AbstractWatcher from "./watchers/AbstractWatcher"; +import AbstractWatcher from "./AbstractWatcher"; import winston from "winston"; -import EvmWatcher from "./watchers/EvmWatcher"; +import EvmWatcher from "./EvmWatcher"; import AbstractHandler from "./handlers/AbstractHandler"; const MAINNET_RPCS: { [key in ChainName]?: string } = { diff --git a/blockchain-watcher/src/infrastructure/handlers/AbstractHandler.ts b/blockchain-watcher/src/infrastructure/watchers/handlers/AbstractHandler.ts similarity index 100% rename from blockchain-watcher/src/infrastructure/handlers/AbstractHandler.ts rename to blockchain-watcher/src/infrastructure/watchers/handlers/AbstractHandler.ts diff --git a/blockchain-watcher/src/infrastructure/handlers/LogMessagePublishedHandler.ts b/blockchain-watcher/src/infrastructure/watchers/handlers/LogMessagePublishedHandler.ts similarity index 100% rename from blockchain-watcher/src/infrastructure/handlers/LogMessagePublishedHandler.ts rename to blockchain-watcher/src/infrastructure/watchers/handlers/LogMessagePublishedHandler.ts diff --git a/blockchain-watcher/src/infrastructure/handlers/deliveryEventHandler.ts b/blockchain-watcher/src/infrastructure/watchers/handlers/deliveryEventHandler.ts similarity index 100% rename from blockchain-watcher/src/infrastructure/handlers/deliveryEventHandler.ts rename to blockchain-watcher/src/infrastructure/watchers/handlers/deliveryEventHandler.ts diff --git a/blockchain-watcher/src/infrastructure/handlers/sendEventHandler.ts b/blockchain-watcher/src/infrastructure/watchers/handlers/sendEventHandler.ts similarity index 100% rename from blockchain-watcher/src/infrastructure/handlers/sendEventHandler.ts rename to blockchain-watcher/src/infrastructure/watchers/handlers/sendEventHandler.ts diff --git a/blockchain-watcher/src/infrastructure/utils/websocket.ts b/blockchain-watcher/src/infrastructure/watchers/utils/websocket.ts similarity index 100% rename from blockchain-watcher/src/infrastructure/utils/websocket.ts rename to blockchain-watcher/src/infrastructure/watchers/utils/websocket.ts diff --git a/blockchain-watcher/src/start.ts b/blockchain-watcher/src/start.ts index d44d433f..8f287f50 100644 --- a/blockchain-watcher/src/start.ts +++ b/blockchain-watcher/src/start.ts @@ -1,11 +1,9 @@ -import { PollEvmLogs, PollEvmLogsConfig, HandleEvmLogs } from "./domain/actions"; -import { LogFoundEvent } from "./domain/entities"; import { configuration } from "./infrastructure/config"; -import { evmLogMessagePublishedMapper } from "./infrastructure/mappers/evmLogMessagePublishedMapper"; import { RepositoriesBuilder } from "./infrastructure/RepositoriesBuilder"; import log from "./infrastructure/log"; import { WebServer } from "./infrastructure/rpc/Server"; import { HealthController } from "./infrastructure/rpc/HealthController"; +import { StartJobs } from "./domain/actions"; let repos: RepositoriesBuilder; let server: WebServer; @@ -14,9 +12,10 @@ async function run(): Promise { log.info(`Starting: dryRunEnabled -> ${configuration.dryRun}`); repos = new RepositoriesBuilder(configuration); + const startJobs = new StartJobs(repos.getJobsRepository()); - await startServer(repos); - await startJobs(repos); + await startServer(repos, startJobs); + await startJobs.run(); // Just keep this running until killed setInterval(() => { @@ -30,74 +29,10 @@ async function run(): Promise { process.on("SIGTERM", handleShutdown); } -const startServer = async (repos: RepositoriesBuilder) => { +const startServer = async (repos: RepositoriesBuilder, startJobs: StartJobs) => { server = new WebServer(configuration.port, new HealthController(repos.getStatsRepository())); }; -const startJobs = async (repos: RepositoriesBuilder) => { - /** Job definition is hardcoded, but should be loaded from cfg or a data store soon enough */ - const jobs = [ - { - id: "poll-log-message-published-ethereum", - chain: "ethereum", - source: { - action: "PollEvmLogs", - config: { - fromBlock: 10012499n, - blockBatchSize: 100, - commitment: "latest", - interval: 15_000, - addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], - chain: "ethereum", - topics: [], - }, - }, - handlers: [ - { - action: "HandleEvmLogs", - target: "sns", - mapper: "evmLogMessagePublishedMapper", - config: { - abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", - filter: { - addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], - topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"], - }, - }, - }, - ], - }, - ]; - - const pollEvmLogs = new PollEvmLogs( - repos.getEvmBlockRepository("ethereum"), - repos.getMetadataRepository(), - repos.getStatsRepository(), - new PollEvmLogsConfig({ ...jobs[0].source.config, id: jobs[0].id }) - ); - - const snsTarget = async (events: LogFoundEvent[]) => { - const result = await repos.getSnsEventRepository().publish(events); - if (result.status === "error") { - log.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`); - throw new Error(`Error publishing events to SNS: ${result.reason}`); - } - log.info(`Published ${events.length} events to SNS`); - }; - - const handleEvmLogs = new HandleEvmLogs>( - jobs[0].handlers[0].config, - evmLogMessagePublishedMapper, - configuration.dryRun - ? async (events) => { - log.info(`Got ${events.length} events`); - } - : snsTarget - ); - - pollEvmLogs.start([handleEvmLogs.handle.bind(handleEvmLogs)]); -}; - const handleShutdown = async () => { try { await Promise.allSettled([repos.close(), server.stop()]); diff --git a/blockchain-watcher/test/domain/PollEvmLogs.test.ts b/blockchain-watcher/test/domain/PollEvmLogs.test.ts index d3b3bc54..f9fd0c08 100644 --- a/blockchain-watcher/test/domain/PollEvmLogs.test.ts +++ b/blockchain-watcher/test/domain/PollEvmLogs.test.ts @@ -160,7 +160,7 @@ const givenPollEvmLogs = (from?: bigint) => { }; const whenPollEvmLogsStarts = async () => { - await pollEvmLogs.start([handlers.working]); + pollEvmLogs.run([handlers.working]); }; const thenWaitForAssertion = async (...assertions: (() => void)[]) => { diff --git a/blockchain-watcher/test/infrastructure/repositories/EvmJsonRPCBlockRepository.integration.test.ts b/blockchain-watcher/test/infrastructure/repositories/EvmJsonRPCBlockRepository.integration.test.ts index 0c96d49e..ddaff3a2 100644 --- a/blockchain-watcher/test/infrastructure/repositories/EvmJsonRPCBlockRepository.integration.test.ts +++ b/blockchain-watcher/test/infrastructure/repositories/EvmJsonRPCBlockRepository.integration.test.ts @@ -3,9 +3,9 @@ import { EvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositor import axios from "axios"; import nock from "nock"; import { EvmLogFilter, EvmTag } from "../../../src/domain/entities"; +import { HttpClient } from "../../../src/infrastructure/repositories/HttpClient"; axios.defaults.adapter = "http"; // needed by nock -const axiosInstance = axios.create(); const rpc = "http://localhost"; const address = "0x98f3c9e6e3face36baad05fe09d375ef1464288b"; const topic = "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"; @@ -65,7 +65,7 @@ describe("EvmJsonRPCBlockRepository", () => { }); const givenARepo = () => { - repo = new EvmJsonRPCBlockRepository({ rpc, timeout: 100, chain: "ethereum" }, axiosInstance); + repo = new EvmJsonRPCBlockRepository({ rpc, timeout: 100, chain: "ethereum" }, new HttpClient()); }; const givenBlockHeightIs = (height: bigint, commitment: EvmTag) => { diff --git a/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts b/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts new file mode 100644 index 00000000..dc69a4cd --- /dev/null +++ b/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts @@ -0,0 +1,77 @@ +import { beforeEach, describe, expect, it } from "@jest/globals"; +import fs from "fs"; +import { SnsEventRepository, StaticJobRepository } from "../../../src/infrastructure/repositories"; +import { + EvmBlockRepository, + MetadataRepository, + StatRepository, +} from "../../../src/domain/repositories"; + +const dirPath = "./metadata-repo/jobs"; +const blockRepo: EvmBlockRepository = {} as any as EvmBlockRepository; +const metadataRepo = {} as MetadataRepository; +const statsRepo = {} as any as StatRepository; +const snsRepo = {} as any as SnsEventRepository; + +let repo: StaticJobRepository; + +describe("StaticJobRepository", () => { + beforeEach(() => { + if (fs.existsSync(dirPath)) { + fs.rmSync(dirPath, { recursive: true, force: true }); + } + repo = new StaticJobRepository(dirPath, false, () => blockRepo, { + metadataRepo, + statsRepo, + snsRepo, + }); + }); + + it("should return empty when no file available", async () => { + const jobs = await repo.getJobDefinitions(); + expect(jobs).toHaveLength(0); + }); + + it("should read jobs from file", async () => { + givenJobsPresent(); + const jobs = await repo.getJobDefinitions(); + expect(jobs).toHaveLength(1); + expect(jobs[0].id).toEqual("poll-log-message-published-ethereum"); + }); +}); + +const givenJobsPresent = () => { + const jobs = [ + { + id: "poll-log-message-published-ethereum", + chain: "ethereum", + source: { + action: "PollEvmLogs", + config: { + fromBlock: 10012499n, + blockBatchSize: 100, + commitment: "latest", + interval: 15_000, + addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], + chain: "ethereum", + topics: [], + }, + }, + handlers: [ + { + action: "HandleEvmLogs", + target: "sns", + mapper: "evmLogMessagePublishedMapper", + config: { + abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", + filter: { + addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], + topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"], + }, + }, + }, + ], + }, + ]; + fs.writeFileSync(dirPath + "/jobs.json", JSON.stringify(jobs)); +}; diff --git a/deploy/blockchain-watcher/env/staging-mainnet.env b/deploy/blockchain-watcher/env/staging-mainnet.env index 72a488d2..490972ba 100644 --- a/deploy/blockchain-watcher/env/staging-mainnet.env +++ b/deploy/blockchain-watcher/env/staging-mainnet.env @@ -8,9 +8,9 @@ IMAGE_NAME= PORT=3005 LOG_LEVEL=debug -RESOURCES_LIMITS_MEMORY=256Mi +RESOURCES_LIMITS_MEMORY=128Mi RESOURCES_LIMITS_CPU=200m -RESOURCES_REQUESTS_MEMORY=128Mi +RESOURCES_REQUESTS_MEMORY=96Mi RESOURCES_REQUESTS_CPU=100m SNS_TOPIC_ARN= diff --git a/deploy/blockchain-watcher/env/staging-testnet.env b/deploy/blockchain-watcher/env/staging-testnet.env index 133c196e..c6e6e8cf 100644 --- a/deploy/blockchain-watcher/env/staging-testnet.env +++ b/deploy/blockchain-watcher/env/staging-testnet.env @@ -6,11 +6,11 @@ DRY_RUN_ENABLED=false REPLICAS=1 IMAGE_NAME= PORT=3005 -LOG_LEVEL=info +LOG_LEVEL=debug -RESOURCES_LIMITS_MEMORY=256Mi +RESOURCES_LIMITS_MEMORY=128Mi RESOURCES_LIMITS_CPU=200m -RESOURCES_REQUESTS_MEMORY=128Mi +RESOURCES_REQUESTS_MEMORY=96Mi RESOURCES_REQUESTS_CPU=100m SNS_TOPIC_ARN= diff --git a/deploy/blockchain-watcher/metadata-pv.yaml b/deploy/blockchain-watcher/metadata-pv.yaml deleted file mode 100644 index d7675f58..00000000 --- a/deploy/blockchain-watcher/metadata-pv.yaml +++ /dev/null @@ -1,27 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: blockchain-watcher-metadata-pvc - namespace: {{ .NAMESPACE }} -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 10Mi - storageClassName: gp2 ---- -apiVersion: v1 -kind: PersistentVolume -metadata: - name: blockchain-watcher-metadata-pv - namespace: {{ .NAMESPACE }} -spec: - accessModes: - - ReadWriteOnce - - ReadWriteMany - capacity: - storage: 100Mi - storageClassName: gp2 - hostPath: - path: /home/node/app/metadata-repo \ No newline at end of file diff --git a/deploy/blockchain-watcher/sa.yaml b/deploy/blockchain-watcher/sa.yaml new file mode 100644 index 00000000..d846fb49 --- /dev/null +++ b/deploy/blockchain-watcher/sa.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: event-watcher + namespace: {{ .NAMESPACE }} + annotations: + eks.amazonaws.com/role-arn: {{ .AWS_IAM_ROLE }} diff --git a/deploy/blockchain-watcher/stateful-set.yaml b/deploy/blockchain-watcher/stateful-set.yaml deleted file mode 100644 index d5224905..00000000 --- a/deploy/blockchain-watcher/stateful-set.yaml +++ /dev/null @@ -1,71 +0,0 @@ ---- -apiVersion: v1 -kind: Service -metadata: - name: {{ .NAME }} - namespace: {{ .NAMESPACE }} - labels: - app: {{ .NAME }} -spec: - selector: - app: {{ .NAME }} - ports: - - port: {{ .PORT }} - targetPort: {{ .PORT }} - name: {{ .NAME }} - protocol: TCP ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: {{ .NAME }} - namespace: {{ .NAMESPACE }} -spec: - replicas: {{ .REPLICAS }} - serviceName: {{ .NAME }}-service - selector: - matchLabels: - app: {{ .NAME }} - template: - metadata: - labels: - app: {{ .NAME }} - annotations: - prometheus.io/scrape: "true" - prometheus.io/port: "{{ .PORT }}" - spec: - restartPolicy: Always - terminationGracePeriodSeconds: 30 - serviceAccountName: event-watcher - containers: - - name: {{ .NAME }} - image: {{ .IMAGE_NAME }} - env: - - name: NODE_ENV - value: {{ .NODE_ENV }} - - name: PORT - value: "{{ .PORT }}" - - name: LOG_LEVEL - value: {{ .LOG_LEVEL }} - - name: BLOCKCHAIN_ENV - value: {{ .BLOCKCHAIN_ENV }} - - name: DRY_RUN_ENABLED - value: "{{ .DRY_RUN_ENABLED }}" - - name: SNS_TOPIC_ARN - value: {{ .SNS_TOPIC_ARN }} - - name: SNS_REGION - value: {{ .SNS_REGION }} - resources: - limits: - memory: {{ .RESOURCES_LIMITS_MEMORY }} - cpu: {{ .RESOURCES_LIMITS_CPU }} - requests: - memory: {{ .RESOURCES_REQUESTS_MEMORY }} - cpu: {{ .RESOURCES_REQUESTS_CPU }} - volumeMounts: - - name: metadata-volume - mountPath: /home/node/app/metadata-repo - volumes: - - name: metadata-volume - persistentVolumeClaim: - claimName: blockchain-watcher-metadata-pvc diff --git a/deploy/blockchain-watcher/workers/ethereum.yaml b/deploy/blockchain-watcher/workers/ethereum.yaml new file mode 100644 index 00000000..1c827ddf --- /dev/null +++ b/deploy/blockchain-watcher/workers/ethereum.yaml @@ -0,0 +1,158 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ .NAME }}-eth + namespace: {{ .NAMESPACE }} + labels: + app: {{ .NAME }}-eth +spec: + selector: + app: {{ .NAME }}-eth + ports: + - port: {{ .PORT }} + targetPort: {{ .PORT }} + name: {{ .NAME }}-eth + protocol: TCP +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: blockchain-watcher-eth-pvc + namespace: {{ .NAMESPACE }} +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Mi + storageClassName: gp2 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .NAME }}-eth-jobs + namespace: {{ .NAMESPACE }} +data: + testnet-jobs.json: |- + [ + { + "id": "poll-log-message-published-ethereum", + "chain": "ethereum", + "source": { + "action": "PollEvmLogs", + "config": { + "fromBlock": "10012499", + "blockBatchSize": 100, + "commitment": "latest", + "interval": 15000, + "addresses": ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], + "chain": "ethereum", + "topics": [] + } + }, + "handlers": [ + { + "action": "HandleEvmLogs", + "target": "sns", + "mapper": "evmLogMessagePublishedMapper", + "config": { + "abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", + "filter": { + "addresses": ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"], + "topics": ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"] + } + } + } + ] + } + ] + mainnet-jobs.json: |- + [ + { + "id": "poll-log-message-published-ethereum", + "chain": "ethereum", + "source": { + "action": "PollEvmLogs", + "config": { + "blockBatchSize": 100, + "commitment": "latest", + "interval": 15000, + "addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"], + "chain": "ethereum", + "topics": [] + } + }, + "handlers": [ + { + "action": "HandleEvmLogs", + "target": "sns", + "mapper": "evmLogMessagePublishedMapper", + "config": { + "abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", + "filter": { + "addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"], + "topics": ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"] + } + } + } + ] + } + ] +--- +apiVersion: v1 +kind: Pod +metadata: + name: {{ .NAME }}-eth + namespace: {{ .NAMESPACE }} + labels: + app: {{ .NAME }}-eth + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "{{ .PORT }}" +spec: + restartPolicy: Always + terminationGracePeriodSeconds: 30 + serviceAccountName: event-watcher + containers: + - name: {{ .NAME }} + image: {{ .IMAGE_NAME }} + env: + - name: NODE_ENV + value: {{ .NODE_ENV }} + - name: PORT + value: "{{ .PORT }}" + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: BLOCKCHAIN_ENV + value: {{ .BLOCKCHAIN_ENV }} + - name: DRY_RUN_ENABLED + value: "{{ .DRY_RUN_ENABLED }}" + - name: SNS_TOPIC_ARN + value: {{ .SNS_TOPIC_ARN }} + - name: SNS_REGION + value: {{ .SNS_REGION }} + - name: JOBS_DIR + value: /home/node/app/jobs + resources: + limits: + memory: {{ .RESOURCES_LIMITS_MEMORY }} + cpu: {{ .RESOURCES_LIMITS_CPU }} + requests: + memory: {{ .RESOURCES_REQUESTS_MEMORY }} + cpu: {{ .RESOURCES_REQUESTS_CPU }} + volumeMounts: + - name: metadata-volume + mountPath: /home/node/app/metadata-repo + - name: jobs-volume + mountPath: /home/node/app/jobs + volumes: + - name: metadata-volume + persistentVolumeClaim: + claimName: blockchain-watcher-eth-pvc + - name: jobs-volume + configMap: + name: {{ .NAME }}-eth-jobs + items: + - key: {{ .BLOCKCHAIN_ENV }}-jobs.json + path: jobs.json