From cea7d9d0d7490dbcb611d9f5d25201573f023e96 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Mon, 27 Mar 2023 17:07:55 +0200 Subject: [PATCH] Minor changes to the batching timeout --- src/workers/tx_sender.rs | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 43b00373..00db0a70 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -188,11 +188,6 @@ impl TxSender { let tasks_counter = tasks_counter.clone(); let mut timeout_interval = tx_send_interval.as_millis() as u64; - let mut try_get_permit = || { - permit = semaphore.clone().try_acquire_owned().ok(); - permit.is_some() - }; - while txs.len() <= tx_batch_size { let instance = tokio::time::Instant::now(); match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv()) @@ -203,27 +198,24 @@ impl TxSender { TXS_IN_CHANNEL.dec(); sigs_and_slots.push((sig, slot)); txs.push(tx); + // update the timeout inteval + timeout_interval = timeout_interval + .saturating_sub(instance.elapsed().as_millis() as u64).max(1); } None => { bail!("Channel Disconnected"); } }, Err(_) => { - if try_get_permit() { + permit = semaphore.clone().try_acquire_owned().ok(); + if permit.is_some() { break; + } else { + // already timed out, but could not get a permit + timeout_interval = 1; } } } - timeout_interval = - timeout_interval.saturating_sub(instance.elapsed().as_millis() as u64); - if timeout_interval < 100 { - if try_get_permit() { - break; - } else { - // could not get a permit continue batching with a small timeout - timeout_interval = 100; - } - } } assert_eq!(sigs_and_slots.len(), txs.len());