From b857528a03e6b104a6868d465619e2627d96fdd1 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Thu, 19 May 2022 15:29:27 -0400 Subject: [PATCH] Implement updating the connection cache stats in the client (#25308) --- client/src/connection_cache.rs | 51 +++++----------------- client/src/quic_client.rs | 79 +++++++++++++++++++++++++--------- client/src/tpu_connection.rs | 3 +- client/src/udp_client.rs | 7 ++- client/tests/quic_client.rs | 4 +- 5 files changed, 79 insertions(+), 65 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 1685433db5..d26fae696c 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -30,7 +30,7 @@ pub enum Connection { } #[derive(Default)] -struct ConnectionCacheStats { +pub struct ConnectionCacheStats { cache_hits: AtomicU64, cache_misses: AtomicU64, cache_evictions: AtomicU64, @@ -46,7 +46,7 @@ struct ConnectionCacheStats { // Need to track these separately per-connection // because we need to track the base stat value from quinn - total_client_stats: ClientStats, + pub total_client_stats: ClientStats, } const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000; @@ -287,9 +287,15 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0), None => { let connection = if map.use_quic { - Connection::Quic(Arc::new(QuicTpuConnection::new(*addr))) + Connection::Quic(Arc::new(QuicTpuConnection::new( + *addr, + map.stats.clone(), + ))) } else { - Connection::Udp(Arc::new(UdpTpuConnection::new(*addr))) + Connection::Udp(Arc::new(UdpTpuConnection::new( + *addr, + map.stats.clone(), + ))) }; // evict a connection if the cache is reaching upper bounds @@ -345,47 +351,10 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc) eviction_timing_ms, } = get_or_add_connection(addr); - let other_stats = if let Connection::Quic(conn) = &connection { - conn.stats().map(|s| (conn.base_stats(), s)) - } else { - None - }; - if report_stats { connection_cache_stats.report(); } - if let Some((connection_stats, new_stats)) = other_stats { - connection_cache_stats - .total_client_stats - .congestion_events - .update_stat( - &connection_stats.congestion_events, - new_stats.path.congestion_events, - ); - - connection_cache_stats - .total_client_stats - .tx_streams_blocked_uni - .update_stat( - &connection_stats.tx_streams_blocked_uni, - new_stats.frame_tx.streams_blocked_uni, - ); - - connection_cache_stats - .total_client_stats - .tx_data_blocked - .update_stat( - &connection_stats.tx_data_blocked, - new_stats.frame_tx.data_blocked, - ); - - connection_cache_stats - .total_client_stats - .tx_acks - .update_stat(&connection_stats.tx_acks, new_stats.frame_tx.acks); - } - if cache_hit { connection_cache_stats .cache_hits diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 8a51adf3e6..645394631f 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -4,6 +4,7 @@ use { crate::{ client_error::ClientErrorKind, + connection_cache::ConnectionCacheStats, tpu_connection::{ClientStats, TpuConnection}, }, async_mutex::Mutex, @@ -14,7 +15,6 @@ use { quinn::{ ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, }, - quinn_proto::ConnectionStats, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, solana_sdk::{ @@ -154,24 +154,24 @@ struct QuicClient { pub struct QuicTpuConnection { client: Arc, + connection_stats: Arc, } impl QuicTpuConnection { - pub fn stats(&self) -> Option { - self.client.stats() - } - pub fn base_stats(&self) -> Arc { self.client.stats.clone() } } impl TpuConnection for QuicTpuConnection { - fn new(tpu_addr: SocketAddr) -> Self { + fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self { let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); let client = Arc::new(QuicClient::new(tpu_addr)); - Self { client } + Self { + client, + connection_stats, + } } fn tpu_addr(&self) -> &SocketAddr { @@ -187,7 +187,9 @@ impl TpuConnection for QuicTpuConnection { T: AsRef<[u8]>, { let _guard = RUNTIME.enter(); - let send_buffer = self.client.send_buffer(wire_transaction, stats); + let send_buffer = + self.client + .send_buffer(wire_transaction, stats, self.connection_stats.clone()); RUNTIME.block_on(send_buffer)?; Ok(()) } @@ -201,7 +203,9 @@ impl TpuConnection for QuicTpuConnection { T: AsRef<[u8]>, { let _guard = RUNTIME.enter(); - let send_batch = self.client.send_batch(buffers, stats); + let send_batch = self + .client + .send_batch(buffers, stats, self.connection_stats.clone()); RUNTIME.block_on(send_batch)?; Ok(()) } @@ -213,9 +217,10 @@ impl TpuConnection for QuicTpuConnection { ) -> TransportResult<()> { let _guard = RUNTIME.enter(); let client = self.client.clone(); + let connection_stats = self.connection_stats.clone(); //drop and detach the task let _ = RUNTIME.spawn(async move { - let send_buffer = client.send_buffer(wire_transaction, &stats); + let send_buffer = client.send_buffer(wire_transaction, &stats, connection_stats); if let Err(e) = send_buffer.await { warn!("Failed to send transaction async to {:?}", e); datapoint_warn!("send-wire-async", ("failure", 1, i64),); @@ -231,9 +236,10 @@ impl TpuConnection for QuicTpuConnection { ) -> TransportResult<()> { let _guard = RUNTIME.enter(); let client = self.client.clone(); + let connection_stats = self.connection_stats.clone(); //drop and detach the task let _ = RUNTIME.spawn(async move { - let send_batch = client.send_batch(&buffers, &stats); + let send_batch = client.send_batch(&buffers, &stats, connection_stats); if let Err(e) = send_batch.await { warn!("Failed to send transaction batch async to {:?}", e); datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),); @@ -252,12 +258,6 @@ impl QuicClient { } } - pub fn stats(&self) -> Option { - let conn_guard = self.connection.lock(); - let x = RUNTIME.block_on(conn_guard); - x.as_ref().map(|c| c.connection.connection.stats()) - } - async fn _send_buffer_using_conn( data: &[u8], connection: &NewConnection, @@ -282,6 +282,7 @@ impl QuicClient { &self, data: &[u8], stats: &ClientStats, + connection_stats: Arc, ) -> Result, WriteError> { let connection = { let mut conn_guard = self.connection.lock().await; @@ -299,6 +300,35 @@ impl QuicClient { } } }; + + let new_stats = connection.connection.stats(); + + connection_stats + .total_client_stats + .congestion_events + .update_stat( + &self.stats.congestion_events, + new_stats.path.congestion_events, + ); + + connection_stats + .total_client_stats + .tx_streams_blocked_uni + .update_stat( + &self.stats.tx_streams_blocked_uni, + new_stats.frame_tx.streams_blocked_uni, + ); + + connection_stats + .total_client_stats + .tx_data_blocked + .update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked); + + connection_stats + .total_client_stats + .tx_acks + .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); + match Self::_send_buffer_using_conn(data, &connection).await { Ok(()) => Ok(connection), _ => { @@ -313,11 +343,17 @@ impl QuicClient { } } - pub async fn send_buffer(&self, data: T, stats: &ClientStats) -> Result<(), ClientErrorKind> + pub async fn send_buffer( + &self, + data: T, + stats: &ClientStats, + connection_stats: Arc, + ) -> Result<(), ClientErrorKind> where T: AsRef<[u8]>, { - self._send_buffer(data.as_ref(), stats).await?; + self._send_buffer(data.as_ref(), stats, connection_stats) + .await?; Ok(()) } @@ -325,6 +361,7 @@ impl QuicClient { &self, buffers: &[T], stats: &ClientStats, + connection_stats: Arc, ) -> Result<(), ClientErrorKind> where T: AsRef<[u8]>, @@ -343,7 +380,9 @@ impl QuicClient { if buffers.is_empty() { return Ok(()); } - let connection = self._send_buffer(buffers[0].as_ref(), stats).await?; + let connection = self + ._send_buffer(buffers[0].as_ref(), stats, connection_stats) + .await?; // Used to avoid dereferencing the Arc multiple times below // by just getting a reference to the NewConnection once diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 7fbdd5be2f..8c8faf7263 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,4 +1,5 @@ use { + crate::connection_cache::ConnectionCacheStats, rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_metrics::MovingStat, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, @@ -25,7 +26,7 @@ pub struct ClientStats { } pub trait TpuConnection { - fn new(tpu_addr: SocketAddr) -> Self; + fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self; fn tpu_addr(&self) -> &SocketAddr; diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index 6f091ce05f..c54ccdcf55 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -2,7 +2,10 @@ //! an interface for sending transactions use { - crate::tpu_connection::{ClientStats, TpuConnection}, + crate::{ + connection_cache::ConnectionCacheStats, + tpu_connection::{ClientStats, TpuConnection}, + }, core::iter::repeat, solana_net_utils::VALIDATOR_PORT_RANGE, solana_sdk::transport::Result as TransportResult, @@ -19,7 +22,7 @@ pub struct UdpTpuConnection { } impl TpuConnection for UdpTpuConnection { - fn new(tpu_addr: SocketAddr) -> Self { + fn new(tpu_addr: SocketAddr, _connection_stats: Arc) -> Self { let (_, client_socket) = solana_net_utils::bind_in_range( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), VALIDATOR_PORT_RANGE, diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index d043f7a256..8d3681e2b4 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -3,6 +3,7 @@ mod tests { use { crossbeam_channel::unbounded, solana_client::{ + connection_cache::ConnectionCacheStats, quic_client::QuicTpuConnection, tpu_connection::{ClientStats, TpuConnection}, }, @@ -44,7 +45,8 @@ mod tests { let addr = s.local_addr().unwrap().ip(); let port = s.local_addr().unwrap().port() - QUIC_PORT_OFFSET; let tpu_addr = SocketAddr::new(addr, port); - let client = QuicTpuConnection::new(tpu_addr); + let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); + let client = QuicTpuConnection::new(tpu_addr, connection_cache_stats); // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE;