Add connection error metrics (#31049)

This commit is contained in:
sakridge 2023-04-05 16:40:31 +02:00 committed by GitHub
parent 684901879d
commit e575650d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 161 additions and 76 deletions

View File

@ -461,90 +461,96 @@ async fn setup_connection(
wait_for_chunk_timeout: Duration, wait_for_chunk_timeout: Duration,
) { ) {
const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2; const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
let from = connecting.remote_address();
if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await { if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await {
if let Ok(new_connection) = connecting_result { match connecting_result {
stats.total_new_connections.fetch_add(1, Ordering::Relaxed); Ok(new_connection) => {
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
let params = get_connection_stake(&new_connection, &staked_nodes).map_or( let params = get_connection_stake(&new_connection, &staked_nodes).map_or(
NewConnectionHandlerParams::new_unstaked( NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(), packet_sender.clone(),
max_connections_per_peer, max_connections_per_peer,
stats.clone(), stats.clone(),
), ),
|(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams { |(pubkey, stake, total_stake, max_stake, min_stake)| {
packet_sender, NewConnectionHandlerParams {
remote_pubkey: Some(pubkey), packet_sender,
stake, remote_pubkey: Some(pubkey),
total_stake, stake,
max_connections_per_peer, total_stake,
stats: stats.clone(), max_connections_per_peer,
max_stake, stats: stats.clone(),
min_stake, max_stake,
}, min_stake,
); }
},
);
if params.stake > 0 { if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap(); let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections { if connection_table_l.total_size >= max_staked_connections {
let num_pruned = let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}
if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} }
} else {
// If we couldn't prune a connection in the staked connection table, let's if connection_table_l.total_size < max_staked_connections {
// put this connection in the unstaked connection table. If needed, prune a if let Ok(()) = handle_and_cache_new_connection(
// connection from the unstaked connection table. new_connection,
if let Ok(()) = prune_unstaked_connections_and_add_new_connection( connection_table_l,
new_connection, staked_connection_table.clone(),
unstaked_connection_table.lock().unwrap(), &params,
unstaked_connection_table.clone(), wait_for_chunk_timeout,
max_unstaked_connections, ) {
&params, stats
wait_for_chunk_timeout, .connection_added_from_staked_peer
) { .fetch_add(1, Ordering::Relaxed);
stats }
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} else { } else {
stats // If we couldn't prune a connection in the staked connection table, let's
.connection_add_failed_on_pruning // put this connection in the unstaked connection table. If needed, prune a
.fetch_add(1, Ordering::Relaxed); // connection from the unstaked connection table.
stats if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
.connection_add_failed_staked_node new_connection,
.fetch_add(1, Ordering::Relaxed); unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.fetch_add(1, Ordering::Relaxed);
}
} }
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
} }
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
} }
} else { Err(e) => {
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); handle_connection_error(e, &stats, from);
}
} }
} else { } else {
stats stats
@ -553,6 +559,44 @@ async fn setup_connection(
} }
} }
fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) {
debug!("error: {:?} from: {:?}", e, from);
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
match e {
quinn::ConnectionError::TimedOut => {
stats
.connection_setup_error_timed_out
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::ConnectionClosed(_) => {
stats
.connection_setup_error_closed
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::TransportError(_) => {
stats
.connection_setup_error_transport
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::ApplicationClosed(_) => {
stats
.connection_setup_error_app_closed
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::Reset => {
stats
.connection_setup_error_reset
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::LocallyClosed => {
stats
.connection_setup_error_locally_closed
.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}
async fn packet_batch_sender( async fn packet_batch_sender(
packet_sender: Sender<PacketBatch>, packet_sender: Sender<PacketBatch>,
packet_receiver: AsyncReceiver<PacketAccumulator>, packet_receiver: AsyncReceiver<PacketAccumulator>,

View File

@ -148,6 +148,12 @@ pub struct StreamStats {
pub(crate) connection_add_failed_on_pruning: AtomicUsize, pub(crate) connection_add_failed_on_pruning: AtomicUsize,
pub(crate) connection_setup_timeout: AtomicUsize, pub(crate) connection_setup_timeout: AtomicUsize,
pub(crate) connection_setup_error: AtomicUsize, pub(crate) connection_setup_error: AtomicUsize,
pub(crate) connection_setup_error_closed: AtomicUsize,
pub(crate) connection_setup_error_timed_out: AtomicUsize,
pub(crate) connection_setup_error_transport: AtomicUsize,
pub(crate) connection_setup_error_app_closed: AtomicUsize,
pub(crate) connection_setup_error_reset: AtomicUsize,
pub(crate) connection_setup_error_locally_closed: AtomicUsize,
pub(crate) connection_removed: AtomicUsize, pub(crate) connection_removed: AtomicUsize,
pub(crate) connection_remove_failed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize,
} }
@ -242,6 +248,41 @@ impl StreamStats {
self.connection_setup_error.swap(0, Ordering::Relaxed), self.connection_setup_error.swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"connection_setup_error_timed_out",
self.connection_setup_error_timed_out
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_closed",
self.connection_setup_error_closed
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_transport",
self.connection_setup_error_transport
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_app_closed",
self.connection_setup_error_app_closed
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_reset",
self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_locally_closed",
self.connection_setup_error_locally_closed
.swap(0, Ordering::Relaxed),
i64
),
( (
"invalid_chunk", "invalid_chunk",
self.total_invalid_chunks.swap(0, Ordering::Relaxed), self.total_invalid_chunks.swap(0, Ordering::Relaxed),