From abfaf06e874281e3823edced433b69c8b9058f34 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 27 Sep 2022 13:43:35 +0000 Subject: [PATCH] counts gossip packets received before excess packets are dropped (#28086) Currently, gossip packets are counted after excess packets are dropped. This makes it difficult to debug gossip traffic spikes if the majority of the packets are dropped. This commit instead counts gossip packets received before excess packets are dropped --- gossip/src/cluster_info.rs | 59 ++++++++++++++++++++++-------- gossip/src/cluster_info_metrics.rs | 18 +++++++++ 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 68ba35f660..4118df7a52 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -279,6 +279,7 @@ pub(crate) enum Protocol { PruneMessage(Pubkey, PruneData), PingMessage(Ping), PongMessage(Pong), + // Update count_packets_received if new variants are added here. } impl Protocol { @@ -2410,18 +2411,6 @@ impl ClusterInfo { Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), } } - self.stats - .packets_received_pull_requests_count - .add_relaxed(pull_requests.len() as u64); - self.stats - .packets_received_pull_responses_count - .add_relaxed(pull_responses.len() as u64); - self.stats - .packets_received_push_messages_count - .add_relaxed(push_messages.len() as u64); - self.stats - .packets_received_prune_messages_count - .add_relaxed(prune_messages.len() as u64); if self.require_stake_for_gossip(stakes) { for (_, data) in &mut pull_responses { retain_staked(data, stakes); @@ -2467,9 +2456,26 @@ impl ClusterInfo { thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.into(); + fn count_packets_received(packets: &PacketBatch, counts: &mut [u64; 7]) { + for packet in packets { + let k = match packet + .data(..4) + .and_then(|data| <[u8; 4]>::try_from(data).ok()) + .map(u32::from_le_bytes) + { + Some(k @ 0..=6) => k as usize, + None | Some(_) => 6, + }; + counts[k] += 1; + } + } + let packets = receiver.recv_timeout(RECV_TIMEOUT)?; + let mut counts = [0u64; 7]; + count_packets_received(&packets, &mut counts); + let packets = Vec::from(packets); let mut packets = VecDeque::from(packets); for packet_batch in receiver.try_iter() { + count_packets_received(&packet_batch, &mut counts); packets.extend(packet_batch.iter().cloned()); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { @@ -2479,9 +2485,6 @@ impl ClusterInfo { .add_relaxed(excess_count as u64); } } - self.stats - .packets_received_count - .add_relaxed(packets.len() as u64); let verify_packet = |packet: Packet| { let protocol: Protocol = packet.deserialize_slice(..).ok()?; protocol.sanitize().ok()?; @@ -2492,6 +2495,30 @@ impl ClusterInfo { let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); thread_pool.install(|| packets.into_par_iter().filter_map(verify_packet).collect()) }; + self.stats + .packets_received_count + .add_relaxed(counts.iter().sum::()); + self.stats + .packets_received_pull_requests_count + .add_relaxed(counts[0]); + self.stats + .packets_received_pull_responses_count + .add_relaxed(counts[1]); + self.stats + .packets_received_push_messages_count + .add_relaxed(counts[2]); + self.stats + .packets_received_prune_messages_count + .add_relaxed(counts[3]); + self.stats + .packets_received_ping_messages_count + .add_relaxed(counts[4]); + self.stats + .packets_received_pong_messages_count + .add_relaxed(counts[5]); + self.stats + .packets_received_unknown_count + .add_relaxed(counts[6]); self.stats .packets_received_verified_count .add_relaxed(packets.len() as u64); diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 81e63a0163..6d5a3586ce 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -131,10 +131,13 @@ pub struct GossipStats { pub(crate) new_push_requests: Counter, pub(crate) new_push_requests_num: Counter, pub(crate) packets_received_count: Counter, + pub(crate) packets_received_ping_messages_count: Counter, + pub(crate) packets_received_pong_messages_count: Counter, pub(crate) packets_received_prune_messages_count: Counter, pub(crate) packets_received_pull_requests_count: Counter, pub(crate) packets_received_pull_responses_count: Counter, pub(crate) packets_received_push_messages_count: Counter, + pub(crate) packets_received_unknown_count: Counter, pub(crate) packets_received_verified_count: Counter, pub(crate) packets_sent_gossip_requests_count: Counter, pub(crate) packets_sent_prune_messages_count: Counter, @@ -490,6 +493,16 @@ pub(crate) fn submit_gossip_stats( stats.packets_received_count.clear(), i64 ), + ( + "packets_received_ping_messages_count", + stats.packets_received_ping_messages_count.clear(), + i64 + ), + ( + "packets_received_pong_messages_count", + stats.packets_received_pong_messages_count.clear(), + i64 + ), ( "packets_received_prune_messages_count", stats.packets_received_prune_messages_count.clear(), @@ -510,6 +523,11 @@ pub(crate) fn submit_gossip_stats( stats.packets_received_push_messages_count.clear(), i64 ), + ( + "packets_received_unknown_count", + stats.packets_received_unknown_count.clear(), + i64 + ), ( "packets_received_verified_count", stats.packets_received_verified_count.clear(),