From e575650d478569ccb0d5cb6e3d0f115b905589a6 Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 5 Apr 2023 16:40:31 +0200 Subject: [PATCH] Add connection error metrics (#31049) --- streamer/src/nonblocking/quic.rs | 196 +++++++++++++++++++------------ streamer/src/quic.rs | 41 +++++++ 2 files changed, 161 insertions(+), 76 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 52d2b1c360..892ff7205a 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -461,90 +461,96 @@ async fn setup_connection( wait_for_chunk_timeout: Duration, ) { const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2; + let from = connecting.remote_address(); if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await { - if let Ok(new_connection) = connecting_result { - stats.total_new_connections.fetch_add(1, Ordering::Relaxed); + match connecting_result { + Ok(new_connection) => { + stats.total_new_connections.fetch_add(1, Ordering::Relaxed); - let params = get_connection_stake(&new_connection, &staked_nodes).map_or( - NewConnectionHandlerParams::new_unstaked( - packet_sender.clone(), - max_connections_per_peer, - stats.clone(), - ), - |(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams { - packet_sender, - remote_pubkey: Some(pubkey), - stake, - total_stake, - max_connections_per_peer, - stats: stats.clone(), - max_stake, - min_stake, - }, - ); + let params = get_connection_stake(&new_connection, &staked_nodes).map_or( + NewConnectionHandlerParams::new_unstaked( + packet_sender.clone(), + max_connections_per_peer, + stats.clone(), + ), + |(pubkey, stake, total_stake, max_stake, min_stake)| { + NewConnectionHandlerParams { + packet_sender, + remote_pubkey: Some(pubkey), + stake, + total_stake, + max_connections_per_peer, + stats: stats.clone(), + max_stake, + min_stake, + } + }, + ); - if params.stake > 0 { - let mut connection_table_l = staked_connection_table.lock().unwrap(); - if connection_table_l.total_size >= max_staked_connections { - let num_pruned = - connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - } - - if connection_table_l.total_size < max_staked_connections { - if let Ok(()) = handle_and_cache_new_connection( - new_connection, - connection_table_l, - staked_connection_table.clone(), - ¶ms, - wait_for_chunk_timeout, - ) { - stats - .connection_added_from_staked_peer - .fetch_add(1, Ordering::Relaxed); + if params.stake > 0 { + let mut connection_table_l = staked_connection_table.lock().unwrap(); + if connection_table_l.total_size >= max_staked_connections { + let num_pruned = + connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); } - } else { - // If we couldn't prune a connection in the staked connection table, let's - // put this connection in the unstaked connection table. If needed, prune a - // connection from the unstaked connection table. - if let Ok(()) = prune_unstaked_connections_and_add_new_connection( - new_connection, - unstaked_connection_table.lock().unwrap(), - unstaked_connection_table.clone(), - max_unstaked_connections, - ¶ms, - wait_for_chunk_timeout, - ) { - stats - .connection_added_from_staked_peer - .fetch_add(1, Ordering::Relaxed); + + if connection_table_l.total_size < max_staked_connections { + if let Ok(()) = handle_and_cache_new_connection( + new_connection, + connection_table_l, + staked_connection_table.clone(), + ¶ms, + wait_for_chunk_timeout, + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } } else { - stats - .connection_add_failed_on_pruning - .fetch_add(1, Ordering::Relaxed); - stats - .connection_add_failed_staked_node - .fetch_add(1, Ordering::Relaxed); + // If we couldn't prune a connection in the staked connection table, let's + // put this connection in the unstaked connection table. If needed, prune a + // connection from the unstaked connection table. + if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.lock().unwrap(), + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + wait_for_chunk_timeout, + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .connection_add_failed_on_pruning + .fetch_add(1, Ordering::Relaxed); + stats + .connection_add_failed_staked_node + .fetch_add(1, Ordering::Relaxed); + } } + } else if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.lock().unwrap(), + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + wait_for_chunk_timeout, + ) { + stats + .connection_added_from_unstaked_peer + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .connection_add_failed_unstaked_node + .fetch_add(1, Ordering::Relaxed); } - } else if let Ok(()) = prune_unstaked_connections_and_add_new_connection( - new_connection, - unstaked_connection_table.lock().unwrap(), - unstaked_connection_table.clone(), - max_unstaked_connections, - ¶ms, - wait_for_chunk_timeout, - ) { - stats - .connection_added_from_unstaked_peer - .fetch_add(1, Ordering::Relaxed); - } else { - stats - .connection_add_failed_unstaked_node - .fetch_add(1, Ordering::Relaxed); } - } else { - stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); + Err(e) => { + handle_connection_error(e, &stats, from); + } } } else { stats @@ -553,6 +559,44 @@ async fn setup_connection( } } +fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) { + debug!("error: {:?} from: {:?}", e, from); + stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); + match e { + quinn::ConnectionError::TimedOut => { + stats + .connection_setup_error_timed_out + .fetch_add(1, Ordering::Relaxed); + } + quinn::ConnectionError::ConnectionClosed(_) => { + stats + .connection_setup_error_closed + .fetch_add(1, Ordering::Relaxed); + } + quinn::ConnectionError::TransportError(_) => { + stats + .connection_setup_error_transport + .fetch_add(1, Ordering::Relaxed); + } + quinn::ConnectionError::ApplicationClosed(_) => { + stats + .connection_setup_error_app_closed + .fetch_add(1, Ordering::Relaxed); + } + quinn::ConnectionError::Reset => { + stats + .connection_setup_error_reset + .fetch_add(1, Ordering::Relaxed); + } + quinn::ConnectionError::LocallyClosed => { + stats + .connection_setup_error_locally_closed + .fetch_add(1, Ordering::Relaxed); + } + _ => {} + } +} + async fn packet_batch_sender( packet_sender: Sender, packet_receiver: AsyncReceiver, diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 7ee9cf490f..9bf15b3f6f 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -148,6 +148,12 @@ pub struct StreamStats { pub(crate) connection_add_failed_on_pruning: AtomicUsize, pub(crate) connection_setup_timeout: AtomicUsize, pub(crate) connection_setup_error: AtomicUsize, + pub(crate) connection_setup_error_closed: AtomicUsize, + pub(crate) connection_setup_error_timed_out: AtomicUsize, + pub(crate) connection_setup_error_transport: AtomicUsize, + pub(crate) connection_setup_error_app_closed: AtomicUsize, + pub(crate) connection_setup_error_reset: AtomicUsize, + pub(crate) connection_setup_error_locally_closed: AtomicUsize, pub(crate) connection_removed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize, } @@ -242,6 +248,41 @@ impl StreamStats { self.connection_setup_error.swap(0, Ordering::Relaxed), i64 ), + ( + "connection_setup_error_timed_out", + self.connection_setup_error_timed_out + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_setup_error_closed", + self.connection_setup_error_closed + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_setup_error_transport", + self.connection_setup_error_transport + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_setup_error_app_closed", + self.connection_setup_error_app_closed + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_setup_error_reset", + self.connection_setup_error_reset.swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_setup_error_locally_closed", + self.connection_setup_error_locally_closed + .swap(0, Ordering::Relaxed), + i64 + ), ( "invalid_chunk", self.total_invalid_chunks.swap(0, Ordering::Relaxed),