From 2cc48a650b20b4b5215bcbd87d9f2f63ae0f606d Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 27 Jun 2022 12:03:40 -0700 Subject: [PATCH] Spawn for each stream (#26086) --- streamer/src/nonblocking/quic.rs | 56 ++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index cdceb20e40..17c4424931 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -237,33 +237,39 @@ async fn handle_connection( Ok(mut stream) => { stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); - let mut maybe_batch = None; - while !stream_exit.load(Ordering::Relaxed) { - if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), - stream.read_chunk(PACKET_DATA_SIZE, false), - ) - .await - { - if handle_chunk( - &chunk, - &mut maybe_batch, - &remote_addr, - &packet_sender, - stats.clone(), - stake, - ) { - last_update.store(timing::timestamp(), Ordering::Relaxed); - break; + let stream_exit = stream_exit.clone(); + let stats = stats.clone(); + let packet_sender = packet_sender.clone(); + let last_update = last_update.clone(); + tokio::spawn(async move { + let mut maybe_batch = None; + while !stream_exit.load(Ordering::Relaxed) { + if let Ok(chunk) = tokio::time::timeout( + Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), + stream.read_chunk(PACKET_DATA_SIZE, false), + ) + .await + { + if handle_chunk( + &chunk, + &mut maybe_batch, + &remote_addr, + &packet_sender, + stats.clone(), + stake, + ) { + last_update.store(timing::timestamp(), Ordering::Relaxed); + break; + } + } else { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); } - } else { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); } - } - stats.total_streams.fetch_sub(1, Ordering::Relaxed); + stats.total_streams.fetch_sub(1, Ordering::Relaxed); + }); } Err(e) => { debug!("stream error: {:?}", e);