Load jobs dinamically

This commit is contained in:
matias martinez 2023-11-14 14:03:11 -03:00
parent 6c74e94a66
commit 609da135cd
17 changed files with 333 additions and 82 deletions

View File

@ -13,6 +13,9 @@
"metadata": {
"dir": "metadata-repo"
},
"jobs": {
"dir": "metadata-repo/jobs"
},
"platforms": {
"ethereum": {
"name": "ethereum",

View File

@ -111,7 +111,7 @@ export class PollEvmLogs extends RunPollingJob {
fromBlock = this.cfg.fromBlock;
}
let toBlock = fromBlock + BigInt(this.cfg.getBlockBatchSize());
let toBlock = BigInt(fromBlock) + BigInt(this.cfg.getBlockBatchSize());
// limit toBlock to obtained block height
if (toBlock > fromBlock && toBlock > latestBlockHeight) {
toBlock = latestBlockHeight;
@ -180,7 +180,7 @@ export class PollEvmLogsConfig {
}
public get fromBlock() {
return this.props.fromBlock;
return this.props.fromBlock ? BigInt(this.props.fromBlock) : undefined;
}
public setFromBlock(fromBlock: bigint | undefined) {

View File

@ -1,5 +1,6 @@
import { setTimeout } from "timers/promises";
import winston from "winston";
import { Handler } from "../entities";
export abstract class RunPollingJob {
private interval: number;
@ -15,7 +16,7 @@ export abstract class RunPollingJob {
this.running = true;
}
public async run(handlers: ((items: any[]) => Promise<any>)[]): Promise<void> {
public async run(handlers: Handler[]): Promise<void> {
this.logger.info("Starting polling job");
await this.preHook();
while (this.running) {

View File

@ -0,0 +1,41 @@
import winston from "winston";
import { JobDefinition } from "../entities";
import { JobRepository } from "../repositories";
export class StartJobs {
private readonly logger = winston.child({ module: "StartJobs" });
private readonly repo: JobRepository;
private runnables: Map<string, () => Promise<void>> = new Map();
constructor(repo: JobRepository) {
this.repo = repo;
}
public async runSingle(job: JobDefinition): Promise<JobDefinition> {
if (this.runnables.has(job.id)) {
throw new Error(`Job ${job.id} already exists. Ids must be unique`);
}
const handlers = await this.repo.getHandlers(job);
if (handlers.length === 0) {
this.logger.error(`No handlers for job ${job.id}`);
throw new Error("No handlers for job");
}
const source = this.repo.getSource(job);
this.runnables.set(job.id, () => source.run(handlers));
this.runnables.get(job.id)!();
return job;
}
public async run(): Promise<JobDefinition[]> {
const jobs = await this.repo.getJobDefinitions();
for (const job of jobs) {
await this.runSingle(job);
}
return jobs;
}
}

View File

@ -1,2 +1,4 @@
export * from "./HandleEvmLogs";
export * from "./PollEvmLogs";
export * from "./RunPollingJob";
export * from "./StartJobs";

View File

@ -1,3 +1,3 @@
export * from "./evm";
export * from "./jobs";
export * from "./events";
export * from "./jobs";

View File

@ -1,4 +1,28 @@
export interface Source {
get(): Promise<any[]>;
hasNext(): Promise<boolean>;
export class JobDefinition {
id: string;
chain: string;
source: {
action: string;
config: Record<string, any>;
};
handlers: {
action: string;
target: string;
mapper: string;
config: Record<string, any>;
}[];
constructor(
id: string,
chain: string,
source: { action: string; config: Record<string, any> },
handlers: { action: string; target: string; mapper: string; config: Record<string, any> }[]
) {
this.id = id;
this.chain = chain;
this.source = source;
this.handlers = handlers;
}
}
export type Handler = (items: any[]) => Promise<any>;

View File

@ -1,4 +1,5 @@
import { EvmBlock, EvmLog, EvmLogFilter } from "./entities";
import { RunPollingJob } from "./actions/RunPollingJob";
import { EvmBlock, EvmLog, EvmLogFilter, Handler, JobDefinition } from "./entities";
export interface EvmBlockRepository {
getBlockHeight(finality: string): Promise<bigint>;
@ -16,3 +17,9 @@ export interface StatRepository {
measure(id: string, value: bigint, labels: Record<string, any>): void;
report: () => Promise<string>;
}
export interface JobRepository {
getJobDefinitions(): Promise<JobDefinition[]>;
getSource(jobDef: JobDefinition): RunPollingJob;
getHandlers(jobDef: JobDefinition): Promise<Handler[]>;
}

View File

@ -6,9 +6,11 @@ import {
EvmJsonRPCBlockRepositoryCfg,
FileMetadataRepo,
PromStatRepository,
StaticJobRepository,
} from "./repositories";
import { HttpClient } from "./repositories/HttpClient";
import { JobRepository } from "../domain/repositories";
export class RepositoriesBuilder {
private cfg: Config;
@ -38,6 +40,20 @@ export class RepositoriesBuilder {
};
this.repositories.set(`${chain}-evmRepo`, new EvmJsonRPCBlockRepository(repoCfg, httpClient));
});
this.repositories.set(
"jobs",
new StaticJobRepository(
this.cfg.jobs.dir,
this.cfg.dryRun,
(chain: string) => this.getEvmBlockRepository(chain),
{
metadataRepo: this.getMetadataRepository(),
statsRepo: this.getStatsRepository(),
snsRepo: this.getSnsEventRepository(),
}
)
);
}
public getEvmBlockRepository(chain: string): EvmJsonRPCBlockRepository {
@ -68,6 +84,13 @@ export class RepositoriesBuilder {
return repo;
}
public getJobsRepository(): JobRepository {
const repo = this.repositories.get("jobs");
if (!repo) throw new Error(`No JobRepository`);
return repo;
}
public close(): void {
this.snsClient?.destroy();
}

View File

@ -10,6 +10,9 @@ export type Config = {
metadata?: {
dir: string;
};
jobs: {
dir: string;
};
platforms: Record<string, PlatformConfig>;
supportedChains: string[];
};
@ -36,6 +39,9 @@ export const configuration = {
metadata: {
dir: config.get<string>("metadata.dir"),
},
jobs: {
dir: config.get<string>("jobs.dir"),
},
platforms: config.get<Record<string, PlatformConfig>>("platforms"),
supportedChains: config.get<string[]>("supportedChains"),
} as Config;

View File

@ -6,13 +6,13 @@ export class FileMetadataRepo implements MetadataRepository<any> {
constructor(dirPath: string) {
this.dirPath = dirPath;
if (!fs.existsSync(this.dirPath)) {
fs.mkdirSync(this.dirPath, { recursive: true });
}
}
async get(id: string): Promise<any> {
const filePath = `${this.dirPath}/${id}.json`;
if (!fs.existsSync(this.dirPath)) {
fs.mkdirSync(this.dirPath);
}
return fs.promises
.readFile(filePath, "utf8")

View File

@ -86,6 +86,17 @@ export class SnsEventRepository {
status: "success",
};
}
async asTarget(): Promise<(events: LogFoundEvent<any>[]) => Promise<void>> {
return async (events: LogFoundEvent<any>[]) => {
const result = await this.publish(events);
if (result.status === "error") {
this.logger.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`);
throw new Error(`Error publishing events to SNS: ${result.reason}`);
}
this.logger.info(`Published ${events.length} events to SNS`);
};
}
}
export class SnsEvent {

View File

@ -0,0 +1,120 @@
import {
HandleEvmLogs,
PollEvmLogs,
PollEvmLogsConfig,
PollEvmLogsConfigProps,
RunPollingJob,
} from "../../domain/actions";
import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities";
import {
EvmBlockRepository,
JobRepository,
MetadataRepository,
StatRepository,
} from "../../domain/repositories";
import { FileMetadataRepo, SnsEventRepository } from "./index";
import { evmLogMessagePublishedMapper } from "../mappers/evmLogMessagePublishedMapper";
import log from "../log";
export class StaticJobRepository implements JobRepository {
private fileRepo: FileMetadataRepo;
private dryRun: boolean = false;
private sources: Map<string, (def: JobDefinition) => RunPollingJob> = new Map();
private handlers: Map<string, (cfg: any, target: string, mapper: any) => Promise<Handler>> =
new Map();
private mappers: Map<string, any> = new Map();
private targets: Map<string, () => Promise<(items: any[]) => Promise<void>>> = new Map();
private blockRepoProvider: (chain: string) => EvmBlockRepository;
private metadataRepo: MetadataRepository<any>;
private statsRepo: StatRepository;
private snsRepo: SnsEventRepository;
constructor(
path: string,
dryRun: boolean,
blockRepoProvider: (chain: string) => EvmBlockRepository,
repos: {
metadataRepo: MetadataRepository<any>;
statsRepo: StatRepository;
snsRepo: SnsEventRepository;
}
) {
this.fileRepo = new FileMetadataRepo(path);
this.blockRepoProvider = blockRepoProvider;
this.metadataRepo = repos.metadataRepo;
this.statsRepo = repos.statsRepo;
this.snsRepo = repos.snsRepo;
this.dryRun = dryRun;
this.fill();
}
async getJobDefinitions(): Promise<JobDefinition[]> {
const persisted = await this.fileRepo.get("jobs");
if (!persisted) {
return Promise.resolve([]);
}
return persisted;
}
getSource(jobDef: JobDefinition): RunPollingJob {
const src = this.sources.get(jobDef.source.action);
if (!src) {
throw new Error(`Source ${jobDef.source.action} not found`);
}
return src(jobDef);
}
async getHandlers(jobDef: JobDefinition): Promise<Handler[]> {
const result: Handler[] = [];
for (const handler of jobDef.handlers) {
const maybeHandler = this.handlers.get(handler.action);
if (!maybeHandler) {
throw new Error(`Handler ${handler.action} not found`);
}
const mapper = this.mappers.get(handler.mapper);
if (!mapper) {
throw new Error(`Handler ${handler.action} not found`);
}
result.push((await maybeHandler(handler.config, handler.target, mapper)).bind(maybeHandler));
}
return result;
}
private fill() {
const pollEvmLogs = (jobDef: JobDefinition) =>
new PollEvmLogs(
this.blockRepoProvider(jobDef.source.config.chain),
this.metadataRepo,
this.statsRepo,
new PollEvmLogsConfig({
...(jobDef.source.config as PollEvmLogsConfigProps),
id: jobDef.id,
})
);
this.sources.set("PollEvmLogs", pollEvmLogs);
this.mappers.set("evmLogMessagePublishedMapper", evmLogMessagePublishedMapper);
const snsTarget = () => this.snsRepo.asTarget();
const dummyTarget = async () => async (events: any[]) => {
log.info(`Got ${events.length} events`);
};
this.targets.set("sns", snsTarget);
this.targets.set("dummy", dummyTarget);
const handleEvmLogs = async (config: any, target: string, mapper: any) => {
const instance = new HandleEvmLogs<LogFoundEvent<any>>(
config,
mapper,
await this.targets.get(this.dryRun ? "dummy" : target)!()
);
return instance.handle.bind(instance);
};
this.handlers.set("HandleEvmLogs", handleEvmLogs);
}
}

View File

@ -11,3 +11,4 @@ export * from "./FileMetadataRepo";
export * from "./SnsEventRepository";
export * from "./EvmJsonRPCBlockRepository";
export * from "./PromStatRepository";
export * from "./StaticJobRepository";

View File

@ -30,7 +30,7 @@ export class WebServer {
start() {
this.server.listen(this.port, () => {
log.info(`Server started on port 8080`);
log.info(`Server started on port ${this.port}`);
});
}

View File

@ -1,11 +1,9 @@
import { PollEvmLogs, PollEvmLogsConfig, HandleEvmLogs } from "./domain/actions";
import { LogFoundEvent } from "./domain/entities";
import { configuration } from "./infrastructure/config";
import { evmLogMessagePublishedMapper } from "./infrastructure/mappers/evmLogMessagePublishedMapper";
import { RepositoriesBuilder } from "./infrastructure/RepositoriesBuilder";
import log from "./infrastructure/log";
import { WebServer } from "./infrastructure/rpc/Server";
import { HealthController } from "./infrastructure/rpc/HealthController";
import { StartJobs } from "./domain/actions";
let repos: RepositoriesBuilder;
let server: WebServer;
@ -14,9 +12,10 @@ async function run(): Promise<void> {
log.info(`Starting: dryRunEnabled -> ${configuration.dryRun}`);
repos = new RepositoriesBuilder(configuration);
const startJobs = new StartJobs(repos.getJobsRepository());
await startServer(repos);
await startJobs(repos);
await startServer(repos, startJobs);
await startJobs.run();
// Just keep this running until killed
setInterval(() => {
@ -30,74 +29,10 @@ async function run(): Promise<void> {
process.on("SIGTERM", handleShutdown);
}
const startServer = async (repos: RepositoriesBuilder) => {
const startServer = async (repos: RepositoriesBuilder, startJobs: StartJobs) => {
server = new WebServer(configuration.port, new HealthController(repos.getStatsRepository()));
};
const startJobs = async (repos: RepositoriesBuilder) => {
/** Job definition is hardcoded, but should be loaded from cfg or a data store soon enough */
const jobs = [
{
id: "poll-log-message-published-ethereum",
chain: "ethereum",
source: {
action: "PollEvmLogs",
config: {
fromBlock: 10012499n,
blockBatchSize: 100,
commitment: "latest",
interval: 15_000,
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
chain: "ethereum",
topics: [],
},
},
handlers: [
{
action: "HandleEvmLogs",
target: "sns",
mapper: "evmLogMessagePublishedMapper",
config: {
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
filter: {
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"],
},
},
},
],
},
];
const pollEvmLogs = new PollEvmLogs(
repos.getEvmBlockRepository("ethereum"),
repos.getMetadataRepository(),
repos.getStatsRepository(),
new PollEvmLogsConfig({ ...jobs[0].source.config, id: jobs[0].id })
);
const snsTarget = async (events: LogFoundEvent<any>[]) => {
const result = await repos.getSnsEventRepository().publish(events);
if (result.status === "error") {
log.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`);
throw new Error(`Error publishing events to SNS: ${result.reason}`);
}
log.info(`Published ${events.length} events to SNS`);
};
const handleEvmLogs = new HandleEvmLogs<LogFoundEvent<any>>(
jobs[0].handlers[0].config,
evmLogMessagePublishedMapper,
configuration.dryRun
? async (events) => {
log.info(`Got ${events.length} events`);
}
: snsTarget
);
pollEvmLogs.run([handleEvmLogs.handle.bind(handleEvmLogs)]);
};
const handleShutdown = async () => {
try {
await Promise.allSettled([repos.close(), server.stop()]);

View File

@ -0,0 +1,77 @@
import { beforeEach, describe, expect, it } from "@jest/globals";
import fs from "fs";
import { SnsEventRepository, StaticJobRepository } from "../../../src/infrastructure/repositories";
import {
EvmBlockRepository,
MetadataRepository,
StatRepository,
} from "../../../src/domain/repositories";
const dirPath = "./metadata-repo/jobs";
const blockRepo: EvmBlockRepository = {} as any as EvmBlockRepository;
const metadataRepo = {} as MetadataRepository<any>;
const statsRepo = {} as any as StatRepository;
const snsRepo = {} as any as SnsEventRepository;
let repo: StaticJobRepository;
describe("StaticJobRepository", () => {
beforeEach(() => {
if (fs.existsSync(dirPath)) {
fs.rmSync(dirPath, { recursive: true, force: true });
}
repo = new StaticJobRepository(dirPath, false, () => blockRepo, {
metadataRepo,
statsRepo,
snsRepo,
});
});
it("should return empty when no file available", async () => {
const jobs = await repo.getJobDefinitions();
expect(jobs).toHaveLength(0);
});
it("should read jobs from file", async () => {
givenJobsPresent();
const jobs = await repo.getJobDefinitions();
expect(jobs).toHaveLength(1);
expect(jobs[0].id).toEqual("poll-log-message-published-ethereum");
});
});
const givenJobsPresent = () => {
const jobs = [
{
id: "poll-log-message-published-ethereum",
chain: "ethereum",
source: {
action: "PollEvmLogs",
config: {
fromBlock: 10012499n,
blockBatchSize: 100,
commitment: "latest",
interval: 15_000,
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
chain: "ethereum",
topics: [],
},
},
handlers: [
{
action: "HandleEvmLogs",
target: "sns",
mapper: "evmLogMessagePublishedMapper",
config: {
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
filter: {
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"],
},
},
},
],
},
];
fs.writeFileSync(dirPath + "/jobs.json", JSON.stringify(jobs));
};