diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 88b3d21360..243e960808 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -204,6 +204,7 @@ mod tests { { let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap(); cluster_info.push_message(message); + cluster_info.flush_push_queue(); } slot_to_hash.insert(0, hash2); trusted_validators.insert(validator1.pubkey()); @@ -254,6 +255,7 @@ mod tests { 100, ); } + cluster_info.flush_push_queue(); let cluster_hashes = cluster_info .get_accounts_hash_for_node(&keypair.pubkey(), |c| c.clone()) .unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 38d442e08b..a77cfff7b4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -257,6 +257,7 @@ pub struct ClusterInfo { id: Pubkey, stats: GossipStats, socket: UdpSocket, + local_message_pending_push_queue: RwLock>, } impl Default for ClusterInfo { @@ -414,6 +415,7 @@ impl ClusterInfo { id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + local_message_pending_push_queue: RwLock::new(vec![]), }; { let mut gossip = me.gossip.write().unwrap(); @@ -440,6 +442,12 @@ impl ClusterInfo { id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + local_message_pending_push_queue: RwLock::new( + self.local_message_pending_push_queue + .read() + .unwrap() + .clone(), + ), } } @@ -462,9 +470,14 @@ impl ClusterInfo { self.my_contact_info.write().unwrap().wallclock = now; let entry = CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); - let mut w_gossip = self.gossip.write().unwrap(); - w_gossip.refresh_push_active_set(stakes, gossip_validators); - w_gossip.process_push_message(&self.id(), vec![entry], now); + self.gossip + .write() + .unwrap() + .refresh_push_active_set(stakes, gossip_validators); + self.local_message_pending_push_queue + .write() + .unwrap() + .push((entry, now)); } // TODO kill insert_info, only used by tests @@ -633,10 +646,10 @@ impl ClusterInfo { CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)), &self.keypair, ); - self.gossip + self.local_message_pending_push_queue .write() .unwrap() - .process_push_message(&self.id(), vec![entry], now); + .push((entry, now)); } } @@ -689,8 +702,10 @@ impl ClusterInfo { let n = slots.fill(&update[num..], now); if n > 0 { let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); - self.time_gossip_write_lock("epcoh_slots_push", &self.stats.epoch_slots_push) - .process_push_message(&self.id(), vec![entry], now); + self.local_message_pending_push_queue + .write() + .unwrap() + .push((entry, now)); } num += n; if num < update.len() { @@ -718,9 +733,10 @@ impl ClusterInfo { pub fn push_message(&self, message: CrdsValue) { let now = message.wallclock(); - let id = message.pubkey(); - self.time_gossip_write_lock("process_push_message", &self.stats.push_message) - .process_push_message(&id, vec![message], now); + self.local_message_pending_push_queue + .write() + .unwrap() + .push((message, now)); } pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) { @@ -761,8 +777,10 @@ impl ClusterInfo { CrdsValue::compute_vote_index(tower_index, current_votes) }; let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair); - self.time_gossip_write_lock("push_vote_process_push", &self.stats.vote_process_push) - .process_push_message(&self.id(), vec![entry], now); + self.local_message_pending_push_queue + .write() + .unwrap() + .push((entry, now)); } pub fn send_vote(&self, vote: &Transaction) -> Result<()> { @@ -1425,11 +1443,21 @@ impl ClusterInfo { }) .collect() } + fn drain_push_queue(&self) -> Vec<(CrdsValue, u64)> { + let mut push_queue = self.local_message_pending_push_queue.write().unwrap(); + std::mem::take(&mut *push_queue) + } + #[cfg(test)] + pub fn flush_push_queue(&self) { + let pending_push_messages = self.drain_push_queue(); + let mut gossip = self.gossip.write().unwrap(); + gossip.process_push_messages(pending_push_messages); + } fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); let (_, push_messages) = self .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) - .new_push_messages(timestamp()); + .new_push_messages(self.drain_push_queue(), timestamp()); let messages: Vec<_> = push_messages .into_iter() .filter_map(|(peer, messages)| { @@ -2909,7 +2937,7 @@ mod tests { .gossip .write() .unwrap() - .new_push_messages(timestamp()); + .new_push_messages(cluster_info.drain_push_queue(), timestamp()); // there should be some pushes ready assert_eq!(push_messages.is_empty(), false); push_messages @@ -3092,6 +3120,7 @@ mod tests { let tx = test_tx(); let index = 1; cluster_info.push_vote(index, tx.clone()); + cluster_info.flush_push_queue(); // -1 to make sure that the clock is strictly lower then when insert occurred let (labels, votes, max_ts) = cluster_info.get_votes(now - 1); @@ -3121,6 +3150,7 @@ mod tests { assert!(slots.is_empty()); assert!(since.is_none()); cluster_info.push_epoch_slots(&[0]); + cluster_info.flush_push_queue(); let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX)); assert!(slots.is_empty()); @@ -3449,7 +3479,9 @@ mod tests { range.push(last + rand::thread_rng().gen_range(1, 32)); } cluster_info.push_epoch_slots(&range[..16000]); + cluster_info.flush_push_queue(); cluster_info.push_epoch_slots(&range[16000..]); + cluster_info.flush_push_queue(); let (slots, since) = cluster_info.get_epoch_slots_since(None); let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); assert_eq!(slots, range); diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 27daa9d3bf..0cd0d02636 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -181,6 +181,7 @@ mod test { let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info); ClusterSlotsService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); + cluster_info.flush_push_queue(); let lowest = cluster_info .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| { lowest_slot.clone() diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index c95a1393d3..9996dd50f8 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -88,7 +88,20 @@ impl CrdsGossip { prune_map } - pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap>) { + pub fn process_push_messages(&mut self, pending_push_messages: Vec<(CrdsValue, u64)>) { + for (push_message, timestamp) in pending_push_messages { + let _ = + self.push + .process_push_message(&mut self.crds, &self.id, push_message, timestamp); + } + } + + pub fn new_push_messages( + &mut self, + pending_push_messages: Vec<(CrdsValue, u64)>, + now: u64, + ) -> (Pubkey, HashMap>) { + self.process_push_messages(pending_push_messages); let push_messages = self.push.new_push_messages(&self.crds, now); (self.id, push_messages) } diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index dcdb732345..f3b7e03841 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -566,6 +566,7 @@ mod tests { // No account hashes for any trusted validators == "behind" cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]); + cluster_info.flush_push_queue(); assert_eq!(rm.health_check(), "behind"); override_health_check.store(true, Ordering::Relaxed); assert_eq!(rm.health_check(), "ok"); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 33d3a535d3..9abf00e6ff 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -293,9 +293,10 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, let requests: Vec<_> = network_values .par_iter() .map(|node| { - let timeouts = node.lock().unwrap().make_timeouts_test(); - node.lock().unwrap().purge(now, &timeouts); - node.lock().unwrap().new_push_messages(now) + let mut node_lock = node.lock().unwrap(); + let timeouts = node_lock.make_timeouts_test(); + node_lock.purge(now, &timeouts); + node_lock.new_push_messages(vec![], now) }) .collect(); let transfered: Vec<_> = requests