This commit is contained in:
matias martinez 2023-11-28 15:45:23 -03:00
parent cf18d36afd
commit 8e3941c3c2
9 changed files with 0 additions and 807 deletions

View File

@ -1,112 +0,0 @@
import {
} from "./infrastructure/watchers/environment";
import AbstractWatcher from "./infrastructure/watchers/AbstractWatcher";
async function run() {
initializeEnvironment(process.env.WATCHER_CONFIG_PATH || "../config/local.json");
const ENVIRONMENT = await getEnvironment();
//TODO instantiate the persistence module(s)
//TODO either hand the persistence module to the watcher, or pull necessary config from the persistence module here
//TODO the event watchers currently instantiate themselves, which isn't ideal. Refactor for next version
const handlers = createHandlers(ENVIRONMENT);
const watchers = createWatchers(ENVIRONMENT, handlers);
await runAllProcesses(watchers);
async function runAllProcesses(allWatchers: AbstractWatcher[]) {
//These are all the raw processes that will run, wrapped to contain their process ID and a top level error handler
let allProcesses = new Map<number, () => Promise<number>>();
let processIdCounter = 0;
//These are all the processes, keyed by their process ID, that we know are not currently running.
const unstartedProcesses = new Set<number>();
//Go through all the watchers, wrap their processes, and add them to the unstarted processes set
for (const watcher of allWatchers) {
wrapProcessWithTracker(processIdCounter, watcher.startWebsocketProcessor)
wrapProcessWithTracker(processIdCounter, watcher.startQueryProcessor)
wrapProcessWithTracker(processIdCounter, watcher.startGapProcessor)
//If a process ends, reenqueue it into the unstarted processes set
const reenqueueCallback = (processId: number) => {
//Every 5 seconds, try to start any unstarted processes
while (true) {
for (const processId of unstartedProcesses) {
const process = allProcesses.get(processId);
if (process) {
//TODO the process ID is a good key but is difficult to track to meaningful information
console.log(`Starting process ${processId}`);
.then((processId) => {
.catch((e) => {
} else {
//should never happen
console.error(`Process ${processId} not found`);
await new Promise((resolve) => setTimeout(resolve, 5000));
function wrapProcessWithTracker(
processId: number,
process: () => Promise<void>
): () => Promise<number> {
return () => {
return process()
.then(() => {
console.log(`Process ${processId} exited via promise resolution`);
return processId;
.catch((e) => {
console.error(`Process ${processId} exited via promise rejection`);
return processId;
//run should never stop, unless an unexpected fatal error occurs
.then(() => {
console.log("run() finished");
.catch((e) => {
console.error("Fatal error caused process to exit");

View File

@ -1,37 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractHandler from "./handlers/AbstractHandler";
export default abstract class AbstractWatcher {
//store class fields from constructor
public watcherName: string;
public environment: Network;
public events: AbstractHandler<any>[];
public chain: ChainId;
public rpc: string;
public logger: any;
//TODO add persistence module(s) as class fields
//or, alternatively, pull necessary config from the persistence module here
//TODO resumeBlock is needed for the query processor
watcherName: string,
environment: Network,
events: AbstractHandler<any>[],
chain: ChainId,
rpc: string,
logger: any
) {
this.watcherName = watcherName;
this.environment = environment; = events;
this.chain = chain;
this.rpc = rpc;
this.logger = logger;
abstract startWebsocketProcessor(): Promise<void>;
abstract startQueryProcessor(): Promise<void>;
abstract startGapProcessor(): Promise<void>;

View File

@ -1,26 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractWatcher from "./AbstractWatcher";
import AbstractHandler from "./handlers/AbstractHandler";
export default class EvmWatcher extends AbstractWatcher {
watcherName: string,
environment: Network,
events: AbstractHandler<any>[],
chain: ChainId,
rpc: string,
logger: any
) {
super(watcherName, environment, events, chain, rpc, logger);
async startWebsocketProcessor(): Promise<void> {
throw new Error("Method not implemented.");
async startQueryProcessor(): Promise<void> {
throw new Error("Method not implemented.");
async startGapProcessor(): Promise<void> {
throw new Error("Method not implemented.");

View File

@ -1,170 +0,0 @@
import { ChainId, ChainName, Network, toChainName } from "@certusone/wormhole-sdk";
import AbstractWatcher from "./AbstractWatcher";
import winston from "winston";
import EvmWatcher from "./EvmWatcher";
import AbstractHandler from "./handlers/AbstractHandler";
const MAINNET_RPCS: { [key in ChainName]?: string } = {
ethereum: process.env.ETH_RPC || "",
bsc: process.env.BSC_RPC || "",
polygon: "",
avalanche: "",
oasis: "",
algorand: "",
fantom: "",
karura: "",
acala: "",
klaytn: "",
celo: "",
moonbeam: "",
arbitrum: "",
optimism: "",
aptos: "",
near: "",
xpla: "",
terra2: "",
terra: "",
injective: "",
solana: process.env.SOLANA_RPC ?? "",
sui: "",
const TESTNET_RPCS: { [key in ChainName]?: string } = {
bsc: "",
polygon: "",
avalanche: "",
celo: "",
moonbeam: "",
const DEVNET_RPCS: { [key in ChainName]?: string } = {
ethereum: "http://localhost:8545",
bsc: "http://localhost:8546",
let hasLoadedDotEnv = false;
function loadDotEnv() {
if (readEnvironmentVariable("USE_ENV_FILE") === "true" && !hasLoadedDotEnv) {
//use the dotenv library to load in the .env file
hasLoadedDotEnv = true;
const readEnvironmentVariable = (name: string): string | null => {
const value = process.env[name];
if (!value) {
return null;
return value;
export type HandlerConfig = {
name: string;
config: any;
export type ConfigFile = {
network: Network;
supportedChains: ChainId[];
rpcs: { chain: ChainId; rpc: string }[];
handlers: HandlerConfig[];
export type Environment = {
network: Network;
configurationPath: any;
configuration: ConfigFile;
supportedChains: ChainId[];
rpcs: Map<ChainId, string>;
logger: winston.Logger;
let environment: Environment | null = null;
export function getEnvironment(): Environment {
if (environment) {
return environment;
} else {
throw new Error("Environment not set");
export async function initializeEnvironment(configurationPath: string) {
const configuration = require(configurationPath);
const json: ConfigFile = JSON.parse(JSON.stringify(configuration));
const network =;
if (network !== "MAINNET" && network !== "TESTNET" && network !== "DEVNET") {
throw new Error("Invalid network provided in the configuration file");
const supportedChains = json.supportedChains;
if (!supportedChains || supportedChains.length === 0) {
throw new Error("No supported chains provided in the configuration file");
const configRpcs = json.rpcs;
const rpcs = new Map<ChainId, string>();
for (const chain of supportedChains) {
configRpcs.forEach((item: any) => {
//double equals for string/int equality
if (item.chain == chain) {
if (!item.rpc) {
throw new Error(`No RPC provided for chain ${chain}`);
rpcs.set(chain, item.rpc);
environment = {
logger: winston.child({}),
//TODO this
export function createHandlers(env: Environment): AbstractHandler<any>[] {
const handlerArray: AbstractHandler<any>[] = [];
for (const handler of env.configuration.handlers) {
const handlerInstance = new (require(`./handlers/${}`).default)(
return handlerArray;
//TODO this process probably needs persistence
export function createWatchers(
env: Environment,
handlers: AbstractHandler<any>[]
): AbstractWatcher[] {
const watchers: AbstractWatcher[] = [];
for (const chain of env.supportedChains) {
const rpc = env.rpcs.get(chain);
if (!rpc) {
throw new Error(`No RPC provided for chain ${chain}`);
const watcher = new EvmWatcher(
toChainName(chain) + " Watcher",,
return watchers;

View File

@ -1,95 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import { v4 as uuidv4 } from "uuid";
import { Environment } from "../environment";
const { createHash } = require("crypto");
export type SyntheticEvent<T> = {
eventName: string;
eventVersion: number;
eventChain: ChainId;
observationTimestamp: number;
uuid: string; //UUID for the event, good for deduping
dataHash: string; //sha256 hash of the event data, good for deduping
data: T;
export default abstract class AbstractHandler<T> {
public name: string;
public environment: Environment;
public config: any;
constructor(name: string, environment: Environment, config: any) { = name;
this.environment = environment;
this.config = config;
//These top level functions must always be implemented
public abstract shouldSupportChain(network: Network, chainId: ChainId): boolean;
//These functions must be implemented if an EVM chain is supported.
//Event to be listened for in ABI format. Example:
//"event Delivery(address indexed recipientContract, uint16 indexed sourceChain, uint64 indexed sequence, bytes32 deliveryVaaHash, uint8 status, uint256 gasUsed, uint8 refundStatus, bytes additionalStatusInfo, bytes overridesInfo)",
public abstract getEventAbiEvm(): string[] | null;
//Event to be listened for in signature format. Example:
public abstract getEventSignatureEvm(): string | null;
//This function will be called when a subscribed event is received from the ethers provider.
//TODO pretty sure the ...args is always an ethers.Event object
public abstract handleEventEvm(chainId: ChainId, ...args: any): Promise<SyntheticEvent<T>>;
public abstract getContractAddressEvm(network: Network, chainId: ChainId): string;
//*** Non-abstract functions
//Wrapper function to hand into EVM rpc provider.
//The wrapper is necessary otherwise we can't figure out which chain ID the event came from.
public getEventListener(handler: AbstractHandler<T>, chainId: ChainId) {
return (...args) => {
// @ts-ignore
return handler
.handleEventEvm(chainId, ...args)
.then((records) => {
if (records) {
//TODO persist records. Unsure how exactly this happens atm.
.catch((e) => {
console.error("Unexpected error processing the following event: ", chainId, ...args);
public getName(): string {
public generateUuid(): string {
return uuidv4();
public getEnvironment(): Environment {
return this.environment;
public getConfig(): any {
return this.config;
protected wrapEvent(chainId: ChainId, version: number, data: T): SyntheticEvent<T> {
return {
eventVersion: version,
eventChain: chainId,
uuid: this.generateUuid(),
dataHash: createHash("sha256").update(JSON.stringify(data)).digest("hex"),
data: data,

View File

@ -1,100 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractHandler, { SyntheticEvent } from "./AbstractHandler";
import { Environment } from "../environment";
import { ethers } from "ethers";
type LogMessagePublishedConfig = {
chains: {
chainId: ChainId;
coreContract: string;
//VAA structure is the same on all chains.
//therefore, as long as the content of the VAA is readable on-chain, we should be able to create this object for all ecosystems
type LogMessagePublished = {
timestamp: number;
nonce: number;
emitterChain: ChainId;
emitterAddress: string;
sequence: number;
consistencyLevel: number;
payload: string;
hash: string;
export default class LogMessagePublishedHandler extends AbstractHandler<LogMessagePublished> {
constructor(env: Environment, config: any) {
super("LogMessagePublished", env, config);
public shouldSupportChain(network: Network, chainId: ChainId): boolean {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
return found !== undefined;
public getEventAbiEvm(): string[] | null {
return [
"event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel);",
public getEventSignatureEvm(): string | null {
return "LogMessagePublished(address,uint64,uint32,bytes,uint8)";
public async handleEventEvm(
chainId: ChainId,
event: ethers.Event
): Promise<SyntheticEvent<LogMessagePublished>> {
const abi = this.getEventAbiEvm() as string[];
const iface = new ethers.utils.Interface(abi);
const parsedLog = iface.parseLog(event);
const timestamp = (await event.getBlock()).timestamp; //TODO see if there's a way we can do this without pulling the block header
const nonce = parsedLog.args[2].toNumber();
const emitterChain = chainId;
const emitterAddress = parsedLog.args[0].toString("hex"); //TODO unsure if this is correct
const sequence = parsedLog.args[1].toNumber();
const consistencyLevel = parsedLog.args[4].toNumber();
const payload = parsedLog.args[3].toString("hex"); //TODO unsure if this is correct
//Encoding from Wormhole ts-sdk
// timestamp: body.readUInt32BE(0),
// nonce: body.readUInt32BE(4),
// emitterChain: body.readUInt16BE(8),
// emitterAddress: body.subarray(10, 42),
// sequence: body.readBigUInt64BE(42),
// consistencyLevel: body[50],
// payload: body.subarray(51),
const body = ethers.utils.defaultAbiCoder.encode(
["uint32", "uint32", "uint16", "bytes32", "uint64", "uint8", "bytes"],
[timestamp, nonce, chainId, emitterAddress, sequence, consistencyLevel, payload]
const hash = this.keccak256(body).toString("hex");
const parsedEvent = {
return Promise.resolve(this.wrapEvent(chainId, CURRENT_VERSION, parsedEvent));
public getContractAddressEvm(network: Network, chainId: ChainId): string {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
if (found === undefined) {
throw new Error("Chain not supported");
return found.coreContract;
//TODO move to utils
private keccak256(data: ethers.BytesLike): Buffer {
return Buffer.from(ethers.utils.arrayify(ethers.utils.keccak256(data)));

View File

@ -1,105 +0,0 @@
//TODO refactor this to use the new event handler system
export {};
// import { TypedEvent } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts/common";
// import { ethers } from "ethers";
// import { getEnvironment } from "../environment";
// import { CHAIN_ID_TO_NAME, ChainId, Network } from "@certusone/wormhole-sdk";
// import AbstractHandler from "./AbstractHandler";
// //TODOD consider additional fields:
// // - timestamp
// // - block number
// // - call data
// // - transaction cost
// // - full transaction receipt
// export type WormholeRelayerDeliveryEventRecord = {
// environment: string;
// chainId: ChainId;
// txHash: string;
// recipientContract: string;
// sourceChain: number;
// sequence: string;
// deliveryVaaHash: string;
// status: number;
// gasUsed: string;
// refundStatus: number;
// additionalStatusInfo: string;
// overridesInfo: string;
// };
// //TODO implement this such that it pushes the event to a database
// async function persistRecord(record: WormholeRelayerDeliveryEventRecord) {
// console.log(JSON.stringify(record));
// }
// function getEventAbi(): string[] {
// return [
// "event Delivery(address indexed recipientContract, uint16 indexed sourceChain, uint64 indexed sequence, bytes32 deliveryVaaHash, uint8 status, uint256 gasUsed, uint8 refundStatus, bytes additionalStatusInfo, bytes overridesInfo)",
// ];
// }
// function getEventSignatureEvm(): string {
// return "Delivery(address,uint16,uint64,bytes32,uint8,uint256,uint8,bytes,bytes)";
// }
// async function handleEventEvm(
// chainId: ChainId,
// eventObj: ethers.Event
// ): Promise<SyntheticEvent<WormholeRelayerDeliveryEventRecord>> {
// console.log(
// `Received Delivery event for Wormhole Relayer Contract, txHash: ${eventObj.transactionHash}`
// );
// const environment = await getEnvironment();
// const txHash = eventObj.transactionHash;
// var abi = getEventAbi();
// var iface = new ethers.utils.Interface(abi);
// var parsedLog = iface.parseLog(eventObj);
// const recipientContract = parsedLog.args[0];
// const sourceChain = parsedLog.args[1];
// const sequence = parsedLog.args[2].toString();
// const deliveryVaaHash = parsedLog.args[3];
// const status = parsedLog.args[4];
// const gasUsed = parsedLog.args[5].toString();
// const refundStatus = parsedLog.args[6];
// const additionalStatusInfo = parsedLog.args[7];
// const overridesInfo = parsedLog.args[8];
// return AbstractHandler.prototype.wrapEvent(chainId, 1, {
// environment:,
// chainId,
// txHash,
// recipientContract,
// sourceChain,
// sequence,
// deliveryVaaHash,
// status,
// gasUsed,
// refundStatus,
// additionalStatusInfo,
// overridesInfo,
// });
// }
// function getContractAddressEvm(network: Network, chainId: ChainId): string {
// return ""; //TODO //getWormholeRelayerAddressWrapped(CHAIN_ID_TO_NAME[chainId], network);
// }
// function shouldSupportChain(network: Network, chainId: ChainId): boolean {
// return true; //TODO currently the supported chains are determined by the relayer contract, so this is trivially true.
// //It might not be true in the future.
// }
// const WormholeRelayerEventHandler: AbstractHandler =
// {
// name: "Wormhole Relayer Delivery Event Handler",
// getEventSignatureEvm,
// getEventAbiEvm: getEventAbi,
// handleEventEvm,
// persistRecord,
// getContractAddressEvm,
// shouldSupportChain,
// getEventListener: AbstractHandler.prototype.getEventListener, //TODO not any of this
// };
// export default WormholeRelayerEventHandler;

View File

@ -1,80 +0,0 @@
//TODO refactor this to use the new event handler system
export {};
// import { TypedEvent } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts/common";
// import { ethers } from "ethers";
// import { getEnvironment } from "../environment";
// import { CHAIN_ID_TO_NAME, ChainId, Network } from "@certusone/wormhole-sdk";
// import AbstractHandler from "./AbstractHandler";
// //TODOD consider additional fields:
// // - timestamp
// // - entire transaction receipt
// // - deduplication info
// export type WormholeRelayerSendEventRecord = {
// environment: string;
// chainId: ChainId;
// txHash: string;
// sequence: string;
// deliveryQuote: string;
// paymentForExtraReceiverValue: string;
// };
// //TODO implement this such that it pushes the event to a database
// async function persistRecord(record: WormholeRelayerSendEventRecord) {
// console.log(JSON.stringify(record));
// }
// function getEventAbiEvm(): string[] {
// return [
// "event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)",
// ];
// }
// async function handleEventEvm(
// chainId: ChainId,
// eventObj: ethers.Event
// ): Promise<WormholeRelayerSendEventRecord | null> {
// console.log(
// `Received Send event for Wormhole Relayer Contract, txHash: ${eventObj.transactionHash}`
// );
// const abi = getEventAbiEvm();
// var iface = new ethers.utils.Interface(abi);
// var parsedLog = iface.parseLog(eventObj);
// return {
// //TODO env type broke
// environment: await getEnvironment().network,
// chainId: chainId,
// txHash: eventObj.transactionHash,
// sequence: parsedLog.args[0].toString(),
// deliveryQuote: parsedLog.args[1].toString(),
// paymentForExtraReceiverValue: parsedLog.args[2].toString(),
// };
// }
// function getContractAddressEvm(network: Network, chainId: ChainId): string {
// return ""; //TODO //getWormholeRelayerAddressWrapped(CHAIN_ID_TO_NAME[chainId], network);
// }
// function shouldSupportChain(network: Network, chainId: ChainId): boolean {
// return true; //TODO currently the supported chains are determined by the relayer contract, so this is trivially true.
// //It might not be true in the future.
// }
// function getEventSignatureEvm(): string {
// return "SendEvent(uint64,uint256,uint256)";
// }
// const WormholeRelayerSendEventHandler: AbstractHandler<WormholeRelayerSendEventRecord> =
// {
// name: "Wormhole Relayer Send Event Handler",
// getEventSignatureEvm,
// getEventAbiEvm,
// handleEventEvm,
// persistRecord,
// getContractAddressEvm,
// shouldSupportChain,
// getEventListener: AbstractHandler.prototype.getEventListener, //TODO not any of this
// };
// export default WormholeRelayerSendEventHandler;

View File

@ -1,82 +0,0 @@
import { ethers } from "ethers";
const WebSocketProviderClass = (): new () => ethers.providers.WebSocketProvider =>
class {} as never;
export class WebSocketProvider extends WebSocketProviderClass() {
private provider?: ethers.providers.WebSocketProvider;
private events: ethers.providers.WebSocketProvider["_events"] = [];
private requests: ethers.providers.WebSocketProvider["_requests"] = {};
private handler = {
get(target: WebSocketProvider, prop: string, receiver: unknown) {
const value = target.provider && Reflect.get(target.provider, prop, receiver);
return value instanceof Function ? value.bind(target.provider) : value;
constructor(private providerUrl: string) {
return new Proxy(this, this.handler);
private create() {
if (this.provider) { = [, ...this.provider._events];
this.requests = { ...this.requests, ...this.provider._requests };
const provider = new ethers.providers.WebSocketProvider(
let pingInterval: NodeJS.Timer | undefined;
let pongTimeout: NodeJS.Timeout | undefined;
provider._websocket.on("open", () => {
pingInterval = setInterval(() => {;
pongTimeout = setTimeout(() => {
let event;
while ((event = {
for (const key in this.requests) {
provider._requests[key] = this.requests[key];
delete this.requests[key];
provider._websocket.on("pong", () => {
if (pongTimeout) clearTimeout(pongTimeout);
provider._websocket.on("close", (code: number) => {
provider._wsReady = false;
if (pingInterval) clearInterval(pingInterval);
if (pongTimeout) clearTimeout(pongTimeout);
if (code !== 1000) {
setTimeout(() => this.create(), WEBSOCKET_RECONNECT_DELAY);
this.provider = provider;