removes redundant ClusterInfo::drain_push_queue (#33753)

This commit is contained in:
behzad nouri 2023-10-18 18:53:58 +00:00 committed by GitHub
parent 2465abce5c
commit afd044e296
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 33 deletions

View File

@ -889,10 +889,7 @@ impl ClusterInfo {
CrdsData::LowestSlot(0, LowestSlot::new(self_pubkey, min, now)), CrdsData::LowestSlot(0, LowestSlot::new(self_pubkey, min, now)),
&self.keypair(), &self.keypair(),
); );
self.local_message_pending_push_queue self.push_message(entry);
.lock()
.unwrap()
.push(entry);
} }
} }
@ -973,7 +970,7 @@ impl ClusterInfo {
TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter) TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter)
} }
pub fn push_message(&self, message: CrdsValue) { fn push_message(&self, message: CrdsValue) {
self.local_message_pending_push_queue self.local_message_pending_push_queue
.lock() .lock()
.unwrap() .unwrap()
@ -1515,25 +1512,23 @@ impl ClusterInfo {
(pings, pulls.collect()) (pings, pulls.collect())
} }
fn drain_push_queue(&self) -> Vec<CrdsValue> {
let mut push_queue = self.local_message_pending_push_queue.lock().unwrap();
std::mem::take(&mut *push_queue)
}
// Used in tests
pub fn flush_push_queue(&self) { pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue(); let entries: Vec<CrdsValue> =
let mut gossip_crds = self.gossip.crds.write().unwrap(); std::mem::take(&mut *self.local_message_pending_push_queue.lock().unwrap());
let now = timestamp(); if !entries.is_empty() {
for entry in pending_push_messages { let mut gossip_crds = self.gossip.crds.write().unwrap();
let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage); let now = timestamp();
for entry in entries {
let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage);
}
} }
} }
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> { fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id(); let self_id = self.id();
let (mut push_messages, num_entries, num_nodes) = { let (mut push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests); let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.gossip self.flush_push_queue();
.new_push_messages(&self_id, self.drain_push_queue(), timestamp(), stakes) self.gossip.new_push_messages(&self_id, timestamp(), stakes)
}; };
self.stats self.stats
.push_fanout_num_entries .push_fanout_num_entries
@ -3596,12 +3591,11 @@ mod tests {
&SocketAddrSpace::Unspecified, &SocketAddrSpace::Unspecified,
); );
//check that all types of gossip messages are signed correctly //check that all types of gossip messages are signed correctly
let (push_messages, _, _) = cluster_info.gossip.new_push_messages( cluster_info.flush_push_queue();
&cluster_info.id(), let (push_messages, _, _) =
cluster_info.drain_push_queue(), cluster_info
timestamp(), .gossip
&stakes, .new_push_messages(&cluster_info.id(), timestamp(), &stakes);
);
// there should be some pushes ready // there should be some pushes ready
assert!(!push_messages.is_empty()); assert!(!push_messages.is_empty());
push_messages push_messages

View File

@ -72,7 +72,6 @@ impl CrdsGossip {
pub fn new_push_messages( pub fn new_push_messages(
&self, &self,
pubkey: &Pubkey, // This node. pubkey: &Pubkey, // This node.
pending_push_messages: Vec<CrdsValue>,
now: u64, now: u64,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> ( ) -> (
@ -80,12 +79,6 @@ impl CrdsGossip {
usize, // number of values usize, // number of values
usize, // number of push messages usize, // number of push messages
) { ) {
{
let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {
let _ = crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
self.push.new_push_messages(pubkey, &self.crds, now, stakes) self.push.new_push_messages(pubkey, &self.crds, now, stakes)
} }

View File

@ -351,9 +351,7 @@ fn network_run_push(
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
( (
node_pubkey, node_pubkey,
node.gossip node.gossip.new_push_messages(&node_pubkey, now, &stakes).0,
.new_push_messages(&node_pubkey, vec![], now, &stakes)
.0,
) )
}) })
.collect(); .collect();