From d1cf4ced3da6027a5e1bae71fc0658e5b7138a31 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 16 Dec 2022 14:26:04 -0800 Subject: [PATCH] quic test timeout fix (#29260) Allow longer chunk receive timeout without impacting testing the stream exit condition for unit tests. The exit is periodically checked, we will break only when the total allowed chunk receive timed out. The start time is reset when a new chunk is received. --- streamer/src/nonblocking/quic.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index fe94448464..c4a1f25046 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -566,9 +566,17 @@ async fn handle_connection( let last_update = last_update.clone(); tokio::spawn(async move { let mut maybe_batch = None; + // The min is to guard against a value too small which can wake up unnecessarily + // frequently and wasting CPU cycles. The max guard against waiting for too long + // which delay exit and cause some test failures when the timeout value is large. + // Within this value, the heuristic is to wake up 10 times to check for exit + // for the set timeout if there are no data. + let exit_check_interval = + (wait_for_chunk_timeout_ms / 10).clamp(10, 1000); + let mut start = Instant::now(); while !stream_exit.load(Ordering::Relaxed) { if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(wait_for_chunk_timeout_ms), + Duration::from_millis(exit_check_interval), stream.read_chunk(PACKET_DATA_SIZE, false), ) .await @@ -585,12 +593,16 @@ async fn handle_connection( last_update.store(timing::timestamp(), Ordering::Relaxed); break; } + start = Instant::now(); } else { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; + let elapse = Instant::now() - start; + if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; + } } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); @@ -1020,7 +1032,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), - 1000, + 2000, ) .unwrap(); (t, exit, receiver, server_address, stats)