diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 990528b6d..dea195d75 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -66,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, cmp::min, - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt, iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, @@ -227,6 +227,7 @@ impl Drop for ScopedTimer<'_> { struct GossipStats { entrypoint: Counter, entrypoint2: Counter, + gossip_packets_dropped_count: Counter, push_vote_read: Counter, vote_process_push: Counter, get_votes: Counter, @@ -2445,7 +2446,7 @@ impl ClusterInfo { fn process_packets( &self, - requests: Vec, + packets: VecDeque, thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, @@ -2455,9 +2456,8 @@ impl ClusterInfo { ) { let mut timer = Measure::start("process_gossip_packets_time"); let packets: Vec<_> = thread_pool.install(|| { - requests + packets .into_par_iter() - .flat_map(|request| request.packets.into_par_iter()) .filter_map(|packet| { let protocol: Protocol = limited_deserialize(&packet.data[..packet.meta.size]).ok()?; @@ -2520,24 +2520,19 @@ impl ClusterInfo { thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { - let timeout = Duration::new(1, 0); - let mut requests = vec![requests_receiver.recv_timeout(timeout)?]; - let mut num_requests = requests.last().unwrap().packets.len(); - while let Ok(more_reqs) = requests_receiver.try_recv() { - if num_requests >= MAX_GOSSIP_TRAFFIC { - continue; + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let mut packets = VecDeque::from(packets); + while let Ok(packet) = requests_receiver.try_recv() { + packets.extend(packet.packets.into_iter()); + let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); + if excess_count > 0 { + packets.drain(0..excess_count); + self.stats + .gossip_packets_dropped_count + .add_relaxed(excess_count as u64); } - num_requests += more_reqs.packets.len(); - requests.push(more_reqs) } - - if num_requests >= MAX_GOSSIP_TRAFFIC { - warn!( - "Too much gossip traffic, ignoring some messages (requests={}, max requests={})", - num_requests, MAX_GOSSIP_TRAFFIC - ); - } - let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); // Using root_bank instead of working_bank here so that an enbaled // feature does not roll back (if the feature happens to get enabled in @@ -2552,7 +2547,7 @@ impl ClusterInfo { .clone() }); self.process_packets( - requests, + packets, thread_pool, recycler, response_sender, @@ -2605,6 +2600,11 @@ impl ClusterInfo { ); datapoint_info!( "cluster_info_stats2", + ( + "gossip_packets_dropped_count", + self.stats.gossip_packets_dropped_count.clear(), + i64 + ), ("retransmit_peers", self.stats.retransmit_peers.clear(), i64), ("repair_peers", self.stats.repair_peers.clear(), i64), ( diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index c9ba77535..f59f96217 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -89,6 +89,18 @@ impl Default for PinnedVec { } } +impl Into> for PinnedVec { + fn into(mut self) -> Vec { + if self.pinned { + unpin(self.x.as_mut_ptr()); + self.pinned = false; + } + self.pinnable = false; + self.recycler = None; + std::mem::take(&mut self.x) + } +} + pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); @@ -109,6 +121,15 @@ impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> { } } +impl IntoIterator for PinnedVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + >>::into(self).into_iter() + } +} + impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec { type Item = &'a T; type IntoIter = PinnedIter<'a, T>; @@ -169,14 +190,8 @@ impl IntoParallelIterator for PinnedVec { type Item = T; type Iter = rayon::vec::IntoIter; - fn into_par_iter(mut self) -> Self::Iter { - if self.pinned { - unpin(self.x.as_mut_ptr()); - self.pinned = false; - } - self.pinnable = false; - self.recycler = None; - std::mem::take(&mut self.x).into_par_iter() + fn into_par_iter(self) -> Self::Iter { + >>::into(self).into_par_iter() } }