From 2cb071d9fbde687dccb6a516ca7fd9cfa59fb01b Mon Sep 17 00:00:00 2001 From: matias martinez Date: Wed, 13 Dec 2023 12:28:10 -0300 Subject: [PATCH] pr updates --- .../src/domain/actions/jobs/StartJobs.ts | 29 +++++++++---------- .../repositories/RepositoriesBuilder.ts | 2 +- .../repositories/jobs/StaticJobRepository.ts | 2 +- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/blockchain-watcher/src/domain/actions/jobs/StartJobs.ts b/blockchain-watcher/src/domain/actions/jobs/StartJobs.ts index cab994ff..011ffb5b 100644 --- a/blockchain-watcher/src/domain/actions/jobs/StartJobs.ts +++ b/blockchain-watcher/src/domain/actions/jobs/StartJobs.ts @@ -27,9 +27,6 @@ export class StartJobs { for (const job of jobs) { if (!this.hasCapacity()) { - this.logger.info( - `[run] Max concurrent jobs reached (${this.options.maxConcurrentJobs}), stopping` - ); break; } @@ -56,7 +53,7 @@ export class StartJobs { } private getCurrentExecutions(): JobExecution[] { - return Array.from(this.runnables.values()).map((r) => r.exec); + return Array.from(this.runnables.values()).map((runner) => runner.exec); } private hasCapacity(): boolean { @@ -69,6 +66,18 @@ export class StartJobs { return available; } + private async tryJobExecution(job: JobDefinition): Promise { + const handlers = await this.jobRepository.getHandlers(job); + if (handlers.length === 0) { + this.logger.error(`[run] No handlers for job ${job.id}`); + throw new Error("No handlers for job"); + } + + const runnable = this.jobRepository.getRunnableJob(job); + + return this.trackExecution(job, handlers, runnable); + } + private async trackExecution( job: JobDefinition, handlers: Handler[], @@ -93,16 +102,4 @@ export class StartJobs { return jobExec; } - - private async tryJobExecution(job: JobDefinition): Promise { - const handlers = await this.jobRepository.getHandlers(job); - if (handlers.length === 0) { - this.logger.error(`[run] No handlers for job ${job.id}`); - throw new Error("No handlers for job"); - } - - const runnable = this.jobRepository.getRunnableJob(job); - - return this.trackExecution(job, handlers, runnable); - } } diff --git a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts index f14ef18a..ea711213 100644 --- a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts +++ b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts @@ -16,7 +16,7 @@ import { PostgresJobExecutionRepository, InMemoryJobExecutionRepository, ArbitrumEvmJsonRPCBlockRepository, -} from "./"; +} from "."; import { HttpClient } from "../rpc/http/HttpClient"; import { JobExecutionRepository, diff --git a/blockchain-watcher/src/infrastructure/repositories/jobs/StaticJobRepository.ts b/blockchain-watcher/src/infrastructure/repositories/jobs/StaticJobRepository.ts index 2bcd98b8..5f07a165 100644 --- a/blockchain-watcher/src/infrastructure/repositories/jobs/StaticJobRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/jobs/StaticJobRepository.ts @@ -16,7 +16,6 @@ import { SolanaSlotRepository, StatRepository, } from "../../../domain/repositories"; -import log from "../../log"; import { FileMetadataRepository, SnsEventRepository } from ".."; import { solanaLogMessagePublishedMapper, @@ -25,6 +24,7 @@ import { evmStandardRelayDelivered, evmTransferRedeemedMapper, } from "../../mappers"; +import log from "../../log"; export class StaticJobRepository implements JobRepository { private fileRepo: FileMetadataRepository;