From f2aa4f0741bedb3c28589b478bddbe8e034cf36e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:58:10 -0700 Subject: [PATCH] Parameterize max streams per ms (#707) Make PPS a parameter instead of the hard coded --- client/src/connection_cache.rs | 4 +++- core/src/tpu.rs | 4 +++- quic-client/tests/quic_client.rs | 10 ++++++-- streamer/src/nonblocking/quic.rs | 18 +++++++++++--- streamer/src/nonblocking/stream_throttle.rs | 26 ++++++++++++++------- streamer/src/quic.rs | 9 ++++++- 6 files changed, 55 insertions(+), 16 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a94bc7cd3..74e1c8d34 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -227,7 +227,8 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, streamer::StakedNodes, }, std::{ @@ -270,6 +271,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 640caf645..b594e04e5 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ use { solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -163,6 +163,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) @@ -183,6 +184,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 0237fc21d..34683b5fe 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,8 +10,10 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, - streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, + streamer::StakedNodes, + tls_certificates::new_dummy_x509_certificate, }, std::{ net::{SocketAddr, UdpSocket}, @@ -82,6 +84,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -161,6 +164,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -223,6 +227,7 @@ mod tests { staked_nodes.clone(), 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -251,6 +256,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f55776873..e79b4f034 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,8 +1,8 @@ use { crate::{ nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS, - STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS, + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + STREAM_THROTTLING_INTERVAL_MS, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -76,6 +76,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; +/// Limit to 250K PPS +pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; + // A sequence of bytes that is part of a packet // along with where in the packet it is struct PacketChunk { @@ -124,6 +127,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { @@ -147,6 +151,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, coalesce, @@ -164,6 +169,7 @@ async fn run_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, @@ -176,6 +182,7 @@ async fn run_server( let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( stats.clone(), max_unstaked_connections, + max_streams_per_ms, )); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); @@ -206,6 +213,7 @@ async fn run_server( staked_nodes.clone(), max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, stream_load_ema.clone(), @@ -484,6 +492,7 @@ async fn setup_connection( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, @@ -505,7 +514,7 @@ async fn setup_connection( // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. let min_stake_ratio = - 1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64; + 1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64; let stake_ratio = stake as f64 / total_stake as f64; let peer_type = if stake_ratio < min_stake_ratio { // If it is a staked connection with ultra low stake ratio, treat it as unstaked. @@ -1327,6 +1336,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1762,6 +1772,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1791,6 +1802,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index e3da2be90..0497c6993 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -11,8 +11,6 @@ use { }, }; -/// Limit to 250K PPS -pub const MAX_STREAMS_PER_MS: u64 = 250; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; @@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA { } impl StakedStreamLoadEMA { - pub(crate) fn new(stats: Arc, max_unstaked_connections: usize) -> Self { + pub(crate) fn new( + stats: Arc, + max_unstaked_connections: usize, + max_streams_per_ms: u64, + ) -> Self { let allow_unstaked_streams = max_unstaked_connections > 0; let max_staked_load_in_ema_window = if allow_unstaked_streams { - (MAX_STREAMS_PER_MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) + (max_streams_per_ms + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms)) * EMA_WINDOW_MS } else { - MAX_STREAMS_PER_MS * EMA_WINDOW_MS + max_streams_per_ms * EMA_WINDOW_MS }; let max_num_unstaked_connections = @@ -56,7 +58,7 @@ impl StakedStreamLoadEMA { let max_unstaked_load_in_throttling_window = if allow_unstaked_streams { Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) .saturating_div(max_num_unstaked_connections) } else { 0 @@ -228,7 +230,9 @@ pub mod test { use { super::*, crate::{ - nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + nonblocking::{ + quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + }, quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, }, std::{ @@ -242,6 +246,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( @@ -258,6 +263,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -349,6 +355,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), 0, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -436,6 +443,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -464,6 +472,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -483,6 +492,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 9b68ab1ee..ad6bace1b 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -508,6 +508,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -524,6 +525,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, wait_for_chunk_timeout, coalesce, ) @@ -550,7 +552,9 @@ pub fn spawn_server( mod test { use { super::*, - crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + crate::nonblocking::quic::{ + test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, crossbeam_channel::unbounded, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, @@ -583,6 +587,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -642,6 +647,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -688,6 +694,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, )