diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 43b00373..00db0a70 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -188,11 +188,6 @@ impl TxSender { let tasks_counter = tasks_counter.clone(); let mut timeout_interval = tx_send_interval.as_millis() as u64; - let mut try_get_permit = || { - permit = semaphore.clone().try_acquire_owned().ok(); - permit.is_some() - }; - while txs.len() <= tx_batch_size { let instance = tokio::time::Instant::now(); match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv()) @@ -203,27 +198,24 @@ impl TxSender { TXS_IN_CHANNEL.dec(); sigs_and_slots.push((sig, slot)); txs.push(tx); + // update the timeout inteval + timeout_interval = timeout_interval + .saturating_sub(instance.elapsed().as_millis() as u64).max(1); } None => { bail!("Channel Disconnected"); } }, Err(_) => { - if try_get_permit() { + permit = semaphore.clone().try_acquire_owned().ok(); + if permit.is_some() { break; + } else { + // already timed out, but could not get a permit + timeout_interval = 1; } } } - timeout_interval = - timeout_interval.saturating_sub(instance.elapsed().as_millis() as u64); - if timeout_interval < 100 { - if try_get_permit() { - break; - } else { - // could not get a permit continue batching with a small timeout - timeout_interval = 100; - } - } } assert_eq!(sigs_and_slots.len(), txs.len());