From 658951e680ba321b5638c555b69f552411df21ad Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 3 Mar 2021 19:07:16 +0000 Subject: [PATCH] sends only the latest vote of each validator to the banking stage (#15629) --- core/src/cluster_info_vote_listener.rs | 15 +++--- core/src/verified_vote_packets.rs | 66 +++++++++++++++++--------- 2 files changed, 52 insertions(+), 29 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 21fbfc26ba..3b26d6368f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -44,8 +44,8 @@ use std::{ }; // Map from a vote account to the authorized voter for an epoch -pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; -pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; +pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; +pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; @@ -332,10 +332,11 @@ impl ClusterInfoVoteListener { } } + #[allow(clippy::type_complexity)] fn verify_votes( votes: Vec, labels: Vec, - ) -> (Vec, Vec<(CrdsValueLabel, Packets)>) { + ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { let msgs = packet::to_packets_chunked(&votes, 1); let r = sigverify::ed25519_verify_cpu(&msgs); @@ -353,8 +354,10 @@ impl ClusterInfoVoteListener { msgs, ) .filter_map(|(label, vote, verify_result, packet)| { + let slot = vote_transaction::parse_vote_transaction(&vote) + .and_then(|(_, vote, _)| vote.slots.last().copied())?; if *verify_result != 0 { - Some((vote, (label, packet))) + Some((vote, (label, slot, packet))) } else { None } @@ -1602,8 +1605,8 @@ mod tests { assert!(packets.is_empty()); } - fn verify_packets_len(packets: &[(CrdsValueLabel, Packets)], ref_value: usize) { - let num_packets: usize = packets.iter().map(|p| p.1.packets.len()).sum(); + fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) { + let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum(); assert_eq!(num_packets, ref_value); } diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 2d186afc3d..f8841ca51a 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -3,10 +3,14 @@ use crate::{ result::Result, }; use solana_perf::packet::Packets; -use std::{collections::HashMap, time::Duration}; +use solana_sdk::clock::Slot; +use std::{ + collections::{hash_map::Entry, HashMap}, + time::Duration, +}; #[derive(Default)] -pub struct VerifiedVotePackets(HashMap); +pub struct VerifiedVotePackets(HashMap); impl VerifiedVotePackets { pub fn receive_and_process_vote_packets( @@ -17,32 +21,45 @@ impl VerifiedVotePackets { let timer = Duration::from_millis(200); let vote_packets = vote_packets_receiver.recv_timeout(timer)?; *last_update_version += 1; - for (label, packet) in vote_packets { - self.0.insert(label, (*last_update_version, packet)); + for (label, slot, packet) in vote_packets { + self.0.insert(label, (*last_update_version, slot, packet)); } while let Ok(vote_packets) = vote_packets_receiver.try_recv() { - for (label, packet) in vote_packets { - self.0.insert(label, (*last_update_version, packet)); + for (label, slot, packet) in vote_packets { + self.0.insert(label, (*last_update_version, slot, packet)); } } Ok(()) } #[cfg(test)] - fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Packets)> { + fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> { self.0.get(key) } pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) { let mut new_update_version = last_update_version; - let packets = self - .0 - .values() - .filter(|(v, _)| *v > last_update_version) - .flat_map(|(v, packets)| { - new_update_version = std::cmp::max(*v, new_update_version); - packets.packets.clone() - }) + let mut votes = HashMap::new(); + for (label, (version, slot, packets)) in &self.0 { + new_update_version = std::cmp::max(*version, new_update_version); + if *version <= last_update_version { + continue; + } + match votes.entry(label.pubkey()) { + Entry::Vacant(entry) => { + entry.insert((slot, packets)); + } + Entry::Occupied(mut entry) => { + let (entry_slot, _) = entry.get(); + if *entry_slot < slot { + *entry.get_mut() = (slot, packets); + } + } + } + } + let packets = votes + .into_iter() + .flat_map(|(_, (_, packets))| packets.packets.clone()) .collect(); (new_update_version, Packets::new(packets)) } @@ -74,10 +91,10 @@ mod tests { verified_vote_packets .0 - .insert(label1, (2, none_empty_packets)); + .insert(label1, (2, 42, none_empty_packets)); verified_vote_packets .0 - .insert(label2, (1, Packets::default())); + .insert(label2, (1, 23, Packets::default())); // Both updates have timestamps greater than 0, so both should be returned let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0); @@ -104,8 +121,10 @@ mod tests { let label1 = CrdsValueLabel::Vote(0, pubkey); let label2 = CrdsValueLabel::Vote(1, pubkey); let mut update_version = 0; - s.send(vec![(label1.clone(), Packets::default())]).unwrap(); - s.send(vec![(label2.clone(), Packets::default())]).unwrap(); + s.send(vec![(label1.clone(), 17, Packets::default())]) + .unwrap(); + s.send(vec![(label2.clone(), 23, Packets::default())]) + .unwrap(); let data = Packet { meta: Meta { @@ -116,7 +135,7 @@ mod tests { }; let later_packets = Packets::new(vec![data, Packet::default()]); - s.send(vec![(label1.clone(), later_packets)]).unwrap(); + s.send(vec![(label1.clone(), 42, later_packets)]).unwrap(); let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets .receive_and_process_vote_packets(&r, &mut update_version) @@ -134,7 +153,7 @@ mod tests { verified_vote_packets .get_vote_packets(&label1) .unwrap() - .1 + .2 .packets .len() > 1 @@ -143,14 +162,15 @@ mod tests { verified_vote_packets .get_vote_packets(&label2) .unwrap() - .1 + .2 .packets .len(), 0 ); // Test timestamp for next batch overwrites the original - s.send(vec![(label2.clone(), Packets::default())]).unwrap(); + s.send(vec![(label2.clone(), 51, Packets::default())]) + .unwrap(); verified_vote_packets .receive_and_process_vote_packets(&r, &mut update_version) .unwrap();