From 842fa993c8dcd05dc5db2dd428a4b247662d52be Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Thu, 3 Nov 2022 20:45:44 +0800 Subject: [PATCH] Quic fix calculation of staked number of concurrent streams (#28705) * Fix calculation of staked number of concurrent streams --- quic-client/src/lib.rs | 7 +++++- sdk/src/quic.rs | 2 ++ streamer/src/nonblocking/quic.rs | 37 +++++++++++++++--------------- tpu-client/src/connection_cache.rs | 8 +++++-- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index c66904481..61652809c 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -175,6 +175,7 @@ mod tests { super::*, solana_sdk::quic::{ QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, solana_tpu_client::tpu_connection_cache::TpuConnectionCache, }; @@ -208,9 +209,13 @@ mod tests { .unwrap() .pubkey_stake_map .insert(pubkey, 1); + + let delta = + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + assert_eq!( tpu_config.compute_max_parallel_streams(), - QUIC_MIN_STAKED_CONCURRENT_STREAMS + (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize ); staked_nodes diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index 1cd93be9c..d0012ba71 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -5,6 +5,8 @@ pub const QUIC_PORT_OFFSET: u16 = 6; pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128; +pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; + pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index cef16f16c..4f22932df 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -21,7 +21,8 @@ use { quic::{ QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, + QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, signature::Keypair, timing, @@ -40,7 +41,6 @@ use { }, }; -const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 100; pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS: u64 = 10000; @@ -186,20 +186,27 @@ pub fn compute_max_allowed_uni_streams( peer_stake: u64, total_stake: u64, ) -> usize { + // Treat stake = 0 as unstaked if peer_stake == 0 { - // Treat stake = 0 as unstaked QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS } else { match peer_type { ConnectionPeerType::Staked => { // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 { + if total_stake == 0 || peer_stake > total_stake { + warn!( + "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", + peer_stake, total_stake, + ); + QUIC_MIN_STAKED_CONCURRENT_STREAMS } else { - (((peer_stake as f64 / total_stake as f64) - * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS as f64) - as usize) - .max(QUIC_MIN_STAKED_CONCURRENT_STREAMS) + let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS + - QUIC_MIN_STAKED_CONCURRENT_STREAMS) + as f64; + + ((peer_stake as f64 / total_stake as f64) * delta) as usize + + QUIC_MIN_STAKED_CONCURRENT_STREAMS } } _ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, @@ -1625,21 +1632,15 @@ pub mod test { compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 0), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); + let delta = + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, 10000), - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (10_f64)) as usize + (delta / (10_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS ); assert_eq!( compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, 10000), - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (100_f64)) as usize - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 10000), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1, 10000), - QUIC_MIN_STAKED_CONCURRENT_STREAMS + (delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS ); assert_eq!( compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 10000), diff --git a/tpu-client/src/connection_cache.rs b/tpu-client/src/connection_cache.rs index 8f4a6b67c..5c5b0b8a0 100644 --- a/tpu-client/src/connection_cache.rs +++ b/tpu-client/src/connection_cache.rs @@ -449,7 +449,7 @@ mod tests { pubkey::Pubkey, quic::{ QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_PORT_OFFSET, + QUIC_PORT_OFFSET, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, }, solana_streamer::streamer::StakedNodes, @@ -556,9 +556,13 @@ mod tests { .unwrap() .pubkey_stake_map .insert(pubkey, 1); + + let delta = + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + assert_eq!( connection_cache.compute_max_parallel_streams(), - QUIC_MIN_STAKED_CONCURRENT_STREAMS + (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize ); staked_nodes