diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7dbb790bce..b1066eebc4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2023,8 +2023,9 @@ impl ClusterInfo { feature_set: Option<&FeatureSet>, ) -> Packets { let mut time = Measure::start("handle_pull_requests"); + let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) - .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); + .process_pull_requests(callers.cloned(), timestamp()); self.update_data_budget(stakes.len()); let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { diff --git a/core/src/crds.rs b/core/src/crds.rs index 7c9c2a1516..53d390d066 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -240,8 +240,8 @@ impl Crds { /// Update the timestamp's of all the labels that are associated with Pubkey pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) { - for label in &CrdsValue::record_labels(pubkey) { - self.update_label_timestamp(label, now); + for label in CrdsValue::record_labels(*pubkey) { + self.update_label_timestamp(&label, now); } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 14caa3c019..e27abcd17d 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -15,7 +15,7 @@ use solana_sdk::{ }; use std::{ borrow::{Borrow, Cow}, - collections::{BTreeSet, HashSet}, + collections::{hash_map::Entry, BTreeSet, HashMap, HashSet}, fmt, }; @@ -498,18 +498,20 @@ impl CrdsValue { } /// Return all the possible labels for a record identified by Pubkey. - pub fn record_labels(key: &Pubkey) -> Vec { - let mut labels = vec![ - CrdsValueLabel::ContactInfo(*key), - CrdsValueLabel::LowestSlot(*key), - CrdsValueLabel::SnapshotHashes(*key), - CrdsValueLabel::AccountsHashes(*key), - CrdsValueLabel::LegacyVersion(*key), - CrdsValueLabel::Version(*key), + pub fn record_labels(key: Pubkey) -> impl Iterator { + const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [ + CrdsValueLabel::ContactInfo, + CrdsValueLabel::LowestSlot, + CrdsValueLabel::SnapshotHashes, + CrdsValueLabel::AccountsHashes, + CrdsValueLabel::LegacyVersion, + CrdsValueLabel::Version, ]; - labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); - labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key))); - labels + CRDS_VALUE_LABEL_STUBS + .iter() + .map(move |f| (f)(key)) + .chain((0..MAX_VOTES).map(move |ix| CrdsValueLabel::Vote(ix, key))) + .chain((0..MAX_EPOCH_SLOTS).map(move |ix| CrdsValueLabel::EpochSlots(ix, key))) } /// Returns the size (in bytes) of a CrdsValue @@ -545,6 +547,30 @@ impl CrdsValue { } } +/// Filters out an iterator of crds values, returning +/// the unique ones with the most recent wallclock. +pub(crate) fn filter_current<'a, I>(values: I) -> impl Iterator +where + I: IntoIterator, +{ + let mut out = HashMap::new(); + for value in values { + match out.entry(value.label()) { + Entry::Vacant(entry) => { + entry.insert((value, value.wallclock())); + } + Entry::Occupied(mut entry) => { + let value_wallclock = value.wallclock(); + let (_, entry_wallclock) = entry.get(); + if *entry_wallclock < value_wallclock { + entry.insert((value, value_wallclock)); + } + } + } + } + out.into_iter().map(|(_, (v, _))| v) +} + #[cfg(test)] mod test { use super::*; @@ -553,13 +579,15 @@ mod test { use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::timing::timestamp; + use std::cmp::Ordering; + use std::iter::repeat_with; #[test] fn test_labels() { let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; // this method should cover all the possible labels - for v in &CrdsValue::record_labels(&Pubkey::default()) { - match v { + for v in CrdsValue::record_labels(Pubkey::default()) { + match &v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, CrdsValueLabel::LowestSlot(_) => hits[1] = true, CrdsValueLabel::SnapshotHashes(_) => hits[2] = true, @@ -743,4 +771,39 @@ mod test { assert!(!value.verify()); serialize_deserialize_value(value, correct_keypair); } + + #[test] + fn test_filter_current() { + let mut rng = rand::thread_rng(); + let keys: Vec<_> = repeat_with(Keypair::new).take(16).collect(); + let values: Vec<_> = repeat_with(|| { + let index = rng.gen_range(0, keys.len()); + CrdsValue::new_rand(&mut rng, Some(&keys[index])) + }) + .take(256) + .collect(); + let mut currents = HashMap::new(); + for value in filter_current(&values) { + // Assert that filtered values have unique labels. + assert!(currents.insert(value.label(), value).is_none()); + } + // Assert that currents are the most recent version of each value. + let mut count = 0; + for value in &values { + let current_value = currents.get(&value.label()).unwrap(); + match value.wallclock().cmp(¤t_value.wallclock()) { + Ordering::Less => (), + Ordering::Equal => { + assert_eq!(value, *current_value); + count += 1; + } + Ordering::Greater => panic!("this should not happen!"), + } + } + assert_eq!(count, currents.len()); + // Currently CrdsData::new_rand is only implemented for 5 different + // kinds and excludes Vote and EpochSlots, and so the unique labels + // cannot be more than 5 times number of keys. + assert!(currents.len() <= keys.len() * 5); + } }