From 6470544ea22300f8939a3bf59183ab0472380fc5 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 24 Oct 2023 14:59:15 +0000 Subject: [PATCH] adds metrics to turbine QUIC endpoint (#33819) --- turbine/src/quic_endpoint.rs | 231 +++++++++++++++++++++++++++++++++-- 1 file changed, 218 insertions(+), 13 deletions(-) diff --git a/turbine/src/quic_endpoint.rs b/turbine/src/quic_endpoint.rs index e9789357c..326f409ae 100644 --- a/turbine/src/quic_endpoint.rs +++ b/turbine/src/quic_endpoint.rs @@ -21,9 +21,10 @@ use { io::Error as IoError, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, + time::Duration, }, thiserror::Error, tokio::{ @@ -60,12 +61,12 @@ pub type AsyncTryJoinHandle = TryJoin, JoinHandle<()>>; pub enum Error { #[error(transparent)] CertificateError(#[from] RcgenError), + #[error("Channel Send Error")] + ChannelSendError, #[error(transparent)] ConnectError(#[from] ConnectError), #[error(transparent)] ConnectionError(#[from] ConnectionError), - #[error("Channel Send Error")] - ChannelSendError, #[error("Invalid Identity: {0:?}")] InvalidIdentity(SocketAddr), #[error(transparent)] @@ -76,6 +77,12 @@ pub enum Error { TlsError(#[from] rustls::Error), } +macro_rules! add_metric { + ($metric: expr) => {{ + $metric.fetch_add(1, Ordering::Relaxed); + }}; +} + #[allow(clippy::type_complexity)] pub fn new_quic_endpoint( runtime: &tokio::runtime::Handle, @@ -182,8 +189,11 @@ async fn run_server( router: Arc>>>, cache: Arc>>, ) { + let stats = Arc::::default(); + let report_metrics_task = + tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone())); while let Some(connecting) = endpoint.accept().await { - tokio::task::spawn(handle_connecting_error( + tokio::task::spawn(handle_connecting_task( endpoint.clone(), connecting, sender.clone(), @@ -191,8 +201,10 @@ async fn run_server( prune_cache_pending.clone(), router.clone(), cache.clone(), + stats.clone(), )); } + report_metrics_task.abort(); } async fn run_client( @@ -204,13 +216,17 @@ async fn run_client( router: Arc>>>, cache: Arc>>, ) { + let stats = Arc::::default(); + let report_metrics_task = + tokio::task::spawn(report_metrics_task("repair_quic_client", stats.clone())); while let Some((remote_address, bytes)) = receiver.recv().await { - let Some(bytes) = try_route_bytes(&remote_address, bytes, &*router.read().await) else { + let Some(bytes) = try_route_bytes(&remote_address, bytes, &*router.read().await, &stats) + else { continue; }; let receiver = { let mut router = router.write().await; - let Some(bytes) = try_route_bytes(&remote_address, bytes, &router) else { + let Some(bytes) = try_route_bytes(&remote_address, bytes, &router, &stats) else { continue; }; let (sender, receiver) = tokio::sync::mpsc::channel(ROUTER_CHANNEL_BUFFER); @@ -227,24 +243,28 @@ async fn run_client( prune_cache_pending.clone(), router.clone(), cache.clone(), + stats.clone(), )); } close_quic_endpoint(&endpoint); // Drop sender channels to unblock threads waiting on the receiving end. router.write().await.clear(); + report_metrics_task.abort(); } fn try_route_bytes( remote_address: &SocketAddr, bytes: Bytes, router: &HashMap>, + stats: &TurbineQuicStats, ) -> Option { match router.get(remote_address) { None => Some(bytes), Some(sender) => match sender.try_send(bytes) { Ok(()) => None, Err(TrySendError::Full(_)) => { - error!("TrySendError::Full {remote_address}"); + debug!("TrySendError::Full {remote_address}"); + add_metric!(stats.router_try_send_error_full); None } Err(TrySendError::Closed(bytes)) => Some(bytes), @@ -252,7 +272,7 @@ fn try_route_bytes( } } -async fn handle_connecting_error( +async fn handle_connecting_task( endpoint: Endpoint, connecting: Connecting, sender: Sender<(Pubkey, SocketAddr, Bytes)>, @@ -260,6 +280,7 @@ async fn handle_connecting_error( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { if let Err(err) = handle_connecting( endpoint, @@ -269,10 +290,12 @@ async fn handle_connecting_error( prune_cache_pending, router, cache, + stats.clone(), ) .await { - error!("handle_connecting: {err:?}"); + debug!("handle_connecting: {err:?}"); + record_error(&err, &stats); } } @@ -284,6 +307,7 @@ async fn handle_connecting( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) -> Result<(), Error> { let connection = connecting.await?; let remote_address = connection.remote_address(); @@ -304,6 +328,7 @@ async fn handle_connecting( prune_cache_pending, router, cache, + stats, ) .await; Ok(()) @@ -321,6 +346,7 @@ async fn handle_connection( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { cache_connection( remote_pubkey, @@ -338,15 +364,18 @@ async fn handle_connection( remote_pubkey, connection.clone(), sender, + stats.clone(), )); match futures::future::try_join(send_datagram_task, read_datagram_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), Ok(out) => { if let (Err(ref err), _) = out { - error!("send_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); + debug!("send_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); } if let (_, Err(ref err)) = out { - error!("read_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); + debug!("read_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); } } } @@ -364,6 +393,7 @@ async fn read_datagram_task( remote_pubkey: Pubkey, connection: Connection, sender: Sender<(Pubkey, SocketAddr, Bytes)>, + stats: Arc, ) -> Result<(), Error> { // Assert that send won't block. debug_assert_eq!(sender.capacity(), None); @@ -379,7 +409,8 @@ async fn read_datagram_task( if let Some(err) = connection.close_reason() { return Err(Error::from(err)); } - error!("connection.read_datagram: {remote_pubkey}, {remote_address}, {err:?}"); + debug!("connection.read_datagram: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(&Error::from(err), &stats); } }; } @@ -404,6 +435,7 @@ async fn make_connection_task( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) { if let Err(err) = make_connection( endpoint, @@ -414,10 +446,12 @@ async fn make_connection_task( prune_cache_pending, router, cache, + stats.clone(), ) .await { - error!("make_connection: {remote_address}, {err:?}"); + debug!("make_connection: {remote_address}, {err:?}"); + record_error(&err, &stats); } } @@ -430,6 +464,7 @@ async fn make_connection( prune_cache_pending: Arc, router: Arc>>>, cache: Arc>>, + stats: Arc, ) -> Result<(), Error> { let connection = endpoint .connect(remote_address, CONNECT_SERVER_NAME)? @@ -445,6 +480,7 @@ async fn make_connection( prune_cache_pending, router, cache, + stats, ) .await; Ok(()) @@ -560,6 +596,175 @@ impl From> for Error { } } +#[derive(Default)] +struct TurbineQuicStats { + connect_error_invalid_remote_address: AtomicU64, + connect_error_other: AtomicU64, + connect_error_too_many_connections: AtomicU64, + connection_error_application_closed: AtomicU64, + connection_error_connection_closed: AtomicU64, + connection_error_locally_closed: AtomicU64, + connection_error_reset: AtomicU64, + connection_error_timed_out: AtomicU64, + connection_error_transport_error: AtomicU64, + connection_error_version_mismatch: AtomicU64, + invalid_identity: AtomicU64, + router_try_send_error_full: AtomicU64, + send_datagram_error_connection_lost: AtomicU64, + send_datagram_error_too_large: AtomicU64, + send_datagram_error_unsupported_by_peer: AtomicU64, +} + +async fn report_metrics_task(name: &'static str, stats: Arc) { + loop { + tokio::time::sleep(Duration::from_secs(2)).await; + report_metrics(name, &stats); + } +} + +fn record_error(err: &Error, stats: &TurbineQuicStats) { + match err { + Error::CertificateError(_) => (), + Error::ChannelSendError => (), + Error::ConnectError(ConnectError::EndpointStopping) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::TooManyConnections) => { + add_metric!(stats.connect_error_too_many_connections) + } + Error::ConnectError(ConnectError::InvalidDnsName(_)) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => { + add_metric!(stats.connect_error_invalid_remote_address) + } + Error::ConnectError(ConnectError::NoDefaultClientConfig) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectError(ConnectError::UnsupportedVersion) => { + add_metric!(stats.connect_error_other) + } + Error::ConnectionError(ConnectionError::VersionMismatch) => { + add_metric!(stats.connection_error_version_mismatch) + } + Error::ConnectionError(ConnectionError::TransportError(_)) => { + add_metric!(stats.connection_error_transport_error) + } + Error::ConnectionError(ConnectionError::ConnectionClosed(_)) => { + add_metric!(stats.connection_error_connection_closed) + } + Error::ConnectionError(ConnectionError::ApplicationClosed(_)) => { + add_metric!(stats.connection_error_application_closed) + } + Error::ConnectionError(ConnectionError::Reset) => add_metric!(stats.connection_error_reset), + Error::ConnectionError(ConnectionError::TimedOut) => { + add_metric!(stats.connection_error_timed_out) + } + Error::ConnectionError(ConnectionError::LocallyClosed) => { + add_metric!(stats.connection_error_locally_closed) + } + Error::InvalidIdentity(_) => add_metric!(stats.invalid_identity), + Error::IoError(_) => (), + Error::SendDatagramError(SendDatagramError::UnsupportedByPeer) => { + add_metric!(stats.send_datagram_error_unsupported_by_peer) + } + Error::SendDatagramError(SendDatagramError::Disabled) => (), + Error::SendDatagramError(SendDatagramError::TooLarge) => { + add_metric!(stats.send_datagram_error_too_large) + } + Error::SendDatagramError(SendDatagramError::ConnectionLost(_)) => { + add_metric!(stats.send_datagram_error_connection_lost) + } + Error::TlsError(_) => (), + } +} + +fn report_metrics(name: &'static str, stats: &TurbineQuicStats) { + macro_rules! reset_metric { + ($metric: expr) => { + $metric.swap(0, Ordering::Relaxed) + }; + } + datapoint_info!( + name, + ( + "connect_error_invalid_remote_address", + reset_metric!(stats.connect_error_invalid_remote_address), + i64 + ), + ( + "connect_error_other", + reset_metric!(stats.connect_error_other), + i64 + ), + ( + "connect_error_too_many_connections", + reset_metric!(stats.connect_error_too_many_connections), + i64 + ), + ( + "connection_error_application_closed", + reset_metric!(stats.connection_error_application_closed), + i64 + ), + ( + "connection_error_connection_closed", + reset_metric!(stats.connection_error_connection_closed), + i64 + ), + ( + "connection_error_locally_closed", + reset_metric!(stats.connection_error_locally_closed), + i64 + ), + ( + "connection_error_reset", + reset_metric!(stats.connection_error_reset), + i64 + ), + ( + "connection_error_timed_out", + reset_metric!(stats.connection_error_timed_out), + i64 + ), + ( + "connection_error_transport_error", + reset_metric!(stats.connection_error_transport_error), + i64 + ), + ( + "connection_error_version_mismatch", + reset_metric!(stats.connection_error_version_mismatch), + i64 + ), + ( + "invalid_identity", + reset_metric!(stats.invalid_identity), + i64 + ), + ( + "router_try_send_error_full", + reset_metric!(stats.router_try_send_error_full), + i64 + ), + ( + "send_datagram_error_connection_lost", + reset_metric!(stats.send_datagram_error_connection_lost), + i64 + ), + ( + "send_datagram_error_too_large", + reset_metric!(stats.send_datagram_error_too_large), + i64 + ), + ( + "send_datagram_error_unsupported_by_peer", + reset_metric!(stats.send_datagram_error_unsupported_by_peer), + i64 + ), + ); +} + #[cfg(test)] mod tests { use {