diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index fe94448464..c4a1f25046 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -566,9 +566,17 @@ async fn handle_connection( let last_update = last_update.clone(); tokio::spawn(async move { let mut maybe_batch = None; + // The min is to guard against a value too small which can wake up unnecessarily + // frequently and wasting CPU cycles. The max guard against waiting for too long + // which delay exit and cause some test failures when the timeout value is large. + // Within this value, the heuristic is to wake up 10 times to check for exit + // for the set timeout if there are no data. + let exit_check_interval = + (wait_for_chunk_timeout_ms / 10).clamp(10, 1000); + let mut start = Instant::now(); while !stream_exit.load(Ordering::Relaxed) { if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(wait_for_chunk_timeout_ms), + Duration::from_millis(exit_check_interval), stream.read_chunk(PACKET_DATA_SIZE, false), ) .await @@ -585,12 +593,16 @@ async fn handle_connection( last_update.store(timing::timestamp(), Ordering::Relaxed); break; } + start = Instant::now(); } else { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; + let elapse = Instant::now() - start; + if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; + } } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); @@ -1020,7 +1032,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), - 1000, + 2000, ) .unwrap(); (t, exit, receiver, server_address, stats)