From 1ac2a8cfa58721fa2ef1221beb64af323ac416df Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 28 Apr 2021 11:56:13 +0000 Subject: [PATCH] removes delayed crds inserts when upserting gossip table (#16806) It is crucial that VersionedCrdsValue::insert_timestamp does not go backward in time: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79 Otherwise methods such as get_votes and get_epoch_slots_since will break, which will break their downstream flow, including vote-listener and optimistic confirmation: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215 https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298 For that, Crds::new_versioned is intended to be called "atomically" with Crds::insert_verioned (as the comment already says so): https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129 However, currently this is violated in the code. For example, filter_pull_responses creates VersionedCrdsValues (with the current timestamp), then acquires an exclusive lock on gossip, then process_pull_responses writes those values to the crds table: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392 Depending on the workload and lock contention, the insert_timestamps may well be in the past when these values finally are inserted into gossip. To avoid such scenarios, this commit: * removes Crds::new_versioned and Crd::insert_versioned. * makes VersionedCrdsValue constructor private, only invoked in Crds::insert, so that insert_timestamp is populated right before insert. This will improve insert_timestamp monotonicity as long as Crds::insert is not called with a stalled timestamp. Following commits may further improve this by calling timestamp() inside Crds::insert, and/or switching to std::time::Instant which guarantees monotonicity. --- core/benches/crds_shards.rs | 24 +++-- core/src/crds.rs | 192 ++++++++++++++++------------------- core/src/crds_gossip.rs | 10 +- core/src/crds_gossip_pull.rs | 57 ++++++----- core/src/crds_gossip_push.rs | 34 +++---- core/src/crds_shards.rs | 20 ++-- 6 files changed, 167 insertions(+), 170 deletions(-) diff --git a/core/benches/crds_shards.rs b/core/benches/crds_shards.rs index a020e7da3..22e94603e 100644 --- a/core/benches/crds_shards.rs +++ b/core/benches/crds_shards.rs @@ -3,30 +3,34 @@ extern crate test; use rand::{thread_rng, Rng}; -use solana_core::contact_info::ContactInfo; -use solana_core::crds::VersionedCrdsValue; -use solana_core::crds_shards::CrdsShards; -use solana_core::crds_value::{CrdsData, CrdsValue}; -use solana_sdk::pubkey; +use solana_core::{ + crds::{Crds, VersionedCrdsValue}, + crds_shards::CrdsShards, + crds_value::CrdsValue, +}; use solana_sdk::timing::timestamp; +use std::iter::repeat_with; use test::Bencher; const CRDS_SHARDS_BITS: u32 = 8; -fn new_test_crds_value() -> VersionedCrdsValue { - let data = CrdsData::ContactInfo(ContactInfo::new_localhost(&pubkey::new_rand(), timestamp())); - VersionedCrdsValue::new(timestamp(), CrdsValue::new_unsigned(data)) +fn new_test_crds_value(rng: &mut R) -> VersionedCrdsValue { + let value = CrdsValue::new_rand(rng, None); + let label = value.label(); + let mut crds = Crds::default(); + crds.insert(value, timestamp()).unwrap(); + crds.remove(&label).unwrap() } fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) { - let values: Vec = std::iter::repeat_with(new_test_crds_value) + let mut rng = thread_rng(); + let values: Vec<_> = repeat_with(|| new_test_crds_value(&mut rng)) .take(num_values) .collect(); let mut shards = CrdsShards::new(CRDS_SHARDS_BITS); for (index, value) in values.iter().enumerate() { assert!(shards.insert(index, value)); } - let mut rng = thread_rng(); bencher.iter(|| { let mask = rng.gen(); let _hits = shards.find(mask, mask_bits).count(); diff --git a/core/src/crds.rs b/core/src/crds.rs index f0537e1d6..03c234286 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -33,11 +33,11 @@ use indexmap::set::IndexSet; use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Keypair; -use solana_sdk::timing::timestamp; -use std::cmp; -use std::collections::{hash_map, BTreeSet, HashMap}; -use std::ops::{Bound, Index, IndexMut}; +use std::{ + cmp::Ordering, + collections::{hash_map, BTreeSet, HashMap}, + ops::{Bound, Index, IndexMut}, +}; const CRDS_SHARDS_BITS: u32 = 8; // Limit number of crds values associated with each unique pubkey. This @@ -60,7 +60,9 @@ pub struct Crds { #[derive(PartialEq, Debug)] pub enum CrdsError { - InsertFailed, + // Hash of the crds value which failed to insert should be recorded in + // failed_inserts to be excluded from the next pull-request. + InsertFailed(Hash), UnknownStakes, } @@ -71,26 +73,15 @@ pub enum CrdsError { pub struct VersionedCrdsValue { pub value: CrdsValue, /// local time when inserted - pub insert_timestamp: u64, + pub(crate) insert_timestamp: u64, /// local time when updated pub(crate) local_timestamp: u64, /// value hash - pub value_hash: Hash, + pub(crate) value_hash: Hash, } -impl PartialOrd for VersionedCrdsValue { - fn partial_cmp(&self, other: &VersionedCrdsValue) -> Option { - if self.value.label() != other.value.label() { - None - } else if self.value.wallclock() == other.value.wallclock() { - Some(self.value_hash.cmp(&other.value_hash)) - } else { - Some(self.value.wallclock().cmp(&other.value.wallclock())) - } - } -} impl VersionedCrdsValue { - pub fn new(local_timestamp: u64, value: CrdsValue) -> Self { + fn new(local_timestamp: u64, value: CrdsValue) -> Self { let value_hash = hash(&serialize(&value).unwrap()); VersionedCrdsValue { value, @@ -99,13 +90,6 @@ impl VersionedCrdsValue { value_hash, } } - - /// New random VersionedCrdsValue for tests and simulations. - pub fn new_rand(rng: &mut R, keypair: Option<&Keypair>) -> Self { - let delay = 10 * 60 * 1000; // 10 minutes - let now = timestamp() - delay + rng.gen_range(0, 2 * delay); - Self::new(now, CrdsValue::new_rand(rng, keypair)) - } } impl Default for Crds { @@ -122,34 +106,46 @@ impl Default for Crds { } } -impl Crds { - /// must be called atomically with `insert_versioned` - pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue { - VersionedCrdsValue::new(local_timestamp, value) +// Returns true if the first value updates the 2nd one. +// Both values should have the same key/label. +fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { + assert_eq!(value.label(), other.value.label(), "labels mismatch!"); + match value.wallclock().cmp(&other.value.wallclock()) { + Ordering::Less => false, + Ordering::Greater => true, + // Ties should be broken in a deterministic way across the cluster. + // For backward compatibility this is done by comparing hash of + // serialized values. + Ordering::Equal => { + let value_hash = hash(&serialize(&value).unwrap()); + other.value_hash < value_hash + } } - pub fn would_insert( - &self, +} + +impl Crds { + /// Returns true if the given value updates an existing one in the table. + /// The value is outdated and fails to insert, if it already exists in the + /// table with a more recent wallclock. + pub(crate) fn upserts(&self, value: &CrdsValue) -> bool { + match self.table.get(&value.label()) { + Some(other) => overrides(value, other), + None => true, + } + } + + pub fn insert( + &mut self, value: CrdsValue, local_timestamp: u64, - ) -> (bool, VersionedCrdsValue) { - let new_value = self.new_versioned(local_timestamp, value); - let label = new_value.value.label(); - // New value is outdated and fails to insert, if it already exists in - // the table with a more recent wallclock. - let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current); - (!outdated, new_value) - } - /// insert the new value, returns the old value if insert succeeds - pub fn insert_versioned( - &mut self, - new_value: VersionedCrdsValue, ) -> Result, CrdsError> { - let label = new_value.value.label(); + let label = value.label(); + let value = VersionedCrdsValue::new(local_timestamp, value); match self.table.entry(label) { Entry::Vacant(entry) => { let entry_index = entry.index(); - self.shards.insert(entry_index, &new_value); - match new_value.value.data { + self.shards.insert(entry_index, &value); + match value.value.data { CrdsData::ContactInfo(_) => { self.nodes.insert(entry_index); } @@ -158,52 +154,45 @@ impl Crds { } CrdsData::EpochSlots(_, _) => { self.epoch_slots - .insert((new_value.insert_timestamp, entry_index)); + .insert((value.insert_timestamp, entry_index)); } _ => (), }; self.records - .entry(new_value.value.pubkey()) + .entry(value.value.pubkey()) .or_default() .insert(entry_index); - entry.insert(new_value); + entry.insert(value); self.num_inserts += 1; Ok(None) } - Entry::Occupied(mut entry) if *entry.get() < new_value => { + Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { let entry_index = entry.index(); self.shards.remove(entry_index, entry.get()); - self.shards.insert(entry_index, &new_value); - if let CrdsData::EpochSlots(_, _) = new_value.value.data { + self.shards.insert(entry_index, &value); + if let CrdsData::EpochSlots(_, _) = value.value.data { self.epoch_slots .remove(&(entry.get().insert_timestamp, entry_index)); self.epoch_slots - .insert((new_value.insert_timestamp, entry_index)); + .insert((value.insert_timestamp, entry_index)); } self.num_inserts += 1; // As long as the pubkey does not change, self.records // does not need to be updated. - debug_assert_eq!(entry.get().value.pubkey(), new_value.value.pubkey()); - Ok(Some(entry.insert(new_value))) + debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey()); + Ok(Some(entry.insert(value))) } _ => { trace!( "INSERT FAILED data: {} new.wallclock: {}", - new_value.value.label(), - new_value.value.wallclock(), + value.value.label(), + value.value.wallclock(), ); - Err(CrdsError::InsertFailed) + Err(CrdsError::InsertFailed(value.value_hash)) } } } - pub fn insert( - &mut self, - value: CrdsValue, - local_timestamp: u64, - ) -> Result, CrdsError> { - let new_value = self.new_versioned(local_timestamp, value); - self.insert_versioned(new_value) - } + pub fn lookup(&self, label: &CrdsValueLabel) -> Option<&CrdsValue> { self.table.get(label).map(|x| &x.value) } @@ -504,10 +493,13 @@ impl Crds { #[cfg(test)] mod test { use super::*; - use crate::{contact_info::ContactInfo, crds_value::NodeInstance}; + use crate::{ + contact_info::ContactInfo, + crds_value::{new_rand_timestamp, NodeInstance}, + }; use rand::{thread_rng, Rng}; use rayon::ThreadPoolBuilder; - use solana_sdk::signature::Signer; + use solana_sdk::signature::{Keypair, Signer}; use std::{collections::HashSet, iter::repeat_with}; #[test] @@ -523,8 +515,12 @@ mod test { fn test_update_old() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); + let value_hash = hash(&serialize(&val).unwrap()); assert_eq!(crds.insert(val.clone(), 0), Ok(None)); - assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(val.clone(), 1), + Err(CrdsError::InsertFailed(value_hash)) + ); assert_eq!(crds.table[&val.label()].local_timestamp, 0); } #[test] @@ -718,8 +714,9 @@ mod test { let mut num_overrides = 0; for _ in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; - let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); - match crds.insert_versioned(value) { + let value = CrdsValue::new_rand(&mut rng, Some(keypair)); + let local_timestamp = new_rand_timestamp(&mut rng); + match crds.insert(value, local_timestamp) { Ok(None) => { num_inserts += 1; check_crds_shards(&crds); @@ -811,8 +808,9 @@ mod test { let mut num_overrides = 0; for k in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; - let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); - match crds.insert_versioned(value) { + let value = CrdsValue::new_rand(&mut rng, Some(keypair)); + let local_timestamp = new_rand_timestamp(&mut rng); + match crds.insert(value, local_timestamp) { Ok(None) => { num_inserts += 1; } @@ -870,8 +868,9 @@ mod test { let mut crds = Crds::default(); for k in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; - let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); - let _ = crds.insert_versioned(value); + let value = CrdsValue::new_rand(&mut rng, Some(keypair)); + let local_timestamp = new_rand_timestamp(&mut rng); + let _ = crds.insert(value, local_timestamp); if k % 64 == 0 { check_crds_records(&crds); } @@ -911,8 +910,9 @@ mod test { let mut crds = Crds::default(); for _ in 0..2048 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; - let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); - let _ = crds.insert_versioned(value); + let value = CrdsValue::new_rand(&mut rng, Some(keypair)); + let local_timestamp = new_rand_timestamp(&mut rng); + let _ = crds.insert(value, local_timestamp); } let num_values = crds.table.len(); let num_pubkeys = num_unique_pubkeys(crds.table.values()); @@ -967,8 +967,8 @@ mod test { let v2 = VersionedCrdsValue::new(1, val); assert_eq!(v1, v2); assert!(!(v1 != v2)); - assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Equal)); - assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Equal)); + assert!(!overrides(&v1.value, &v2)); + assert!(!overrides(&v2.value, &v1)); } #[test] #[allow(clippy::neg_cmp_op_on_partial_ord)] @@ -991,18 +991,12 @@ mod test { assert_ne!(v1.value_hash, v2.value_hash); assert!(v1 != v2); assert!(!(v1 == v2)); - if v1 > v2 { - assert!(v1 > v2); - assert!(v2 < v1); - assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater)); - assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less)); - } else if v2 > v1 { - assert!(v1 < v2); - assert!(v2 > v1); - assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Less)); - assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Greater)); + if v1.value_hash > v2.value_hash { + assert!(overrides(&v1.value, &v2)); + assert!(!overrides(&v2.value, &v1)); } else { - panic!("bad PartialOrd implementation?"); + assert!(overrides(&v2.value, &v1)); + assert!(!overrides(&v1.value, &v2)); } } #[test] @@ -1023,14 +1017,13 @@ mod test { ))), ); assert_eq!(v1.value.label(), v2.value.label()); - assert!(v1 > v2); - assert!(!(v1 < v2)); + assert!(overrides(&v1.value, &v2)); + assert!(!overrides(&v2.value, &v1)); assert!(v1 != v2); assert!(!(v1 == v2)); - assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater)); - assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less)); } #[test] + #[should_panic(expected = "labels mismatch!")] #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_label_order() { let v1 = VersionedCrdsValue::new( @@ -1049,11 +1042,6 @@ mod test { ); assert_ne!(v1, v2); assert!(!(v1 == v2)); - assert!(!(v1 < v2)); - assert!(!(v1 > v2)); - assert!(!(v2 < v1)); - assert!(!(v2 > v1)); - assert_eq!(v1.partial_cmp(&v2), None); - assert_eq!(v2.partial_cmp(&v1), None); + assert!(!overrides(&v2.value, &v1)); } } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 757dfc85c..d4bea5e97 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -265,7 +265,11 @@ impl CrdsGossip { response: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, - ) -> (Vec, Vec, Vec) { + ) -> ( + Vec, // valid responses. + Vec, // responses with expired timestamps. + Vec, // hash of outdated values. + ) { self.pull .filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats) } @@ -274,8 +278,8 @@ impl CrdsGossip { pub fn process_pull_responses( &mut self, from: &Pubkey, - responses: Vec, - responses_expired_timeout: Vec, + responses: Vec, + responses_expired_timeout: Vec, failed_inserts: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index fcb588778..c9bc033d9 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -12,7 +12,7 @@ use crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, CrdsError}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, crds_value::{CrdsValue, CrdsValueLabel}, @@ -23,8 +23,10 @@ use rand::distributions::{Distribution, WeightedIndex}; use rand::Rng; use rayon::{prelude::*, ThreadPool}; use solana_runtime::bloom::{AtomicBloom, Bloom}; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::{ + hash::{hash, Hash}, + pubkey::Pubkey, +}; use std::cmp; use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; @@ -330,16 +332,16 @@ impl CrdsGossipPull { responses: Vec, now: u64, stats: &mut ProcessPullStats, - ) -> (Vec, Vec, Vec) { - let mut versioned = vec![]; - let mut versioned_expired_timestamp = vec![]; + ) -> (Vec, Vec, Vec) { + let mut active_values = vec![]; + let mut expired_values = vec![]; let mut failed_inserts = vec![]; - let mut maybe_push = |response, values: &mut Vec| { - let (push, value) = crds.would_insert(response, now); - if push { - values.push(value); + let mut maybe_push = |response, values: &mut Vec| { + if crds.upserts(&response) { + values.push(response); } else { - failed_inserts.push(value.value_hash) + let response = bincode::serialize(&response).unwrap(); + failed_inserts.push(hash(&response)); } }; let default_timeout = timeouts @@ -354,17 +356,17 @@ impl CrdsGossipPull { // owner exists in the table. If it doesn't, that implies that this // value can be discarded if now <= response.wallclock().saturating_add(timeout) { - maybe_push(response, &mut versioned); + maybe_push(response, &mut active_values); } else if crds.get_contact_info(owner).is_some() { // Silently insert this old value without bumping record // timestamps - maybe_push(response, &mut versioned_expired_timestamp); + maybe_push(response, &mut expired_values); } else { stats.timeout_count += 1; stats.failed_timeout += 1; } } - (versioned, versioned_expired_timestamp, failed_inserts) + (active_values, expired_values, failed_inserts) } /// process a vec of pull responses @@ -372,33 +374,34 @@ impl CrdsGossipPull { &mut self, crds: &mut Crds, from: &Pubkey, - responses: Vec, - responses_expired_timeout: Vec, + responses: Vec, + responses_expired_timeout: Vec, mut failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, ) -> Vec<(CrdsValueLabel, Hash, u64)> { let mut success = vec![]; let mut owners = HashSet::new(); - for r in responses_expired_timeout { - let value_hash = r.value_hash; - match crds.insert_versioned(r) { + for response in responses_expired_timeout { + match crds.insert(response, now) { Ok(None) => (), Ok(Some(old)) => self.purged_values.push_back((old.value_hash, now)), - Err(_) => failed_inserts.push(value_hash), + Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), + Err(CrdsError::UnknownStakes) => (), } } - for r in responses { - let label = r.value.label(); - let wc = r.value.wallclock(); - let hash = r.value_hash; - match crds.insert_versioned(r) { - Err(_) => failed_inserts.push(hash), + for response in responses { + let label = response.label(); + let wallclock = response.wallclock(); + match crds.insert(response, now) { + Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), + Err(CrdsError::UnknownStakes) => (), Ok(old) => { stats.success += 1; self.num_pulls += 1; owners.insert(label.pubkey()); - success.push((label, hash, wc)); + let value_hash = crds.get(&label).unwrap().value_hash; + success.push((label, value_hash, wallclock)); if let Some(val) = old { self.purged_values.push_back((val.value_hash, now)) } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index a6cc6f7da..3ce4ae39b 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -172,29 +172,29 @@ impl CrdsGossipPush { now: u64, ) -> Result, CrdsGossipError> { self.num_total += 1; - if now > value.wallclock().checked_add(self.msg_timeout).unwrap_or(0) { - return Err(CrdsGossipError::PushMessageTimeout); - } - if now + self.msg_timeout < value.wallclock() { + let range = now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout); + if !range.contains(&value.wallclock()) { return Err(CrdsGossipError::PushMessageTimeout); } let label = value.label(); let origin = label.pubkey(); - let new_value = crds.new_versioned(now, value); - let value_hash = new_value.value_hash; - let received_set = self - .received_cache + self.received_cache .entry(origin) - .or_insert_with(HashMap::new); - received_set.entry(*from).or_insert((false, 0)).1 = now; - - let old = crds.insert_versioned(new_value); - if old.is_err() { - self.num_old += 1; - return Err(CrdsGossipError::PushMessageOldVersion); + .or_default() + .entry(*from) + .and_modify(|(_pruned, timestamp)| *timestamp = now) + .or_insert((/*pruned:*/ false, now)); + match crds.insert(value, now) { + Err(_) => { + self.num_old += 1; + Err(CrdsGossipError::PushMessageOldVersion) + } + Ok(old) => { + let value_hash = crds.get(&label).unwrap().value_hash; + self.push_messages.insert(label, value_hash); + Ok(old) + } } - self.push_messages.insert(label, value_hash); - Ok(old.unwrap()) } /// push pull responses diff --git a/core/src/crds_shards.rs b/core/src/crds_shards.rs index 8b0fc5df8..006d0e299 100644 --- a/core/src/crds_shards.rs +++ b/core/src/crds_shards.rs @@ -130,19 +130,17 @@ where #[cfg(test)] mod test { use super::*; - use crate::contact_info::ContactInfo; - use crate::crds_value::{CrdsData, CrdsValue}; + use crate::{crds::Crds, crds_value::CrdsValue}; use rand::{thread_rng, Rng}; use solana_sdk::timing::timestamp; - use std::collections::HashSet; - use std::ops::Index; + use std::{collections::HashSet, iter::repeat_with, ops::Index}; - fn new_test_crds_value() -> VersionedCrdsValue { - let data = CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - timestamp(), - )); - VersionedCrdsValue::new(timestamp(), CrdsValue::new_unsigned(data)) + fn new_test_crds_value(rng: &mut R) -> VersionedCrdsValue { + let value = CrdsValue::new_rand(rng, None); + let label = value.label(); + let mut crds = Crds::default(); + crds.insert(value, timestamp()).unwrap(); + crds.remove(&label).unwrap() } // Returns true if the first mask_bits most significant bits of hash is the @@ -176,7 +174,7 @@ mod test { fn test_crds_shards_round_trip() { let mut rng = thread_rng(); // Generate some random hash and crds value labels. - let mut values: Vec<_> = std::iter::repeat_with(new_test_crds_value) + let mut values: Vec<_> = repeat_with(|| new_test_crds_value(&mut rng)) .take(4096) .collect(); // Insert everything into the crds shards.