Blockchain watcher: adding domain and infra layers (#786)

* reorg domain-infra

* watch evm blocks action

* adding evm log parser

* wider prettier

* renaming watch action

* adding doc

* persist latest metadata

* gh action for blockchain-watcher

* adding log-message-published mapper

* deps: remove peers and nodemon

* adding handler for LogMessagePublished

* added parser for log message published

---------

Co-authored-by: chase-45 <chasemoran45@gmail.com>
This commit is contained in:
Matías Martínez 2023-11-07 15:25:06 -03:00 committed by GitHub
parent bd15f29631
commit 1ed4cec999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 9017 additions and 675 deletions

View File

@ -0,0 +1,34 @@
name: Run tests
on:
push:
branches: ["main"]
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v3
- name: Prettify code
uses: creyD/prettier_action@master
with:
dry: True
prettier_options: --write ./blockchain-watcher
prettier_version: 2.8.7
- uses: actions/setup-node@v3
with:
node-version: 18
cache: "npm"
cache-dependency-path: |
./blockchain-watcher/package-lock.json
- name: npm ci
run: npm ci
working-directory: ./blockchain-watcher
- name: typecheck
run: npm run build
working-directory: ./blockchain-watcher
- name: Run tests
run: npm test
working-directory: ./blockchain-watcher

View File

@ -1,3 +1,3 @@
node_modules
lib
coverage

View File

@ -0,0 +1,3 @@
{
"printWidth": 100
}

View File

@ -0,0 +1,110 @@
asyncapi: "2.6.0"
info:
title: Blockchain Watcher API
version: "0.0.1"
description: |
Platform service that allows to extract, transform and load data from different blockchains platforms.
servers:
staging-testnet:
url: arn:aws:sns:us-east-2:581679387567:notification-chain-events-dev-testnet.fifo
protocol: sns
defaultContentType: application/json
channels:
LogMessagePublished:
description: Wormhole core contract emitted event
subscribe:
message:
$ref: "#/components/messages/logMessagePublished"
TransferRedeemed:
description: Token bridge emitted event
subscribe:
message:
$ref: "#/components/messages/transferRedeemed"
components:
messages:
logMessagePublished:
name: LogMessagePublished
title: LogMessagePublished
contentType: application/json
payload:
$ref: "#/components/schemas/logMessagePublished"
transferRedeemed:
name: TransferRedeemed
title: TransferRedeemed
contentType: application/json
payload:
$ref: "#/components/schemas/transferRedeemed"
schemas:
base:
type: object
properties:
trackId:
type: string
source:
type: string
event:
type: string
version:
type: number
timestamp:
$ref: "#/components/schemas/sentAt"
chainEventBase:
type: object
properties:
chainId:
type: number
emitterAddress:
type: string
txHash:
type: string
blockHeight:
type: number
blockTime:
$ref: "#/components/schemas/sentAt"
logMessagePublished:
allOf:
- $ref: "#/components/schemas/base"
type: object
properties:
data:
allOf:
- $ref: "#/components/schemas/chainEventBase"
type: object
properties:
attributes:
type: object
properties:
sender:
type: string
sequence:
type: number
nonce:
type: number
payload:
type: string
consistencyLevel:
type: number
transferRedeemed:
allOf:
- $ref: "#/components/schemas/base"
type: object
properties:
data:
type: object
allOf:
- $ref: "#/components/schemas/chainEventBase"
properties:
attributes:
type: object
properties:
emitterChainId:
type: number
emitterAddress:
type: string
sequence:
type: number
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.

View File

@ -0,0 +1,12 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
testRegex: "^(?!.*integration.*)(?=.*test\\/).*\\.test\\.ts$",
collectCoverageFrom: ["./src/**"],
coverageThreshold: {
global: {
lines: 85,
},
},
};

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,14 @@
{
"name": "@wormhole-foundation/event-watcher",
"name": "@wormhole-foundation/blockchain-watcher",
"version": "0.0.0",
"description": "A process for watching blockchain events and moving them to persistent storage",
"main": "index.js",
"scripts": {
"start": "node lib/index.js",
"test": "jest",
"test:coverage": "jest --collectCoverage=true",
"build": "tsc",
"dev": "USE_ENV_FILE=true ts-node src/index2.ts",
"dev": "USE_ENV_FILE=true ts-node src/index.ts",
"prettier": "prettier --write ."
},
"author": "chase-45",
@ -14,18 +16,18 @@
"dependencies": {
"@certusone/wormhole-sdk": "^0.9.21-beta.0",
"dotenv": "^16.3.1",
"uuid": "^9.0.1"
},
"peerDependencies": {
"uuid": "^9.0.1",
"ethers": "^5",
"winston": "3.8.2"
},
"devDependencies": {
"@jest/globals": "^29.7.0",
"@types/koa-router": "^7.4.4",
"@types/uuid": "^9.0.6",
"@types/yargs": "^17.0.23",
"nodemon": "^2.0.20",
"jest": "^29.7.0",
"prettier": "^2.8.7",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.1",
"tsx": "^3.12.7",
"typescript": "^4.8.4",

View File

@ -0,0 +1,45 @@
import { ethers } from "ethers";
import { EvmLog, EvmTopicFilter } from "../entities";
/**
* Handling means mapping and forward to a given target.
* As of today, only one type of event can be handled per each instance.
*/
export class HandleEvmLogs<T> {
cfg: HandleEvmLogsConfig;
mapper: (log: EvmLog, parsedArgs: ReadonlyArray<any>) => T;
target: (parsed: T[]) => Promise<void>;
constructor(
cfg: HandleEvmLogsConfig,
mapper: (log: EvmLog, args: ReadonlyArray<any>) => T,
target: (parsed: T[]) => Promise<void>
) {
this.cfg = cfg;
this.mapper = mapper;
this.target = target;
}
public async handle(logs: EvmLog[]): Promise<T[]> {
const mappedItems = logs
.filter(
(log) =>
this.cfg.filter.addresses.includes(log.address) &&
this.cfg.filter.topics.includes(log.topics[0])
)
.map((log) => {
const iface = new ethers.utils.Interface([this.cfg.abi]);
const parsedLog = iface.parseLog(log);
return this.mapper(log, parsedLog.args);
});
await this.target(mappedItems);
// TODO: return a result specifying failures if any
return mappedItems;
}
}
export type HandleEvmLogsConfig = {
filter: EvmTopicFilter;
abi: string;
};

View File

@ -0,0 +1,139 @@
import { EvmLog } from "../entities";
import { EvmBlockRepository, MetadataRepository } from "../repositories";
import { setTimeout } from "timers/promises";
const ID = "watch-evm-logs";
/**
* PollEvmLogs is an action that watches for new blocks and extracts logs from them.
*/
export class PollEvmLogs {
private readonly blockRepo: EvmBlockRepository;
private readonly metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
private latestBlockHeight: bigint = 0n;
private blockHeightCursor: bigint = 0n;
private cfg: PollEvmLogsConfig;
private started: boolean = false;
constructor(
blockRepo: EvmBlockRepository,
metadataRepo: MetadataRepository<PollEvmLogsMetadata>,
cfg: PollEvmLogsConfig
) {
this.blockRepo = blockRepo;
this.metadataRepo = metadataRepo;
this.cfg = cfg;
}
public async start(handlers: ((logs: EvmLog[]) => Promise<void>)[]): Promise<void> {
const metadata = await this.metadataRepo.get(ID);
if (metadata) {
this.blockHeightCursor = metadata.lastBlock;
}
this.started = true;
this.watch(handlers);
}
private async watch(handlers: ((logs: EvmLog[]) => Promise<void>)[]): Promise<void> {
while (this.started) {
this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment());
const range = this.getBlockRange(this.latestBlockHeight);
if (this.cfg.hasFinished(range.fromBlock)) {
// TODO: log
await this.stop();
continue;
}
const logs = await this.blockRepo.getFilteredLogs({
fromBlock: range.fromBlock,
toBlock: range.toBlock,
addresses: this.cfg.addresses, // Works when sending multiple addresses, but not multiple topics.
topics: [], // this.cfg.topics => will be applied by handlers
});
const blockNumbers = new Set(logs.map((log) => log.blockNumber));
const blocks = await this.blockRepo.getBlocks(blockNumbers);
logs.forEach((log) => {
const block = blocks[log.blockHash];
log.blockTime = block.timestamp;
});
// TODO: add error handling.
await Promise.all(handlers.map((handler) => handler(logs)));
await this.metadataRepo.save(ID, { lastBlock: range.toBlock });
this.blockHeightCursor = range.toBlock;
await setTimeout(this.cfg.interval ?? 1_000, undefined, { ref: false });
}
}
/**
* Get the block range to extract.
* @param latestBlockHeight - the latest known height of the chain
* @returns an always valid range, in the sense from is always <= to
*/
private getBlockRange(latestBlockHeight: bigint): {
fromBlock: bigint;
toBlock: bigint;
} {
let fromBlock = this.blockHeightCursor + 1n;
// fromBlock is configured and is greater than current block height, then we allow to skip blocks.
if (this.cfg.fromBlock && this.cfg.fromBlock > this.blockHeightCursor) {
fromBlock = this.cfg.fromBlock;
}
if (fromBlock > latestBlockHeight) {
return { fromBlock: latestBlockHeight, toBlock: latestBlockHeight };
}
let toBlock = this.cfg.toBlock ?? this.blockHeightCursor + BigInt(this.cfg.getBlockBatchSize());
// limit toBlock to obtained block height
if (toBlock > fromBlock && toBlock > latestBlockHeight) {
toBlock = latestBlockHeight;
}
return { fromBlock, toBlock };
}
public async stop(): Promise<void> {
this.started = false;
}
// TODO: schedule getting latest block height in chain or use the value from poll to keep metrics updated
// this.latestBlockHeight = await this.blockRepo.getBlockHeight(this.commitment);
}
export type PollEvmLogsMetadata = {
lastBlock: bigint;
};
export class PollEvmLogsConfig {
fromBlock?: bigint;
toBlock?: bigint;
blockBatchSize?: number;
commitment?: string;
interval?: number;
addresses: string[] = [];
topics: string[] = [];
public getBlockBatchSize() {
return this.blockBatchSize ?? 100;
}
public getCommitment() {
return this.commitment ?? "latest";
}
public hasFinished(currentFromBlock: bigint) {
return this.toBlock && currentFromBlock > this.toBlock;
}
static fromBlock(fromBlock: bigint) {
const cfg = new PollEvmLogsConfig();
cfg.fromBlock = fromBlock;
return cfg;
}
}

View File

@ -0,0 +1,49 @@
export type EvmBlock = {
number: bigint;
hash: string;
timestamp: bigint; // epoch millis
};
export type EvmLog = {
blockTime: bigint;
blockNumber: bigint;
blockHash: string;
address: string;
removed: boolean;
data: string;
transactionHash: string;
transactionIndex: string;
topics: string[];
logIndex: number;
};
export type EvmTag = "finalized" | "latest" | "safe";
export type EvmTopicFilter = {
addresses: string[];
topics: string[];
};
export type EvmLogFilter = {
fromBlock: bigint | EvmTag;
toBlock: bigint | EvmTag;
addresses: string[];
topics: string[];
};
export type LogFoundEvent<T> = {
name: string;
chainId: number;
txHash: string;
blockHeight: bigint;
blockTime: bigint;
attributes: T;
};
export type LogMessagePublished = {
sequence: number;
sender: string;
nonce: number;
payload: string;
consistencyLevel: number;
};

View File

@ -0,0 +1,12 @@
import { EvmBlock, EvmLog, EvmLogFilter } from "./entities";
export interface EvmBlockRepository {
getBlockHeight(finality: string): Promise<bigint>;
getBlocks(blockNumbers: Set<bigint>): Promise<Record<string, EvmBlock>>;
getFilteredLogs(filter: EvmLogFilter): Promise<EvmLog[]>;
}
export interface MetadataRepository<Metadata> {
get(id: string): Promise<Metadata | undefined>;
save(id: string, metadata: Metadata): Promise<void>;
}

View File

@ -1,143 +1,112 @@
export {};
// import { CHAIN_ID_TO_NAME, ChainId } from "@certusone/wormhole-sdk";
// import {
// getEnvironment,
// getRpcs,
// getSupportedChains,
// getWormholeRelayerAddressWrapped,
// } from "./environment";
// import { WormholeRelayer__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts";
// import { WebSocketProvider } from "./utils/websocket";
// import deliveryEventHandler from "./handlers/deliveryEventHandler";
// import sendEventHandler from "./handlers/sendEventHandler";
// import { EventHandler, getEventListener } from "./handlers/EventHandler";
// import { Contract, ContractFactory, utils } from "ethers";
import {
createHandlers,
createWatchers,
getEnvironment,
initializeEnvironment,
} from "./infrastructure/environment";
import AbstractWatcher from "./infrastructure/watchers/AbstractWatcher";
// const ALL_EVENTS: EventHandler<any>[] = [
// deliveryEventHandler,
// sendEventHandler,
// ];
async function run() {
initializeEnvironment(process.env.WATCHER_CONFIG_PATH || "../config/local.json");
const ENVIRONMENT = await getEnvironment();
// async function queryEvents(chainId: ChainId, rpc: string) {
// console.log(`Querying events for chain ${chainId}`);
// const ENVIRONMENT = await getEnvironment();
//TODO instantiate the persistence module(s)
// for (const event of ALL_EVENTS) {
// if (!event.shouldSupportChain(ENVIRONMENT, chainId)) {
// continue;
// }
//TODO either hand the persistence module to the watcher, or pull necessary config from the persistence module here
// const contractAddress = event.getContractAddressEvm(ENVIRONMENT, chainId);
// const abi = event.getEventAbiEvm();
// const eventSignature = event.getEventSignatureEvm();
// const listener = getEventListener(event, chainId);
//TODO the event watchers currently instantiate themselves, which isn't ideal. Refactor for next version
const handlers = createHandlers(ENVIRONMENT);
const watchers = createWatchers(ENVIRONMENT, handlers);
// if (!abi || !eventSignature) {
// continue;
// }
await runAllProcesses(watchers);
}
// const provider = new WebSocketProvider(rpc);
// const contract = new Contract(contractAddress, abi, provider);
// const filter = contract.filters[eventSignature]();
// const logs = await contract.queryFilter(filter, -2048, "latest");
async function runAllProcesses(allWatchers: AbstractWatcher[]) {
//These are all the raw processes that will run, wrapped to contain their process ID and a top level error handler
let allProcesses = new Map<number, () => Promise<number>>();
let processIdCounter = 0;
// for (const log of logs) {
// await listener(log);
// }
// }
//These are all the processes, keyed by their process ID, that we know are not currently running.
const unstartedProcesses = new Set<number>();
// console.log(`Queried events for chain ${chainId}`);
// }
//Go through all the watchers, wrap their processes, and add them to the unstarted processes set
for (const watcher of allWatchers) {
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startWebsocketProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
// async function subscribeToEvents(chainId: ChainId, rpc: string) {
// console.log(`Subscribing to events for chain ${chainId}`);
// const ENVIRONMENT = await getEnvironment();
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startQueryProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
// for (const event of ALL_EVENTS) {
// if (event.shouldSupportChain(ENVIRONMENT, chainId)) {
// const contractAddress = event.getContractAddressEvm(ENVIRONMENT, chainId);
// const eventSignature = event.getEventSignatureEvm();
// if (!eventSignature) {
// continue;
// }
// const listener = getEventListener(event, chainId);
// const provider = new WebSocketProvider(rpc);
// try {
// provider.off(
// {
// address: contractAddress,
// topics: [utils.id(eventSignature)],
// },
// listener
// );
// } catch (e) {
// //ignore, we just want to make sure we don't have multiple listeners
// }
// provider.on(
// {
// address: contractAddress,
// topics: [utils.id(eventSignature)],
// },
// listener
// );
// }
// }
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startGapProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
}
// console.log(`Subscribed to all events for chain ${chainId}`);
// }
//If a process ends, reenqueue it into the unstarted processes set
const reenqueueCallback = (processId: number) => {
unstartedProcesses.add(processId);
};
// async function listenerLoop(sleepMs: number) {
// console.log("Starting event watcher");
// const SUPPORTED_CHAINS = await getSupportedChains();
//Every 5 seconds, try to start any unstarted processes
while (true) {
for (const processId of unstartedProcesses) {
const process = allProcesses.get(processId);
if (process) {
//TODO the process ID is a good key but is difficult to track to meaningful information
console.log(`Starting process ${processId}`);
unstartedProcesses.delete(processId);
process()
.then((processId) => {
reenqueueCallback(processId);
})
.catch((e) => {
reenqueueCallback(processId);
});
} else {
//should never happen
console.error(`Process ${processId} not found`);
}
}
// let run = true;
// while (run) {
// // resubscribe to contract events every 5 minutes
// for (const chainId of SUPPORTED_CHAINS) {
// try {
// const rpc = (await getRpcs()).get(chainId);
// if (!rpc) {
// console.log(`RPC not found for chain ${chainId}`);
// //hard exit
// process.exit(1);
// }
// await subscribeToEvents(chainId, rpc);
// } catch (e: any) {
// console.log(e);
// run = false;
// }
// }
// console.log(`Initialized connections, sleeping for ${sleepMs}ms`);
// await sleep(sleepMs);
// }
// }
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
// export async function queryLoop(periodMs: number) {
// console.log("Starting query loop");
// const supportedChains = await getSupportedChains();
// const rpcs = await getRpcs();
// let run = true;
// while (run) {
// for (const chainId of supportedChains) {
// try {
// const rpc = rpcs.get(chainId);
// if (!rpc) {
// throw new Error("RPC not found");
// }
// await queryEvents(chainId, rpc);
// } catch (e) {
// console.error(`Error subscribing to events for chain ${chainId}`);
// console.error(e);
// }
// }
// await sleep(periodMs);
// }
// }
function wrapProcessWithTracker(
processId: number,
process: () => Promise<void>
): () => Promise<number> {
return () => {
return process()
.then(() => {
console.log(`Process ${processId} exited via promise resolution`);
return processId;
})
.catch((e) => {
console.error(`Process ${processId} exited via promise rejection`);
console.error(e);
return processId;
});
};
}
// async function sleep(timeout: number) {
// return new Promise((resolve) => setTimeout(resolve, timeout));
// }
// // start the process
// listenerLoop(300000);
// //queryLoop(300000);
//run should never stop, unless an unexpected fatal error occurs
run()
.then(() => {
console.log("run() finished");
})
.catch((e) => {
console.error(e);
console.error("Fatal error caused process to exit");
});

View File

@ -1,114 +0,0 @@
import {
createHandlers,
createWatchers,
getEnvironment,
initializeEnvironment,
} from "./environment";
import AbstractWatcher from "./watchers/AbstractWatcher";
async function run() {
initializeEnvironment(
process.env.WATCHER_CONFIG_PATH || "../config/local.json"
);
const ENVIRONMENT = await getEnvironment();
//TODO instantiate the persistence module(s)
//TODO either hand the persistence module to the watcher, or pull necessary config from the persistence module here
//TODO the event watchers currently instantiate themselves, which isn't ideal. Refactor for next version
const handlers = createHandlers(ENVIRONMENT);
const watchers = createWatchers(ENVIRONMENT, handlers);
await runAllProcesses(watchers);
}
async function runAllProcesses(allWatchers: AbstractWatcher[]) {
//These are all the raw processes that will run, wrapped to contain their process ID and a top level error handler
let allProcesses = new Map<number, () => Promise<number>>();
let processIdCounter = 0;
//These are all the processes, keyed by their process ID, that we know are not currently running.
const unstartedProcesses = new Set<number>();
//Go through all the watchers, wrap their processes, and add them to the unstarted processes set
for (const watcher of allWatchers) {
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startWebsocketProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startQueryProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startGapProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
}
//If a process ends, reenqueue it into the unstarted processes set
const reenqueueCallback = (processId: number) => {
unstartedProcesses.add(processId);
};
//Every 5 seconds, try to start any unstarted processes
while (true) {
for (const processId of unstartedProcesses) {
const process = allProcesses.get(processId);
if (process) {
//TODO the process ID is a good key but is difficult to track to meaningful information
console.log(`Starting process ${processId}`);
unstartedProcesses.delete(processId);
process()
.then((processId) => {
reenqueueCallback(processId);
})
.catch((e) => {
reenqueueCallback(processId);
});
} else {
//should never happen
console.error(`Process ${processId} not found`);
}
}
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
function wrapProcessWithTracker(
processId: number,
process: () => Promise<void>
): () => Promise<number> {
return () => {
return process()
.then(() => {
console.log(`Process ${processId} exited via promise resolution`);
return processId;
})
.catch((e) => {
console.error(`Process ${processId} exited via promise rejection`);
console.error(e);
return processId;
});
};
}
//run should never stop, unless an unexpected fatal error occurs
run()
.then(() => {
console.log("run() finished");
})
.catch((e) => {
console.error(e);
console.error("Fatal error caused process to exit");
});

View File

@ -1,9 +1,4 @@
import {
ChainId,
ChainName,
Network,
toChainName,
} from "@certusone/wormhole-sdk";
import { ChainId, ChainName, Network, toChainName } from "@certusone/wormhole-sdk";
import AbstractWatcher from "./watchers/AbstractWatcher";
import { rootLogger } from "./utils/log";
import winston from "winston";
@ -65,19 +60,19 @@ const readEnvironmentVariable = (name: string): string | null => {
return value;
};
type HandlerConfig = {
export type HandlerConfig = {
name: string;
config: any;
};
type ConfigFile = {
export type ConfigFile = {
network: Network;
supportedChains: ChainId[];
rpcs: { chain: ChainId; rpc: string }[];
handlers: HandlerConfig[];
};
type Environment = {
export type Environment = {
network: Network;
configurationPath: any;
configuration: ConfigFile;

View File

@ -1,8 +1,9 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import { v4 as uuidv4 } from "uuid";
import { Environment } from "../environment";
const { createHash } = require("crypto");
type SyntheticEvent<T> = {
export type SyntheticEvent<T> = {
eventName: string;
eventVersion: number;
eventChain: ChainId;
@ -14,16 +15,17 @@ type SyntheticEvent<T> = {
export default abstract class AbstractHandler<T> {
public name: string;
public environment: Environment;
public config: any;
constructor(name: string) {
constructor(name: string, environment: Environment, config: any) {
this.name = name;
this.environment = environment;
this.config = config;
}
//These top level functions must always be implemented
public abstract shouldSupportChain(
network: Network,
chainId: ChainId
): boolean;
public abstract shouldSupportChain(network: Network, chainId: ChainId): boolean;
//These functions must be implemented if an EVM chain is supported.
@ -37,14 +39,8 @@ export default abstract class AbstractHandler<T> {
//This function will be called when a subscribed event is received from the ethers provider.
//TODO pretty sure the ...args is always an ethers.Event object
public abstract handleEventEvm(
chainId: ChainId,
...args: any
): Promise<SyntheticEvent<T>[]>;
public abstract getContractAddressEvm(
network: Network,
chainId: ChainId
): string;
public abstract handleEventEvm(chainId: ChainId, ...args: any): Promise<SyntheticEvent<T>>;
public abstract getContractAddressEvm(network: Network, chainId: ChainId): string;
//*** Non-abstract functions
@ -63,11 +59,7 @@ export default abstract class AbstractHandler<T> {
}
})
.catch((e) => {
console.error(
"Unexpected error processing the following event: ",
chainId,
...args
);
console.error("Unexpected error processing the following event: ", chainId, ...args);
console.error(e);
});
};
@ -81,11 +73,15 @@ export default abstract class AbstractHandler<T> {
return uuidv4();
}
private wrapEvent(
chainId: ChainId,
version: number,
data: T
): SyntheticEvent<T> {
public getEnvironment(): Environment {
return this.environment;
}
public getConfig(): any {
return this.config;
}
protected wrapEvent(chainId: ChainId, version: number, data: T): SyntheticEvent<T> {
return {
eventName: this.name,
eventVersion: version,

View File

@ -0,0 +1,100 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractHandler, { SyntheticEvent } from "./AbstractHandler";
import { Environment } from "../environment";
import { ethers } from "ethers";
const CURRENT_VERSION = 1;
type LogMessagePublishedConfig = {
chains: {
chainId: ChainId;
coreContract: string;
}[];
};
//VAA structure is the same on all chains.
//therefore, as long as the content of the VAA is readable on-chain, we should be able to create this object for all ecosystems
type LogMessagePublished = {
timestamp: number;
nonce: number;
emitterChain: ChainId;
emitterAddress: string;
sequence: number;
consistencyLevel: number;
payload: string;
hash: string;
};
export default class LogMessagePublishedHandler extends AbstractHandler<LogMessagePublished> {
constructor(env: Environment, config: any) {
super("LogMessagePublished", env, config);
}
public shouldSupportChain(network: Network, chainId: ChainId): boolean {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
return found !== undefined;
}
public getEventAbiEvm(): string[] | null {
return [
"event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel);",
];
}
public getEventSignatureEvm(): string | null {
return "LogMessagePublished(address,uint64,uint32,bytes,uint8)";
}
public async handleEventEvm(
chainId: ChainId,
event: ethers.Event
): Promise<SyntheticEvent<LogMessagePublished>> {
const abi = this.getEventAbiEvm() as string[];
const iface = new ethers.utils.Interface(abi);
const parsedLog = iface.parseLog(event);
const timestamp = (await event.getBlock()).timestamp; //TODO see if there's a way we can do this without pulling the block header
const nonce = parsedLog.args[2].toNumber();
const emitterChain = chainId;
const emitterAddress = parsedLog.args[0].toString("hex"); //TODO unsure if this is correct
const sequence = parsedLog.args[1].toNumber();
const consistencyLevel = parsedLog.args[4].toNumber();
const payload = parsedLog.args[3].toString("hex"); //TODO unsure if this is correct
//Encoding from Wormhole ts-sdk
// timestamp: body.readUInt32BE(0),
// nonce: body.readUInt32BE(4),
// emitterChain: body.readUInt16BE(8),
// emitterAddress: body.subarray(10, 42),
// sequence: body.readBigUInt64BE(42),
// consistencyLevel: body[50],
// payload: body.subarray(51),
const body = ethers.utils.defaultAbiCoder.encode(
["uint32", "uint32", "uint16", "bytes32", "uint64", "uint8", "bytes"],
[timestamp, nonce, chainId, emitterAddress, sequence, consistencyLevel, payload]
);
const hash = this.keccak256(body).toString("hex");
const parsedEvent = {
timestamp,
nonce,
emitterChain,
emitterAddress,
sequence,
consistencyLevel,
payload,
hash,
};
return Promise.resolve(this.wrapEvent(chainId, CURRENT_VERSION, parsedEvent));
}
public getContractAddressEvm(network: Network, chainId: ChainId): string {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
if (found === undefined) {
throw new Error("Chain not supported");
}
return found.coreContract;
}
//TODO move to utils
private keccak256(data: ethers.BytesLike): Buffer {
return Buffer.from(ethers.utils.arrayify(ethers.utils.keccak256(data)));
}
}

View File

@ -0,0 +1,22 @@
import { BigNumber } from "ethers";
import { EvmLog, LogFoundEvent, LogMessagePublished } from "../../domain/entities";
export const evmLogMessagePublishedMapper = (
log: EvmLog,
parsedArgs: ReadonlyArray<any>
): LogFoundEvent<LogMessagePublished> => {
return {
name: "log-message-published",
chainId: 2, // TODO: get from config
txHash: log.transactionHash,
blockHeight: log.blockNumber,
blockTime: log.blockTime,
attributes: {
sender: parsedArgs[0], // log.topics[1]
sequence: (parsedArgs[1] as BigNumber).toNumber(),
payload: parsedArgs[3],
nonce: parsedArgs[2],
consistencyLevel: parsedArgs[4],
},
};
};

View File

@ -4,8 +4,8 @@ const WEBSOCKET_PING_INTERVAL = 10000;
const WEBSOCKET_PONG_TIMEOUT = 5000;
const WEBSOCKET_RECONNECT_DELAY = 100;
const WebSocketProviderClass =
(): new () => ethers.providers.WebSocketProvider => class {} as never;
const WebSocketProviderClass = (): new () => ethers.providers.WebSocketProvider =>
class {} as never;
export class WebSocketProvider extends WebSocketProviderClass() {
private provider?: ethers.providers.WebSocketProvider;
@ -14,8 +14,7 @@ export class WebSocketProvider extends WebSocketProviderClass() {
private handler = {
get(target: WebSocketProvider, prop: string, receiver: unknown) {
const value =
target.provider && Reflect.get(target.provider, prop, receiver);
const value = target.provider && Reflect.get(target.provider, prop, receiver);
return value instanceof Function ? value.bind(target.provider) : value;
},

View File

@ -0,0 +1,97 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { HandleEvmLogs, HandleEvmLogsConfig } from "../../src/domain/actions/HandleEvmLogs";
import { EvmLog, LogFoundEvent } from "../../src/domain/entities";
const ABI =
"event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)";
const mapper = (log: EvmLog, args: ReadonlyArray<any>) => {
return {
name: "send-event",
chainId: 1,
txHash: "0x0",
blockHeight: 0n,
blockTime: 0n,
attributes: {
sequence: args[0].toString(),
deliveryQuote: args[1].toString(),
paymentForExtraReceiverValue: args[2].toString(),
},
};
};
const targetRepo = {
save: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.resolve();
},
failingSave: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.reject();
},
};
let targetRepoSpy: jest.SpiedFunction<(typeof targetRepo)["save"]>;
let evmLogs: EvmLog[];
let cfg: HandleEvmLogsConfig;
let handleEvmLogs: HandleEvmLogs<LogFoundEvent<Record<string, string>>>;
describe("HandleEvmLogs", () => {
afterEach(async () => {});
it("should be able to map logs", async () => {
const expectedLength = 5;
givenConfig(ABI);
givenEvmLogs(expectedLength, expectedLength);
givenHandleEvmLogs();
const result = await handleEvmLogs.handle(evmLogs);
expect(result).toHaveLength(expectedLength);
expect(result[0].attributes.sequence).toBe("3389");
expect(result[0].attributes.deliveryQuote).toBe("75150000000000000");
expect(result[0].attributes.paymentForExtraReceiverValue).toBe("0");
expect(targetRepoSpy).toBeCalledWith(result);
});
});
const givenHandleEvmLogs = (targetFn: "save" | "failingSave" = "save") => {
targetRepoSpy = jest.spyOn(targetRepo, targetFn);
handleEvmLogs = new HandleEvmLogs(cfg, mapper, targetRepo[targetFn]);
};
const givenConfig = (abi: string) => {
cfg = {
filter: {
addresses: ["0x28D8F1Be96f97C1387e94A53e00eCcFb4E75175a"],
topics: ["0xda8540426b64ece7b164a9dce95448765f0a7263ef3ff85091c9c7361e485364"],
},
abi,
};
};
const givenEvmLogs = (length: number, matchingFilterOnes: number) => {
evmLogs = [];
let matchingCount = 0;
for (let i = 0; i < length; i++) {
let address = "0x392f472048681816e91026cd768c60958b55352add2837adea9ea6249178b8a8";
let topic: string | undefined = undefined;
if (matchingCount < matchingFilterOnes) {
address = cfg.filter.addresses![0];
topic = cfg.filter.topics![0];
matchingCount++;
}
evmLogs.push({
blockTime: 0n,
blockNumber: BigInt(i + 1),
blockHash: "0x1a07d0bd31c84f0dab36eac31a2f3aa801852bf8240ffba19113c62463f694fa",
address: address,
removed: false,
data: "0x000000000000000000000000000000000000000000000000010afc86dedee0000000000000000000000000000000000000000000000000000000000000000000",
transactionHash: "0x2077dbd0c685c264dfa4e8e048ff15b03775043070216644258bf1bd3e419aa8",
transactionIndex: "0x4",
topics: topic
? [topic, "0x0000000000000000000000000000000000000000000000000000000000000d3d"]
: [],
logIndex: 0,
});
}
};

View File

@ -0,0 +1,162 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { setTimeout } from "timers/promises";
import {
PollEvmLogsMetadata,
PollEvmLogs,
PollEvmLogsConfig,
} from "../../src/domain/actions/PollEvmLogs";
import { EvmBlockRepository, MetadataRepository } from "../../src/domain/repositories";
import { EvmBlock, EvmLog } from "../../src/domain/entities";
let cfg = PollEvmLogsConfig.fromBlock(0n);
let getBlocksSpy: jest.SpiedFunction<EvmBlockRepository["getBlocks"]>;
let getLogsSpy: jest.SpiedFunction<EvmBlockRepository["getFilteredLogs"]>;
let handlerSpy: jest.SpiedFunction<(logs: EvmLog[]) => Promise<void>>;
let metadataSaveSpy: jest.SpiedFunction<MetadataRepository<PollEvmLogsMetadata>["save"]>;
let metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
let evmBlockRepo: EvmBlockRepository;
let handlers = {
working: (logs: EvmLog[]) => Promise.resolve(),
failing: (logs: EvmLog[]) => Promise.reject(),
};
let pollEvmLogs: PollEvmLogs;
describe("PollEvmLogs", () => {
afterEach(async () => {
await pollEvmLogs.stop();
});
it("should be able to read logs from given start", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenPollEvmLogs(currentHeight);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(getBlocksSpy).toHaveBeenCalledWith(new Set([currentHeight, currentHeight + 1n])),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: currentHeight,
toBlock: currentHeight + blocksAhead,
})
);
});
it("should be able to read logs from last known block when configured from is before", async () => {
const lastExtractedBlock = 10n;
const blocksAhead = 10n;
givenEvmBlockRepository(lastExtractedBlock, blocksAhead);
givenMetadataRepository({ lastBlock: lastExtractedBlock });
givenPollEvmLogs(lastExtractedBlock - 10n);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => () =>
expect(getBlocksSpy).toHaveBeenCalledWith(
new Set([lastExtractedBlock, lastExtractedBlock + 1n])
),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: lastExtractedBlock + 1n,
toBlock: lastExtractedBlock + blocksAhead,
})
);
});
it("should pass logs to handlers and persist metadata", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenPollEvmLogs(currentHeight);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(handlerSpy).toHaveBeenCalledWith(expect.any(Array)),
() =>
expect(metadataSaveSpy).toBeCalledWith("watch-evm-logs", {
lastBlock: currentHeight + blocksAhead,
})
);
});
});
const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => {
const logsResponse: EvmLog[] = [];
const blocksResponse: Record<string, EvmBlock> = {};
if (height) {
for (let index = 0n; index <= (blocksAhead ?? 1n); index++) {
logsResponse.push({
blockNumber: height + index,
blockHash: `0x0${index}`,
blockTime: 0n,
address: "",
removed: false,
data: "",
transactionHash: "",
transactionIndex: "",
topics: [],
logIndex: 0,
});
blocksResponse[`0x0${index}`] = {
timestamp: 0n,
hash: `0x0${index}`,
number: height + index,
};
}
}
evmBlockRepo = {
getBlocks: () => Promise.resolve(blocksResponse),
getBlockHeight: () => Promise.resolve(height ? height + (blocksAhead ?? 10n) : 10n),
getFilteredLogs: () => Promise.resolve(logsResponse),
};
getBlocksSpy = jest.spyOn(evmBlockRepo, "getBlocks");
getLogsSpy = jest.spyOn(evmBlockRepo, "getFilteredLogs");
handlerSpy = jest.spyOn(handlers, "working");
};
const givenMetadataRepository = (data?: PollEvmLogsMetadata) => {
metadataRepo = {
get: () => Promise.resolve(data),
save: () => Promise.resolve(),
};
metadataSaveSpy = jest.spyOn(metadataRepo, "save");
};
const givenPollEvmLogs = (from?: bigint) => {
cfg.fromBlock = from ?? cfg.fromBlock;
pollEvmLogs = new PollEvmLogs(evmBlockRepo, metadataRepo, cfg);
};
const whenPollEvmLogsStarts = async () => {
await pollEvmLogs.start([handlers.working]);
};
const thenWaitForAssertion = async (...assertions: (() => void)[]) => {
for (let index = 1; index < 5; index++) {
try {
for (const assertion of assertions) {
assertion();
}
break;
} catch (error) {
await setTimeout(10, undefined, { ref: false });
if (index === 4) {
throw error;
}
}
}
};

View File

@ -0,0 +1,50 @@
import { describe, it, expect } from "@jest/globals";
import { evmLogMessagePublishedMapper } from "../../../src/infrastructure/mappers/evmLogMessagePublishedMapper";
import { HandleEvmLogs } from "../../../src/domain/actions/HandleEvmLogs";
const address = "0x98f3c9e6e3face36baad05fe09d375ef1464288b";
const topic = "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2";
const txHash = "0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7";
const handler = new HandleEvmLogs(
{
filter: { addresses: [address], topics: [topic] },
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
},
evmLogMessagePublishedMapper,
async () => {}
);
describe("evmLogMessagePublished", () => {
it("should be able to map log to LogMessagePublished", async () => {
const [result] = await handler.handle([
{
blockTime: 1699375895n,
blockNumber: 18521386n,
blockHash: "0x894136d03446d47116319d59b5ec3190c05248e16c8728c2848bf7452732341c",
address: "0x98f3c9e6e3face36baad05fe09d375ef1464288b",
removed: false,
data: "0x00000000000000000000000000000000000000000000000000000000000212b20000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000085010000000000000000000000000000000000000000000000000000000045be2810000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb480002f022f6b3e80ec1219065fee8e46eb34c1cfd056a8d52d93df2c7e0165eaf364b00010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
transactionHash: txHash,
transactionIndex: "0x62",
topics: [topic, "0x0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585"],
logIndex: 0,
},
]);
expect(result.name).toBe("log-message-published");
expect(result.chainId).toBe(2);
expect(result.txHash).toBe(
"0xcbdefc83080a8f60cbde7785eb2978548fd5c1f7d0ea2c024cce537845d339c7"
);
expect(result.blockHeight).toBe(18521386n);
expect(result.blockTime).toBe(1699375895n);
expect(result.attributes.sequence).toBe(135858);
expect(result.attributes.sender.toLowerCase()).toBe(
"0x3ee18b2214aff97000d974cf647e7c347e8fa585"
);
expect(result.attributes.nonce).toBe(0);
expect(result.attributes.consistencyLevel).toBe(1);
});
});