From d5dbfb67fd198321165ac00df9c8866424eed697 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 21 Jun 2022 18:56:47 -0700 Subject: [PATCH] QUIC stream timeouts if no data is received (#26116) --- streamer/src/nonblocking/quic.rs | 113 +++++++++++++++++++++++-------- streamer/src/quic.rs | 6 ++ 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0e23ca46a9..ea830d2f14 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -32,6 +32,7 @@ use { }; const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; +const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 1000; #[allow(clippy::too_many_arguments)] pub fn spawn_server( @@ -225,35 +226,55 @@ async fn handle_connection( stats.total_connections.load(Ordering::Relaxed), ); while !stream_exit.load(Ordering::Relaxed) { - match uni_streams.next().await { - Some(stream_result) => match stream_result { - Ok(mut stream) => { - stats.total_streams.fetch_add(1, Ordering::Relaxed); - stats.total_new_streams.fetch_add(1, Ordering::Relaxed); - let mut maybe_batch = None; - while !stream_exit.load(Ordering::Relaxed) { - if handle_chunk( - &stream.read_chunk(PACKET_DATA_SIZE, false).await, - &mut maybe_batch, - &remote_addr, - &packet_sender, - stats.clone(), - stake, - ) { - last_update.store(timing::timestamp(), Ordering::Relaxed); - break; + if let Ok(stream) = tokio::time::timeout( + Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), + uni_streams.next(), + ) + .await + { + match stream { + Some(stream_result) => match stream_result { + Ok(mut stream) => { + stats.total_streams.fetch_add(1, Ordering::Relaxed); + stats.total_new_streams.fetch_add(1, Ordering::Relaxed); + let mut maybe_batch = None; + while !stream_exit.load(Ordering::Relaxed) { + if let Ok(chunk) = tokio::time::timeout( + Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), + stream.read_chunk(PACKET_DATA_SIZE, false), + ) + .await + { + if handle_chunk( + &chunk, + &mut maybe_batch, + &remote_addr, + &packet_sender, + stats.clone(), + stake, + ) { + last_update.store(timing::timestamp(), Ordering::Relaxed); + break; + } + } else { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; + } } } - } - Err(e) => { - debug!("stream error: {:?}", e); + Err(e) => { + debug!("stream error: {:?}", e); + stats.total_streams.fetch_sub(1, Ordering::Relaxed); + break; + } + }, + None => { stats.total_streams.fetch_sub(1, Ordering::Relaxed); break; } - }, - None => { - stats.total_streams.fetch_sub(1, Ordering::Relaxed); - break; } } } @@ -503,6 +524,7 @@ pub mod test { Arc, crossbeam_channel::Receiver, SocketAddr, + Arc, ) { let s = UdpSocket::bind("127.0.0.1:0").unwrap(); let exit = Arc::new(AtomicBool::new(false)); @@ -522,10 +544,10 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats, + stats.clone(), ) .unwrap(); - (t, exit, receiver, server_address) + (t, exit, receiver, server_address, stats) } pub async fn make_client_endpoint(addr: &SocketAddr) -> NewConnection { @@ -668,7 +690,7 @@ pub mod test { #[tokio::test] async fn test_quic_server_exit() { - let (t, exit, _receiver, _server_address) = setup_quic_server(); + let (t, exit, _receiver, _server_address, _stats) = setup_quic_server(); exit.store(true, Ordering::Relaxed); t.await.unwrap(); } @@ -676,16 +698,47 @@ pub mod test { #[tokio::test] async fn test_quic_timeout() { solana_logger::setup(); - let (t, exit, receiver, server_address) = setup_quic_server(); + let (t, exit, receiver, server_address, _stats) = setup_quic_server(); check_timeout(receiver, server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); } + #[tokio::test] + async fn test_quic_stream_timeout() { + solana_logger::setup(); + let (t, exit, _receiver, server_address, stats) = setup_quic_server(); + + let conn1 = make_client_endpoint(&server_address).await; + assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0); + assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0); + + // Send one byte to start the stream + let mut s1 = conn1.connection.open_uni().await.unwrap(); + s1.write_all(&[0u8]).await.unwrap_or_default(); + + // Wait long enough for the stream to timeout in receiving chunks + sleep(Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS * 2)).await; + + // Test that the stream was created, but timed out in read + assert_eq!(stats.total_streams.load(Ordering::Relaxed), 1); + assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 1); + + for _ in 0..PACKET_DATA_SIZE { + // Ignoring any errors here. s1.finish() will test the error condition + s1.write_all(&[0u8]).await.unwrap_or_default(); + } + // Test that more writes are not successful to the stream + s1.finish().await.unwrap_err(); + + exit.store(true, Ordering::Relaxed); + t.await.unwrap(); + } + #[tokio::test] async fn test_quic_server_block_multiple_connections() { solana_logger::setup(); - let (t, exit, _receiver, server_address) = setup_quic_server(); + let (t, exit, _receiver, server_address, _stats) = setup_quic_server(); check_block_multiple_connections(server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); @@ -694,7 +747,7 @@ pub mod test { #[tokio::test] async fn test_quic_server_multiple_writes() { solana_logger::setup(); - let (t, exit, receiver, server_address) = setup_quic_server(); + let (t, exit, receiver, server_address, _stats) = setup_quic_server(); check_multiple_writes(receiver, server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index ef3788d6dd..7f8d74c742 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -155,6 +155,7 @@ pub struct StreamStats { pub(crate) total_packet_batches_sent: AtomicUsize, pub(crate) total_packet_batches_none: AtomicUsize, pub(crate) total_stream_read_errors: AtomicUsize, + pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) num_evictions: AtomicUsize, pub(crate) connection_add_failed: AtomicUsize, pub(crate) connection_add_failed_unstaked_node: AtomicUsize, @@ -246,6 +247,11 @@ impl StreamStats { self.total_stream_read_errors.swap(0, Ordering::Relaxed), i64 ), + ( + "stream_read_timeouts", + self.total_stream_read_timeouts.swap(0, Ordering::Relaxed), + i64 + ), ); } }