Blockchain Watcher: ethereum -> sns implementation (#790)

* evm block repo implementation

* adding sns publisher

* adding external config and initial runner

* fix: start from latest if no fromBlock configured

* feat: add dryRun option

* fix: handler filtering and eth_getLogs filter payload

* local e2e

* actual sns arns for testnet

* smaller docker image

* deployment changes
This commit is contained in:
Matías Martínez 2023-11-10 09:28:37 -03:00 committed by GitHub
parent 1ed4cec999
commit 00e5d6a6a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 3417 additions and 161 deletions

View File

@ -30,5 +30,5 @@ jobs:
run: npm run build
working-directory: ./blockchain-watcher
- name: Run tests
run: npm test
run: npm run test:coverage
working-directory: ./blockchain-watcher

View File

@ -0,0 +1,6 @@
lib
node_modules
README.md
metadata-repo
coverage
docs

View File

@ -1,3 +1,6 @@
node_modules
lib
coverage
coverage
metadata-repo
config/dev.json

View File

@ -1,5 +1,5 @@
# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2
FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d73b044f5f57cfc
FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d73b044f5f57cfc as builder
# npm wants to clone random Git repositories - lovely.
# RUN apk add git python make build-base
@ -7,8 +7,6 @@ FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d7
RUN apt-get update && apt-get -y install \
git python make curl netcat
RUN npm i typescript -g
USER 1000
RUN mkdir -p /home/node/app
@ -30,10 +28,21 @@ RUN if [ -e /certs/cert.pem ]; then git config --global http.sslCAInfo /certs/ce
COPY --chown=node:node . .
RUN npm ci
RUN npm run build
run npm run build
FROM node:19.6.1-slim@sha256:a1ba21bf0c92931d02a8416f0a54daad66cb36a85d2b73af9d73b044f5f57cfc as runner
CMD ["npm", "start"]
COPY --from=builder /home/node/app/config /home/node/app/config
COPY --from=builder /home/node/app/lib /home/node/app/lib
WORKDIR /home/node/app
COPY package.json .
COPY package-lock.json .
RUN npm install --omit=dev
CMD [ "npm", "start" ]

View File

@ -0,0 +1,8 @@
{
"environment": "BLOCKCHAIN_ENV",
"dryRun": "DRY_RUN_ENABLED",
"sns": {
"topicArn": "SNS_TOPIC_ARN",
"region": "SNS_REGION"
}
}

View File

@ -0,0 +1,23 @@
{
"environment": "testnet",
"dryRun": true,
"supportedChains": ["ethereum"],
"sns": {
"topicArn": "arn:aws:sns:us-east-1:000000000000:localstack-topic.fifo",
"region": "us-east-1",
"groupId": "blockchain-watcher",
"subject": "blockchain-watcher"
},
"metadata": {
"dir": "metadata-repo"
},
"platforms": {
"ethereum": {
"name": "ethereum",
"network": "goerli",
"chainId": 2,
"rpcs": ["https://rpc.ankr.com/eth_goerli"],
"timeout": 10000
}
}
}

View File

@ -1,20 +0,0 @@
{
"network": "TESTNET",
"supportedChains": [2],
"rpcs": [
{
"chain": 2,
"rpcs": ["https://rpc.ankr.com/eth_goerli"]
}
],
"handlers": [
{
"name": "deliveryEventHandler",
"config": {
"contractAddress": "0xA3cF45939bD6260bcFe3D66bc73d60f19e49a8BB",
"fromBlock": 0,
"toBlock": "latest"
}
}
]
}

View File

@ -0,0 +1 @@
{}

View File

@ -2,8 +2,11 @@
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
testRegex: "^(?!.*integration.*)(?=.*test\\/).*\\.test\\.ts$",
collectCoverageFrom: ["./src/**"],
collectCoverageFrom: [
"./src/domain",
"./src/infrastructure/mappers",
"./src/infrastructure/repositories",
],
coverageThreshold: {
global: {
lines: 85,

File diff suppressed because it is too large Load Diff

View File

@ -4,20 +4,25 @@
"description": "A process for watching blockchain events and moving them to persistent storage",
"main": "index.js",
"scripts": {
"start": "node lib/index.js",
"start": "node lib/start.js",
"test": "jest",
"test:coverage": "jest --collectCoverage=true",
"build": "tsc",
"dev": "USE_ENV_FILE=true ts-node src/index.ts",
"dev": "USE_ENV_FILE=true ts-node src/start.ts",
"prettier": "prettier --write ."
},
"author": "chase-45",
"license": "ISC",
"dependencies": {
"@aws-sdk/client-sns": "^3.445.0",
"@certusone/wormhole-sdk": "^0.9.21-beta.0",
"@types/config": "^3.3.3",
"axios": "^1.6.0",
"axios-rate-limit": "^1.3.0",
"config": "^3.3.9",
"dotenv": "^16.3.1",
"uuid": "^9.0.1",
"ethers": "^5",
"uuid": "^9.0.1",
"winston": "3.8.2"
},
"devDependencies": {
@ -26,11 +31,14 @@
"@types/uuid": "^9.0.6",
"@types/yargs": "^17.0.23",
"jest": "^29.7.0",
"nock": "^13.3.8",
"prettier": "^2.8.7",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.1",
"tsx": "^3.12.7",
"typescript": "^4.8.4",
"winston": "^3.8.2"
"typescript": "^4.8.4"
},
"engines": {
"node": ">=18.0.0"
}
}

View File

@ -15,7 +15,7 @@ export class HandleEvmLogs<T> {
mapper: (log: EvmLog, args: ReadonlyArray<any>) => T,
target: (parsed: T[]) => Promise<void>
) {
this.cfg = cfg;
this.cfg = this.normalizeCfg(cfg);
this.mapper = mapper;
this.target = target;
}
@ -24,8 +24,8 @@ export class HandleEvmLogs<T> {
const mappedItems = logs
.filter(
(log) =>
this.cfg.filter.addresses.includes(log.address) &&
this.cfg.filter.topics.includes(log.topics[0])
this.cfg.filter.addresses.includes(log.address.toLowerCase()) &&
this.cfg.filter.topics.includes(log.topics[0].toLowerCase())
)
.map((log) => {
const iface = new ethers.utils.Interface([this.cfg.abi]);
@ -37,6 +37,16 @@ export class HandleEvmLogs<T> {
// TODO: return a result specifying failures if any
return mappedItems;
}
private normalizeCfg(cfg: HandleEvmLogsConfig): HandleEvmLogsConfig {
return {
filter: {
addresses: cfg.filter.addresses.map((addr) => addr.toLowerCase()),
topics: cfg.filter.topics.map((topic) => topic.toLowerCase()),
},
abi: cfg.abi,
};
}
}
export type HandleEvmLogsConfig = {

View File

@ -3,6 +3,7 @@ import { EvmBlockRepository, MetadataRepository } from "../repositories";
import { setTimeout } from "timers/promises";
const ID = "watch-evm-logs";
let ref: any;
/**
* PollEvmLogs is an action that watches for new blocks and extracts logs from them.
@ -10,8 +11,8 @@ const ID = "watch-evm-logs";
export class PollEvmLogs {
private readonly blockRepo: EvmBlockRepository;
private readonly metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
private latestBlockHeight: bigint = 0n;
private blockHeightCursor: bigint = 0n;
private latestBlockHeight?: bigint;
private blockHeightCursor?: bigint;
private cfg: PollEvmLogsConfig;
private started: boolean = false;
@ -25,10 +26,10 @@ export class PollEvmLogs {
this.cfg = cfg;
}
public async start(handlers: ((logs: EvmLog[]) => Promise<void>)[]): Promise<void> {
const metadata = await this.metadataRepo.get(ID);
public async start(handlers: ((logs: EvmLog[]) => Promise<any>)[]): Promise<void> {
const metadata = await this.metadataRepo.get(this.cfg.id);
if (metadata) {
this.blockHeightCursor = metadata.lastBlock;
this.blockHeightCursor = BigInt(metadata.lastBlock);
}
this.started = true;
@ -37,14 +38,17 @@ export class PollEvmLogs {
private async watch(handlers: ((logs: EvmLog[]) => Promise<void>)[]): Promise<void> {
while (this.started) {
if (this.cfg.hasFinished(this.blockHeightCursor)) {
console.log(
`PollEvmLogs: (${this.cfg.id}) Finished processing all blocks from ${this.cfg.fromBlock} to ${this.cfg.toBlock}`
);
await this.stop();
break;
}
this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment());
const range = this.getBlockRange(this.latestBlockHeight);
if (this.cfg.hasFinished(range.fromBlock)) {
// TODO: log
await this.stop();
continue;
}
const logs = await this.blockRepo.getFilteredLogs({
fromBlock: range.fromBlock,
@ -63,10 +67,10 @@ export class PollEvmLogs {
// TODO: add error handling.
await Promise.all(handlers.map((handler) => handler(logs)));
await this.metadataRepo.save(ID, { lastBlock: range.toBlock });
await this.metadataRepo.save(this.cfg.id, { lastBlock: range.toBlock });
this.blockHeightCursor = range.toBlock;
await setTimeout(this.cfg.interval ?? 1_000, undefined, { ref: false });
ref = await setTimeout(this.cfg.interval ?? 1_000, undefined);
}
}
@ -79,26 +83,37 @@ export class PollEvmLogs {
fromBlock: bigint;
toBlock: bigint;
} {
let fromBlock = this.blockHeightCursor + 1n;
let fromBlock = this.blockHeightCursor
? this.blockHeightCursor + 1n
: this.cfg.fromBlock ?? latestBlockHeight;
// fromBlock is configured and is greater than current block height, then we allow to skip blocks.
if (this.cfg.fromBlock && this.cfg.fromBlock > this.blockHeightCursor) {
if (
this.blockHeightCursor &&
this.cfg.fromBlock &&
this.cfg.fromBlock > this.blockHeightCursor
) {
fromBlock = this.cfg.fromBlock;
}
if (fromBlock > latestBlockHeight) {
return { fromBlock: latestBlockHeight, toBlock: latestBlockHeight };
return { fromBlock, toBlock: fromBlock };
}
let toBlock = this.cfg.toBlock ?? this.blockHeightCursor + BigInt(this.cfg.getBlockBatchSize());
let toBlock = fromBlock + BigInt(this.cfg.getBlockBatchSize());
// limit toBlock to obtained block height
if (toBlock > fromBlock && toBlock > latestBlockHeight) {
toBlock = latestBlockHeight;
}
// limit toBlock to configured toBlock
if (this.cfg.toBlock && toBlock > this.cfg.toBlock) {
toBlock = this.cfg.toBlock;
}
return { fromBlock, toBlock };
}
public async stop(): Promise<void> {
clearTimeout(ref);
this.started = false;
}
@ -110,30 +125,71 @@ export type PollEvmLogsMetadata = {
lastBlock: bigint;
};
export class PollEvmLogsConfig {
export interface PollEvmLogsConfigProps {
fromBlock?: bigint;
toBlock?: bigint;
blockBatchSize?: number;
commitment?: string;
interval?: number;
addresses: string[] = [];
topics: string[] = [];
addresses: string[];
topics: string[];
id?: string;
}
export class PollEvmLogsConfig {
private props: PollEvmLogsConfigProps;
constructor(props: PollEvmLogsConfigProps = { addresses: [], topics: [] }) {
if (props.fromBlock && props.toBlock && props.fromBlock > props.toBlock) {
throw new Error("fromBlock must be less than or equal to toBlock");
}
this.props = props;
}
public getBlockBatchSize() {
return this.blockBatchSize ?? 100;
return this.props.blockBatchSize ?? 100;
}
public getCommitment() {
return this.commitment ?? "latest";
return this.props.commitment ?? "latest";
}
public hasFinished(currentFromBlock: bigint) {
return this.toBlock && currentFromBlock > this.toBlock;
public hasFinished(currentFromBlock?: bigint) {
return currentFromBlock && this.props.toBlock && currentFromBlock >= this.props.toBlock;
}
public get fromBlock() {
return this.props.fromBlock;
}
public setFromBlock(fromBlock: bigint | undefined) {
this.props.fromBlock = fromBlock;
}
public get toBlock() {
return this.props.toBlock;
}
public get interval() {
return this.props.interval;
}
public get addresses() {
return this.props.addresses;
}
public get topics() {
return this.props.topics;
}
public get id() {
return this.props.id ?? ID;
}
static fromBlock(fromBlock: bigint) {
const cfg = new PollEvmLogsConfig();
cfg.fromBlock = fromBlock;
cfg.props.fromBlock = fromBlock;
return cfg;
}
}

View File

@ -1,11 +1,11 @@
export type EvmBlock = {
number: bigint;
hash: string;
timestamp: bigint; // epoch millis
timestamp: number; // epoch seconds
};
export type EvmLog = {
blockTime: bigint;
blockTime?: number; // epoch seconds
blockNumber: bigint;
blockHash: string;
address: string;
@ -36,7 +36,7 @@ export type LogFoundEvent<T> = {
chainId: number;
txHash: string;
blockHeight: bigint;
blockTime: bigint;
blockTime: number;
attributes: T;
};

View File

@ -0,0 +1,89 @@
import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns";
import { Config } from "./config";
import {
SnsEventRepository,
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
FileMetadataRepo,
} from "./repositories";
import axios, { AxiosInstance } from "axios";
import axiosRateLimit from "axios-rate-limit";
export class RepositoriesBuilder {
private cfg: Config;
private snsClient?: SNSClient;
private axiosInstance?: AxiosInstance;
private repositories = new Map();
constructor(cfg: Config) {
this.cfg = cfg;
this.build();
}
private build() {
this.snsClient = this.createSnsClient();
this.axiosInstance = this.createAxios();
this.repositories.set("sns", new SnsEventRepository(this.snsClient, this.cfg.sns));
this.cfg.metadata?.dir &&
this.repositories.set("metadata", new FileMetadataRepo(this.cfg.metadata.dir));
this.cfg.supportedChains.forEach((chain) => {
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chain,
rpc: this.cfg.platforms[chain].rpcs[0],
timeout: this.cfg.platforms[chain].timeout,
};
this.repositories.set(
`${chain}-evmRepo`,
new EvmJsonRPCBlockRepository(repoCfg, this.axiosInstance!)
);
});
}
public getEvmBlockRepository(chain: string): EvmJsonRPCBlockRepository {
const repo = this.repositories.get(`${chain}-evmRepo`);
if (!repo) throw new Error(`No EvmJsonRPCBlockRepository for chain ${chain}`);
return repo;
}
public getSnsEventRepository(): SnsEventRepository {
const repo = this.repositories.get("sns");
if (!repo) throw new Error(`No SnsEventRepository`);
return repo;
}
public getMetadataRepository(): FileMetadataRepo {
const repo = this.repositories.get("metadata");
if (!repo) throw new Error(`No FileMetadataRepo`);
return repo;
}
public close(): void {
this.snsClient?.destroy();
}
private createSnsClient(): SNSClient {
const snsCfg: SNSClientConfig = { region: this.cfg.sns.region };
if (this.cfg.sns.credentials) {
snsCfg.credentials = {
accessKeyId: this.cfg.sns.credentials.accessKeyId,
secretAccessKey: this.cfg.sns.credentials.secretAccessKey,
};
snsCfg.endpoint = this.cfg.sns.credentials.url;
}
return new SNSClient(snsCfg);
}
private createAxios() {
return axiosRateLimit(axios.create(), {
perMilliseconds: 1000,
maxRequests: 1_000,
}); // TODO: configurable per repo
}
}

View File

@ -0,0 +1,34 @@
import config from "config";
import { SnsConfig } from "./repositories/SnsEventRepository";
export type Config = {
environment: "testnet" | "mainnet";
dryRun: boolean;
sns: SnsConfig;
metadata?: {
dir: string;
};
platforms: Record<string, PlatformConfig>;
supportedChains: string[];
};
export type PlatformConfig = {
name: string;
network: string;
chainId: number;
rpcs: string[];
timeout?: number;
};
// By setting NODE_CONFIG_ENV we can point to a different config directory.
// Default settings can be customized by definining NODE_ENV=staging|production.
export const configuration = {
environment: config.get<string>("environment"),
dryRun: config.get<string>("dryRun") === "true" ? true : false,
sns: config.get<SnsConfig>("sns"),
metadata: {
dir: config.get<string>("metadata.dir"),
},
platforms: config.get<Record<string, PlatformConfig>>("platforms"),
supportedChains: config.get<string[]>("supportedChains"),
} as Config;

View File

@ -5,6 +5,10 @@ export const evmLogMessagePublishedMapper = (
log: EvmLog,
parsedArgs: ReadonlyArray<any>
): LogFoundEvent<LogMessagePublished> => {
if (!log.blockTime) {
throw new Error(`Block time is missing for log ${log.logIndex} in tx ${log.transactionHash}`);
}
return {
name: "log-message-published",
chainId: 2, // TODO: get from config

View File

@ -0,0 +1,197 @@
import { EvmBlock, EvmLogFilter, EvmLog, EvmTag } from "../../domain/entities";
import { EvmBlockRepository } from "../../domain/repositories";
import { AxiosInstance } from "axios";
const headers = {
"Content-Type": "application/json",
};
/**
* EvmJsonRPCBlockRepository is a repository that uses a JSON RPC endpoint to fetch blocks.
* On the reliability side, only knows how to timeout.
*/
export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
private axios: AxiosInstance;
private chain: string;
private rpc: URL;
private timeout: number;
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, axios: AxiosInstance) {
this.chain = cfg.chain;
this.axios = axios;
this.rpc = new URL(cfg.rpc);
this.timeout = cfg.timeout ?? 10_000;
}
async getBlockHeight(finality: EvmTag): Promise<bigint> {
const block: EvmBlock = await this.getBlock(finality);
return block.number;
}
/**
* Get blocks by block number.
* @param blockNumbers
* @returns a record of block hash -> EvmBlock
*/
async getBlocks(blockNumbers: Set<bigint>): Promise<Record<string, EvmBlock>> {
if (!blockNumbers.size) return {};
const reqs: any[] = [];
for (let blockNumber of blockNumbers) {
const blockNumberStr = blockNumber.toString();
reqs.push({
jsonrpc: "2.0",
id: blockNumberStr,
method: "eth_getBlockByNumber",
params: [blockNumberStr, false],
});
}
const results = (await this.axios.post(this.rpc.href, reqs, this.getRequestOptions()))?.data;
if (results && results.length) {
return results
.map(
(
response: undefined | { id: string; result?: EvmBlock; error?: ErrorBlock },
idx: number
) => {
// Karura is getting 6969 errors for some blocks, so we'll just return empty blocks for those instead of throwing an error.
// We take the timestamp from the previous block, which is not ideal but should be fine.
if (
(response && response.result === null) ||
(response?.error && response.error?.code && response.error.code === 6969)
) {
return {
hash: "",
number: BigInt(response.id),
timestamp: Date.now(), // TODO: we might just want to return the timestamp of the previous block or do something at a client level when not found
};
}
if (
response?.result &&
response.result?.hash &&
response.result.number &&
response.result.timestamp
) {
return {
hash: response.result.hash,
number: BigInt(response.result.number),
timestamp: Number(response.result.timestamp),
};
}
console.error(reqs[idx], response, idx); // TODO: use an actual logger
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${
response?.id ?? reqs[idx].id
} on ${this.rpc.hostname}`
);
}
)
.reduce((acc: Record<string, EvmBlock>, block: EvmBlock) => {
acc[block.hash] = block;
return acc;
}, {});
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for numbers ${blockNumbers} on ${this.rpc.hostname}`
);
}
async getFilteredLogs(filter: EvmLogFilter): Promise<EvmLog[]> {
const parsedFilters = {
topics: filter.topics,
address: filter.addresses,
fromBlock: filter.fromBlock.toString(),
toBlock: filter.toBlock.toString(),
};
let response = await this.axios.post<{ result: Log[]; error?: ErrorBlock }>(
this.rpc.href,
{
jsonrpc: "2.0",
method: "eth_getLogs",
params: [parsedFilters],
id: 1,
},
this.getRequestOptions()
);
if (response?.data.error) {
throw new Error(
`Got error ${response?.data.error.message} for ${this.describeFilter(filter)} from ${
this.rpc.hostname
}`
);
}
const logs = response?.data?.result;
console.info(
`Got ${logs?.length} logs for ${this.describeFilter(filter)} from ${this.rpc.hostname}`
);
return logs.map((log) => ({
...log,
blockNumber: BigInt(log.blockNumber),
transactionIndex: log.transactionIndex.toString(),
}));
}
private describeFilter(filter: EvmLogFilter): string {
return `[addresses:${filter.addresses}][topics:${filter.topics}][blocks:${filter.fromBlock} - ${filter.toBlock}]`;
}
/**
* Loosely based on the wormhole-dashboard implementation (minus some specially crafted blocks when null result is obtained)
*/
private async getBlock(blockNumberOrTag: bigint | EvmTag): Promise<EvmBlock> {
let response = await this.axios.post(
this.rpc.href,
{
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [blockNumberOrTag.toString(), false], // this means we'll get a light block (no txs)
id: 1,
},
this.getRequestOptions()
);
const result = response?.data?.result;
if (result && result.hash && result.number && result.timestamp) {
// Convert to our domain compatible type
return {
number: BigInt(result.number),
timestamp: Number(result.timestamp),
hash: result.hash,
};
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${this.rpc}`
);
}
private getRequestOptions() {
return { headers, timeout: this.timeout, signal: AbortSignal.timeout(this.timeout) };
}
}
export type EvmJsonRPCBlockRepositoryCfg = {
rpc: string;
timeout?: number;
chain: string;
};
type ErrorBlock = {
code: number; //6969,
message: string; //'Error: No response received from RPC endpoint in 60s'
};
type Log = {
blockNumber: string;
blockHash: string;
transactionIndex: number;
removed: boolean;
address: string;
data: string;
topics: Array<string>;
transactionHash: string;
logIndex: number;
};

View File

@ -0,0 +1,27 @@
import fs from "fs";
import { MetadataRepository } from "../../domain/repositories";
export class FileMetadataRepo implements MetadataRepository<any> {
private readonly dirPath: string;
constructor(dirPath: string) {
this.dirPath = dirPath;
}
async get(id: string): Promise<any> {
const filePath = `${this.dirPath}/${id}.json`;
if (!fs.existsSync(this.dirPath)) {
fs.mkdirSync(this.dirPath);
}
return fs.promises
.readFile(filePath, "utf8")
.then(JSON.parse)
.catch((err) => null);
}
async save(id: string, metadata: any): Promise<void> {
const filePath = `${this.dirPath}/${id}.json`;
return fs.promises.writeFile(filePath, JSON.stringify(metadata), "utf8");
}
}

View File

@ -0,0 +1,172 @@
import { LogFoundEvent } from "../../domain/entities";
import crypto from "node:crypto";
import {
SNSClient,
PublishBatchCommand,
PublishBatchCommandInput,
PublishBatchRequestEntry,
} from "@aws-sdk/client-sns";
import winston from "winston";
const CHUNK_SIZE = 10;
export class SnsEventRepository {
private client: SNSClient;
private cfg: SnsConfig;
private logger: typeof winston;
constructor(snsClient: SNSClient, cfg: SnsConfig) {
console.log(`SNSRepo: Created for topic ${cfg.topicArn}`);
this.client = snsClient;
this.cfg = cfg;
this.logger = winston;
}
async publish(events: LogFoundEvent<any>[]): Promise<SnsPublishResult> {
if (!events.length) {
console.log("No events to publish");
return {
status: "success",
};
}
const batches: PublishBatchCommandInput[] = [];
const inputs: PublishBatchRequestEntry[] = events.map((event) => ({
Id: crypto.randomUUID(),
Subject: this.cfg.subject ?? "blockchain-watcher",
Message: JSON.stringify(event),
MessageGroupId: this.cfg.groupId ?? "blockchain-watcher",
MessageDeduplicationId: `${event.chainId}-${event.txHash}-${event.blockHeight}-${event.name}`,
}));
// PublishBatchCommand: only supports max 10 items per batch
for (let i = 0; i < inputs.length; i += CHUNK_SIZE) {
const batch: PublishBatchCommandInput = {
TopicArn: this.cfg.topicArn,
PublishBatchRequestEntries: inputs.slice(i, i + CHUNK_SIZE),
};
batches.push(batch);
}
try {
const promises = [];
const errors = [];
for (const batch of batches) {
const command = new PublishBatchCommand(batch);
promises.push(this.client.send(command));
}
const results = await Promise.allSettled(promises);
for (const result of results) {
if (result.status !== "fulfilled") {
this.logger.error(result.reason);
errors.push(result.reason);
}
}
if (errors.length > 0) {
return {
status: "error",
reasons: errors,
};
}
} catch (error: unknown) {
this.logger.error(error);
return {
status: "error",
};
}
return {
status: "success",
};
}
}
export class SnsEvent {
trackId: string;
source: string;
event: string;
timestamp: string;
version: string;
data: Record<string, any>;
constructor(
trackId: string,
source: string,
event: string,
timestamp: string,
version: string,
data: Record<string, any>
) {
this.trackId = trackId;
this.source = source;
this.event = event;
this.timestamp = timestamp;
this.version = version;
this.data = data;
}
static fromLogFoundEvent<T>(logFoundEvent: LogFoundEvent<T>): SnsEvent {
return new SnsEvent(
`chain-event-${logFoundEvent.txHash}-${logFoundEvent.blockHeight}`,
"blockchain-watcher",
logFoundEvent.name,
new Date().toISOString(),
"1",
{
chainId: logFoundEvent.chainId,
emitterAddress: logFoundEvent.name,
txHash: logFoundEvent.txHash,
blockHeight: logFoundEvent.blockHeight.toString(),
blockTime: new Date(logFoundEvent.blockTime * 1000).toISOString(),
attributes: logFoundEvent.attributes,
}
);
}
}
export type SnsConfig = {
region: string;
topicArn: string;
subject?: string;
groupId: string;
credentials?: {
accessKeyId: string;
secretAccessKey: string;
url: string;
};
};
export type SnsPublishResult = {
status: "success" | "error";
reason?: string;
reasons?: string[];
};
/*
{
"trackId": "chain-event-{txId}-{position}",
"source": "blockchain-watcher",
"event": "log-message-published",
"timestamp": string (timestamp in RFC3339 format)
"version": "1",
"data": {
"chainId": number,
"emitterAddress": string,
"txHash": string,
"blockHeight": string,
"blockTime": string (timestamp in RFC3339 format),
"attributes": {
"sender": string,
"sequence": number,
"nonce": number,
"payload": bytes,
"consistencyLevel": number
}
}
}
*/

View File

@ -0,0 +1,12 @@
// Monkey patching BigInt serialization
if (!("toJSON" in BigInt.prototype)) {
Object.defineProperty(BigInt.prototype, "toJSON", {
get() {
return () => String(this);
},
});
}
export * from "./FileMetadataRepo";
export * from "./SnsEventRepository";
export * from "./EvmJsonRPCBlockRepository";

View File

@ -0,0 +1,98 @@
import { HandleEvmLogs } from "./domain/actions/HandleEvmLogs";
import { PollEvmLogs, PollEvmLogsConfig } from "./domain/actions/PollEvmLogs";
import { LogFoundEvent } from "./domain/entities";
import { configuration } from "./infrastructure/config";
import { evmLogMessagePublishedMapper } from "./infrastructure/mappers/evmLogMessagePublishedMapper";
import { RepositoriesBuilder } from "./infrastructure/RepositoriesBuilder";
let repos: RepositoriesBuilder;
async function run(): Promise<void> {
console.log(`Starting: dryRunEnabled -> ${configuration.dryRun}`);
repos = new RepositoriesBuilder(configuration);
/** Job definition is hardcoded, but should be loaded from cfg or a data store soon enough */
const jobs = [
{
id: "poll-log-message-published-ethereum",
chain: "ethereum",
source: {
action: "PollEvmLogs",
config: {
fromBlock: 10012499n,
toBlock: 10012999n,
blockBatchSize: 100,
commitment: "latest",
interval: 15_000,
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: [],
},
},
handlers: [
{
action: "HandleEvmLogs",
target: "sns",
mapper: "evmLogMessagePublishedMapper",
config: {
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
filter: {
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"],
},
},
},
],
},
];
const pollEvmLogs = new PollEvmLogs(
repos.getEvmBlockRepository("ethereum"),
repos.getMetadataRepository(),
new PollEvmLogsConfig({ ...jobs[0].source.config, id: jobs[0].id })
);
const snsTarget = async (events: LogFoundEvent<any>[]) => {
const result = await repos.getSnsEventRepository().publish(events);
if (result.status === "error") {
console.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`);
throw new Error(`Error publishing events to SNS: ${result.reason}`);
}
console.log(`Published ${events.length} events to SNS`);
};
const handleEvmLogs = new HandleEvmLogs<LogFoundEvent<any>>(
jobs[0].handlers[0].config,
evmLogMessagePublishedMapper,
configuration.dryRun ? async (events) => console.log(`Got ${events.length} events`) : snsTarget
);
pollEvmLogs.start([handleEvmLogs.handle.bind(handleEvmLogs)]);
// Just keep this running until killed
setInterval(() => {
console.log("Still running");
}, 20_000);
console.log("Started");
// Handle shutdown
process.on("SIGINT", handleShutdown);
process.on("SIGTERM", handleShutdown);
}
const handleShutdown = async () => {
try {
await Promise.allSettled([
repos.close(),
// call stop() on all the things
]);
process.exit();
} catch (error: unknown) {
process.exit(1);
}
};
run().catch((e) => {
console.error(e);
console.error("Fatal error caused process to exit");
});

View File

@ -10,7 +10,7 @@ const mapper = (log: EvmLog, args: ReadonlyArray<any>) => {
chainId: 1,
txHash: "0x0",
blockHeight: 0n,
blockTime: 0n,
blockTime: 0,
attributes: {
sequence: args[0].toString(),
deliveryQuote: args[1].toString(),
@ -74,13 +74,13 @@ const givenEvmLogs = (length: number, matchingFilterOnes: number) => {
let address = "0x392f472048681816e91026cd768c60958b55352add2837adea9ea6249178b8a8";
let topic: string | undefined = undefined;
if (matchingCount < matchingFilterOnes) {
address = cfg.filter.addresses![0];
address = cfg.filter.addresses![0].toUpperCase();
topic = cfg.filter.topics![0];
matchingCount++;
}
evmLogs.push({
blockTime: 0n,
blockTime: 0,
blockNumber: BigInt(i + 1),
blockHash: "0x1a07d0bd31c84f0dab36eac31a2f3aa801852bf8240ffba19113c62463f694fa",
address: address,

View File

@ -28,22 +28,23 @@ describe("PollEvmLogs", () => {
await pollEvmLogs.stop();
});
it("should be able to read logs from given start", async () => {
it("should be able to read logs from latest block when no fromBlock is configured", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenPollEvmLogs(currentHeight);
givenPollEvmLogs();
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(getBlocksSpy).toHaveReturnedTimes(1),
() => expect(getBlocksSpy).toHaveBeenCalledWith(new Set([currentHeight, currentHeight + 1n])),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: currentHeight,
fromBlock: currentHeight + blocksAhead,
toBlock: currentHeight + blocksAhead,
})
);
@ -100,7 +101,7 @@ const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => {
logsResponse.push({
blockNumber: height + index,
blockHash: `0x0${index}`,
blockTime: 0n,
blockTime: 0,
address: "",
removed: false,
data: "",
@ -110,7 +111,7 @@ const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => {
logIndex: 0,
});
blocksResponse[`0x0${index}`] = {
timestamp: 0n,
timestamp: 0,
hash: `0x0${index}`,
number: height + index,
};
@ -137,7 +138,7 @@ const givenMetadataRepository = (data?: PollEvmLogsMetadata) => {
};
const givenPollEvmLogs = (from?: bigint) => {
cfg.fromBlock = from ?? cfg.fromBlock;
cfg.setFromBlock(from);
pollEvmLogs = new PollEvmLogs(evmBlockRepo, metadataRepo, cfg);
};
@ -153,10 +154,10 @@ const thenWaitForAssertion = async (...assertions: (() => void)[]) => {
}
break;
} catch (error) {
await setTimeout(10, undefined, { ref: false });
if (index === 4) {
throw error;
}
await setTimeout(10, undefined, { ref: false });
}
}
};

View File

@ -19,7 +19,7 @@ describe("evmLogMessagePublished", () => {
it("should be able to map log to LogMessagePublished", async () => {
const [result] = await handler.handle([
{
blockTime: 1699375895n,
blockTime: 1699443287,
blockNumber: 18521386n,
blockHash: "0x894136d03446d47116319d59b5ec3190c05248e16c8728c2848bf7452732341c",
address: "0x98f3c9e6e3face36baad05fe09d375ef1464288b",
@ -38,7 +38,7 @@ describe("evmLogMessagePublished", () => {
"0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7"
);
expect(result.blockHeight).toBe(18521386n);
expect(result.blockTime).toBe(1699375895n);
expect(result.blockTime).toBe(1699443287);
expect(result.attributes.sequence).toBe(135858);
expect(result.attributes.sender.toLowerCase()).toBe(

View File

@ -0,0 +1,184 @@
import { describe, it, expect, afterEach, afterAll } from "@jest/globals";
import { EvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import axios from "axios";
import nock from "nock";
import { EvmLogFilter, EvmTag } from "../../../src/domain/entities";
axios.defaults.adapter = "http"; // needed by nock
const axiosInstance = axios.create();
const rpc = "http://localhost";
const address = "0x98f3c9e6e3face36baad05fe09d375ef1464288b";
const topic = "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2";
const txHash = "0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7";
let repo: EvmJsonRPCBlockRepository;
describe("EvmJsonRPCBlockRepository", () => {
afterAll(() => {
nock.restore();
});
afterEach(() => {
nock.cleanAll();
});
it("should be able to get block height", async () => {
const expectedHeight = 1980809n;
givenARepo();
givenBlockHeightIs(expectedHeight, "latest");
const result = await repo.getBlockHeight("latest");
expect(result).toBe(expectedHeight);
});
it("should be able to get several blocks", async () => {
const blockNumbers = [2n, 3n, 4n];
givenARepo();
givenBlocksArePresent(blockNumbers);
const result = await repo.getBlocks(new Set(blockNumbers));
expect(Object.keys(result)).toHaveLength(blockNumbers.length);
blockNumbers.forEach((blockNumber) => {
expect(result[blockHash(blockNumber)].number).toBe(blockNumber);
});
});
it("should be able to get logs", async () => {
const filter: EvmLogFilter = {
fromBlock: "safe",
toBlock: "latest",
addresses: [address],
topics: [],
};
givenLogsPresent(filter);
const logs = await repo.getFilteredLogs(filter);
expect(logs).toHaveLength(1);
expect(logs[0].blockNumber).toBe(1n);
expect(logs[0].blockHash).toBe(blockHash(1n));
expect(logs[0].address).toBe(address);
});
});
const givenARepo = () => {
repo = new EvmJsonRPCBlockRepository({ rpc, timeout: 100, chain: "ethereum" }, axiosInstance);
};
const givenBlockHeightIs = (height: bigint, commitment: EvmTag) => {
nock(rpc)
.post("/", {
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [commitment, false],
id: 1,
})
.reply(200, {
jsonrpc: "2.0",
id: 1,
result: {
number: `0x${height.toString(16)}`,
hash: blockHash(height),
timestamp: "0x654a892f",
},
});
};
const givenBlocksArePresent = (blockNumbers: bigint[]) => {
const requests = blockNumbers.map((blockNumber) => ({
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [blockNumber.toString(), false],
id: blockNumber.toString(),
}));
const response = blockNumbers.map((blockNumber) => ({
jsonrpc: "2.0",
id: blockNumber.toString(),
result: {
number: `0x${blockNumber.toString(16)}`,
hash: blockHash(blockNumber),
timestamp: "0x654a892f",
},
}));
nock(rpc).post("/", requests).reply(200, response);
};
const givenLogsPresent = (filter: EvmLogFilter) => {
const response = {
jsonrpc: "2.0",
id: 1,
result: [
{
address: filter.addresses[0],
topics: [topic],
data: "0x",
blockNumber: "0x1",
blockHash: blockHash(1n),
transactionHash: txHash,
transactionIndex: "0x0",
logIndex: 0,
removed: false,
},
],
};
nock(rpc).post("/").reply(200, response);
};
const blockHash = (blockNumber: bigint) => `0x${blockNumber.toString(16)}`;
/* Examples:
- blockByNumber:
{
"jsonrpc":"2.0",
"method":"eth_getBlockByNumber",
"params":[
"latest",
true
],
"id":1
}
->
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"baseFeePerGas": "0x78d108bbb",
"difficulty": "0x0",
"extraData": "0x406275696c64657230783639",
"gasLimit": "0x1c9c380",
"gasUsed": "0xfca882",
"hash": "0xb29ad2fa313f50d293fcf5679c6862ee7f4a3d641f09b227ad0ee3fba10d1cbb",
"logsBloom": "0x312390b5e798baf83b514972932b522118d98b9888f5880461db7e26f4bece2e8141717140d492028f887928435127083671a3488c3b6c240130a6eeb3692d908417d91c65fbfd396d0ade2b57a263a080c64f59fb4d3c4033415503e833306057524072daeae803a45cc020c1a32f436f30037b49003cd257c965d9214b441922012654b681e5202053a7d58500a64aa040cec9d90a0c9e5e3321d821503d90cfb84961594a72f02e92c7c2559d95c86504c54260c708ea63e5e4a2538f1143096c2422250a0a20b321a8814678d26e6a6d6a872e232a500a402a3a6445b85b3cf92b481e9020c20a969eac4c50ca08667cda68812f8141108908b3d175f649",
"miner": "0x690b9a9e9aa1c9db991c7721a92d351db4fac990",
"mixHash": "0xbdeea2aa4f2a026b27bc720d28c73680a35ad3e5017568cddcb066b5c12b1f60",
"nonce": "0x0000000000000000",
"number": "0x11a9fa9",
"parentHash": "0x7f8c4ecd8772eab825ee3c8e713c5088c6a32b41d61dbd1c0833e7d4df337713",
"receiptsRoot": "0x06e3cd06761468089708b204a092545576c508739f0eff936c96914da2e277e9",
"sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size": "0x25301",
"stateRoot": "0x3e1333543583ec7e5fb4e337261c43ef94aeb4f77c4e7f657a538fc3c2e5b6de",
"timestamp": "0x654a892f",
"totalDifficulty": "0xc70d815d562d3cfa955",
"transactions": [
"0x5e59c0bb917e7a5a64f098cd6a370bac4f40ecdf6ca79deaccf25736fe117ef7"
],
"transactionsRoot": "0xbf23b57ac6f6aede4d886a556b7bbee868721542b9a1d912ffa5f4ead0b8ec72",
"uncles": [],
"withdrawals": [
{
"index": "0x16b0d86",
"validatorIndex": "0x2a57b",
"address": "0xdaac5ce35ad7892d5f2dd364954066f4323c9a57",
"amount": "0x105fdec"
}
]
}
}
- blockByHash:
*/

View File

@ -0,0 +1,59 @@
import { describe, expect, it, beforeEach, afterEach } from "@jest/globals";
import fs from "fs";
import { FileMetadataRepo } from "../../../src/infrastructure/repositories";
describe("FileMetadataRepo", () => {
const dirPath = "./metadata-repo";
const repo = new FileMetadataRepo(dirPath);
beforeEach(() => {
if (!fs.existsSync(dirPath)) {
fs.mkdirSync(dirPath);
}
});
afterEach(() => {
fs.rm(dirPath, () => {});
});
describe("get", () => {
it("should return null if the file does not exist", async () => {
const metadata = await repo.get("non-existent-file");
expect(metadata).toBeNull();
});
it("should return the metadata if the file exists", async () => {
const id = "test-file";
const metadata = { foo: "bar" };
await repo.save(id, metadata);
const retrievedMetadata = await repo.get(id);
expect(retrievedMetadata).toEqual(metadata);
});
});
describe("save", () => {
it("should create a new file with the given metadata", async () => {
const id = "test-file";
const metadata = { foo: "bar" };
await repo.save(id, metadata);
const fileExists = fs.existsSync(`${dirPath}/${id}.json`);
expect(fileExists).toBe(true);
const fileContents = fs.readFileSync(`${dirPath}/${id}.json`, "utf8");
expect(JSON.parse(fileContents)).toEqual(metadata);
});
it("should overwrite an existing file with the given metadata", async () => {
const id = "test-file";
const initialMetadata = { foo: "bar" };
const updatedMetadata = { baz: "qux" };
await repo.save(id, initialMetadata);
await repo.save(id, updatedMetadata);
const fileContents = fs.readFileSync(`${dirPath}/${id}.json`, "utf8");
expect(JSON.parse(fileContents)).toEqual(updatedMetadata);
});
});
});

View File

@ -0,0 +1,49 @@
import { describe, expect, it, jest } from "@jest/globals";
import { SnsEventRepository, SnsConfig } from "../../../src/infrastructure/repositories";
import { SNSClient } from "@aws-sdk/client-sns";
let snsEventRepository: SnsEventRepository;
let snsClient: SNSClient;
let snsConfig: SnsConfig;
describe("SnsEventRepository", () => {
it("should not call sns client when no events given", async () => {
givenSnsEventRepository();
const result = await snsEventRepository.publish([]);
expect(result).toEqual({ status: "success" });
expect(snsClient.send).not.toHaveBeenCalled();
});
it("should publish", async () => {
givenSnsEventRepository();
const result = await snsEventRepository.publish([
{
chainId: 1,
txHash: "0x123",
blockHeight: 123n,
blockTime: 0,
name: "LogMessagePublished",
attributes: {},
},
]);
expect(result).toEqual({ status: "success" });
expect(snsClient.send).toHaveBeenCalledTimes(1);
});
});
const givenSnsEventRepository = () => {
snsConfig = {
region: "us-east-1",
topicArn: "arn:aws:sns:us-east-1:123456789012:MyTopic",
groupId: "groupId",
subject: "subject",
};
snsClient = {
send: jest.fn().mockReturnThis(),
} as unknown as SNSClient;
snsEventRepository = new SnsEventRepository(snsClient, snsConfig);
};

View File

@ -0,0 +1,17 @@
NODE_ENV=staging
BLOCKCHAIN_ENV=testnet
NAMESPACE=wormscan-testnet
NAME=blockchain-watcher
DRY_RUN_ENABLED=false
REPLICAS=1
IMAGE_NAME=
PORT=3005
LOG_LEVEL=info
RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=200m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=100m
SNS_TOPIC_ARN=
SNS_REGION=

View File

@ -0,0 +1,27 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: blockchain-watcher-metadata-pvc
namespace: {{ .NAMESPACE }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Mi
storageClassName: gp2
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: blockchain-watcher-metadata-pv
namespace: {{ .NAMESPACE }}
spec:
accessModes:
- ReadWriteOnce
- ReadWriteMany
capacity:
storage: 50Mi
storageClassName: gp2
hostPath:
path: /home/node/app/metadata-repo

View File

@ -0,0 +1,52 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .NAME }}
namespace: {{ .NAMESPACE }}
spec:
replicas: {{ .REPLICAS }}
serviceName: {{ .NAME }}-service
selector:
matchLabels:
app: {{ .NAME }}
template:
metadata:
labels:
app: {{ .NAME }}
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 30
serviceAccountName: event-watcher
containers:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
env:
- name: NODE_ENV
value: {{ .NODE_ENV }}
- name: PORT
value: "{{ .PORT }}"
- name: LOG_LEVEL
value: {{ .LOG_LEVEL }}
- name: BLOCKCHAIN_ENV
value: {{ .BLOCKCHAIN_ENV }}
- name: DRY_RUN_ENABLED
value: "{{ .DRY_RUN_ENABLED }}"
- name: SNS_TOPIC_ARN
value: {{ .SNS_TOPIC_ARN }}
- name: SNS_REGION
value: {{ .SNS_REGION }}
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
cpu: {{ .RESOURCES_LIMITS_CPU }}
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}
volumeMounts:
- name: metadata-volume
mountPath: /home/node/app/metadata-repo
volumes:
- name: metadata-volume
persistentVolumeClaim:
claimName: blockchain-watcher-metadata-pvc