From 827e34366fb653b6bb7bccd82b6467d3d05a3880 Mon Sep 17 00:00:00 2001 From: julian merlo Date: Thu, 15 Feb 2024 11:00:19 -0300 Subject: [PATCH] Divided evm tx in batchs --- .../domain/actions/evm/GetEvmTransactions.ts | 114 +++++++++++------- .../domain/actions/sui/PollSuiTransactions.ts | 35 +++--- .../repositories/common/utils.ts | 4 + .../evm/EvmJsonRPCBlockRepository.ts | 4 - 4 files changed, 91 insertions(+), 66 deletions(-) diff --git a/blockchain-watcher/src/domain/actions/evm/GetEvmTransactions.ts b/blockchain-watcher/src/domain/actions/evm/GetEvmTransactions.ts index 21de2e43..1401e00a 100644 --- a/blockchain-watcher/src/domain/actions/evm/GetEvmTransactions.ts +++ b/blockchain-watcher/src/domain/actions/evm/GetEvmTransactions.ts @@ -30,37 +30,61 @@ export class GetEvmTransactions { this.logger.info( `[${chain}][exec] Processing blocks [fromBlock: ${fromBlock} - toBlock: ${toBlock}]` ); - for (let block = fromBlock; block <= toBlock; block++) { - const evmBlock = await this.blockRepo.getBlock(chain, block, isTransactionsPresent); - const transactions = evmBlock.transactions ?? []; - // Only process transactions to the contract address configured - const transactionsByAddressConfigured = transactions.filter( - (transaction) => - opts.addresses?.includes(String(transaction.to).toLowerCase()) || - opts.addresses?.includes(String(transaction.from).toLowerCase()) - ); + let currentBlock = fromBlock; + const batchSize = 9; - if (transactionsByAddressConfigured.length > 0) { - const hashNumbers = new Set( - transactionsByAddressConfigured.map((transaction) => transaction.hash) - ); - const receiptTransactions = await this.blockRepo.getTransactionReceipt(chain, hashNumbers); + while (currentBlock <= toBlock) { + const batchPromises = []; - const filterTransactions = this.filterTransactions( - opts, - transactionsByAddressConfigured, - receiptTransactions - ); - - await this.populateTransaction( - opts, - evmBlock, - receiptTransactions, - filterTransactions, - populatedTransactions - ); + for (let i = 0; i < batchSize && currentBlock <= toBlock; i++, currentBlock++) { + // Push each getBlock call as a promise into the batchPromises array + batchPromises.push(this.blockRepo.getBlock(chain, currentBlock, isTransactionsPresent)); } + + const results = await Promise.allSettled(batchPromises); + + results.forEach(async (result, index) => { + if (result.status === "fulfilled") { + const evmBlock = result.value; + const transactions = evmBlock.transactions ?? []; + + // Only process transactions to the contract address configured + const transactionsByAddressConfigured = transactions.filter( + (transaction) => + opts.addresses?.includes(String(transaction.to).toLowerCase()) || + opts.addresses?.includes(String(transaction.from).toLowerCase()) + ); + + if (transactionsByAddressConfigured.length > 0) { + const hashNumbers = new Set( + transactionsByAddressConfigured.map((transaction) => transaction.hash) + ); + const receiptTransactions = await this.blockRepo.getTransactionReceipt( + chain, + hashNumbers + ); + + const filterTransactions = this.filterTransactions( + opts, + transactionsByAddressConfigured, + receiptTransactions + ); + + await this.populateTransaction( + opts, + evmBlock, + receiptTransactions, + filterTransactions, + populatedTransactions + ); + } + } else if (result.status === "rejected") { + this.logger.warn( + `[${chain}][exec] Invalid range [fromBlock: ${fromBlock} - toBlock: ${toBlock}]` + ); + } + }); } this.logger.info( @@ -71,24 +95,6 @@ export class GetEvmTransactions { return populatedTransactions; } - private async populateTransaction( - opts: GetEvmOpts, - evmBlock: EvmBlock, - receiptTransactions: Record, - filterTransactions: EvmTransaction[], - populatedTransactions: EvmTransaction[] - ) { - filterTransactions.forEach((transaction) => { - transaction.status = receiptTransactions[transaction.hash].status; - transaction.timestamp = evmBlock.timestamp; - transaction.environment = opts.environment; - transaction.chainId = opts.chainId; - transaction.chain = opts.chain; - transaction.logs = receiptTransactions[transaction.hash].logs; - populatedTransactions.push(transaction); - }); - } - /** * This method filter the transactions in base your logs with the topic and address configured in the job * For example: Redeemed or MintAndWithdraw transactions @@ -108,6 +114,24 @@ export class GetEvmTransactions { }); } + private async populateTransaction( + opts: GetEvmOpts, + evmBlock: EvmBlock, + receiptTransactions: Record, + filterTransactions: EvmTransaction[], + populatedTransactions: EvmTransaction[] + ) { + filterTransactions.forEach((transaction) => { + transaction.status = receiptTransactions[transaction.hash].status; + transaction.timestamp = evmBlock.timestamp; + transaction.environment = opts.environment; + transaction.chainId = opts.chainId; + transaction.chain = opts.chain; + transaction.logs = receiptTransactions[transaction.hash].logs; + populatedTransactions.push(transaction); + }); + } + private populateLog(opts: GetEvmOpts, fromBlock: bigint, toBlock: bigint): string { return `[addresses:${opts.addresses}][topics:${opts.topics}][blocks:${fromBlock} - ${toBlock}]`; } diff --git a/blockchain-watcher/src/domain/actions/sui/PollSuiTransactions.ts b/blockchain-watcher/src/domain/actions/sui/PollSuiTransactions.ts index 953b9707..25cf284f 100644 --- a/blockchain-watcher/src/domain/actions/sui/PollSuiTransactions.ts +++ b/blockchain-watcher/src/domain/actions/sui/PollSuiTransactions.ts @@ -33,6 +33,7 @@ export class PollSuiTransactions extends RunPollingJob { protected async preHook(): Promise { const metadata = await this.metadataRepo.get(this.cfg.id); if (metadata) { + this.lastCheckpoint = metadata.lastCursor?.checkpoint; this.cursor = metadata.lastCursor; } } @@ -48,23 +49,6 @@ export class PollSuiTransactions extends RunPollingJob { return true; } - protected report(): void { - const labels = { - job: this.cfg.id, - chain: "sui", - commitment: "immediate", - }; - this.statsRepo.count("job_execution", labels); - this.statsRepo.measure("polling_cursor", BigInt(this.lastCheckpoint ?? 0), { - ...labels, - type: "max", - }); - this.statsRepo.measure("polling_cursor", BigInt(this.cursor?.checkpoint ?? 0n), { - ...labels, - type: "current", - }); - } - protected async get(): Promise { this.cursor = await this.getCursor(); @@ -119,6 +103,23 @@ export class PollSuiTransactions extends RunPollingJob { await this.metadataRepo.save(this.cfg.id, { lastCursor: this.cursor }); } } + + protected report(): void { + const labels = { + job: this.cfg.id, + chain: "sui", + commitment: "immediate", + }; + this.statsRepo.count("job_execution", labels); + this.statsRepo.measure("polling_cursor", BigInt(this.cursor?.checkpoint ?? 0), { + ...labels, + type: "max", + }); + this.statsRepo.measure("polling_cursor", BigInt(this.lastCheckpoint ?? 0n), { + ...labels, + type: "current", + }); + } } export class PollSuiTransactionsConfig { diff --git a/blockchain-watcher/src/infrastructure/repositories/common/utils.ts b/blockchain-watcher/src/infrastructure/repositories/common/utils.ts index 78073568..0d0c22c0 100644 --- a/blockchain-watcher/src/infrastructure/repositories/common/utils.ts +++ b/blockchain-watcher/src/infrastructure/repositories/common/utils.ts @@ -1,3 +1,7 @@ +/** + * This method divide in batches the object to send, because we have one restriction about how many object send to the endpoint + * the maximum is 10 object per request + */ export function divideIntoBatches(set: Set, batchSize = 10): Set[] { const batches: Set[] = []; let batch: any[] = []; diff --git a/blockchain-watcher/src/infrastructure/repositories/evm/EvmJsonRPCBlockRepository.ts b/blockchain-watcher/src/infrastructure/repositories/evm/EvmJsonRPCBlockRepository.ts index 75ffcf23..39f22484 100644 --- a/blockchain-watcher/src/infrastructure/repositories/evm/EvmJsonRPCBlockRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/evm/EvmJsonRPCBlockRepository.ts @@ -250,10 +250,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository { let results: ResultTransactionReceipt[] = []; let id = 1; - /** - * This method divide in batches the object to send, because we have one restriction about how many object send to the endpoint - * the maximum is 10 object per request - */ const batches = divideIntoBatches(hashNumbers, TX_BATCH_SIZE); let combinedResults: ResultTransactionReceipt[] = [];