From 7f4debdad5f6ac4fc465fc2890982b38479c6c9c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 5 Nov 2020 17:14:28 +0000 Subject: [PATCH] drops older gossip packets when load shedding (#13364) Gossip drops incoming packets when overloaded: https://github.com/solana-labs/solana/blob/f6a73098a/core/src/cluster_info.rs#L2462-L2475 However newer packets are dropped in favor of the older ones. This is probably not ideal as newer packets are more likely to contain more recent data, so dropping them will keep the validator state lagging. --- core/src/cluster_info.rs | 42 ++++++++++++++++++++-------------------- perf/src/cuda_runtime.rs | 31 +++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 990528b6d..dea195d75 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -66,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, cmp::min, - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt, iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, @@ -227,6 +227,7 @@ impl Drop for ScopedTimer<'_> { struct GossipStats { entrypoint: Counter, entrypoint2: Counter, + gossip_packets_dropped_count: Counter, push_vote_read: Counter, vote_process_push: Counter, get_votes: Counter, @@ -2445,7 +2446,7 @@ impl ClusterInfo { fn process_packets( &self, - requests: Vec, + packets: VecDeque, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2455,9 +2456,8 @@ impl ClusterInfo { ) { let mut timer = Measure::start("process_gossip_packets_time"); let packets: Vec<_> = thread_pool.install(|| { - requests + packets .into_par_iter() - .flat_map(|request| request.packets.into_par_iter()) .filter_map(|packet| { let protocol: Protocol = limited_deserialize(&packet.data[..packet.meta.size]).ok()?; @@ -2520,24 +2520,19 @@ impl ClusterInfo { thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { - let timeout = Duration::new(1, 0); - let mut requests = vec![requests_receiver.recv_timeout(timeout)?]; - let mut num_requests = requests.last().unwrap().packets.len(); - while let Ok(more_reqs) = requests_receiver.try_recv() { - if num_requests >= MAX_GOSSIP_TRAFFIC { - continue; + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + while let Ok(packet) = requests_receiver.try_recv() { + packets.extend(packet.packets.into_iter()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); } - num_requests += more_reqs.packets.len(); - requests.push(more_reqs) } - - if num_requests >= MAX_GOSSIP_TRAFFIC { - warn!( - "Too much gossip traffic, ignoring some messages (requests={}, max requests={})", - num_requests, MAX_GOSSIP_TRAFFIC - ); - } - let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); // Using root_bank instead of working_bank here so that an enbaled // feature does not roll back (if the feature happens to get enabled in @@ -2552,7 +2547,7 @@ impl ClusterInfo { .clone() }); self.process_packets( - requests, + packets, thread_pool, recycler, response_sender, @@ -2605,6 +2600,11 @@ impl ClusterInfo { ); datapoint_info!( "cluster_info_stats2", + ( + "gossip_packets_dropped_count", + self.stats.gossip_packets_dropped_count.clear(), + i64 + ), ("retransmit_peers", self.stats.retransmit_peers.clear(), i64), ("repair_peers", self.stats.repair_peers.clear(), i64), ( diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index c9ba77535..f59f96217 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -89,6 +89,18 @@ impl Default for PinnedVec { } } +impl Into> for PinnedVec { + fn into(mut self) -> Vec { + if self.pinned { + unpin(self.x.as_mut_ptr()); + self.pinned = false; + } + self.pinnable = false; + self.recycler = None; + std::mem::take(&mut self.x) + } +} + pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); @@ -109,6 +121,15 @@ impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> { } } +impl IntoIterator for PinnedVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + >>::into(self).into_iter() + } +} + impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec { type Item = &'a T; type IntoIter = PinnedIter<'a, T>; @@ -169,14 +190,8 @@ impl IntoParallelIterator for PinnedVec { type Item = T; type Iter = rayon::vec::IntoIter; - fn into_par_iter(mut self) -> Self::Iter { - if self.pinned { - unpin(self.x.as_mut_ptr()); - self.pinned = false; - } - self.pinnable = false; - self.recycler = None; - std::mem::take(&mut self.x).into_par_iter() + fn into_par_iter(self) -> Self::Iter { + >>::into(self).into_par_iter() } }