diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 27c7a7c6b2..e7e3e5f973 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -30,6 +30,7 @@ use { }, solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{ + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -169,6 +170,7 @@ impl Tpu { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, ) .unwrap(); @@ -183,6 +185,7 @@ impl Tpu { MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions stats, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, ) .unwrap(); diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 5eaae0eefb..cef16f16c5 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -42,6 +42,7 @@ use { const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 100; +pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS: u64 = 10000; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -69,6 +70,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, + wait_for_chunk_timeout_ms: u64, ) -> Result, QuicServerError> { let (config, _cert) = configure_server(keypair, gossip_host)?; @@ -86,6 +88,7 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, stats, + wait_for_chunk_timeout_ms, )); Ok(handle) } @@ -99,6 +102,7 @@ pub async fn run_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, + wait_for_chunk_timeout_ms: u64, ) { debug!("spawn quic server"); let mut last_datapoint = Instant::now(); @@ -132,6 +136,7 @@ pub async fn run_server( max_staked_connections, max_unstaked_connections, stats.clone(), + wait_for_chunk_timeout_ms, )); sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await; } @@ -242,6 +247,7 @@ fn handle_and_cache_new_connection( mut connection_table_l: MutexGuard, connection_table: Arc>, params: &NewConnectionHandlerParams, + wait_for_chunk_timeout_ms: u64, ) -> Result<(), ConnectionHandlerError> { let NewConnection { connection, @@ -300,6 +306,7 @@ fn handle_and_cache_new_connection( params.stats.clone(), params.stake, peer_type, + wait_for_chunk_timeout_ms, )); Ok(()) } else { @@ -328,6 +335,7 @@ fn prune_unstaked_connections_and_add_new_connection( connection_table: Arc>, max_connections: usize, params: &NewConnectionHandlerParams, + wait_for_chunk_timeout_ms: u64, ) -> Result<(), ConnectionHandlerError> { let stats = params.stats.clone(); if max_connections > 0 { @@ -337,6 +345,7 @@ fn prune_unstaked_connections_and_add_new_connection( connection_table_l, connection_table, params, + wait_for_chunk_timeout_ms, ) } else { new_connection.connection.close( @@ -391,6 +400,7 @@ fn compute_recieve_window( } } +#[allow(clippy::too_many_arguments)] async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, @@ -401,6 +411,7 @@ async fn setup_connection( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, + wait_for_chunk_timeout_ms: u64, ) { if let Ok(connecting_result) = timeout( Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), @@ -445,6 +456,7 @@ async fn setup_connection( connection_table_l, staked_connection_table.clone(), ¶ms, + wait_for_chunk_timeout_ms, ) { stats .connection_added_from_staked_peer @@ -460,6 +472,7 @@ async fn setup_connection( unstaked_connection_table.clone(), max_unstaked_connections, ¶ms, + wait_for_chunk_timeout_ms, ) { stats .connection_added_from_staked_peer @@ -479,6 +492,7 @@ async fn setup_connection( unstaked_connection_table.clone(), max_unstaked_connections, ¶ms, + wait_for_chunk_timeout_ms, ) { stats .connection_added_from_unstaked_peer @@ -510,6 +524,7 @@ async fn handle_connection( stats: Arc, stake: u64, peer_type: ConnectionPeerType, + wait_for_chunk_timeout_ms: u64, ) { debug!( "quic new connection {} streams: {} connections: {}", @@ -538,7 +553,7 @@ async fn handle_connection( let mut maybe_batch = None; while !stream_exit.load(Ordering::Relaxed) { if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), + Duration::from_millis(wait_for_chunk_timeout_ms), stream.read_chunk(PACKET_DATA_SIZE, false), ) .await @@ -560,6 +575,7 @@ async fn handle_connection( stats .total_stream_read_timeouts .fetch_add(1, Ordering::Relaxed); + break; } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); @@ -989,6 +1005,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), + 1, ) .unwrap(); (t, exit, receiver, server_address, stats) @@ -1118,7 +1135,7 @@ pub mod test { total_packets += packets.len(); all_packets.push(packets) } - if total_packets > num_expected_packets { + if total_packets >= num_expected_packets { break; } } @@ -1173,19 +1190,17 @@ pub mod test { s1.write_all(&[0u8]).await.unwrap_or_default(); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(2000); + let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(3000); sleep(Duration::from_millis(sleep_time)).await; // Test that the stream was created, but timed out in read - assert_eq!(stats.total_streams.load(Ordering::Relaxed), 1); + assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0); assert_ne!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0); - // Test that more writes are still successful to the stream (i.e. the stream was writable - // even after the timeouts) - for _ in 0..PACKET_DATA_SIZE { - s1.write_all(&[0u8]).await.unwrap(); - } - s1.finish().await.unwrap(); + // Test that more writes to the stream will fail (i.e. the stream is no longer writable + // after the timeouts) + assert!(s1.write_all(&[0u8]).await.is_err()); + assert!(s1.finish().await.is_err()); exit.store(true, Ordering::Relaxed); t.await.unwrap(); @@ -1302,6 +1317,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes stats, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, ) .unwrap(); @@ -1332,6 +1348,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats.clone(), + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, ) .unwrap(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 1089537c74..22590cfd85 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -307,6 +307,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, + wait_for_chunk_timeout_ms: u64, ) -> Result, QuicServerError> { let runtime = rt(); let task = { @@ -322,6 +323,7 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, stats, + wait_for_chunk_timeout_ms, ) }?; let handle = thread::Builder::new() @@ -367,6 +369,7 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats, + 100, ) .unwrap(); (t, exit, receiver, server_address) @@ -422,6 +425,7 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, stats, + 100, ) .unwrap(); @@ -464,6 +468,7 @@ mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes stats, + 100, ) .unwrap(); diff --git a/tpu-client/tests/quic_client.rs b/tpu-client/tests/quic_client.rs index 31b8967dd5..5f99eb81cd 100644 --- a/tpu-client/tests/quic_client.rs +++ b/tpu-client/tests/quic_client.rs @@ -78,6 +78,7 @@ mod tests { 10, 10, stats, + 1000, ) .unwrap(); @@ -123,6 +124,7 @@ mod tests { 10, 10, stats, + 1000, ) .unwrap();