drops older gossip packets when load shedding (#13364)

Gossip drops incoming packets when overloaded:
https://github.com/solana-labs/solana/blob/f6a73098a/core/src/cluster_info.rs#L2462-L2475
However newer packets are dropped in favor of the older ones.
This is probably not ideal as newer packets are more likely to contain
more recent data, so dropping them will keep the validator state
lagging.
This commit is contained in:
behzad nouri 2020-11-05 17:14:28 +00:00 committed by GitHub
parent 8f0796436a
commit 7f4debdad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 29 deletions

View File

@ -66,7 +66,7 @@ use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
borrow::Cow,
cmp::min,
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt,
iter::FromIterator,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
@ -227,6 +227,7 @@ impl Drop for ScopedTimer<'_> {
struct GossipStats {
entrypoint: Counter,
entrypoint2: Counter,
gossip_packets_dropped_count: Counter,
push_vote_read: Counter,
vote_process_push: Counter,
get_votes: Counter,
@ -2445,7 +2446,7 @@ impl ClusterInfo {
fn process_packets(
&self,
requests: Vec<Packets>,
packets: VecDeque<Packet>,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler,
response_sender: &PacketSender,
@ -2455,9 +2456,8 @@ impl ClusterInfo {
) {
let mut timer = Measure::start("process_gossip_packets_time");
let packets: Vec<_> = thread_pool.install(|| {
requests
packets
.into_par_iter()
.flat_map(|request| request.packets.into_par_iter())
.filter_map(|packet| {
let protocol: Protocol =
limited_deserialize(&packet.data[..packet.meta.size]).ok()?;
@ -2520,24 +2520,19 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
last_print: &mut Instant,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let mut requests = vec![requests_receiver.recv_timeout(timeout)?];
let mut num_requests = requests.last().unwrap().packets.len();
while let Ok(more_reqs) = requests_receiver.try_recv() {
if num_requests >= MAX_GOSSIP_TRAFFIC {
continue;
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into();
let mut packets = VecDeque::from(packets);
while let Ok(packet) = requests_receiver.try_recv() {
packets.extend(packet.packets.into_iter());
let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC);
if excess_count > 0 {
packets.drain(0..excess_count);
self.stats
.gossip_packets_dropped_count
.add_relaxed(excess_count as u64);
}
num_requests += more_reqs.packets.len();
requests.push(more_reqs)
}
if num_requests >= MAX_GOSSIP_TRAFFIC {
warn!(
"Too much gossip traffic, ignoring some messages (requests={}, max requests={})",
num_requests, MAX_GOSSIP_TRAFFIC
);
}
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
// Using root_bank instead of working_bank here so that an enbaled
// feature does not roll back (if the feature happens to get enabled in
@ -2552,7 +2547,7 @@ impl ClusterInfo {
.clone()
});
self.process_packets(
requests,
packets,
thread_pool,
recycler,
response_sender,
@ -2605,6 +2600,11 @@ impl ClusterInfo {
);
datapoint_info!(
"cluster_info_stats2",
(
"gossip_packets_dropped_count",
self.stats.gossip_packets_dropped_count.clear(),
i64
),
("retransmit_peers", self.stats.retransmit_peers.clear(), i64),
("repair_peers", self.stats.repair_peers.clear(), i64),
(

View File

@ -89,6 +89,18 @@ impl<T: Clone + Default + Sized> Default for PinnedVec<T> {
}
}
impl<T: Clone + Default + Sized> Into<Vec<T>> for PinnedVec<T> {
fn into(mut self) -> Vec<T> {
if self.pinned {
unpin(self.x.as_mut_ptr());
self.pinned = false;
}
self.pinnable = false;
self.recycler = None;
std::mem::take(&mut self.x)
}
}
pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>);
pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>);
@ -109,6 +121,15 @@ impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> {
}
}
impl<T: Clone + Default + Sized> IntoIterator for PinnedVec<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
<Self as Into<Vec<T>>>::into(self).into_iter()
}
}
impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec<T> {
type Item = &'a T;
type IntoIter = PinnedIter<'a, T>;
@ -169,14 +190,8 @@ impl<T: Clone + Default + Send + Sized> IntoParallelIterator for PinnedVec<T> {
type Item = T;
type Iter = rayon::vec::IntoIter<T>;
fn into_par_iter(mut self) -> Self::Iter {
if self.pinned {
unpin(self.x.as_mut_ptr());
self.pinned = false;
}
self.pinnable = false;
self.recycler = None;
std::mem::take(&mut self.x).into_par_iter()
fn into_par_iter(self) -> Self::Iter {
<Self as Into<Vec<T>>>::into(self).into_par_iter()
}
}