diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 6a7bac0853..4f25d42126 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -437,6 +437,21 @@ impl QuicClient { Ok(()) } + fn compute_chunk_length(num_buffers_to_chunk: usize, num_chunks: usize) -> usize { + // The function is equivalent to checked div_ceil() + // Also, if num_chunks == 0 || num_buffers_per_chunk == 0, return 1 + num_buffers_to_chunk + .checked_div(num_chunks) + .map_or(1, |value| { + if num_buffers_to_chunk.checked_rem(num_chunks).unwrap_or(0) != 0 { + value.saturating_add(1) + } else { + value + } + }) + .max(1) + } + pub async fn send_batch( &self, buffers: &[T], @@ -468,9 +483,9 @@ impl QuicClient { // by just getting a reference to the NewConnection once let connection_ref: &NewConnection = &connection; - let chunks = buffers[1..buffers.len()] - .iter() - .chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS); + let chunk_len = + Self::compute_chunk_length(buffers.len() - 1, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS); + let chunks = buffers[1..buffers.len()].iter().chunks(chunk_len); let futures: Vec<_> = chunks .into_iter() @@ -572,3 +587,29 @@ impl TpuConnection for QuicTpuConnection { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::nonblocking::quic_client::QuicClient; + + #[test] + fn test_transaction_batch_chunking() { + assert_eq!(QuicClient::compute_chunk_length(0, 0), 1); + assert_eq!(QuicClient::compute_chunk_length(10, 0), 1); + assert_eq!(QuicClient::compute_chunk_length(0, 10), 1); + assert_eq!(QuicClient::compute_chunk_length(usize::MAX, usize::MAX), 1); + assert_eq!(QuicClient::compute_chunk_length(10, usize::MAX), 1); + assert!(QuicClient::compute_chunk_length(usize::MAX, 10) == (usize::MAX / 10) + 1); + assert_eq!(QuicClient::compute_chunk_length(10, 1), 10); + assert_eq!(QuicClient::compute_chunk_length(10, 2), 5); + assert_eq!(QuicClient::compute_chunk_length(10, 3), 4); + assert_eq!(QuicClient::compute_chunk_length(10, 4), 3); + assert_eq!(QuicClient::compute_chunk_length(10, 5), 2); + assert_eq!(QuicClient::compute_chunk_length(10, 6), 2); + assert_eq!(QuicClient::compute_chunk_length(10, 7), 2); + assert_eq!(QuicClient::compute_chunk_length(10, 8), 2); + assert_eq!(QuicClient::compute_chunk_length(10, 9), 2); + assert_eq!(QuicClient::compute_chunk_length(10, 10), 1); + assert_eq!(QuicClient::compute_chunk_length(10, 11), 1); + } +} diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index f20b3bca21..e00c4d7d70 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -3,6 +3,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6; // that seems to maximize TPS on GCE (higher values don't seem to // give significant improvement or seem to impact stability) pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128; +pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index a1da1d3577..d103b6fba8 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -17,7 +17,10 @@ use { solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, pubkey::Pubkey, - quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, + quic::{ + QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + QUIC_MIN_STAKED_CONCURRENT_STREAMS, + }, signature::Keypair, timing, }, @@ -156,6 +159,34 @@ fn get_connection_stake( }) } +fn compute_max_allowed_uni_streams( + peer_type: ConnectionPeerType, + peer_stake: u64, + staked_nodes: Arc>, +) -> usize { + if peer_stake == 0 { + // Treat stake = 0 as unstaked + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + } else { + match peer_type { + ConnectionPeerType::Staked => { + let staked_nodes = staked_nodes.read().unwrap(); + + // No checked math for f64 type. So let's explicitly check for 0 here + if staked_nodes.total_stake == 0 { + QUIC_MIN_STAKED_CONCURRENT_STREAMS + } else { + (((peer_stake as f64 / staked_nodes.total_stake as f64) + * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS as f64) + as usize) + .max(QUIC_MIN_STAKED_CONCURRENT_STREAMS) + } + } + _ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + } + } +} + async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, @@ -233,19 +264,19 @@ async fn setup_connection( if let Some((mut connection_table_l, stake)) = table_and_stake { let table_type = connection_table_l.peer_type; - let max_uni_streams = match table_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64) - } - ConnectionPeerType::Staked => { - let staked_nodes = staked_nodes.read().unwrap(); - VarInt::from_u64( - ((stake as f64 / staked_nodes.total_stake as f64) - * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) - as u64, - ) - } - }; + let max_uni_streams = VarInt::from_u64(compute_max_allowed_uni_streams( + table_type, + stake, + staked_nodes.clone(), + ) as u64); + + debug!( + "Peer type: {:?}, stake {}, total stake {}, max streams {}", + table_type, + stake, + staked_nodes.read().unwrap().total_stake, + max_uni_streams.unwrap().into_inner() + ); if let Ok(max_uni_streams) = max_uni_streams { connection.set_max_concurrent_uni_streams(max_uni_streams); @@ -526,7 +557,7 @@ impl Drop for ConnectionEntry { } } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] enum ConnectionPeerType { Unstaked, Staked, @@ -684,6 +715,7 @@ pub mod test { use { super::*, crate::{ + nonblocking::quic::compute_max_allowed_uni_streams, quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, tls_certificates::new_self_signed_tls_certificate_chain, }, @@ -1371,4 +1403,62 @@ pub mod test { } assert_eq!(table.total_size, 0); } + + #[test] + fn test_max_allowed_uni_streams() { + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes.clone()), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, staked_nodes.clone()), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()), + QUIC_MIN_STAKED_CONCURRENT_STREAMS + ); + staked_nodes.write().unwrap().total_stake = 10000; + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, staked_nodes.clone()), + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (10_f64)) as usize + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, staked_nodes.clone()), + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (100_f64)) as usize + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()), + QUIC_MIN_STAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1, staked_nodes.clone()), + QUIC_MIN_STAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams( + ConnectionPeerType::Unstaked, + 1000, + staked_nodes.clone() + ), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, staked_nodes.clone()), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + } }