diff --git a/bench-tps/src/send_batch.rs b/bench-tps/src/send_batch.rs index b6f1fe776f..5ea916530c 100644 --- a/bench-tps/src/send_batch.rs +++ b/bench-tps/src/send_batch.rs @@ -248,9 +248,13 @@ where fn send(&self, client: &Arc) { let mut send_txs = Measure::start("send_and_clone_txs"); let batch: Vec<_> = self.iter().map(|(_keypair, tx)| tx.clone()).collect(); - client.send_batch(batch).expect("transfer"); + let result = client.send_batch(batch); send_txs.stop(); - debug!("send {} {}", self.len(), send_txs); + if result.is_err() { + debug!("Failed to send batch {result:?}"); + } else { + debug!("send {} {}", self.len(), send_txs); + } } fn verify( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 955123df34..4ed4ee46f7 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2926,24 +2926,26 @@ fn setup_transfer_scan_threads( .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); for i in 0..starting_keypairs_.len() { - client - .async_transfer( - 1, - &starting_keypairs_[i], - &target_keypairs_[i].pubkey(), - blockhash, - ) - .unwrap(); + let result = client.async_transfer( + 1, + &starting_keypairs_[i], + &target_keypairs_[i].pubkey(), + blockhash, + ); + if result.is_err() { + debug!("Failed in transfer for starting keypair: {:?}", result); + } } for i in 0..starting_keypairs_.len() { - client - .async_transfer( - 1, - &target_keypairs_[i], - &starting_keypairs_[i].pubkey(), - blockhash, - ) - .unwrap(); + let result = client.async_transfer( + 1, + &target_keypairs_[i], + &starting_keypairs_[i].pubkey(), + blockhash, + ); + if result.is_err() { + debug!("Failed in transfer for starting keypair: {:?}", result); + } } } }) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 246eb9b925..9f18acd5c7 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -46,7 +46,7 @@ mod tests { assert_eq!(p.meta().size, num_bytes); } } - assert_eq!(total_packets, num_expected_packets); + assert!(total_packets > 0); } fn server_args() -> (UdpSocket, Arc, Keypair, IpAddr) { @@ -139,7 +139,7 @@ mod tests { assert_eq!(p.meta().size, num_bytes); } } - assert_eq!(total_packets, num_expected_packets); + assert!(total_packets > 0); } #[tokio::test] @@ -182,7 +182,9 @@ mod tests { let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(client.send_data_batch(&packets).await.is_ok()); + for packet in packets { + let _ = client.send_data(&packet).await; + } nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await; exit.store(true, Ordering::Relaxed); diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index d238552abb..8f8adc5061 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,6 +1,6 @@ use { crate::{ - quic::{configure_server, QuicServerError, StreamStats}, + quic::{configure_server, QuicServerError, StreamStats, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, tls_certificates::get_pubkey_from_tls_certificate, }, @@ -39,6 +39,10 @@ use { tokio::{task::JoinHandle, time::timeout}, }; +/// Limit to 500K PPS +const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10; +const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; +const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100); const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); @@ -55,6 +59,7 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; +const STREAM_STOP_CODE_THROTTLING: u32 = 15; // A sequence of bytes that is part of a packet // along with where in the packet it is @@ -264,6 +269,7 @@ enum ConnectionHandlerError { MaxStreamError, } +#[derive(Clone)] struct NewConnectionHandlerParams { // In principle, the code can be made to work with a crossbeam channel // as long as we're careful never to use a blocking recv or send call @@ -348,13 +354,11 @@ fn handle_and_cache_new_connection( drop(connection_table_l); tokio::spawn(handle_connection( connection, - params.packet_sender.clone(), remote_addr, - params.remote_pubkey, last_update, connection_table, stream_exit, - params.stats.clone(), + params.clone(), peer_type, wait_for_chunk_timeout, )); @@ -681,19 +685,42 @@ async fn packet_batch_sender( } } -#[allow(clippy::too_many_arguments)] +fn max_streams_for_connection_in_100ms( + connection_type: ConnectionPeerType, + stake: u64, + total_stake: u64, +) -> u64 { + if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 { + Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) + .apply_to(MAX_STREAMS_PER_100MS) + .saturating_div(MAX_UNSTAKED_CONNECTIONS as u64) + } else { + let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS); + ((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64 + } +} + +fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool { + if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL { + *last_instant = tokio::time::Instant::now(); + true + } else { + false + } +} + async fn handle_connection( connection: Connection, - packet_sender: AsyncSender, remote_addr: SocketAddr, - remote_pubkey: Option, last_update: Arc, connection_table: Arc>, stream_exit: Arc, - stats: Arc, + params: NewConnectionHandlerParams, peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, ) { + let stats = params.stats; debug!( "quic new connection {} streams: {} connections: {}", remote_addr, @@ -702,17 +729,28 @@ async fn handle_connection( ); let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); + let max_streams_per_100ms = + max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake); + let mut last_throttling_instant = tokio::time::Instant::now(); + let mut streams_in_current_interval = 0; while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { + if reset_throttling_params_if_needed(&mut last_throttling_instant) { + streams_in_current_interval = 0; + } else if streams_in_current_interval >= max_streams_per_100ms { + let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); + continue; + } + streams_in_current_interval = streams_in_current_interval.saturating_add(1); stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); let stream_exit = stream_exit.clone(); let stats = stats.clone(); - let packet_sender = packet_sender.clone(); + let packet_sender = params.packet_sender.clone(); let last_update = last_update.clone(); tokio::spawn(async move { let mut maybe_batch = None; @@ -765,7 +803,7 @@ async fn handle_connection( } let removed_connection_count = connection_table.lock().unwrap().remove_connection( - ConnectionTableKey::new(remote_addr.ip(), remote_pubkey), + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), stable_id, ); @@ -1989,4 +2027,40 @@ pub mod test { compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); assert_eq!(ratio, max_ratio); } + + #[test] + fn test_max_streams_for_connection_in_100ms() { + // 50K packets per ms * 20% / 500 max unstaked connections + assert_eq!( + max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000), + 20 + ); + + // 50K packets per ms * 20% / 500 max unstaked connections + assert_eq!( + max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000), + 20 + ); + + // If stake is 0, same limits as unstaked connections will apply. + // 50K packets per ms * 20% / 500 max unstaked connections + assert_eq!( + max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000), + 20 + ); + + // max staked streams = 50K packets per ms * 80% = 40K + // function = 40K * stake / total_stake + assert_eq!( + max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000), + 60 + ); + + // max staked streams = 50K packets per ms * 80% = 40K + // function = 40K * stake / total_stake + assert_eq!( + max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000), + 4000 + ); + } }