From f1ebc5b5c36b6480dd1a5951a86d3ebbbccd85f8 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 25 May 2023 16:54:24 +0000 Subject: [PATCH] separates out quic streamer connection stats from different servers (#31797) --- client/src/connection_cache.rs | 22 ++++-------------- core/src/tpu.rs | 7 +++--- quic-client/tests/quic_client.rs | 38 ++++++++++---------------------- streamer/src/nonblocking/quic.rs | 32 +++++++++++++-------------- streamer/src/quic.rs | 19 +++++++--------- 5 files changed, 43 insertions(+), 75 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 4cdcfcd97b..44673c06f4 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -217,8 +217,7 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, - streamer::StakedNodes, + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes, }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -229,37 +228,25 @@ mod tests { }, }; - fn server_args() -> ( - UdpSocket, - Arc, - Keypair, - IpAddr, - Arc, - ) { + fn server_args() -> (UdpSocket, Arc, Keypair, IpAddr) { ( UdpSocket::bind("127.0.0.1:0").unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), "127.0.0.1".parse().unwrap(), - Arc::new(StreamStats::default()), ) } #[test] fn test_connection_with_specified_client_endpoint() { // Start a response receiver: - let ( - response_recv_socket, - response_recv_exit, - keypair2, - response_recv_ip, - response_recv_stats, - ) = server_args(); + let (response_recv_socket, response_recv_exit, keypair2, response_recv_ip) = server_args(); let (sender2, _receiver2) = unbounded(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server( + "quic_streamer_test", response_recv_socket, &keypair2, response_recv_ip, @@ -269,7 +256,6 @@ mod tests { staked_nodes, 10, 10, - response_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 23a890dc04..4d6beef5a3 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -37,7 +37,7 @@ use { solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, std::{ @@ -145,8 +145,8 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let stats = Arc::new(StreamStats::default()); let (_, tpu_quic_t) = spawn_server( + "quic_streamer_tpu", transactions_quic_sockets, keypair, cluster_info @@ -160,13 +160,13 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats.clone(), DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) .unwrap(); let (_, tpu_forwards_quic_t) = spawn_server( + "quic_streamer_tpu_forwards", transactions_forwards_quic_sockets, keypair, cluster_info @@ -180,7 +180,6 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 09dbbdfbf9..7608e2b7b2 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,8 +10,8 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, - streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes, + tls_certificates::new_self_signed_tls_certificate, }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -49,19 +49,12 @@ mod tests { assert_eq!(total_packets, num_expected_packets); } - fn server_args() -> ( - UdpSocket, - Arc, - Keypair, - IpAddr, - Arc, - ) { + fn server_args() -> (UdpSocket, Arc, Keypair, IpAddr) { ( UdpSocket::bind("127.0.0.1:0").unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), "127.0.0.1".parse().unwrap(), - Arc::new(StreamStats::default()), ) } @@ -74,8 +67,9 @@ mod tests { solana_logger::setup(); let (sender, receiver) = unbounded(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let (s, exit, keypair, ip, stats) = server_args(); + let (s, exit, keypair, ip) = server_args(); let (_, t) = solana_streamer::quic::spawn_server( + "quic_streamer_test", s.try_clone().unwrap(), &keypair, ip, @@ -85,7 +79,6 @@ mod tests { staked_nodes, 10, 10, - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -154,8 +147,9 @@ mod tests { solana_logger::setup(); let (sender, receiver) = unbounded(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let (s, exit, keypair, ip, stats) = server_args(); - let (_, t) = solana_streamer::nonblocking::quic::spawn_server( + let (s, exit, keypair, ip) = server_args(); + let (_, _, t) = solana_streamer::nonblocking::quic::spawn_server( + "quic_streamer_test", s.try_clone().unwrap(), &keypair, ip, @@ -165,7 +159,6 @@ mod tests { staked_nodes, 10, 10, - stats, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -210,9 +203,9 @@ mod tests { // Request Receiver let (sender, receiver) = unbounded(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let (request_recv_socket, request_recv_exit, keypair, request_recv_ip, request_recv_stats) = - server_args(); + let (request_recv_socket, request_recv_exit, keypair, request_recv_ip) = server_args(); let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server( + "quic_streamer_test", request_recv_socket.try_clone().unwrap(), &keypair, request_recv_ip, @@ -222,7 +215,6 @@ mod tests { staked_nodes.clone(), 10, 10, - request_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -230,19 +222,14 @@ mod tests { drop(request_recv_endpoint); // Response Receiver: - let ( - response_recv_socket, - response_recv_exit, - keypair2, - response_recv_ip, - response_recv_stats, - ) = server_args(); + let (response_recv_socket, response_recv_exit, keypair2, response_recv_ip) = server_args(); let (sender2, receiver2) = unbounded(); let addr = response_recv_socket.local_addr().unwrap().ip(); let port = response_recv_socket.local_addr().unwrap().port(); let server_addr = SocketAddr::new(addr, port); let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server( + "quic_streamer_test", response_recv_socket, &keypair2, response_recv_ip, @@ -252,7 +239,6 @@ mod tests { staked_nodes, 10, 10, - response_recv_stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 18c8a31b53..fa5385a7f4 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -85,6 +85,7 @@ struct PacketAccumulator { #[allow(clippy::too_many_arguments)] pub fn spawn_server( + name: &'static str, sock: UdpSocket, keypair: &Keypair, gossip_host: IpAddr, @@ -94,16 +95,17 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, - stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, -) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> { - info!("Start quic server on {:?}", sock); +) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { + info!("Start {name} quic server on {sock:?}"); let (config, _cert) = configure_server(keypair, gossip_host)?; let endpoint = Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime) .map_err(QuicServerError::EndpointFailed)?; + let stats = Arc::::default(); let handle = tokio::spawn(run_server( + name, endpoint.clone(), packet_sender, exit, @@ -111,15 +113,16 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, - stats, + stats.clone(), wait_for_chunk_timeout, coalesce, )); - Ok((endpoint, handle)) + Ok((endpoint, stats, handle)) } #[allow(clippy::too_many_arguments)] -pub async fn run_server( +async fn run_server( + name: &'static str, incoming: Endpoint, packet_sender: Sender, exit: Arc, @@ -152,7 +155,7 @@ pub async fn run_server( let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; if last_datapoint.elapsed().as_secs() >= 5 { - stats.report(); + stats.report(name); last_datapoint = Instant::now(); } @@ -1166,8 +1169,8 @@ pub mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); - let stats = Arc::new(StreamStats::default()); - let (_, t) = spawn_server( + let (_, stats, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -1177,7 +1180,6 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats.clone(), Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1597,8 +1599,8 @@ pub mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let stats = Arc::new(StreamStats::default()); - let (_, t) = spawn_server( + let (_, _, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -1608,7 +1610,6 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1629,8 +1630,8 @@ pub mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let stats = Arc::new(StreamStats::default()); - let (_, t) = spawn_server( + let (_, stats, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -1640,7 +1641,6 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats.clone(), DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 85616f13f5..3b326f182e 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -158,9 +158,9 @@ pub struct StreamStats { } impl StreamStats { - pub fn report(&self) { + pub fn report(&self, name: &'static str) { datapoint_info!( - "quic-connections", + name, ( "active_connections", self.total_connections.load(Ordering::Relaxed), @@ -391,6 +391,7 @@ impl StreamStats { #[allow(clippy::too_many_arguments)] pub fn spawn_server( + name: &'static str, sock: UdpSocket, keypair: &Keypair, gossip_host: IpAddr, @@ -400,14 +401,14 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, - stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> { let runtime = rt(); - let (endpoint, task) = { + let (endpoint, _stats, task) = { let _guard = runtime.enter(); crate::nonblocking::quic::spawn_server( + name, sock, keypair, gossip_host, @@ -417,7 +418,6 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, - stats, wait_for_chunk_timeout, coalesce, ) @@ -456,8 +456,8 @@ mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let stats = Arc::new(StreamStats::default()); let (_, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -467,7 +467,6 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -513,8 +512,8 @@ mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let stats = Arc::new(StreamStats::default()); let (_, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -524,7 +523,6 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -557,8 +555,8 @@ mod test { let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let stats = Arc::new(StreamStats::default()); let (_, t) = spawn_server( + "quic_streamer_test", s, &keypair, ip, @@ -568,7 +566,6 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes - stats, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, )