From a69470fd45ab51ff39e69f58889b898b2dabde49 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Tue, 9 Aug 2022 10:02:47 -0700 Subject: [PATCH] Set receive_window per quic connection (#26936) This change sets the receive_window for non-staked node to 1 * PACKET_DATA_SIZE, and maps the staked nodes's connection's receive_window between 1.2 * PACKET_DATA_SIZE to 10 * PACKET_DATA_SIZE based on the stakes. The changes is based on Quinn library change to support per connection receive_window tweak at the server side. quinn-rs/quinn#1393 --- Cargo.lock | 63 ++++++++++- core/src/staked_nodes_updater_service.rs | 17 ++- programs/bpf/Cargo.lock | 63 ++++++++++- sdk/src/quic.rs | 12 ++ streamer/Cargo.toml | 4 +- streamer/src/nonblocking/quic.rs | 134 ++++++++++++++++++++--- streamer/src/streamer.rs | 2 + 7 files changed, 263 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 718d5a786..e24d5532f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3562,8 +3562,26 @@ dependencies = [ "futures-channel", "futures-util", "fxhash", - "quinn-proto", - "quinn-udp", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quinn-udp 0.1.0", + "rustls 0.20.6", + "thiserror", + "tokio", + "tracing", + "webpki 0.22.0", +] + +[[package]] +name = "quinn" +version = "0.8.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "fxhash", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", + "quinn-udp 0.1.3", "rustls 0.20.6", "thiserror", "tokio", @@ -3591,6 +3609,25 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "quinn-proto" +version = "0.8.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "bytes", + "fxhash", + "rand 0.8.5", + "ring", + "rustls 0.20.6", + "rustls-native-certs", + "rustls-pemfile 0.2.1", + "slab", + "thiserror", + "tinyvec", + "tracing", + "webpki 0.22.0", +] + [[package]] name = "quinn-udp" version = "0.1.0" @@ -3600,7 +3637,20 @@ dependencies = [ "futures-util", "libc", "mio", - "quinn-proto", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "socket2", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.1.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "futures-util", + "libc", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", "socket2", "tokio", "tracing", @@ -4939,8 +4989,8 @@ dependencies = [ "jsonrpc-http-server", "lazy_static", "log", - "quinn", - "quinn-proto", + "quinn 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", @@ -6291,7 +6341,8 @@ dependencies = [ "pem", "percentage", "pkcs8", - "quinn", + "quinn 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", "rand 0.7.3", "rcgen", "rustls 0.20.6", diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index a5bd35185..06ca95522 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -36,11 +36,15 @@ impl StakedNodesUpdaterService { let mut new_ip_to_stake = HashMap::new(); let mut new_id_to_stake = HashMap::new(); let mut total_stake = 0; + let mut max_stake: u64 = 0; + let mut min_stake: u64 = u64::MAX; if Self::try_refresh_stake_maps( &mut last_stakes, &mut new_ip_to_stake, &mut new_id_to_stake, &mut total_stake, + &mut max_stake, + &mut min_stake, &bank_forks, &cluster_info, ) { @@ -61,16 +65,21 @@ impl StakedNodesUpdaterService { ip_to_stake: &mut HashMap, id_to_stake: &mut HashMap, total_stake: &mut u64, + max_stake: &mut u64, + min_stake: &mut u64, bank_forks: &RwLock, cluster_info: &ClusterInfo, ) -> bool { if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { let root_bank = bank_forks.read().unwrap().root_bank(); let staked_nodes = root_bank.staked_nodes(); - *total_stake = staked_nodes - .iter() - .map(|(_pubkey, stake)| stake) - .sum::(); + + for stake in staked_nodes.values() { + *total_stake += stake; + *max_stake = *stake.max(max_stake); + *min_stake = *stake.min(min_stake); + } + *id_to_stake = cluster_info .tvu_peers() .into_iter() diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index ddef2cff2..46ed7af00 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3273,8 +3273,26 @@ dependencies = [ "futures-channel", "futures-util", "fxhash", - "quinn-proto", - "quinn-udp", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quinn-udp 0.1.1", + "rustls 0.20.6", + "thiserror", + "tokio", + "tracing", + "webpki 0.22.0", +] + +[[package]] +name = "quinn" +version = "0.8.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "fxhash", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", + "quinn-udp 0.1.3", "rustls 0.20.6", "thiserror", "tokio", @@ -3302,6 +3320,25 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "quinn-proto" +version = "0.8.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "bytes", + "fxhash", + "rand 0.8.5", + "ring", + "rustls 0.20.6", + "rustls-native-certs", + "rustls-pemfile 0.2.1", + "slab", + "thiserror", + "tinyvec", + "tracing", + "webpki 0.22.0", +] + [[package]] name = "quinn-udp" version = "0.1.1" @@ -3311,7 +3348,20 @@ dependencies = [ "futures-util", "libc", "mio", - "quinn-proto", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "socket2", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.1.3" +source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd" +dependencies = [ + "futures-util", + "libc", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", "socket2", "tokio", "tracing", @@ -4650,8 +4700,8 @@ dependencies = [ "jsonrpc-core", "lazy_static", "log", - "quinn", - "quinn-proto", + "quinn 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", @@ -5630,7 +5680,8 @@ dependencies = [ "pem", "percentage", "pkcs8", - "quinn", + "quinn 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", + "quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)", "rand 0.7.3", "rcgen", "rustls 0.20.6", diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index e00c4d7d7..1cd93be9c 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -12,3 +12,15 @@ pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; // applications. Different applications vary, but most seem to // be in the 30-60 second range pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000; + +/// The receive window for QUIC connection from unstaked nodes is +/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`] +pub const QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO: u64 = 1; + +/// The receive window for QUIC connection from minimum staked nodes is +/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`] +pub const QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO: u64 = 2; + +/// The receive window for QUIC connection from maximum staked nodes is +/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`] +pub const QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO: u64 = 10; diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 7aabcdd69..6c5eef4fb 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -21,7 +21,9 @@ nix = "0.24.2" pem = "1.0.2" percentage = "0.1.0" pkcs8 = { version = "0.8.0", features = ["alloc"] } -quinn = "0.8.3" +quinn = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"} +quinn-proto = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"} + rand = "0.7.0" rcgen = "0.9.2" rustls = { version = "0.20.6", features = ["dangerous_configuration"] } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b0aefbef8..b1124ab3e 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -12,14 +12,16 @@ use { Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt, }, + quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, pubkey::Pubkey, quic::{ - QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, signature::Keypair, timing, @@ -142,7 +144,7 @@ fn prune_unstaked_connection_table( fn get_connection_stake( connection: &Connection, staked_nodes: Arc>, -) -> Option<(Pubkey, u64, u64)> { +) -> Option<(Pubkey, u64, u64, u64, u64)> { connection .peer_identity() .and_then(|der_cert_any| der_cert_any.downcast::>().ok()) @@ -152,10 +154,12 @@ fn get_connection_stake( let staked_nodes = staked_nodes.read().unwrap(); let total_stake = staked_nodes.total_stake; + let max_stake = staked_nodes.max_stake; + let min_stake = staked_nodes.min_stake; staked_nodes .pubkey_stake_map .get(&pubkey) - .map(|stake| (pubkey, *stake, total_stake)) + .map(|stake| (pubkey, *stake, total_stake, max_stake, min_stake)) }) }) } @@ -198,6 +202,8 @@ struct NewConnectionHandlerParams { total_stake: u64, max_connections_per_peer: usize, stats: Arc, + max_stake: u64, + min_stake: u64, } impl NewConnectionHandlerParams { @@ -213,6 +219,8 @@ impl NewConnectionHandlerParams { total_stake: 0, max_connections_per_peer, stats, + max_stake: 0, + min_stake: 0, } } } @@ -236,16 +244,29 @@ fn handle_and_cache_new_connection( ) as u64) { connection.set_max_concurrent_uni_streams(max_uni_streams); + let receive_window = compute_recieve_window( + params.max_stake, + params.min_stake, + connection_table_l.peer_type, + params.stake, + ); + + if let Ok(receive_window) = receive_window { + connection.set_receive_window(receive_window); + } + + let remote_addr = connection.remote_address(); + debug!( - "Peer type: {:?}, stake {}, total stake {}, max streams {}", + "Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}", connection_table_l.peer_type, params.stake, params.total_stake, - max_uni_streams.into_inner() + max_uni_streams.into_inner(), + receive_window, + remote_addr, ); - let remote_addr = connection.remote_address(); - if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), @@ -305,6 +326,50 @@ fn prune_unstaked_connections_and_add_new_connection( } } +/// Calculate the ratio for per connection receive window from a staked peer +fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 { + // Testing shows the maximum througput from a connection is achieved at receive_window = + // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the + // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to + // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r' + // for stake 's' is, + // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find + // a and b. + + if stake > max_stake { + return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + } + + let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; + if max_stake > min_stake { + let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; + let b = max_ratio as f64 - ((max_stake as f64) * a); + let ratio = (a * stake as f64) + b; + ratio.round() as u64 + } else { + QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + } +} + +fn compute_recieve_window( + max_stake: u64, + min_stake: u64, + peer_type: ConnectionPeerType, + peer_stake: u64, +) -> Result { + match peer_type { + ConnectionPeerType::Unstaked => { + VarInt::from_u64((PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) as u64) + } + ConnectionPeerType::Staked => { + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); + VarInt::from_u64((PACKET_DATA_SIZE as u64 * ratio) as u64) + } + } +} + async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, @@ -333,13 +398,17 @@ async fn setup_connection( max_connections_per_peer, stats.clone(), ), - |(pubkey, stake, total_stake)| NewConnectionHandlerParams { - packet_sender, - remote_pubkey: Some(pubkey), - stake, - total_stake, - max_connections_per_peer, - stats: stats.clone(), + |(pubkey, stake, total_stake, max_stake, min_stake)| { + NewConnectionHandlerParams { + packet_sender, + remote_pubkey: Some(pubkey), + stake, + total_stake, + max_connections_per_peer, + stats: stats.clone(), + max_stake, + min_stake, + } }, ); @@ -1475,6 +1544,7 @@ pub mod test { } #[test] + fn test_max_allowed_uni_streams() { assert_eq!( compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0), @@ -1525,4 +1595,38 @@ pub mod test { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } + + #[test] + fn test_cacluate_receive_window_ratio_for_staked_node() { + let mut max_stake = 10000; + let mut min_stake = 0; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake); + assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO); + + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + assert_eq!(ratio, max_ratio); + + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2); + let average_ratio = + (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2; + assert_eq!(ratio, average_ratio); + + max_stake = 10000; + min_stake = 10000; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + assert_eq!(ratio, max_ratio); + + max_stake = 0; + min_stake = 0; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + assert_eq!(ratio, max_ratio); + + max_stake = 1000; + min_stake = 10; + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); + assert_eq!(ratio, max_ratio); + } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4d8ee2d1c..3492f60c8 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -28,6 +28,8 @@ use { #[derive(Default)] pub struct StakedNodes { pub total_stake: u64, + pub max_stake: u64, + pub min_stake: u64, pub ip_stake_map: HashMap, pub pubkey_stake_map: HashMap, }