unique run per job support

This commit is contained in:
matias martinez 2023-12-07 15:33:50 -03:00
parent 43295e3ee4
commit d12e805d30
16 changed files with 344 additions and 63 deletions

View File

@ -30,5 +30,7 @@ jobs:
run: npm run build
working-directory: ./blockchain-watcher
- name: Run tests
env:
LOG_LEVEL: off
run: npm run test:coverage
working-directory: ./blockchain-watcher

View File

@ -0,0 +1 @@
{}

View File

@ -78,7 +78,7 @@
"coverageDirectory": "./coverage",
"coverageThreshold": {
"global": {
"lines": 70.85
"lines": 72.01
}
}
}

View File

@ -1,9 +1,9 @@
import { setTimeout } from "timers/promises";
import winston from "winston";
import { Handler } from "../entities";
import { Handler, Runnable } from "../entities";
import { StatRepository } from "../repositories";
export abstract class RunPollingJob {
export abstract class RunPollingJob implements Runnable {
private interval: number;
private id: string;
private statRepo?: StatRepository;

View File

@ -1,41 +0,0 @@
import winston from "winston";
import { JobDefinition } from "../entities";
import { JobRepository } from "../repositories";
export class StartJobs {
private readonly logger = winston.child({ module: "StartJobs" });
private readonly repo: JobRepository;
private runnables: Map<string, () => Promise<void>> = new Map();
constructor(repo: JobRepository) {
this.repo = repo;
}
public async runSingle(job: JobDefinition): Promise<JobDefinition> {
if (this.runnables.has(job.id)) {
throw new Error(`Job ${job.id} already exists. Ids must be unique`);
}
const handlers = await this.repo.getHandlers(job);
if (handlers.length === 0) {
this.logger.error(`[runSingle] No handlers for job ${job.id}`);
throw new Error("No handlers for job");
}
const source = this.repo.getSource(job);
this.runnables.set(job.id, () => source.run(handlers));
this.runnables.get(job.id)!();
return job;
}
public async run(): Promise<JobDefinition[]> {
const jobs = await this.repo.getJobDefinitions();
for (const job of jobs) {
await this.runSingle(job);
}
return jobs;
}
}

View File

@ -4,4 +4,4 @@ export * from "./evm/PollEvmLogs";
export * from "./solana/GetSolanaTransactions";
export * from "./solana/PollSolanaTransactions";
export * from "./RunPollingJob";
export * from "./StartJobs";
export * from "./jobs/StartJobs";

View File

@ -0,0 +1,75 @@
import winston from "winston";
import { Handler, JobDefinition, JobExecution, Runnable } from "../../entities";
import { JobExecutionRepository, JobRepository } from "../../repositories";
export class StartJobs {
private readonly logger = winston.child({ module: "StartJobs" });
private readonly jobRepository: JobRepository;
private readonly jobExecutionRepository: JobExecutionRepository;
private runnables: Map<string, () => Promise<void>> = new Map();
constructor(repo: JobRepository, jobExecutionRepository: JobExecutionRepository) {
this.jobRepository = repo;
this.jobExecutionRepository = jobExecutionRepository;
}
public async run(): Promise<JobExecution[]> {
const jobs = await this.jobRepository.getJobs(); // TODO: probably should limit by a config number to not fill each pod
const running: JobExecution[] = [];
for (const job of jobs) {
try {
if (job.paused) {
if (this.runnables.has(job.id)) {
await this.runnables.get(job.id)?.();
this.runnables.delete(job.id);
}
this.logger.info(`[run] Job ${job.id} is paused, skipping`);
continue;
}
const maybeJobexecution = await this.tryJobExecution(job);
running.push(maybeJobexecution);
} catch (error) {
this.logger.warn(`[run] Error starting job ${job.id}: ${error}`);
}
}
return running;
}
private async trackExecution(
job: JobDefinition,
handlers: Handler[],
runnable: Runnable
): Promise<JobExecution> {
const jobExec = await this.jobExecutionRepository.start(job);
const innerFn = () => {
runnable
.run(handlers)
.then(() => this.jobExecutionRepository.stop(jobExec))
.catch(async (error) => {
this.logger.error(`[trackExecution] Error running job ${jobExec.job.id}: ${error}`);
if (!(error instanceof Error)) {
error = new Error(error);
}
await this.jobExecutionRepository.stop(jobExec, error);
});
return runnable.stop;
};
this.runnables.set(job.id, innerFn());
return jobExec;
}
private async tryJobExecution(job: JobDefinition): Promise<JobExecution> {
const handlers = await this.jobRepository.getHandlers(job);
if (handlers.length === 0) {
this.logger.error(`[run] No handlers for job ${job.id}`);
throw new Error("No handlers for job");
}
const runnable = this.jobRepository.getRunnableJob(job);
return this.trackExecution(job, handlers, runnable);
}
}

View File

@ -1,6 +1,8 @@
export class JobDefinition {
id: string;
name?: string;
chain: string;
paused?: boolean = false;
source: {
action: string;
config: Record<string, any>;
@ -16,9 +18,11 @@ export class JobDefinition {
id: string,
chain: string,
source: { action: string; config: Record<string, any> },
handlers: { action: string; target: string; mapper: string; config: Record<string, any> }[]
handlers: { action: string; target: string; mapper: string; config: Record<string, any> }[],
name?: string
) {
this.id = id;
this.name = name ?? id;
this.chain = chain;
this.source = source;
this.handlers = handlers;
@ -26,3 +30,17 @@ export class JobDefinition {
}
export type Handler = (items: any[]) => Promise<any>;
export interface Runnable {
run(handlers: Handler[]): Promise<void>;
stop(): Promise<void>;
}
export type JobExecution = {
id: string;
job: JobDefinition;
status: string;
error?: Error;
startedAt: Date;
finishedAt?: Date;
};

View File

@ -1,5 +1,14 @@
import { RunPollingJob } from "./actions/RunPollingJob";
import { EvmBlock, EvmLog, EvmLogFilter, Handler, JobDefinition, solana } from "./entities";
import {
EvmBlock,
EvmLog,
EvmLogFilter,
Handler,
JobDefinition,
JobExecution,
Runnable,
solana,
} from "./entities";
import { ConfirmedSignatureInfo } from "./entities/solana";
import { Fallible, SolanaFailure } from "./errors";
@ -34,7 +43,12 @@ export interface StatRepository {
}
export interface JobRepository {
getJobDefinitions(): Promise<JobDefinition[]>;
getSource(jobDef: JobDefinition): RunPollingJob;
getJobs(): Promise<JobDefinition[]>;
getRunnableJob(jobDef: JobDefinition): Runnable;
getHandlers(jobDef: JobDefinition): Promise<Handler[]>;
}
export interface JobExecutionRepository {
start(job: JobDefinition): Promise<JobExecution>;
stop(jobExec: JobExecution, error?: Error): Promise<JobExecution>;
}

View File

@ -13,7 +13,8 @@ import {
BscEvmJsonRPCBlockRepository,
} from ".";
import { HttpClient } from "../rpc/http/HttpClient";
import { JobRepository } from "../../domain/repositories";
import { JobExecutionRepository, JobRepository } from "../../domain/repositories";
import { InMemoryJobExecutionRepository } from "./jobs/InMemoryJobExecutionRepository";
const SOLANA_CHAIN = "solana";
const EVM_CHAIN = "evm";
@ -72,6 +73,7 @@ export class RepositoriesBuilder {
}
});
this.repositories.set("job-executions", new InMemoryJobExecutionRepository()); // TODO: make this configurable, as in choose implementation
this.repositories.set(
"jobs",
new StaticJobRepository(
@ -110,6 +112,10 @@ export class RepositoriesBuilder {
return this.getRepo("jobs");
}
public getJobExecutionRepository(): JobExecutionRepository {
return this.getRepo("job-executions");
}
public getSolanaSlotRepository(): Web3SolanaSlotRepository {
return this.getRepo("solana-slotRepo");
}

View File

@ -12,6 +12,6 @@ export * from "./SnsEventRepository";
export * from "./evm/EvmJsonRPCBlockRepository";
export * from "./evm/BscEvmJsonRPCBlockRepository";
export * from "./PromStatRepository";
export * from "./StaticJobRepository";
export * from "./jobs/StaticJobRepository";
export * from "./solana/Web3SolanaSlotRepository";
export * from "./solana/RateLimitedSolanaSlotRepository";

View File

@ -0,0 +1,29 @@
import { JobDefinition, JobExecution } from "../../../domain/entities";
import { JobExecutionRepository } from "../../../domain/repositories";
export class InMemoryJobExecutionRepository implements JobExecutionRepository {
private executions: Map<string, JobExecution> = new Map();
async start(job: JobDefinition): Promise<JobExecution> {
if (this.executions.has(job.id)) {
throw new Error(`Job ${job.id} already running`);
}
const execution = { id: job.id, job, status: "running", startedAt: new Date() };
this.executions.set(job.id, execution);
return execution;
}
async stop(jobExec: JobExecution, error?: Error): Promise<JobExecution> {
const execution = this.executions.get(jobExec.job.id);
if (!execution) {
throw new Error(`No execution for job ${jobExec.job.id}`);
}
execution.status = "stopped";
execution.error = error;
execution.finishedAt = new Date();
return execution;
}
}

View File

@ -6,19 +6,19 @@ import {
PollSolanaTransactions,
PollSolanaTransactionsConfig,
RunPollingJob,
} from "../../domain/actions";
import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities";
} from "../../../domain/actions";
import { JobDefinition, Handler, LogFoundEvent } from "../../../domain/entities";
import {
EvmBlockRepository,
JobRepository,
MetadataRepository,
SolanaSlotRepository,
StatRepository,
} from "../../domain/repositories";
import { FileMetadataRepository, SnsEventRepository } from "./index";
import { HandleSolanaTransactions } from "../../domain/actions/solana/HandleSolanaTransactions";
import { solanaLogMessagePublishedMapper, evmLogMessagePublishedMapper } from "../mappers";
import log from "../log";
} from "../../../domain/repositories";
import { FileMetadataRepository, SnsEventRepository } from "../index";
import { HandleSolanaTransactions } from "../../../domain/actions/solana/HandleSolanaTransactions";
import { solanaLogMessagePublishedMapper, evmLogMessagePublishedMapper } from "../../mappers";
import log from "../../log";
export class StaticJobRepository implements JobRepository {
private fileRepo: FileMetadataRepository;
@ -55,7 +55,7 @@ export class StaticJobRepository implements JobRepository {
this.fill();
}
async getJobDefinitions(): Promise<JobDefinition[]> {
async getJobs(): Promise<JobDefinition[]> {
const persisted = await this.fileRepo.get("jobs");
if (!persisted) {
return Promise.resolve([]);
@ -64,7 +64,7 @@ export class StaticJobRepository implements JobRepository {
return persisted;
}
getSource(jobDef: JobDefinition): RunPollingJob {
getRunnableJob(jobDef: JobDefinition): RunPollingJob {
const src = this.sources.get(jobDef.source.action);
if (!src) {
throw new Error(`Source ${jobDef.source.action} not found`);

View File

@ -12,7 +12,7 @@ async function run(): Promise<void> {
log.info(`Starting: dryRunEnabled -> ${configuration.dryRun}`);
repos = new RepositoriesBuilder(configuration);
const startJobs = new StartJobs(repos.getJobsRepository());
const startJobs = new StartJobs(repos.getJobsRepository(), repos.getJobExecutionRepository());
await startServer(repos);
await startJobs.run();

View File

@ -0,0 +1,177 @@
import { describe, it, expect, beforeEach } from "@jest/globals";
import { StartJobs } from "../../../../src/domain/actions/jobs/StartJobs";
import { JobDefinition, JobExecution } from "../../../../src/domain/entities";
import { JobExecutionRepository, JobRepository } from "../../../../src/domain/repositories";
let startJobs: StartJobs | undefined;
let jobRepository: JobRepository;
let jobExecutionRepository: JobExecutionRepository;
describe("StartJobs", () => {
beforeEach(() => {
startJobs = undefined;
});
describe("run", () => {
it("should run a single job and return the job execution", async () => {
const job = createJobDefinitionExample();
givenJobsPresent([job]);
givenNoJobsExecutionPresent([job]);
const jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toBeDefined();
expect(jobExecutions).toHaveLength(1);
});
it("should run a job iff no other execution is present", async () => {
const job = createJobDefinitionExample();
givenJobsPresent([job]);
givenNoJobsExecutionPresent([job]);
const jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toBeDefined();
expect(jobExecutions).toHaveLength(1);
givenJobExecutionsPresent([job]);
givenStartJobsAction();
const nextJobExecutions = await whenStartJobsIsCalled();
expect(nextJobExecutions).toBeDefined();
expect(nextJobExecutions).toHaveLength(0);
});
it("should run jobs with no current execs and ignore the ones running or paused", async () => {
const jobs = ["job-1", "job-2", "job-3", "job-4"].map(createJobDefinitionExample);
const [firstJob, secondJob, thirdJob] = jobs;
thirdJob.paused = true;
givenJobsPresent(jobs);
givenNoJobsExecutionPresent([firstJob, secondJob]);
const jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toHaveLength(2);
});
it("should stop paused jobs if running", async () => {
const job = createJobDefinitionExample();
givenJobsPresent([job]);
givenNoJobsExecutionPresent([job]);
givenStartJobsAction();
let jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toHaveLength(1);
job.paused = true;
givenJobsPresent([job]);
givenJobExecutionsPresent([job]);
jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toHaveLength(0);
});
it("should stop failing jobs", async () => {
const job = createJobDefinitionExample();
givenJobsPresent([job], false);
givenNoJobsExecutionPresent([job]);
givenStartJobsAction();
let jobExecutions = await whenStartJobsIsCalled();
expect(jobExecutions).toHaveLength(1);
jobExecutions = await whenStartJobsIsCalled();
// Should be present again, as it has
expect(jobExecutions).toHaveLength(1);
});
});
});
const givenJobsPresent = (jobs: JobDefinition[], runWorks: boolean = true) => {
jobRepository = {
getJobs: () => Promise.resolve(jobs),
getRunnableJob: () => {
const runnable = {
run: () => (runWorks ? Promise.resolve() : Promise.reject(new Error("Error running job"))),
stop: () => Promise.resolve(),
};
return runnable;
},
getHandlers: () => Promise.resolve([() => Promise.resolve()]),
};
};
const createJobDefinitionExample = (id: string = "job-1") => {
return {
id,
name: "Test Job" + id,
chain: "ethereum",
source: {
action: "test",
config: {},
},
handlers: [
{
action: "test",
target: "dummy",
mapper: "test",
config: {},
},
],
} as JobDefinition;
};
const givenJobExecutionsPresent = (jobs: JobDefinition[]) => {
jobExecutionRepository = {
start: (job: JobDefinition) => {
if (jobs.includes(job)) {
return Promise.reject(new Error("Job already running"));
}
return Promise.resolve({ id: job.id, job, status: "running", startedAt: new Date() });
},
stop: (jobExec: JobExecution) => {
if (jobs.includes(jobExec.job)) {
return Promise.reject(new Error("Job not running"));
}
return Promise.resolve({
...jobExec,
status: "stopped",
startedAt: new Date(),
finishedAt: new Date(),
});
},
};
};
const givenNoJobsExecutionPresent = (jobs: JobDefinition[]) => {
jobExecutionRepository = {
start: (job: JobDefinition) => {
if (jobs.includes(job)) {
return Promise.resolve({ id: job.id, job, status: "running", startedAt: new Date() });
}
return Promise.reject(new Error("Job already running"));
},
stop: (jobExec: JobExecution, error?: Error) => {
if (jobs.includes(jobExec.job)) {
return Promise.resolve({
...jobExec,
status: "stopped",
startedAt: new Date(),
finishedAt: new Date(),
});
}
return Promise.reject(new Error("Job not running"));
},
};
};
const givenStartJobsAction = () => {
startJobs = new StartJobs(jobRepository, jobExecutionRepository);
};
const whenStartJobsIsCalled = () => {
if (!startJobs) {
givenStartJobsAction();
}
return startJobs?.run();
};

View File

@ -31,13 +31,13 @@ describe("StaticJobRepository", () => {
});
it("should return empty when no file available", async () => {
const jobs = await repo.getJobDefinitions();
const jobs = await repo.getJobs();
expect(jobs).toHaveLength(0);
});
it("should read jobs from file", async () => {
givenJobsPresent();
const jobs = await repo.getJobDefinitions();
const jobs = await repo.getJobs();
expect(jobs).toHaveLength(1);
expect(jobs[0].id).toEqual("poll-log-message-published-ethereum");
});