[Blockchain Watcher] Initial version (#806)

- Ethereum support for LogMessagePublished extraction
This commit is contained in:
Matías Martínez 2023-11-28 16:00:45 -03:00 committed by GitHub
parent 695fd0dcd4
commit 2d9d0533f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 23206 additions and 0 deletions

View File

@ -0,0 +1,34 @@
name: Run tests
on:
push:
branches: ["main"]
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v3
- name: Prettify code
uses: creyD/prettier_action@master
with:
dry: True
prettier_options: --write ./blockchain-watcher
prettier_version: 2.8.7
- uses: actions/setup-node@v3
with:
node-version: 18
cache: "npm"
cache-dependency-path: |
./blockchain-watcher/package-lock.json
- name: npm ci
run: npm ci
working-directory: ./blockchain-watcher
- name: typecheck
run: npm run build
working-directory: ./blockchain-watcher
- name: Run tests
run: npm run test:coverage
working-directory: ./blockchain-watcher

View File

@ -0,0 +1,6 @@
lib
node_modules
README.md
metadata-repo
coverage
docs

6
blockchain-watcher/.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
node_modules
lib
coverage
metadata-repo
config/dev.json

View File

@ -0,0 +1,3 @@
{
"printWidth": 100
}

View File

@ -0,0 +1,49 @@
# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2
FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d73b044f5f57cfc as builder
# npm wants to clone random Git repositories - lovely.
# RUN apk add git python make build-base
# RUN apk update && apk add bash
RUN apt-get update && apt-get -y install \
git python make curl netcat
USER 1000
RUN mkdir -p /home/node/app
RUN mkdir -p /home/node/.npm
WORKDIR /home/node/app
# Fix git ssh error
RUN git config --global url."https://".insteadOf ssh://
# Node
ENV NODE_EXTRA_CA_CERTS=/certs/cert.pem
ENV NODE_OPTIONS=--use-openssl-ca
# npm
RUN if [ -e /certs/cert.pem ]; then npm config set cafile /certs/cert.pem; fi
# git
RUN if [ -e /certs/cert.pem ]; then git config --global http.sslCAInfo /certs/cert.pem; fi
COPY --chown=node:node . .
RUN npm ci
RUN npm run build
FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d73b044f5f57cfc as runner
COPY --from=builder /home/node/app/config /home/node/app/config
COPY --from=builder /home/node/app/lib /home/node/app/lib
WORKDIR /home/node/app
COPY package.json .
COPY package-lock.json .
RUN npm install --omit=dev
CMD [ "npm", "start" ]

View File

@ -0,0 +1,31 @@
# Explorer Event Watcher
The purpose of this process is to watch all Wormhole connected blockchains for events in Wormhole ecosystem contracts, and then produce database records for these events, and other important associated data.
For now, only EVM chains are supported, but this should be readily extensible in the future.
## Installation
run npm ci in the root of the project
run npm dev in the root of the project
## Deployment
This process is meant to be deployed as a docker container. The dockerfile is located in the root of the project.
## Configuration
Configuration is loaded from files in `config` directory.
There is a default file, and then a file for each environment. The environment is set by the NODE_ENV environment variable.
If NODE_ENV is not set, the default file is used.
Some values may be overriden by using environment variables. See `config/custom-environment-variables.json` for a list of these variables.
```bash
$ NODE_ENV=staging LOG_LEVEL=debug npm run dev
```
## Usage & Modification
A Handler should be created and registered in the handlers directory for each event type that needs to be handled. Handlers are registered inside the src/index.ts file. These handlers are treated as listeners, and thus require 100% uptime to to ensure
no events are missed.

View File

@ -0,0 +1,18 @@
Each blockchain has three watchers,
- A websocket watcher for low latency
- A querying watcher
- And a sequence gap watcher
These three watchers all invoke the same handler callback.
The handler callback is responsible for:
- Providing the watchers with the necessary query filter information
- parsing the event into a persistence object
- invoking the persistence manager
The persistence manager is responsible for:
- Inserting records into the database in a safe manner, which takes into account that items will be seen multiple times.
- Last write wins should be the approach taken here.

View File

@ -0,0 +1,13 @@
{
"environment": "BLOCKCHAIN_ENV",
"port": "PORT",
"logLevel": "LOG_LEVEL",
"dryRun": "DRY_RUN_ENABLED",
"jobs": {
"dir": "JOBS_DIR"
},
"sns": {
"topicArn": "SNS_TOPIC_ARN",
"region": "SNS_REGION"
}
}

View File

@ -0,0 +1,28 @@
{
"environment": "testnet",
"port": 9090,
"logLevel": "debug",
"dryRun": true,
"supportedChains": ["ethereum"],
"sns": {
"topicArn": "arn:aws:sns:us-east-1:000000000000:localstack-topic.fifo",
"region": "us-east-1",
"groupId": "blockchain-watcher",
"subject": "blockchain-watcher"
},
"metadata": {
"dir": "metadata-repo"
},
"jobs": {
"dir": "metadata-repo/jobs"
},
"platforms": {
"ethereum": {
"name": "ethereum",
"network": "goerli",
"chainId": 2,
"rpcs": ["https://rpc.ankr.com/eth_goerli"],
"timeout": 10000
}
}
}

View File

@ -0,0 +1,11 @@
{
"platforms": {
"ethereum": {
"name": "ethereum",
"network": "mainnet",
"chainId": 2,
"rpcs": ["https://rpc.ankr.com/eth"],
"timeout": 10000
}
}
}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,11 @@
{
"platforms": {
"ethereum": {
"name": "ethereum",
"network": "mainnet",
"chainId": 2,
"rpcs": ["https://rpc.ankr.com/eth"],
"timeout": 10000
}
}
}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,110 @@
asyncapi: "2.6.0"
info:
title: Blockchain Watcher API
version: "0.0.1"
description: |
Platform service that allows to extract, transform and load data from different blockchains platforms.
servers:
staging-testnet:
url: arn:aws:sns:us-east-2:581679387567:notification-chain-events-dev-testnet.fifo
protocol: sns
defaultContentType: application/json
channels:
LogMessagePublished:
description: Wormhole core contract emitted event
subscribe:
message:
$ref: "#/components/messages/logMessagePublished"
TransferRedeemed:
description: Token bridge emitted event
subscribe:
message:
$ref: "#/components/messages/transferRedeemed"
components:
messages:
logMessagePublished:
name: LogMessagePublished
title: LogMessagePublished
contentType: application/json
payload:
$ref: "#/components/schemas/logMessagePublished"
transferRedeemed:
name: TransferRedeemed
title: TransferRedeemed
contentType: application/json
payload:
$ref: "#/components/schemas/transferRedeemed"
schemas:
base:
type: object
properties:
trackId:
type: string
source:
type: string
event:
type: string
version:
type: number
timestamp:
$ref: "#/components/schemas/sentAt"
chainEventBase:
type: object
properties:
chainId:
type: number
emitter:
type: string
txHash:
type: string
blockHeight:
type: number
blockTime:
$ref: "#/components/schemas/sentAt"
logMessagePublished:
allOf:
- $ref: "#/components/schemas/base"
type: object
properties:
data:
allOf:
- $ref: "#/components/schemas/chainEventBase"
type: object
properties:
attributes:
type: object
properties:
sender:
type: string
sequence:
type: number
nonce:
type: number
payload:
type: string
consistencyLevel:
type: number
transferRedeemed:
allOf:
- $ref: "#/components/schemas/base"
type: object
properties:
data:
type: object
allOf:
- $ref: "#/components/schemas/chainEventBase"
properties:
attributes:
type: object
properties:
emitterChainId:
type: number
emitterAddress:
type: string
sequence:
type: number
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.

View File

@ -0,0 +1,15 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
collectCoverageFrom: [
"./src/domain",
"./src/infrastructure/mappers",
"./src/infrastructure/repositories",
],
coverageThreshold: {
global: {
lines: 85,
},
},
};

20394
blockchain-watcher/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,44 @@
{
"name": "@wormhole-foundation/blockchain-watcher",
"version": "0.0.0",
"description": "A process for watching blockchain events and moving them to persistent storage",
"main": "index.js",
"scripts": {
"start": "node lib/start.js",
"test": "jest",
"test:coverage": "jest --collectCoverage=true",
"build": "tsc",
"dev": "USE_ENV_FILE=true ts-node src/start.ts",
"prettier": "prettier --write ."
},
"author": "chase-45",
"license": "ISC",
"dependencies": {
"@aws-sdk/client-sns": "^3.445.0",
"@certusone/wormhole-sdk": "^0.9.21-beta.0",
"@types/config": "^3.3.3",
"axios": "^1.6.0",
"config": "^3.3.9",
"dotenv": "^16.3.1",
"ethers": "^5",
"prom-client": "^15.0.0",
"uuid": "^9.0.1",
"winston": "3.8.2"
},
"devDependencies": {
"@jest/globals": "^29.7.0",
"@types/koa-router": "^7.4.4",
"@types/uuid": "^9.0.6",
"@types/yargs": "^17.0.23",
"jest": "^29.7.0",
"nock": "^13.3.8",
"prettier": "^2.8.7",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.1",
"tsx": "^3.12.7",
"typescript": "^4.8.4"
},
"engines": {
"node": ">=18.0.0"
}
}

View File

@ -0,0 +1,55 @@
import { ethers } from "ethers";
import { EvmLog, EvmTopicFilter } from "../entities";
/**
* Handling means mapping and forward to a given target.
* As of today, only one type of event can be handled per each instance.
*/
export class HandleEvmLogs<T> {
cfg: HandleEvmLogsConfig;
mapper: (log: EvmLog, parsedArgs: ReadonlyArray<any>) => T;
target: (parsed: T[]) => Promise<void>;
constructor(
cfg: HandleEvmLogsConfig,
mapper: (log: EvmLog, args: ReadonlyArray<any>) => T,
target: (parsed: T[]) => Promise<void>
) {
this.cfg = this.normalizeCfg(cfg);
this.mapper = mapper;
this.target = target;
}
public async handle(logs: EvmLog[]): Promise<T[]> {
const mappedItems = logs
.filter(
(log) =>
this.cfg.filter.addresses.includes(log.address.toLowerCase()) &&
this.cfg.filter.topics.includes(log.topics[0].toLowerCase())
)
.map((log) => {
const iface = new ethers.utils.Interface([this.cfg.abi]);
const parsedLog = iface.parseLog(log);
return this.mapper(log, parsedLog.args);
});
await this.target(mappedItems);
// TODO: return a result specifying failures if any
return mappedItems;
}
private normalizeCfg(cfg: HandleEvmLogsConfig): HandleEvmLogsConfig {
return {
filter: {
addresses: cfg.filter.addresses.map((addr) => addr.toLowerCase()),
topics: cfg.filter.topics.map((topic) => topic.toLowerCase()),
},
abi: cfg.abi,
};
}
}
export type HandleEvmLogsConfig = {
filter: EvmTopicFilter;
abi: string;
};

View File

@ -0,0 +1,219 @@
import { EvmLog } from "../entities";
import { EvmBlockRepository, MetadataRepository, StatRepository } from "../repositories";
import winston from "winston";
import { RunPollingJob } from "./RunPollingJob";
const ID = "watch-evm-logs";
/**
* PollEvmLogs is an action that watches for new blocks and extracts logs from them.
*/
export class PollEvmLogs extends RunPollingJob {
protected readonly logger: winston.Logger;
private readonly blockRepo: EvmBlockRepository;
private readonly metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
private readonly statsRepository: StatRepository;
private cfg: PollEvmLogsConfig;
private latestBlockHeight?: bigint;
private blockHeightCursor?: bigint;
private lastRange?: { fromBlock: bigint; toBlock: bigint };
constructor(
blockRepo: EvmBlockRepository,
metadataRepo: MetadataRepository<PollEvmLogsMetadata>,
statsRepository: StatRepository,
cfg: PollEvmLogsConfig
) {
super(cfg.interval ?? 1_000);
this.blockRepo = blockRepo;
this.metadataRepo = metadataRepo;
this.statsRepository = statsRepository;
this.cfg = cfg;
this.logger = winston.child({ module: "PollEvmLogs", label: this.cfg.id });
}
protected async preHook(): Promise<void> {
const metadata = await this.metadataRepo.get(this.cfg.id);
if (metadata) {
this.blockHeightCursor = BigInt(metadata.lastBlock);
}
}
protected async hasNext(): Promise<boolean> {
const hasFinished = this.cfg.hasFinished(this.blockHeightCursor);
if (hasFinished) {
this.logger.info(
`PollEvmLogs: (${this.cfg.id}) Finished processing all blocks from ${this.cfg.fromBlock} to ${this.cfg.toBlock}`
);
}
return !hasFinished;
}
protected async get(): Promise<EvmLog[]> {
this.report();
this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment());
const range = this.getBlockRange(this.latestBlockHeight);
if (range.fromBlock > this.latestBlockHeight) {
this.logger.info(`Next range is after latest block height, waiting...`);
return [];
}
const logs = await this.blockRepo.getFilteredLogs({
fromBlock: range.fromBlock,
toBlock: range.toBlock,
addresses: this.cfg.addresses, // Works when sending multiple addresses, but not multiple topics.
topics: [], // this.cfg.topics => will be applied by handlers
});
const blockNumbers = new Set(logs.map((log) => log.blockNumber));
const blocks = await this.blockRepo.getBlocks(blockNumbers);
logs.forEach((log) => {
const block = blocks[log.blockHash];
log.blockTime = block.timestamp;
});
this.lastRange = range;
return logs;
}
protected async persist(): Promise<void> {
this.blockHeightCursor = this.lastRange?.toBlock ?? this.blockHeightCursor;
if (this.blockHeightCursor) {
await this.metadataRepo.save(this.cfg.id, { lastBlock: this.blockHeightCursor });
}
}
/**
* Get the block range to extract.
* @param latestBlockHeight - the latest known height of the chain
* @returns an always valid range, in the sense from is always <= to
*/
private getBlockRange(latestBlockHeight: bigint): {
fromBlock: bigint;
toBlock: bigint;
} {
let fromBlock = this.blockHeightCursor
? this.blockHeightCursor + 1n
: this.cfg.fromBlock ?? latestBlockHeight;
// fromBlock is configured and is greater than current block height, then we allow to skip blocks.
if (
this.blockHeightCursor &&
this.cfg.fromBlock &&
this.cfg.fromBlock > this.blockHeightCursor
) {
fromBlock = this.cfg.fromBlock;
}
let toBlock = BigInt(fromBlock) + BigInt(this.cfg.getBlockBatchSize());
// limit toBlock to obtained block height
if (toBlock > fromBlock && toBlock > latestBlockHeight) {
toBlock = latestBlockHeight;
}
// limit toBlock to configured toBlock
if (this.cfg.toBlock && toBlock > this.cfg.toBlock) {
toBlock = this.cfg.toBlock;
}
return { fromBlock, toBlock };
}
private report(): void {
const labels = {
job: this.cfg.id,
chain: this.cfg.chain ?? "",
commitment: this.cfg.getCommitment(),
};
this.statsRepository.count("job_execution", labels);
this.statsRepository.measure("block_height", this.latestBlockHeight ?? 0n, labels);
this.statsRepository.measure("block_cursor", this.blockHeightCursor ?? 0n, labels);
}
}
export type PollEvmLogsMetadata = {
lastBlock: bigint;
};
export interface PollEvmLogsConfigProps {
fromBlock?: bigint;
toBlock?: bigint;
blockBatchSize?: number;
commitment?: string;
interval?: number;
addresses: string[];
topics: string[];
id?: string;
chain?: string;
}
export class PollEvmLogsConfig {
private props: PollEvmLogsConfigProps;
constructor(props: PollEvmLogsConfigProps = { addresses: [], topics: [] }) {
if (props.fromBlock && props.toBlock && props.fromBlock > props.toBlock) {
throw new Error("fromBlock must be less than or equal to toBlock");
}
this.props = props;
}
public getBlockBatchSize() {
return this.props.blockBatchSize ?? 100;
}
public getCommitment() {
return this.props.commitment ?? "latest";
}
public hasFinished(currentFromBlock?: bigint): boolean {
return (
currentFromBlock != undefined &&
this.props.toBlock != undefined &&
currentFromBlock >= this.props.toBlock
);
}
public get fromBlock() {
return this.props.fromBlock ? BigInt(this.props.fromBlock) : undefined;
}
public setFromBlock(fromBlock: bigint | undefined) {
this.props.fromBlock = fromBlock;
}
public get toBlock() {
return this.props.toBlock;
}
public get interval() {
return this.props.interval;
}
public get addresses() {
return this.props.addresses;
}
public get topics() {
return this.props.topics;
}
public get id() {
return this.props.id ?? ID;
}
public get chain() {
return this.props.chain;
}
static fromBlock(fromBlock: bigint) {
const cfg = new PollEvmLogsConfig();
cfg.props.fromBlock = fromBlock;
return cfg;
}
}

View File

@ -0,0 +1,48 @@
import { setTimeout } from "timers/promises";
import winston from "winston";
import { Handler } from "../entities";
export abstract class RunPollingJob {
private interval: number;
private running: boolean = false;
protected abstract logger: winston.Logger;
protected abstract preHook(): Promise<void>;
protected abstract hasNext(): Promise<boolean>;
protected abstract get(): Promise<any[]>;
protected abstract persist(): Promise<void>;
constructor(interval: number) {
this.interval = interval;
this.running = true;
}
public async run(handlers: Handler[]): Promise<void> {
this.logger.info("Starting polling job");
await this.preHook();
while (this.running) {
if (!(await this.hasNext())) {
this.logger.info("Finished processing");
await this.stop();
break;
}
let items: any[];
try {
items = await this.get();
await Promise.all(handlers.map((handler) => handler(items)));
} catch (e) {
this.logger.error("Error processing items", e);
await setTimeout(this.interval);
continue;
}
await this.persist();
await setTimeout(this.interval);
}
}
public async stop(): Promise<void> {
this.running = false;
}
}

View File

@ -0,0 +1,41 @@
import winston from "winston";
import { JobDefinition } from "../entities";
import { JobRepository } from "../repositories";
export class StartJobs {
private readonly logger = winston.child({ module: "StartJobs" });
private readonly repo: JobRepository;
private runnables: Map<string, () => Promise<void>> = new Map();
constructor(repo: JobRepository) {
this.repo = repo;
}
public async runSingle(job: JobDefinition): Promise<JobDefinition> {
if (this.runnables.has(job.id)) {
throw new Error(`Job ${job.id} already exists. Ids must be unique`);
}
const handlers = await this.repo.getHandlers(job);
if (handlers.length === 0) {
this.logger.error(`No handlers for job ${job.id}`);
throw new Error("No handlers for job");
}
const source = this.repo.getSource(job);
this.runnables.set(job.id, () => source.run(handlers));
this.runnables.get(job.id)!();
return job;
}
public async run(): Promise<JobDefinition[]> {
const jobs = await this.repo.getJobDefinitions();
for (const job of jobs) {
await this.runSingle(job);
}
return jobs;
}
}

View File

@ -0,0 +1,4 @@
export * from "./HandleEvmLogs";
export * from "./PollEvmLogs";
export * from "./RunPollingJob";
export * from "./StartJobs";

View File

@ -0,0 +1,17 @@
export type LogFoundEvent<T> = {
name: string;
address: string;
chainId: number;
txHash: string;
blockHeight: bigint;
blockTime: number;
attributes: T;
};
export type LogMessagePublished = {
sequence: number;
sender: string;
nonce: number;
payload: string;
consistencyLevel: number;
};

View File

@ -0,0 +1,32 @@
export type EvmBlock = {
number: bigint;
hash: string;
timestamp: number; // epoch seconds
};
export type EvmLog = {
blockTime?: number; // epoch seconds
blockNumber: bigint;
blockHash: string;
address: string;
removed: boolean;
data: string;
transactionHash: string;
transactionIndex: string;
topics: string[];
logIndex: number;
};
export type EvmTag = "finalized" | "latest" | "safe";
export type EvmTopicFilter = {
addresses: string[];
topics: string[];
};
export type EvmLogFilter = {
fromBlock: bigint | EvmTag;
toBlock: bigint | EvmTag;
addresses: string[];
topics: string[];
};

View File

@ -0,0 +1,3 @@
export * from "./evm";
export * from "./events";
export * from "./jobs";

View File

@ -0,0 +1,28 @@
export class JobDefinition {
id: string;
chain: string;
source: {
action: string;
config: Record<string, any>;
};
handlers: {
action: string;
target: string;
mapper: string;
config: Record<string, any>;
}[];
constructor(
id: string,
chain: string,
source: { action: string; config: Record<string, any> },
handlers: { action: string; target: string; mapper: string; config: Record<string, any> }[]
) {
this.id = id;
this.chain = chain;
this.source = source;
this.handlers = handlers;
}
}
export type Handler = (items: any[]) => Promise<any>;

View File

@ -0,0 +1,25 @@
import { RunPollingJob } from "./actions/RunPollingJob";
import { EvmBlock, EvmLog, EvmLogFilter, Handler, JobDefinition } from "./entities";
export interface EvmBlockRepository {
getBlockHeight(finality: string): Promise<bigint>;
getBlocks(blockNumbers: Set<bigint>): Promise<Record<string, EvmBlock>>;
getFilteredLogs(filter: EvmLogFilter): Promise<EvmLog[]>;
}
export interface MetadataRepository<Metadata> {
get(id: string): Promise<Metadata | undefined>;
save(id: string, metadata: Metadata): Promise<void>;
}
export interface StatRepository {
count(id: string, labels: Record<string, any>): void;
measure(id: string, value: bigint, labels: Record<string, any>): void;
report: () => Promise<string>;
}
export interface JobRepository {
getJobDefinitions(): Promise<JobDefinition[]>;
getSource(jobDef: JobDefinition): RunPollingJob;
getHandlers(jobDef: JobDefinition): Promise<Handler[]>;
}

View File

@ -0,0 +1,119 @@
import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns";
import { Config } from "./config";
import {
SnsEventRepository,
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
FileMetadataRepo,
PromStatRepository,
StaticJobRepository,
} from "./repositories";
import { HttpClient } from "./repositories/HttpClient";
import { JobRepository } from "../domain/repositories";
export class RepositoriesBuilder {
private cfg: Config;
private snsClient?: SNSClient;
private repositories = new Map();
constructor(cfg: Config) {
this.cfg = cfg;
this.build();
}
private build() {
this.snsClient = this.createSnsClient();
this.repositories.set("sns", new SnsEventRepository(this.snsClient, this.cfg.sns));
this.repositories.set("metrics", new PromStatRepository());
this.cfg.metadata?.dir &&
this.repositories.set("metadata", new FileMetadataRepo(this.cfg.metadata.dir));
this.cfg.supportedChains.forEach((chain) => {
const httpClient = this.createHttpClient(this.cfg.platforms[chain].timeout);
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chain,
rpc: this.cfg.platforms[chain].rpcs[0],
timeout: this.cfg.platforms[chain].timeout,
};
this.repositories.set(`${chain}-evmRepo`, new EvmJsonRPCBlockRepository(repoCfg, httpClient));
});
this.repositories.set(
"jobs",
new StaticJobRepository(
this.cfg.jobs.dir,
this.cfg.dryRun,
(chain: string) => this.getEvmBlockRepository(chain),
{
metadataRepo: this.getMetadataRepository(),
statsRepo: this.getStatsRepository(),
snsRepo: this.getSnsEventRepository(),
}
)
);
}
public getEvmBlockRepository(chain: string): EvmJsonRPCBlockRepository {
const repo = this.repositories.get(`${chain}-evmRepo`);
if (!repo) throw new Error(`No EvmJsonRPCBlockRepository for chain ${chain}`);
return repo;
}
public getSnsEventRepository(): SnsEventRepository {
const repo = this.repositories.get("sns");
if (!repo) throw new Error(`No SnsEventRepository`);
return repo;
}
public getMetadataRepository(): FileMetadataRepo {
const repo = this.repositories.get("metadata");
if (!repo) throw new Error(`No FileMetadataRepo`);
return repo;
}
public getStatsRepository(): PromStatRepository {
const repo = this.repositories.get("metrics");
if (!repo) throw new Error(`No PromStatRepository`);
return repo;
}
public getJobsRepository(): JobRepository {
const repo = this.repositories.get("jobs");
if (!repo) throw new Error(`No JobRepository`);
return repo;
}
public close(): void {
this.snsClient?.destroy();
}
private createSnsClient(): SNSClient {
const snsCfg: SNSClientConfig = { region: this.cfg.sns.region };
if (this.cfg.sns.credentials) {
snsCfg.credentials = {
accessKeyId: this.cfg.sns.credentials.accessKeyId,
secretAccessKey: this.cfg.sns.credentials.secretAccessKey,
};
snsCfg.endpoint = this.cfg.sns.credentials.url;
}
return new SNSClient(snsCfg);
}
private createHttpClient(timeout?: number, retries?: number): HttpClient {
return new HttpClient({
retries: retries ?? 3,
timeout: timeout ?? 5_000,
initialDelay: 1_000,
maxDelay: 30_000,
});
}
}

View File

@ -0,0 +1,47 @@
import config from "config";
import { SnsConfig } from "./repositories/SnsEventRepository";
export type Config = {
environment: "testnet" | "mainnet";
port: number;
logLevel: "debug" | "info" | "warn" | "error";
dryRun: boolean;
sns: SnsConfig;
metadata?: {
dir: string;
};
jobs: {
dir: string;
};
platforms: Record<string, PlatformConfig>;
supportedChains: string[];
};
export type PlatformConfig = {
name: string;
network: string;
chainId: number;
rpcs: string[];
timeout?: number;
};
/*
By setting NODE_CONFIG_ENV we can point to a different config directory.
Default settings can be customized by definining NODE_ENV=staging|production.
Some options may be overridable by env variables, see: config/custom-environment-variables.json
*/
export const configuration = {
environment: config.get<string>("environment"),
port: config.get<number>("port") ?? 9090,
logLevel: config.get<string>("logLevel")?.toLowerCase() ?? "info",
dryRun: config.get<string>("dryRun") === "true" ? true : false,
sns: config.get<SnsConfig>("sns"),
metadata: {
dir: config.get<string>("metadata.dir"),
},
jobs: {
dir: config.get<string>("jobs.dir"),
},
platforms: config.get<Record<string, PlatformConfig>>("platforms"),
supportedChains: config.get<string[]>("supportedChains"),
} as Config;

View File

@ -0,0 +1,24 @@
import winston from "winston";
import { configuration } from "./config";
winston.remove(winston.transports.Console);
winston.configure({
transports: [
new winston.transports.Console({
level: configuration.logLevel,
}),
],
format: winston.format.combine(
winston.format.colorize(),
winston.format.splat(),
winston.format.errors({ stack: true }),
winston.format.printf(
({ level, message, module, chain, label }) =>
`${level} [${module ?? ""}]${chain ? `[${chain}]` : ""}${
label ? `[${label}]` : ""
} ${message}`
)
),
});
export default winston;

View File

@ -0,0 +1,27 @@
import { BigNumber } from "ethers";
import { EvmLog, LogFoundEvent, LogMessagePublished } from "../../domain/entities";
export const evmLogMessagePublishedMapper = (
log: EvmLog,
parsedArgs: ReadonlyArray<any>
): LogFoundEvent<LogMessagePublished> => {
if (!log.blockTime) {
throw new Error(`Block time is missing for log ${log.logIndex} in tx ${log.transactionHash}`);
}
return {
name: "log-message-published",
address: log.address,
chainId: 2, // TODO: get from config
txHash: log.transactionHash,
blockHeight: log.blockNumber,
blockTime: log.blockTime,
attributes: {
sender: parsedArgs[0], // log.topics[1]
sequence: (parsedArgs[1] as BigNumber).toNumber(),
payload: parsedArgs[3],
nonce: parsedArgs[2],
consistencyLevel: parsedArgs[4],
},
};
};

View File

@ -0,0 +1,214 @@
import { EvmBlock, EvmLogFilter, EvmLog, EvmTag } from "../../domain/entities";
import { EvmBlockRepository } from "../../domain/repositories";
import { AxiosInstance } from "axios";
import winston from "../log";
import { HttpClient, HttpClientError } from "./HttpClient";
/**
* EvmJsonRPCBlockRepository is a repository that uses a JSON RPC endpoint to fetch blocks.
* On the reliability side, only knows how to timeout.
*/
export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
private httpClient: HttpClient;
private rpc: URL;
private readonly logger = winston.child({ module: "EvmJsonRPCBlockRepository" });
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) {
this.httpClient = httpClient;
this.rpc = new URL(cfg.rpc);
this.logger = winston.child({ module: "EvmJsonRPCBlockRepository", chain: cfg.chain });
}
async getBlockHeight(finality: EvmTag): Promise<bigint> {
const block: EvmBlock = await this.getBlock(finality);
return block.number;
}
/**
* Get blocks by block number.
* @param blockNumbers
* @returns a record of block hash -> EvmBlock
*/
async getBlocks(blockNumbers: Set<bigint>): Promise<Record<string, EvmBlock>> {
if (!blockNumbers.size) return {};
const reqs: any[] = [];
for (let blockNumber of blockNumbers) {
const blockNumberStr = blockNumber.toString();
reqs.push({
jsonrpc: "2.0",
id: blockNumberStr,
method: "eth_getBlockByNumber",
params: [blockNumberStr, false],
});
}
let results: (undefined | { id: string; result?: EvmBlock; error?: ErrorBlock })[];
try {
results = await this.httpClient.post<typeof results>(this.rpc.href, reqs);
} catch (e: HttpClientError | any) {
this.handleError(e, "eth_getBlockByNumber");
throw e;
}
if (results && results.length) {
return results
.map(
(
response: undefined | { id: string; result?: EvmBlock; error?: ErrorBlock },
idx: number
) => {
// Karura is getting 6969 errors for some blocks, so we'll just return empty blocks for those instead of throwing an error.
// We take the timestamp from the previous block, which is not ideal but should be fine.
if (
(response && response.result === null) ||
(response?.error && response.error?.code && response.error.code === 6969)
) {
this.logger.warn;
return {
hash: "",
number: BigInt(response.id),
timestamp: Date.now(),
};
}
if (
response?.result &&
response.result?.hash &&
response.result.number &&
response.result.timestamp
) {
return {
hash: response.result.hash,
number: BigInt(response.result.number),
timestamp: Number(response.result.timestamp),
};
}
const msg = `Got error ${response?.error?.message} for eth_getBlockByNumber for ${
response?.id ?? reqs[idx].id
} on ${this.rpc.hostname}`;
this.logger.error(msg);
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${
response?.id ?? reqs[idx].id
}: ${msg}`
);
}
)
.reduce((acc: Record<string, EvmBlock>, block: EvmBlock) => {
acc[block.hash] = block;
return acc;
}, {});
}
throw new Error(
`Unable to parse ${
results?.length ?? 0
} blocks for eth_getBlockByNumber for numbers ${blockNumbers} on ${this.rpc.hostname}`
);
}
async getFilteredLogs(filter: EvmLogFilter): Promise<EvmLog[]> {
const parsedFilters = {
topics: filter.topics,
address: filter.addresses,
fromBlock: `0x${filter.fromBlock.toString(16)}`,
toBlock: `0x${filter.toBlock.toString(16)}`,
};
let response: { result: Log[]; error?: ErrorBlock };
try {
response = await this.httpClient.post<typeof response>(this.rpc.href, {
jsonrpc: "2.0",
method: "eth_getLogs",
params: [parsedFilters],
id: 1,
});
} catch (e: HttpClientError | any) {
this.handleError(e, "eth_getLogs");
throw e;
}
const logs = response?.result;
this.logger.info(
`Got ${logs?.length} logs for ${this.describeFilter(filter)} from ${this.rpc.hostname}`
);
return logs.map((log) => ({
...log,
blockNumber: BigInt(log.blockNumber),
transactionIndex: log.transactionIndex.toString(),
}));
}
private describeFilter(filter: EvmLogFilter): string {
return `[addresses:${filter.addresses}][topics:${filter.topics}][blocks:${filter.fromBlock} - ${filter.toBlock}]`;
}
/**
* Loosely based on the wormhole-dashboard implementation (minus some specially crafted blocks when null result is obtained)
*/
private async getBlock(blockNumberOrTag: bigint | EvmTag): Promise<EvmBlock> {
let response: { result?: EvmBlock; error?: ErrorBlock };
try {
response = await this.httpClient.post<typeof response>(this.rpc.href, {
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [blockNumberOrTag.toString(), false], // this means we'll get a light block (no txs)
id: 1,
});
} catch (e: HttpClientError | any) {
this.handleError(e, "eth_getBlockByNumber");
throw e;
}
const result = response?.result;
if (result && result.hash && result.number && result.timestamp) {
// Convert to our domain compatible type
return {
number: BigInt(result.number),
timestamp: Number(result.timestamp),
hash: result.hash,
};
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${this.rpc}`
);
}
private handleError(e: any, method: string) {
if (e instanceof HttpClientError) {
this.logger.error(
`Got ${e.status} from ${this.rpc.hostname}/${method}. ${e?.message ?? `${e?.message}`}`
);
} else {
this.logger.error(`Got error ${e} from ${this.rpc.hostname}/${method}`);
}
}
}
export type EvmJsonRPCBlockRepositoryCfg = {
rpc: string;
timeout?: number;
chain: string;
};
type ErrorBlock = {
code: number; //6969,
message: string; //'Error: No response received from RPC endpoint in 60s'
};
type Log = {
blockNumber: string;
blockHash: string;
transactionIndex: number;
removed: boolean;
address: string;
data: string;
topics: Array<string>;
transactionHash: string;
logIndex: number;
};

View File

@ -0,0 +1,27 @@
import fs from "fs";
import { MetadataRepository } from "../../domain/repositories";
export class FileMetadataRepo implements MetadataRepository<any> {
private readonly dirPath: string;
constructor(dirPath: string) {
this.dirPath = dirPath;
if (!fs.existsSync(this.dirPath)) {
fs.mkdirSync(this.dirPath, { recursive: true });
}
}
async get(id: string): Promise<any> {
const filePath = `${this.dirPath}/${id}.json`;
return fs.promises
.readFile(filePath, "utf8")
.then(JSON.parse)
.catch((err) => null);
}
async save(id: string, metadata: any): Promise<void> {
const filePath = `${this.dirPath}/${id}.json`;
return fs.promises.writeFile(filePath, JSON.stringify(metadata), "utf8");
}
}

View File

@ -0,0 +1,130 @@
import axios, { AxiosError, AxiosInstance } from "axios";
import { setTimeout } from "timers/promises";
/**
* A simple HTTP client with exponential backoff retries and 429 handling.
*/
export class HttpClient {
private initialDelay: number = 1_000;
private maxDelay: number = 60_000;
private retries: number = 0;
private timeout: number = 5_000;
private axios: AxiosInstance;
constructor(options?: HttpClientOptions) {
options?.initialDelay && (this.initialDelay = options.initialDelay);
options?.maxDelay && (this.maxDelay = options.maxDelay);
options?.retries && (this.retries = options.retries);
options?.timeout && (this.timeout = options.timeout);
this.axios = axios.create();
}
public async post<T>(url: string, body: any, opts?: HttpClientOptions): Promise<T> {
return this.executeWithRetry(url, "POST", body, opts);
}
private async execute<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
let response;
try {
response = await this.axios.request<T>({
url: url,
method: method,
data: body,
timeout: opts?.timeout ?? this.timeout,
signal: AbortSignal.timeout(opts?.timeout ?? this.timeout),
});
} catch (err: AxiosError | any) {
// Connection / timeout error:
if (err instanceof AxiosError) {
throw new HttpClientError(err.message ?? err.code, { status: err?.status ?? 0 }, err);
}
throw new HttpClientError(err.message ?? err.code, undefined, err);
}
if (!(response.status > 200) && !(response.status < 300)) {
throw new HttpClientError(undefined, response, response.data);
}
return response.data;
}
private async executeWithRetry<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
const maxRetries = opts?.retries ?? this.retries;
let retries = 0;
const initialDelay = opts?.initialDelay ?? this.initialDelay;
const maxDelay = opts?.maxDelay ?? this.maxDelay;
while (maxRetries >= 0) {
try {
return await this.execute(url, method, body, opts);
} catch (err) {
if (err instanceof HttpClientError) {
if (retries < maxRetries) {
const retryAfter = err.getRetryAfter(maxDelay, err);
if (retryAfter) {
await setTimeout(retryAfter, { ref: false });
} else {
const timeout = Math.min(initialDelay * 2 ** maxRetries, maxDelay);
await setTimeout(timeout, { ref: false });
}
retries++;
continue;
}
}
throw err;
}
}
throw new Error(`Failed to reach ${url}`);
}
}
export type HttpClientOptions = {
initialDelay?: number;
maxDelay?: number;
retries?: number;
timeout?: number;
};
export class HttpClientError extends Error {
public readonly status?: number;
public readonly data?: any;
public readonly headers?: any;
constructor(message?: string, response?: { status: number; headers?: any }, data?: any) {
super(message ?? `Unexpected status code: ${response?.status}`);
this.status = response?.status;
this.data = data;
this.headers = response?.headers;
Error.captureStackTrace(this, this.constructor);
}
/**
* Parses the Retry-After header and returns the value in milliseconds.
* @param maxDelay
* @param error
* @throws {HttpClientError} if retry-after is bigger than maxDelay.
* @returns the retry-after value in milliseconds.
*/
public getRetryAfter(maxDelay: number, error: HttpClientError): number | undefined {
const retryAfter = this.headers?.get("Retry-After");
if (retryAfter) {
const value = parseInt(retryAfter) * 1000; // header value is in seconds
if (value <= maxDelay) {
return value;
}
throw error;
}
}
}

View File

@ -0,0 +1,55 @@
import prometheus from "prom-client";
import { StatRepository } from "../../domain/repositories";
export class PromStatRepository implements StatRepository {
private readonly registry: prometheus.Registry;
private counters: Map<string, prometheus.Counter<string>> = new Map();
private gauges: Map<string, prometheus.Gauge<string>> = new Map();
constructor(registry?: prometheus.Registry) {
this.registry = registry ?? new prometheus.Registry();
}
public report() {
return this.registry.metrics();
}
public count(id: string, labels: Record<string, any>): void {
const counter = this.getCounter(id, labels);
counter.inc(labels);
}
public measure(id: string, value: bigint, labels: Record<string, any>): void {
const gauge = this.getGauge(id, labels);
gauge.set(labels, Number(value));
}
private getCounter(id: string, labels: Record<string, any>): prometheus.Counter {
this.counters.get(id) ??
this.counters.set(
id,
new prometheus.Counter({
name: id,
help: id,
registers: [this.registry],
labelNames: Object.keys(labels),
})
);
return this.counters.get(id) as prometheus.Counter<string>;
}
private getGauge(id: string, labels: Record<string, any>): prometheus.Gauge {
this.gauges.get(id) ??
this.gauges.set(
id,
new prometheus.Gauge({
name: id,
help: id,
registers: [this.registry],
labelNames: Object.keys(labels),
})
);
return this.gauges.get(id) as prometheus.Gauge;
}
}

View File

@ -0,0 +1,161 @@
import { LogFoundEvent } from "../../domain/entities";
import crypto from "node:crypto";
import {
SNSClient,
PublishBatchCommand,
PublishBatchCommandInput,
PublishBatchRequestEntry,
} from "@aws-sdk/client-sns";
import winston from "../log";
const CHUNK_SIZE = 10;
export class SnsEventRepository {
private client: SNSClient;
private cfg: SnsConfig;
private logger: winston.Logger;
constructor(snsClient: SNSClient, cfg: SnsConfig) {
this.client = snsClient;
this.cfg = cfg;
this.logger = winston.child({ module: "SnsEventRepository" });
this.logger.info(`Created for topic ${cfg.topicArn}`);
}
async publish(events: LogFoundEvent<any>[]): Promise<SnsPublishResult> {
if (!events.length) {
this.logger.warn("No events to publish, continuing...");
return {
status: "success",
};
}
const batches: PublishBatchCommandInput[] = [];
const inputs: PublishBatchRequestEntry[] = events
.map(SnsEvent.fromLogFoundEvent)
.map((event) => ({
Id: crypto.randomUUID(),
Subject: this.cfg.subject ?? "blockchain-watcher",
Message: JSON.stringify(event),
MessageGroupId: this.cfg.groupId ?? "blockchain-watcher",
MessageDeduplicationId: event.trackId,
}));
// PublishBatchCommand: only supports max 10 items per batch
for (let i = 0; i < inputs.length; i += CHUNK_SIZE) {
const batch: PublishBatchCommandInput = {
TopicArn: this.cfg.topicArn,
PublishBatchRequestEntries: inputs.slice(i, i + CHUNK_SIZE),
};
batches.push(batch);
}
try {
const promises = [];
const errors = [];
for (const batch of batches) {
const command = new PublishBatchCommand(batch);
promises.push(this.client.send(command));
}
const results = await Promise.allSettled(promises);
for (const result of results) {
if (result.status !== "fulfilled") {
this.logger.error(result.reason);
errors.push(result.reason);
}
}
if (errors.length > 0) {
return {
status: "error",
reasons: errors,
};
}
} catch (error: unknown) {
this.logger.error(error);
return {
status: "error",
};
}
return {
status: "success",
};
}
async asTarget(): Promise<(events: LogFoundEvent<any>[]) => Promise<void>> {
return async (events: LogFoundEvent<any>[]) => {
const result = await this.publish(events);
if (result.status === "error") {
this.logger.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`);
throw new Error(`Error publishing events to SNS: ${result.reason}`);
}
this.logger.info(`Published ${events.length} events to SNS`);
};
}
}
export class SnsEvent {
trackId: string;
source: string;
event: string;
timestamp: string;
version: string;
data: Record<string, any>;
constructor(
trackId: string,
source: string,
event: string,
timestamp: string,
version: string,
data: Record<string, any>
) {
this.trackId = trackId;
this.source = source;
this.event = event;
this.timestamp = timestamp;
this.version = version;
this.data = data;
}
static fromLogFoundEvent<T>(logFoundEvent: LogFoundEvent<T>): SnsEvent {
return new SnsEvent(
`chain-event-${logFoundEvent.txHash}-${logFoundEvent.blockHeight}`,
"blockchain-watcher",
logFoundEvent.name,
new Date().toISOString(),
"1",
{
chainId: logFoundEvent.chainId,
emitter: logFoundEvent.address,
txHash: logFoundEvent.txHash,
blockHeight: logFoundEvent.blockHeight.toString(),
blockTime: new Date(logFoundEvent.blockTime * 1000).toISOString(),
attributes: logFoundEvent.attributes,
}
);
}
}
export type SnsConfig = {
region: string;
topicArn: string;
subject?: string;
groupId: string;
credentials?: {
accessKeyId: string;
secretAccessKey: string;
url: string;
};
};
export type SnsPublishResult = {
status: "success" | "error";
reason?: string;
reasons?: string[];
};

View File

@ -0,0 +1,120 @@
import {
HandleEvmLogs,
PollEvmLogs,
PollEvmLogsConfig,
PollEvmLogsConfigProps,
RunPollingJob,
} from "../../domain/actions";
import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities";
import {
EvmBlockRepository,
JobRepository,
MetadataRepository,
StatRepository,
} from "../../domain/repositories";
import { FileMetadataRepo, SnsEventRepository } from "./index";
import { evmLogMessagePublishedMapper } from "../mappers/evmLogMessagePublishedMapper";
import log from "../log";
export class StaticJobRepository implements JobRepository {
private fileRepo: FileMetadataRepo;
private dryRun: boolean = false;
private sources: Map<string, (def: JobDefinition) => RunPollingJob> = new Map();
private handlers: Map<string, (cfg: any, target: string, mapper: any) => Promise<Handler>> =
new Map();
private mappers: Map<string, any> = new Map();
private targets: Map<string, () => Promise<(items: any[]) => Promise<void>>> = new Map();
private blockRepoProvider: (chain: string) => EvmBlockRepository;
private metadataRepo: MetadataRepository<any>;
private statsRepo: StatRepository;
private snsRepo: SnsEventRepository;
constructor(
path: string,
dryRun: boolean,
blockRepoProvider: (chain: string) => EvmBlockRepository,
repos: {
metadataRepo: MetadataRepository<any>;
statsRepo: StatRepository;
snsRepo: SnsEventRepository;
}
) {
this.fileRepo = new FileMetadataRepo(path);
this.blockRepoProvider = blockRepoProvider;
this.metadataRepo = repos.metadataRepo;
this.statsRepo = repos.statsRepo;
this.snsRepo = repos.snsRepo;
this.dryRun = dryRun;
this.fill();
}
async getJobDefinitions(): Promise<JobDefinition[]> {
const persisted = await this.fileRepo.get("jobs");
if (!persisted) {
return Promise.resolve([]);
}
return persisted;
}
getSource(jobDef: JobDefinition): RunPollingJob {
const src = this.sources.get(jobDef.source.action);
if (!src) {
throw new Error(`Source ${jobDef.source.action} not found`);
}
return src(jobDef);
}
async getHandlers(jobDef: JobDefinition): Promise<Handler[]> {
const result: Handler[] = [];
for (const handler of jobDef.handlers) {
const maybeHandler = this.handlers.get(handler.action);
if (!maybeHandler) {
throw new Error(`Handler ${handler.action} not found`);
}
const mapper = this.mappers.get(handler.mapper);
if (!mapper) {
throw new Error(`Handler ${handler.action} not found`);
}
result.push((await maybeHandler(handler.config, handler.target, mapper)).bind(maybeHandler));
}
return result;
}
private fill() {
const pollEvmLogs = (jobDef: JobDefinition) =>
new PollEvmLogs(
this.blockRepoProvider(jobDef.source.config.chain),
this.metadataRepo,
this.statsRepo,
new PollEvmLogsConfig({
...(jobDef.source.config as PollEvmLogsConfigProps),
id: jobDef.id,
})
);
this.sources.set("PollEvmLogs", pollEvmLogs);
this.mappers.set("evmLogMessagePublishedMapper", evmLogMessagePublishedMapper);
const snsTarget = () => this.snsRepo.asTarget();
const dummyTarget = async () => async (events: any[]) => {
log.info(`Got ${events.length} events`);
};
this.targets.set("sns", snsTarget);
this.targets.set("dummy", dummyTarget);
const handleEvmLogs = async (config: any, target: string, mapper: any) => {
const instance = new HandleEvmLogs<LogFoundEvent<any>>(
config,
mapper,
await this.targets.get(this.dryRun ? "dummy" : target)!()
);
return instance.handle.bind(instance);
};
this.handlers.set("HandleEvmLogs", handleEvmLogs);
}
}

View File

@ -0,0 +1,14 @@
// Monkey patching BigInt serialization
if (!("toJSON" in BigInt.prototype)) {
Object.defineProperty(BigInt.prototype, "toJSON", {
get() {
return () => String(this);
},
});
}
export * from "./FileMetadataRepo";
export * from "./SnsEventRepository";
export * from "./EvmJsonRPCBlockRepository";
export * from "./PromStatRepository";
export * from "./StaticJobRepository";

View File

@ -0,0 +1,13 @@
import { StatRepository } from "../../domain/repositories";
export class HealthController {
private readonly statsRepo: StatRepository;
constructor(statsRepo: StatRepository) {
this.statsRepo = statsRepo;
}
metrics = async () => {
return this.statsRepo.report();
};
}

View File

@ -0,0 +1,40 @@
import http from "http";
import url from "url";
import { HealthController } from "./HealthController";
import log from "../log";
export class WebServer {
private server: http.Server;
private port: number;
constructor(port: number, healthController: HealthController) {
this.port = port;
this.server = http.createServer(async (req, res) => {
const route = url.parse(req.url ?? "").pathname;
if (route === "/metrics") {
// Return all metrics the Prometheus exposition format
res.setHeader("Content-Type", "text/plain");
res.end(await healthController.metrics());
}
if (route === "/health") {
res.end("OK");
}
res.statusCode = 404;
res.end();
});
this.start();
}
start() {
this.server.listen(this.port, () => {
log.info(`Server started on port ${this.port}`);
});
}
stop() {
this.server.close();
}
}

View File

@ -0,0 +1,48 @@
import { configuration } from "./infrastructure/config";
import { RepositoriesBuilder } from "./infrastructure/RepositoriesBuilder";
import log from "./infrastructure/log";
import { WebServer } from "./infrastructure/rpc/Server";
import { HealthController } from "./infrastructure/rpc/HealthController";
import { StartJobs } from "./domain/actions";
let repos: RepositoriesBuilder;
let server: WebServer;
async function run(): Promise<void> {
log.info(`Starting: dryRunEnabled -> ${configuration.dryRun}`);
repos = new RepositoriesBuilder(configuration);
const startJobs = new StartJobs(repos.getJobsRepository());
await startServer(repos, startJobs);
await startJobs.run();
// Just keep this running until killed
setInterval(() => {
log.info("Still running");
}, 20_000);
log.info("Started");
// Handle shutdown
process.on("SIGINT", handleShutdown);
process.on("SIGTERM", handleShutdown);
}
const startServer = async (repos: RepositoriesBuilder, startJobs: StartJobs) => {
server = new WebServer(configuration.port, new HealthController(repos.getStatsRepository()));
};
const handleShutdown = async () => {
try {
await Promise.allSettled([repos.close(), server.stop()]);
process.exit();
} catch (error: unknown) {
process.exit(1);
}
};
run().catch((e) => {
log.error("Fatal error caused process to exit", e);
});

View File

@ -0,0 +1,98 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { HandleEvmLogs, HandleEvmLogsConfig } from "../../src/domain/actions/HandleEvmLogs";
import { EvmLog, LogFoundEvent } from "../../src/domain/entities";
const ABI =
"event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)";
const mapper = (log: EvmLog, args: ReadonlyArray<any>) => {
return {
name: "send-event",
address: log.address,
chainId: 1,
txHash: "0x0",
blockHeight: 0n,
blockTime: 0,
attributes: {
sequence: args[0].toString(),
deliveryQuote: args[1].toString(),
paymentForExtraReceiverValue: args[2].toString(),
},
};
};
const targetRepo = {
save: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.resolve();
},
failingSave: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.reject();
},
};
let targetRepoSpy: jest.SpiedFunction<(typeof targetRepo)["save"]>;
let evmLogs: EvmLog[];
let cfg: HandleEvmLogsConfig;
let handleEvmLogs: HandleEvmLogs<LogFoundEvent<Record<string, string>>>;
describe("HandleEvmLogs", () => {
afterEach(async () => {});
it("should be able to map logs", async () => {
const expectedLength = 5;
givenConfig(ABI);
givenEvmLogs(expectedLength, expectedLength);
givenHandleEvmLogs();
const result = await handleEvmLogs.handle(evmLogs);
expect(result).toHaveLength(expectedLength);
expect(result[0].attributes.sequence).toBe("3389");
expect(result[0].attributes.deliveryQuote).toBe("75150000000000000");
expect(result[0].attributes.paymentForExtraReceiverValue).toBe("0");
expect(targetRepoSpy).toBeCalledWith(result);
});
});
const givenHandleEvmLogs = (targetFn: "save" | "failingSave" = "save") => {
targetRepoSpy = jest.spyOn(targetRepo, targetFn);
handleEvmLogs = new HandleEvmLogs(cfg, mapper, targetRepo[targetFn]);
};
const givenConfig = (abi: string) => {
cfg = {
filter: {
addresses: ["0x28D8F1Be96f97C1387e94A53e00eCcFb4E75175a"],
topics: ["0xda8540426b64ece7b164a9dce95448765f0a7263ef3ff85091c9c7361e485364"],
},
abi,
};
};
const givenEvmLogs = (length: number, matchingFilterOnes: number) => {
evmLogs = [];
let matchingCount = 0;
for (let i = 0; i < length; i++) {
let address = "0x392f472048681816e91026cd768c60958b55352add2837adea9ea6249178b8a8";
let topic: string | undefined = undefined;
if (matchingCount < matchingFilterOnes) {
address = cfg.filter.addresses![0].toUpperCase();
topic = cfg.filter.topics![0];
matchingCount++;
}
evmLogs.push({
blockTime: 0,
blockNumber: BigInt(i + 1),
blockHash: "0x1a07d0bd31c84f0dab36eac31a2f3aa801852bf8240ffba19113c62463f694fa",
address: address,
removed: false,
data: "0x000000000000000000000000000000000000000000000000010afc86dedee0000000000000000000000000000000000000000000000000000000000000000000",
transactionHash: "0x2077dbd0c685c264dfa4e8e048ff15b03775043070216644258bf1bd3e419aa8",
transactionIndex: "0x4",
topics: topic
? [topic, "0x0000000000000000000000000000000000000000000000000000000000000d3d"]
: [],
logIndex: 0,
});
}
};

View File

@ -0,0 +1,180 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { setTimeout } from "timers/promises";
import {
PollEvmLogsMetadata,
PollEvmLogs,
PollEvmLogsConfig,
} from "../../src/domain/actions/PollEvmLogs";
import {
EvmBlockRepository,
MetadataRepository,
StatRepository,
} from "../../src/domain/repositories";
import { EvmBlock, EvmLog } from "../../src/domain/entities";
let cfg = PollEvmLogsConfig.fromBlock(0n);
let getBlocksSpy: jest.SpiedFunction<EvmBlockRepository["getBlocks"]>;
let getLogsSpy: jest.SpiedFunction<EvmBlockRepository["getFilteredLogs"]>;
let handlerSpy: jest.SpiedFunction<(logs: EvmLog[]) => Promise<void>>;
let metadataSaveSpy: jest.SpiedFunction<MetadataRepository<PollEvmLogsMetadata>["save"]>;
let metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
let evmBlockRepo: EvmBlockRepository;
let statsRepo: StatRepository;
let handlers = {
working: (logs: EvmLog[]) => Promise.resolve(),
failing: (logs: EvmLog[]) => Promise.reject(),
};
let pollEvmLogs: PollEvmLogs;
describe("PollEvmLogs", () => {
afterEach(async () => {
await pollEvmLogs.stop();
});
it("should be able to read logs from latest block when no fromBlock is configured", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenStatsRepository();
givenPollEvmLogs();
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(getBlocksSpy).toHaveReturnedTimes(1),
() => expect(getBlocksSpy).toHaveBeenCalledWith(new Set([currentHeight, currentHeight + 1n])),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: currentHeight + blocksAhead,
toBlock: currentHeight + blocksAhead,
})
);
});
it("should be able to read logs from last known block when configured from is before", async () => {
const lastExtractedBlock = 10n;
const blocksAhead = 10n;
givenEvmBlockRepository(lastExtractedBlock, blocksAhead);
givenMetadataRepository({ lastBlock: lastExtractedBlock });
givenStatsRepository();
givenPollEvmLogs(lastExtractedBlock - 10n);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => () =>
expect(getBlocksSpy).toHaveBeenCalledWith(
new Set([lastExtractedBlock, lastExtractedBlock + 1n])
),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: lastExtractedBlock + 1n,
toBlock: lastExtractedBlock + blocksAhead,
})
);
});
it("should pass logs to handlers and persist metadata", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenStatsRepository();
givenPollEvmLogs(currentHeight);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(handlerSpy).toHaveBeenCalledWith(expect.any(Array)),
() =>
expect(metadataSaveSpy).toBeCalledWith("watch-evm-logs", {
lastBlock: currentHeight + blocksAhead,
})
);
});
});
const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => {
const logsResponse: EvmLog[] = [];
const blocksResponse: Record<string, EvmBlock> = {};
if (height) {
for (let index = 0n; index <= (blocksAhead ?? 1n); index++) {
logsResponse.push({
blockNumber: height + index,
blockHash: `0x0${index}`,
blockTime: 0,
address: "",
removed: false,
data: "",
transactionHash: "",
transactionIndex: "",
topics: [],
logIndex: 0,
});
blocksResponse[`0x0${index}`] = {
timestamp: 0,
hash: `0x0${index}`,
number: height + index,
};
}
}
evmBlockRepo = {
getBlocks: () => Promise.resolve(blocksResponse),
getBlockHeight: () => Promise.resolve(height ? height + (blocksAhead ?? 10n) : 10n),
getFilteredLogs: () => Promise.resolve(logsResponse),
};
getBlocksSpy = jest.spyOn(evmBlockRepo, "getBlocks");
getLogsSpy = jest.spyOn(evmBlockRepo, "getFilteredLogs");
handlerSpy = jest.spyOn(handlers, "working");
};
const givenMetadataRepository = (data?: PollEvmLogsMetadata) => {
metadataRepo = {
get: () => Promise.resolve(data),
save: () => Promise.resolve(),
};
metadataSaveSpy = jest.spyOn(metadataRepo, "save");
};
const givenStatsRepository = () => {
statsRepo = {
count: () => {},
measure: () => {},
report: () => Promise.resolve(""),
};
};
const givenPollEvmLogs = (from?: bigint) => {
cfg.setFromBlock(from);
pollEvmLogs = new PollEvmLogs(evmBlockRepo, metadataRepo, statsRepo, cfg);
};
const whenPollEvmLogsStarts = async () => {
pollEvmLogs.run([handlers.working]);
};
const thenWaitForAssertion = async (...assertions: (() => void)[]) => {
for (let index = 1; index < 5; index++) {
try {
for (const assertion of assertions) {
assertion();
}
break;
} catch (error) {
if (index === 4) {
throw error;
}
await setTimeout(10, undefined, { ref: false });
}
}
};

View File

@ -0,0 +1,50 @@
import { describe, it, expect } from "@jest/globals";
import { evmLogMessagePublishedMapper } from "../../../src/infrastructure/mappers/evmLogMessagePublishedMapper";
import { HandleEvmLogs } from "../../../src/domain/actions/HandleEvmLogs";
const address = "0x98f3c9e6e3face36baad05fe09d375ef1464288b";
const topic = "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2";
const txHash = "0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7";
const handler = new HandleEvmLogs(
{
filter: { addresses: [address], topics: [topic] },
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
},
evmLogMessagePublishedMapper,
async () => {}
);
describe("evmLogMessagePublished", () => {
it("should be able to map log to LogMessagePublished", async () => {
const [result] = await handler.handle([
{
blockTime: 1699443287,
blockNumber: 18521386n,
blockHash: "0x894136d03446d47116319d59b5ec3190c05248e16c8728c2848bf7452732341c",
address: "0x98f3c9e6e3face36baad05fe09d375ef1464288b",
removed: false,
data: "0x00000000000000000000000000000000000000000000000000000000000212b20000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000085010000000000000000000000000000000000000000000000000000000045be2810000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb480002f022f6b3e80ec1219065fee8e46eb34c1cfd056a8d52d93df2c7e0165eaf364b00010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
transactionHash: txHash,
transactionIndex: "0x62",
topics: [topic, "0x0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585"],
logIndex: 0,
},
]);
expect(result.name).toBe("log-message-published");
expect(result.chainId).toBe(2);
expect(result.txHash).toBe(
"0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7"
);
expect(result.blockHeight).toBe(18521386n);
expect(result.blockTime).toBe(1699443287);
expect(result.attributes.sequence).toBe(135858);
expect(result.attributes.sender.toLowerCase()).toBe(
"0x3ee18b2214aff97000d974cf647e7c347e8fa585"
);
expect(result.attributes.nonce).toBe(0);
expect(result.attributes.consistencyLevel).toBe(1);
});
});

View File

@ -0,0 +1,184 @@
import { describe, it, expect, afterEach, afterAll } from "@jest/globals";
import { EvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import axios from "axios";
import nock from "nock";
import { EvmLogFilter, EvmTag } from "../../../src/domain/entities";
import { HttpClient } from "../../../src/infrastructure/repositories/HttpClient";
axios.defaults.adapter = "http"; // needed by nock
const rpc = "http://localhost";
const address = "0x98f3c9e6e3face36baad05fe09d375ef1464288b";
const topic = "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2";
const txHash = "0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7";
let repo: EvmJsonRPCBlockRepository;
describe("EvmJsonRPCBlockRepository", () => {
afterAll(() => {
nock.restore();
});
afterEach(() => {
nock.cleanAll();
});
it("should be able to get block height", async () => {
const expectedHeight = 1980809n;
givenARepo();
givenBlockHeightIs(expectedHeight, "latest");
const result = await repo.getBlockHeight("latest");
expect(result).toBe(expectedHeight);
});
it("should be able to get several blocks", async () => {
const blockNumbers = [2n, 3n, 4n];
givenARepo();
givenBlocksArePresent(blockNumbers);
const result = await repo.getBlocks(new Set(blockNumbers));
expect(Object.keys(result)).toHaveLength(blockNumbers.length);
blockNumbers.forEach((blockNumber) => {
expect(result[blockHash(blockNumber)].number).toBe(blockNumber);
});
});
it("should be able to get logs", async () => {
const filter: EvmLogFilter = {
fromBlock: "safe",
toBlock: "latest",
addresses: [address],
topics: [],
};
givenLogsPresent(filter);
const logs = await repo.getFilteredLogs(filter);
expect(logs).toHaveLength(1);
expect(logs[0].blockNumber).toBe(1n);
expect(logs[0].blockHash).toBe(blockHash(1n));
expect(logs[0].address).toBe(address);
});
});
const givenARepo = () => {
repo = new EvmJsonRPCBlockRepository({ rpc, timeout: 100, chain: "ethereum" }, new HttpClient());
};
const givenBlockHeightIs = (height: bigint, commitment: EvmTag) => {
nock(rpc)
.post("/", {
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [commitment, false],
id: 1,
})
.reply(200, {
jsonrpc: "2.0",
id: 1,
result: {
number: `0x${height.toString(16)}`,
hash: blockHash(height),
timestamp: "0x654a892f",
},
});
};
const givenBlocksArePresent = (blockNumbers: bigint[]) => {
const requests = blockNumbers.map((blockNumber) => ({
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [blockNumber.toString(), false],
id: blockNumber.toString(),
}));
const response = blockNumbers.map((blockNumber) => ({
jsonrpc: "2.0",
id: blockNumber.toString(),
result: {
number: `0x${blockNumber.toString(16)}`,
hash: blockHash(blockNumber),
timestamp: "0x654a892f",
},
}));
nock(rpc).post("/", requests).reply(200, response);
};
const givenLogsPresent = (filter: EvmLogFilter) => {
const response = {
jsonrpc: "2.0",
id: 1,
result: [
{
address: filter.addresses[0],
topics: [topic],
data: "0x",
blockNumber: "0x1",
blockHash: blockHash(1n),
transactionHash: txHash,
transactionIndex: "0x0",
logIndex: 0,
removed: false,
},
],
};
nock(rpc).post("/").reply(200, response);
};
const blockHash = (blockNumber: bigint) => `0x${blockNumber.toString(16)}`;
/* Examples:
- blockByNumber:
{
"jsonrpc":"2.0",
"method":"eth_getBlockByNumber",
"params":[
"latest",
true
],
"id":1
}
->
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"baseFeePerGas": "0x78d108bbb",
"difficulty": "0x0",
"extraData": "0x406275696c64657230783639",
"gasLimit": "0x1c9c380",
"gasUsed": "0xfca882",
"hash": "0xb29ad2fa313f50d293fcf5679c6862ee7f4a3d641f09b227ad0ee3fba10d1cbb",
"logsBloom": "0x312390b5e798baf83b514972932b522118d98b9888f5880461db7e26f4bece2e8141717140d492028f887928435127083671a3488c3b6c240130a6eeb3692d908417d91c65fbfd396d0ade2b57a263a080c64f59fb4d3c4033415503e833306057524072daeae803a45cc020c1a32f436f30037b49003cd257c965d9214b441922012654b681e5202053a7d58500a64aa040cec9d90a0c9e5e3321d821503d90cfb84961594a72f02e92c7c2559d95c86504c54260c708ea63e5e4a2538f1143096c2422250a0a20b321a8814678d26e6a6d6a872e232a500a402a3a6445b85b3cf92b481e9020c20a969eac4c50ca08667cda68812f8141108908b3d175f649",
"miner": "0x690b9a9e9aa1c9db991c7721a92d351db4fac990",
"mixHash": "0xbdeea2aa4f2a026b27bc720d28c73680a35ad3e5017568cddcb066b5c12b1f60",
"nonce": "0x0000000000000000",
"number": "0x11a9fa9",
"parentHash": "0x7f8c4ecd8772eab825ee3c8e713c5088c6a32b41d61dbd1c0833e7d4df337713",
"receiptsRoot": "0x06e3cd06761468089708b204a092545576c508739f0eff936c96914da2e277e9",
"sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size": "0x25301",
"stateRoot": "0x3e1333543583ec7e5fb4e337261c43ef94aeb4f77c4e7f657a538fc3c2e5b6de",
"timestamp": "0x654a892f",
"totalDifficulty": "0xc70d815d562d3cfa955",
"transactions": [
"0x5e59c0bb917e7a5a64f098cd6a370bac4f40ecdf6ca79deaccf25736fe117ef7"
],
"transactionsRoot": "0xbf23b57ac6f6aede4d886a556b7bbee868721542b9a1d912ffa5f4ead0b8ec72",
"uncles": [],
"withdrawals": [
{
"index": "0x16b0d86",
"validatorIndex": "0x2a57b",
"address": "0xdaac5ce35ad7892d5f2dd364954066f4323c9a57",
"amount": "0x105fdec"
}
]
}
}
- blockByHash:
*/

View File

@ -0,0 +1,59 @@
import { describe, expect, it, beforeEach, afterEach } from "@jest/globals";
import fs from "fs";
import { FileMetadataRepo } from "../../../src/infrastructure/repositories";
describe("FileMetadataRepo", () => {
const dirPath = "./metadata-repo";
const repo = new FileMetadataRepo(dirPath);
beforeEach(() => {
if (!fs.existsSync(dirPath)) {
fs.mkdirSync(dirPath);
}
});
afterEach(() => {
fs.rm(dirPath, () => {});
});
describe("get", () => {
it("should return null if the file does not exist", async () => {
const metadata = await repo.get("non-existent-file");
expect(metadata).toBeNull();
});
it("should return the metadata if the file exists", async () => {
const id = "test-file";
const metadata = { foo: "bar" };
await repo.save(id, metadata);
const retrievedMetadata = await repo.get(id);
expect(retrievedMetadata).toEqual(metadata);
});
});
describe("save", () => {
it("should create a new file with the given metadata", async () => {
const id = "test-file";
const metadata = { foo: "bar" };
await repo.save(id, metadata);
const fileExists = fs.existsSync(`${dirPath}/${id}.json`);
expect(fileExists).toBe(true);
const fileContents = fs.readFileSync(`${dirPath}/${id}.json`, "utf8");
expect(JSON.parse(fileContents)).toEqual(metadata);
});
it("should overwrite an existing file with the given metadata", async () => {
const id = "test-file";
const initialMetadata = { foo: "bar" };
const updatedMetadata = { baz: "qux" };
await repo.save(id, initialMetadata);
await repo.save(id, updatedMetadata);
const fileContents = fs.readFileSync(`${dirPath}/${id}.json`, "utf8");
expect(JSON.parse(fileContents)).toEqual(updatedMetadata);
});
});
});

View File

@ -0,0 +1,50 @@
import { describe, expect, it, jest } from "@jest/globals";
import { SnsEventRepository, SnsConfig } from "../../../src/infrastructure/repositories";
import { SNSClient } from "@aws-sdk/client-sns";
let snsEventRepository: SnsEventRepository;
let snsClient: SNSClient;
let snsConfig: SnsConfig;
describe("SnsEventRepository", () => {
it("should not call sns client when no events given", async () => {
givenSnsEventRepository();
const result = await snsEventRepository.publish([]);
expect(result).toEqual({ status: "success" });
expect(snsClient.send).not.toHaveBeenCalled();
});
it("should publish", async () => {
givenSnsEventRepository();
const result = await snsEventRepository.publish([
{
chainId: 1,
address: "0x123456",
txHash: "0x123",
blockHeight: 123n,
blockTime: 0,
name: "LogMessagePublished",
attributes: {},
},
]);
expect(result).toEqual({ status: "success" });
expect(snsClient.send).toHaveBeenCalledTimes(1);
});
});
const givenSnsEventRepository = () => {
snsConfig = {
region: "us-east-1",
topicArn: "arn:aws:sns:us-east-1:123456789012:MyTopic",
groupId: "groupId",
subject: "subject",
};
snsClient = {
send: jest.fn().mockReturnThis(),
} as unknown as SNSClient;
snsEventRepository = new SnsEventRepository(snsClient, snsConfig);
};

View File

@ -0,0 +1,77 @@
import { beforeEach, describe, expect, it } from "@jest/globals";
import fs from "fs";
import { SnsEventRepository, StaticJobRepository } from "../../../src/infrastructure/repositories";
import {
EvmBlockRepository,
MetadataRepository,
StatRepository,
} from "../../../src/domain/repositories";
const dirPath = "./metadata-repo/jobs";
const blockRepo: EvmBlockRepository = {} as any as EvmBlockRepository;
const metadataRepo = {} as MetadataRepository<any>;
const statsRepo = {} as any as StatRepository;
const snsRepo = {} as any as SnsEventRepository;
let repo: StaticJobRepository;
describe("StaticJobRepository", () => {
beforeEach(() => {
if (fs.existsSync(dirPath)) {
fs.rmSync(dirPath, { recursive: true, force: true });
}
repo = new StaticJobRepository(dirPath, false, () => blockRepo, {
metadataRepo,
statsRepo,
snsRepo,
});
});
it("should return empty when no file available", async () => {
const jobs = await repo.getJobDefinitions();
expect(jobs).toHaveLength(0);
});
it("should read jobs from file", async () => {
givenJobsPresent();
const jobs = await repo.getJobDefinitions();
expect(jobs).toHaveLength(1);
expect(jobs[0].id).toEqual("poll-log-message-published-ethereum");
});
});
const givenJobsPresent = () => {
const jobs = [
{
id: "poll-log-message-published-ethereum",
chain: "ethereum",
source: {
action: "PollEvmLogs",
config: {
fromBlock: 10012499n,
blockBatchSize: 100,
commitment: "latest",
interval: 15_000,
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
chain: "ethereum",
topics: [],
},
},
handlers: [
{
action: "HandleEvmLogs",
target: "sns",
mapper: "evmLogMessagePublishedMapper",
config: {
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
filter: {
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"],
},
},
},
],
},
];
fs.writeFileSync(dirPath + "/jobs.json", JSON.stringify(jobs));
};

View File

@ -0,0 +1,22 @@
{
"compilerOptions": {
"outDir": "lib",
"target": "esnext",
"module": "CommonJS",
"moduleResolution": "node",
"lib": ["es2019"],
"declaration": true,
"skipLibCheck": true,
"allowJs": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"isolatedModules": true,
"resolveJsonModule": true,
"downlevelIteration": true,
"sourceMap": true,
"esModuleInterop": true
},
"include": ["./src"],
"exclude": ["node_modules", "**/__tests__/*"]
}

View File

@ -0,0 +1,17 @@
NODE_ENV=staging-mainnet
BLOCKCHAIN_ENV=mainnet
NAMESPACE=wormscan
NAME=blockchain-watcher
DRY_RUN_ENABLED=false
REPLICAS=1
IMAGE_NAME=
PORT=3005
LOG_LEVEL=debug
RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=200m
RESOURCES_REQUESTS_MEMORY=96Mi
RESOURCES_REQUESTS_CPU=100m
SNS_TOPIC_ARN=
SNS_REGION=

View File

@ -0,0 +1,17 @@
NODE_ENV=staging
BLOCKCHAIN_ENV=testnet
NAMESPACE=wormscan-testnet
NAME=blockchain-watcher
DRY_RUN_ENABLED=false
REPLICAS=1
IMAGE_NAME=
PORT=3005
LOG_LEVEL=debug
RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=200m
RESOURCES_REQUESTS_MEMORY=96Mi
RESOURCES_REQUESTS_CPU=100m
SNS_TOPIC_ARN=
SNS_REGION=

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: event-watcher
namespace: {{ .NAMESPACE }}
annotations:
eks.amazonaws.com/role-arn: {{ .AWS_IAM_ROLE }}

View File

@ -0,0 +1,158 @@
---
apiVersion: v1
kind: Service
metadata:
name: {{ .NAME }}-eth
namespace: {{ .NAMESPACE }}
labels:
app: {{ .NAME }}-eth
spec:
selector:
app: {{ .NAME }}-eth
ports:
- port: {{ .PORT }}
targetPort: {{ .PORT }}
name: {{ .NAME }}-eth
protocol: TCP
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: blockchain-watcher-eth-pvc
namespace: {{ .NAMESPACE }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Mi
storageClassName: gp2
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .NAME }}-eth-jobs
namespace: {{ .NAMESPACE }}
data:
testnet-jobs.json: |-
[
{
"id": "poll-log-message-published-ethereum",
"chain": "ethereum",
"source": {
"action": "PollEvmLogs",
"config": {
"fromBlock": "10012499",
"blockBatchSize": 100,
"commitment": "latest",
"interval": 15000,
"addresses": ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
"chain": "ethereum",
"topics": []
}
},
"handlers": [
{
"action": "HandleEvmLogs",
"target": "sns",
"mapper": "evmLogMessagePublishedMapper",
"config": {
"abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
"filter": {
"addresses": ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
"topics": ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"]
}
}
}
]
}
]
mainnet-jobs.json: |-
[
{
"id": "poll-log-message-published-ethereum",
"chain": "ethereum",
"source": {
"action": "PollEvmLogs",
"config": {
"blockBatchSize": 100,
"commitment": "latest",
"interval": 15000,
"addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"],
"chain": "ethereum",
"topics": []
}
},
"handlers": [
{
"action": "HandleEvmLogs",
"target": "sns",
"mapper": "evmLogMessagePublishedMapper",
"config": {
"abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
"filter": {
"addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"],
"topics": ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"]
}
}
}
]
}
]
---
apiVersion: v1
kind: Pod
metadata:
name: {{ .NAME }}-eth
namespace: {{ .NAMESPACE }}
labels:
app: {{ .NAME }}-eth
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "{{ .PORT }}"
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 30
serviceAccountName: event-watcher
containers:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
env:
- name: NODE_ENV
value: {{ .NODE_ENV }}
- name: PORT
value: "{{ .PORT }}"
- name: LOG_LEVEL
value: {{ .LOG_LEVEL }}
- name: BLOCKCHAIN_ENV
value: {{ .BLOCKCHAIN_ENV }}
- name: DRY_RUN_ENABLED
value: "{{ .DRY_RUN_ENABLED }}"
- name: SNS_TOPIC_ARN
value: {{ .SNS_TOPIC_ARN }}
- name: SNS_REGION
value: {{ .SNS_REGION }}
- name: JOBS_DIR
value: /home/node/app/jobs
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
cpu: {{ .RESOURCES_LIMITS_CPU }}
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}
volumeMounts:
- name: metadata-volume
mountPath: /home/node/app/metadata-repo
- name: jobs-volume
mountPath: /home/node/app/jobs
volumes:
- name: metadata-volume
persistentVolumeClaim:
claimName: blockchain-watcher-eth-pvc
- name: jobs-volume
configMap:
name: {{ .NAME }}-eth-jobs
items:
- key: {{ .BLOCKCHAIN_ENV }}-jobs.json
path: jobs.json