From 4772a931095d3654eb90767f33e1e0ab30aa7c62 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Mon, 11 Jul 2022 14:10:12 +0800 Subject: [PATCH] Quic handshake timeout (#26306) * Implement timeout for the quic client and server handshake --- client/src/nonblocking/quic_client.rs | 76 +++++++--- sdk/src/quic.rs | 5 + streamer/src/nonblocking/quic.rs | 198 ++++++++++++++------------ streamer/src/quic.rs | 6 + 4 files changed, 168 insertions(+), 117 deletions(-) diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 2476c36841..0456f873c3 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -12,12 +12,16 @@ use { itertools::Itertools, log::*, quinn::{ - ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, + ClientConfig, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, NewConnection, + VarInt, WriteError, }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, solana_sdk::{ - quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, + quic::{ + QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + }, transport::Result as TransportResult, }, std::{ @@ -26,7 +30,7 @@ use { thread, time::Duration, }, - tokio::sync::RwLock, + tokio::{sync::RwLock, time::timeout}, }; struct SkipServerVerification; @@ -142,21 +146,29 @@ impl QuicNewConnection { .connect(addr, "connect") .expect("QuicNewConnection::make_connection endpoint.connect"); stats.total_connections.fetch_add(1, Ordering::Relaxed); - let connecting_result = connecting.await; - if connecting_result.is_err() { - stats.connection_errors.fetch_add(1, Ordering::Relaxed); + if let Ok(connecting_result) = timeout( + Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), + connecting, + ) + .await + { + if connecting_result.is_err() { + stats.connection_errors.fetch_add(1, Ordering::Relaxed); + } + make_connection_measure.stop(); + stats + .make_connection_ms + .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed); + + let connection = connecting_result?; + + Ok(Self { + endpoint, + connection: Arc::new(connection), + }) + } else { + Err(ConnectionError::TimedOut.into()) } - make_connection_measure.stop(); - stats - .make_connection_ms - .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed); - - let connection = connecting_result?; - - Ok(Self { - endpoint, - connection: Arc::new(connection), - }) } fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { @@ -179,17 +191,35 @@ impl QuicNewConnection { stats.total_connections.fetch_add(1, Ordering::Relaxed); let connection = match connecting.into_0rtt() { Ok((connection, zero_rtt)) => { - if zero_rtt.await { - stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed); + if let Ok(zero_rtt) = timeout( + Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), + zero_rtt, + ) + .await + { + if zero_rtt { + stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed); + } else { + stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed); + } + connection } else { - stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed); + return Err(ConnectionError::TimedOut.into()); } - connection } Err(connecting) => { stats.connection_errors.fetch_add(1, Ordering::Relaxed); - let connecting = connecting.await; - connecting? + + if let Ok(connecting_result) = timeout( + Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), + connecting, + ) + .await + { + connecting_result? + } else { + return Err(ConnectionError::TimedOut.into()); + } } }; self.connection = Arc::new(connection); diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index 48467bf520..f20b3bca21 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -6,3 +6,8 @@ pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; + +// Based on commonly-used handshake timeouts for various TCP +// applications. Different applications vary, but most seem to +// be in the 30-60 second range +pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 133f26ca63..d495070cdb 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -15,7 +15,7 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, - quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, timing, }, @@ -133,7 +133,7 @@ fn prune_unstaked_connection_table( } async fn setup_connection( - connection: Connecting, + connecting: Connecting, unstaked_connection_table: Arc>, staked_connection_table: Arc>, packet_sender: Sender, @@ -143,119 +143,129 @@ async fn setup_connection( max_unstaked_connections: usize, stats: Arc, ) { - if let Ok(new_connection) = connection.await { - stats.total_connections.fetch_add(1, Ordering::Relaxed); - stats.total_new_connections.fetch_add(1, Ordering::Relaxed); - let NewConnection { - connection, - uni_streams, - .. - } = new_connection; + if let Ok(connecting_result) = timeout( + Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS), + connecting, + ) + .await + { + if let Ok(new_connection) = connecting_result { + stats.total_connections.fetch_add(1, Ordering::Relaxed); + stats.total_new_connections.fetch_add(1, Ordering::Relaxed); + let NewConnection { + connection, + uni_streams, + .. + } = new_connection; - let remote_addr = connection.remote_address(); + let remote_addr = connection.remote_address(); - let table_and_stake = { - let staked_nodes = staked_nodes.read().unwrap(); - if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { - let stake = *stake; - drop(staked_nodes); + let table_and_stake = { + let staked_nodes = staked_nodes.read().unwrap(); + if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { + let stake = *stake; + drop(staked_nodes); - let mut connection_table_l = staked_connection_table.lock().unwrap(); - if connection_table_l.total_size >= max_staked_connections { - let num_pruned = connection_table_l.prune_random(stake); - if num_pruned == 0 { - if max_unstaked_connections > 0 { - // If we couldn't prune a connection in the staked connection table, let's - // put this connection in the unstaked connection table. If needed, prune a - // connection from the unstaked connection table. - connection_table_l = unstaked_connection_table.lock().unwrap(); - prune_unstaked_connection_table( - &mut connection_table_l, - max_unstaked_connections, - stats.clone(), - ); - Some((connection_table_l, stake)) + let mut connection_table_l = staked_connection_table.lock().unwrap(); + if connection_table_l.total_size >= max_staked_connections { + let num_pruned = connection_table_l.prune_random(stake); + if num_pruned == 0 { + if max_unstaked_connections > 0 { + // If we couldn't prune a connection in the staked connection table, let's + // put this connection in the unstaked connection table. If needed, prune a + // connection from the unstaked connection table. + connection_table_l = unstaked_connection_table.lock().unwrap(); + prune_unstaked_connection_table( + &mut connection_table_l, + max_unstaked_connections, + stats.clone(), + ); + Some((connection_table_l, stake)) + } else { + stats + .connection_add_failed_on_pruning + .fetch_add(1, Ordering::Relaxed); + None + } } else { - stats - .connection_add_failed_on_pruning - .fetch_add(1, Ordering::Relaxed); - None + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + Some((connection_table_l, stake)) } } else { - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); Some((connection_table_l, stake)) } + } else if max_unstaked_connections > 0 { + drop(staked_nodes); + let mut connection_table_l = unstaked_connection_table.lock().unwrap(); + prune_unstaked_connection_table( + &mut connection_table_l, + max_unstaked_connections, + stats.clone(), + ); + Some((connection_table_l, 0)) } else { - Some((connection_table_l, stake)) - } - } else if max_unstaked_connections > 0 { - drop(staked_nodes); - let mut connection_table_l = unstaked_connection_table.lock().unwrap(); - prune_unstaked_connection_table( - &mut connection_table_l, - max_unstaked_connections, - stats.clone(), - ); - Some((connection_table_l, 0)) - } else { - None - } - }; - - if let Some((mut connection_table_l, stake)) = table_and_stake { - let table_type = connection_table_l.peer_type; - let max_uni_streams = match table_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64) - } - ConnectionPeerType::Staked => { - let staked_nodes = staked_nodes.read().unwrap(); - VarInt::from_u64( - ((stake as f64 / staked_nodes.total_stake as f64) - * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) as u64, - ) + None } }; - if let Ok(max_uni_streams) = max_uni_streams { - connection.set_max_concurrent_uni_streams(max_uni_streams); + if let Some((mut connection_table_l, stake)) = table_and_stake { + let table_type = connection_table_l.peer_type; + let max_uni_streams = match table_type { + ConnectionPeerType::Unstaked => { + VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64) + } + ConnectionPeerType::Staked => { + let staked_nodes = staked_nodes.read().unwrap(); + VarInt::from_u64( + ((stake as f64 / staked_nodes.total_stake as f64) + * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) + as u64, + ) + } + }; - if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( - &remote_addr, - Some(connection), - stake, - timing::timestamp(), - max_connections_per_ip, - ) { - drop(connection_table_l); - let stats = stats.clone(); - let connection_table = match table_type { - ConnectionPeerType::Unstaked => unstaked_connection_table.clone(), - ConnectionPeerType::Staked => staked_connection_table.clone(), - }; - tokio::spawn(handle_connection( - uni_streams, - packet_sender, - remote_addr, - last_update, - connection_table, - stream_exit, - stats, + if let Ok(max_uni_streams) = max_uni_streams { + connection.set_max_concurrent_uni_streams(max_uni_streams); + + if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( + &remote_addr, + Some(connection), stake, - )); + timing::timestamp(), + max_connections_per_ip, + ) { + drop(connection_table_l); + let stats = stats.clone(); + let connection_table = match table_type { + ConnectionPeerType::Unstaked => unstaked_connection_table.clone(), + ConnectionPeerType::Staked => staked_connection_table.clone(), + }; + tokio::spawn(handle_connection( + uni_streams, + packet_sender, + remote_addr, + last_update, + connection_table, + stream_exit, + stats, + stake, + )); + } else { + stats.connection_add_failed.fetch_add(1, Ordering::Relaxed); + } } else { - stats.connection_add_failed.fetch_add(1, Ordering::Relaxed); + stats + .connection_add_failed_invalid_stream_count + .fetch_add(1, Ordering::Relaxed); } } else { + connection.close(0u32.into(), &[0u8]); stats - .connection_add_failed_invalid_stream_count + .connection_add_failed_unstaked_node .fetch_add(1, Ordering::Relaxed); } } else { - connection.close(0u32.into(), &[0u8]); - stats - .connection_add_failed_unstaked_node - .fetch_add(1, Ordering::Relaxed); + stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); } } else { stats diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 2b49f05c5b..1e20e10aa0 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -162,6 +162,7 @@ pub struct StreamStats { pub(crate) connection_add_failed_unstaked_node: AtomicUsize, pub(crate) connection_add_failed_on_pruning: AtomicUsize, pub(crate) connection_setup_timeout: AtomicUsize, + pub(crate) connection_setup_error: AtomicUsize, pub(crate) connection_removed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize, } @@ -233,6 +234,11 @@ impl StreamStats { self.connection_setup_timeout.swap(0, Ordering::Relaxed), i64 ), + ( + "connection_setup_error", + self.connection_setup_error.swap(0, Ordering::Relaxed), + i64 + ), ( "invalid_chunk", self.total_invalid_chunks.swap(0, Ordering::Relaxed),