diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e7f405f06..67f713676 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -889,10 +889,7 @@ impl ClusterInfo { CrdsData::LowestSlot(0, LowestSlot::new(self_pubkey, min, now)), &self.keypair(), ); - self.local_message_pending_push_queue - .lock() - .unwrap() - .push(entry); + self.push_message(entry); } } @@ -973,7 +970,7 @@ impl ClusterInfo { TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter) } - pub fn push_message(&self, message: CrdsValue) { + fn push_message(&self, message: CrdsValue) { self.local_message_pending_push_queue .lock() .unwrap() @@ -1515,25 +1512,23 @@ impl ClusterInfo { (pings, pulls.collect()) } - fn drain_push_queue(&self) -> Vec { - let mut push_queue = self.local_message_pending_push_queue.lock().unwrap(); - std::mem::take(&mut *push_queue) - } - // Used in tests pub fn flush_push_queue(&self) { - let pending_push_messages = self.drain_push_queue(); - let mut gossip_crds = self.gossip.crds.write().unwrap(); - let now = timestamp(); - for entry in pending_push_messages { - let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage); + let entries: Vec = + std::mem::take(&mut *self.local_message_pending_push_queue.lock().unwrap()); + if !entries.is_empty() { + let mut gossip_crds = self.gossip.crds.write().unwrap(); + let now = timestamp(); + for entry in entries { + let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage); + } } } fn new_push_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); let (mut push_messages, num_entries, num_nodes) = { let _st = ScopedTimer::from(&self.stats.new_push_requests); - self.gossip - .new_push_messages(&self_id, self.drain_push_queue(), timestamp(), stakes) + self.flush_push_queue(); + self.gossip.new_push_messages(&self_id, timestamp(), stakes) }; self.stats .push_fanout_num_entries @@ -3596,12 +3591,11 @@ mod tests { &SocketAddrSpace::Unspecified, ); //check that all types of gossip messages are signed correctly - let (push_messages, _, _) = cluster_info.gossip.new_push_messages( - &cluster_info.id(), - cluster_info.drain_push_queue(), - timestamp(), - &stakes, - ); + cluster_info.flush_push_queue(); + let (push_messages, _, _) = + cluster_info + .gossip + .new_push_messages(&cluster_info.id(), timestamp(), &stakes); // there should be some pushes ready assert!(!push_messages.is_empty()); push_messages diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 015deed1d..977db716e 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -72,7 +72,6 @@ impl CrdsGossip { pub fn new_push_messages( &self, pubkey: &Pubkey, // This node. - pending_push_messages: Vec, now: u64, stakes: &HashMap, ) -> ( @@ -80,12 +79,6 @@ impl CrdsGossip { usize, // number of values usize, // number of push messages ) { - { - let mut crds = self.crds.write().unwrap(); - for entry in pending_push_messages { - let _ = crds.insert(entry, now, GossipRoute::LocalMessage); - } - } self.push.new_push_messages(pubkey, &self.crds, now, stakes) } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 74415ec3c..ff9e36ba2 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -351,9 +351,7 @@ fn network_run_push( node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); ( node_pubkey, - node.gossip - .new_push_messages(&node_pubkey, vec![], now, &stakes) - .0, + node.gossip.new_push_messages(&node_pubkey, now, &stakes).0, ) }) .collect();