added first cut of new process manager logic

This commit is contained in:
chase-45 2023-10-27 10:46:13 -04:00
parent f5864d582b
commit 9740308077
2 changed files with 145 additions and 6 deletions

View File

@ -0,0 +1,136 @@
import {
CHAIN_ID_ETH,
CHAIN_ID_TO_NAME,
ChainId,
} from "@certusone/wormhole-sdk";
import {
getEnvironment,
getRpcs,
getSupportedChains,
getWormholeRelayerAddressWrapped,
} from "./environment";
import { WormholeRelayer__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts";
import { WebSocketProvider } from "./utils/websocket";
import deliveryEventHandler from "./handlers/deliveryEventHandler";
import sendEventHandler from "./handlers/sendEventHandler";
import { EventHandler, getEventListener } from "./handlers/EventHandler";
import { Contract, ContractFactory, utils } from "ethers";
import AbstractWatcher from "./watchers/AbstractWatcher";
import EvmWatcher from "./watchers/EvmWatcher";
async function run() {
const ENVIRONMENT = await getEnvironment();
//TODO instantiate the persistence module(s)
//TODO either hand the persistence module to the watcher, or pull necessary config from the persistence module here
//TODO the event watchers currently instantiate themselves, which isn't ideal. Refactor for next version
//TODO move event configuration to be environment controlled
const ALL_EVENTS: EventHandler<any>[] = [
deliveryEventHandler,
sendEventHandler,
];
//TODO move to being environment controlled
const ALL_WATCHERS: AbstractWatcher[] = [
new EvmWatcher(
"Ethereum Watcher",
ENVIRONMENT,
ALL_EVENTS,
CHAIN_ID_ETH,
[(await getRpcs()).get(CHAIN_ID_ETH) || ""], //lol do not this
console //TODO add winston logger
),
];
await runAllProcesses(ALL_WATCHERS);
}
async function runAllProcesses(allWatchers: AbstractWatcher[]) {
//These are all the raw processes that will run, wrapped to contain their process ID and a top level error handler
let allProcesses = new Map<number, () => Promise<number>>();
let processIdCounter = 0;
//These are all the processes, keyed by their process ID, that we know are not currently running.
const unstartedProcesses = new Set<number>();
//Go through all the watchers, wrap their processes, and add them to the unstarted processes set
for (const watcher of allWatchers) {
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startWebsocketProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startQueryProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startGapProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
}
//If a process ends, reenqueue it into the unstarted processes set
const reenqueueCallback = (processId: number) => {
unstartedProcesses.add(processId);
};
//Every 5 seconds, try to start any unstarted processes
while (true) {
for (const processId of unstartedProcesses) {
const process = allProcesses.get(processId);
if (process) {
unstartedProcesses.delete(processId);
process()
.then((processId) => {
reenqueueCallback(processId);
})
.catch((e) => {
reenqueueCallback(processId);
});
} else {
//should never happen
console.error(`Process ${processId} not found`);
}
}
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
function wrapProcessWithTracker(
processId: number,
process: () => Promise<void>
): () => Promise<number> {
return () => {
return process()
.then(() => {
console.log(`Process ${processId} exited via promise resolution`);
return processId;
})
.catch((e) => {
console.error(`Process ${processId} exited via promise rejection`);
console.error(e);
return processId;
});
};
}
//run should never stop, unless an unexpected fatal error occurs
run()
.then(() => {
console.log("run() finished");
})
.catch((e) => {
console.error(e);
console.error("Fatal error caused process to exit");
});

View File

@ -3,13 +3,16 @@ import { EventHandler } from "../handlers/EventHandler";
export default abstract class AbstractWatcher {
//store class fields from constructor
watcherName: string;
environment: Network;
events: EventHandler<any>[];
chain: ChainId;
rpcs: string[];
logger: any;
public watcherName: string;
public environment: Network;
public events: EventHandler<any>[];
public chain: ChainId;
public rpcs: string[];
public logger: any;
//TODO add persistence module(s) as class fields
//or, alternatively, pull necessary config from the persistence module here
//TODO resumeBlock is needed for the query processor
constructor(
watcherName: string,
environment: Network,