encapsulates purged values bookkeeping into crds module (#17265)

For all code paths (gossip push, pull, purge, etc) that remove or
override a crds value, it is necessary to record hash of values purged
from crds table, in order to exclude them from subsequent pull-requests;
otherwise the next pull request will likely return outdated values,
wasting bandwidth:
https://github.com/solana-labs/solana/blob/ed51cde37/core/src/crds_gossip_pull.rs#L486-L491

Currently this is done all over the place in multiple modules, and this
has caused bugs in the past where purged values were not recorded.

This commit encapsulated this bookkeeping into crds module, so that any
code path which removes or overrides a crds value, also records the hash
of purged value in-place.
This commit is contained in:
behzad nouri 2021-05-24 13:47:21 +00:00 committed by GitHub
parent 060332c704
commit 9d112cf41f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 175 additions and 230 deletions

View File

@ -29,13 +29,8 @@ fn bench_hash_as_u64(bencher: &mut Bencher) {
fn bench_build_crds_filters(bencher: &mut Bencher) {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut rng = thread_rng();
let mut crds_gossip_pull = CrdsGossipPull::default();
let crds_gossip_pull = CrdsGossipPull::default();
let mut crds = Crds::default();
for _ in 0..50_000 {
crds_gossip_pull
.purged_values
.push_back((solana_sdk::hash::new_rand(&mut rng), rng.gen()));
}
let mut num_inserts = 0;
for _ in 0..90_000 {
if crds

View File

@ -19,7 +19,7 @@ fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.remove(&label).unwrap()
crds.get(&label).cloned().unwrap()
}
fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) {

View File

@ -1745,18 +1745,15 @@ impl ClusterInfo {
.chain(std::iter::once(self.id))
.collect();
let mut gossip = self.gossip.write().unwrap();
match gossip.crds.trim(cap, &keep, stakes) {
match gossip.crds.trim(cap, &keep, stakes, timestamp()) {
Err(err) => {
self.stats.trim_crds_table_failed.add_relaxed(1);
error!("crds table trim failed: {:?}", err);
}
Ok(purged_values) => {
let now = timestamp();
Ok(num_purged) => {
self.stats
.trim_crds_table_purged_values_count
.add_relaxed(purged_values.len() as u64);
let purged_values = purged_values.into_iter().map(|v| (v.value_hash, now));
gossip.pull.purged_values.extend(purged_values);
.add_relaxed(num_purged as u64);
}
}
}
@ -2380,8 +2377,7 @@ impl ClusterInfo {
self.stats
.skip_push_message_shred_version
.add_relaxed(num_crds_values - num_filtered_crds_values);
// Origins' pubkeys of updated crds values.
// TODO: Should this also include origins of new crds values?
// Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = {
let mut gossip =
self.time_gossip_write_lock("process_push", &self.stats.process_push_message);
@ -2391,7 +2387,6 @@ impl ClusterInfo {
.flat_map(|(from, crds_values)| {
gossip.process_push_message(&from, crds_values, now)
})
.map(|v| v.value.pubkey())
.collect()
};
// Generate prune messages.

View File

@ -128,7 +128,7 @@ pub(crate) fn submit_gossip_stats(
(
gossip.crds.len(),
gossip.crds.num_nodes(),
gossip.pull.purged_values.len(),
gossip.crds.num_purged(),
gossip.pull.failed_inserts.len(),
)
};

View File

@ -35,7 +35,7 @@ use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::{
cmp::Ordering,
collections::{hash_map, BTreeMap, HashMap},
collections::{hash_map, BTreeMap, HashMap, VecDeque},
ops::{Bound, Index, IndexMut},
};
@ -56,13 +56,13 @@ pub struct Crds {
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Hash of recently purged values.
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
}
#[derive(PartialEq, Debug)]
pub enum CrdsError {
// Hash of the crds value which failed to insert should be recorded in
// failed_inserts to be excluded from the next pull-request.
InsertFailed(Hash),
InsertFailed,
UnknownStakes,
}
@ -116,6 +116,7 @@ impl Default for Crds {
epoch_slots: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
}
}
}
@ -156,14 +157,10 @@ impl Crds {
}
}
pub fn insert(
&mut self,
value: CrdsValue,
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp);
let value = VersionedCrdsValue::new(value, self.cursor, now);
match self.table.entry(label) {
Entry::Vacant(entry) => {
let entry_index = entry.index();
@ -184,7 +181,7 @@ impl Crds {
self.records.entry(pubkey).or_default().insert(entry_index);
self.cursor.consume(value.ordinal);
entry.insert(value);
Ok(None)
Ok(())
}
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
let entry_index = entry.index();
@ -207,15 +204,20 @@ impl Crds {
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), pubkey);
self.cursor.consume(value.ordinal);
Ok(Some(entry.insert(value)))
self.purged.push_back((entry.get().value_hash, now));
entry.insert(value);
Ok(())
}
_ => {
Entry::Occupied(entry) => {
trace!(
"INSERT FAILED data: {} new.wallclock: {}",
value.value.label(),
value.value.wallclock(),
);
Err(CrdsError::InsertFailed(value.value_hash))
if entry.get().value_hash != value.value_hash {
self.purged.push_back((value.value_hash, now));
}
Err(CrdsError::InsertFailed)
}
}
}
@ -324,6 +326,24 @@ impl Crds {
self.table.par_values()
}
pub(crate) fn num_purged(&self) -> usize {
self.purged.len()
}
pub(crate) fn purged(&self) -> impl IndexedParallelIterator<Item = Hash> + '_ {
self.purged.par_iter().map(|(hash, _)| *hash)
}
/// Drops purged value hashes with timestamp less than the given one.
pub(crate) fn trim_purged(&mut self, timestamp: u64) {
let count = self
.purged
.iter()
.take_while(|(_, ts)| *ts < timestamp)
.count();
self.purged.drain(..count);
}
/// Returns all crds values which the first 'mask_bits'
/// of their hash value is equal to 'mask'.
pub fn filter_bitmask(
@ -402,8 +422,12 @@ impl Crds {
})
}
pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?;
pub fn remove(&mut self, key: &CrdsValueLabel, now: u64) {
let (index, _ /*label*/, value) = match self.table.swap_remove_full(key) {
Some(entry) => entry,
None => return,
};
self.purged.push_back((value.value_hash, now));
self.shards.remove(index, &value);
match value.value.data {
CrdsData::ContactInfo(_) => {
@ -457,7 +481,6 @@ impl Crds {
records.swap_remove(&size);
records.insert(index);
}
Some(value)
}
/// Returns true if the number of unique pubkeys in the table exceeds the
@ -478,12 +501,13 @@ impl Crds {
// e.g. trusted validators, self pubkey, ...
keep: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
now: u64,
) -> Result</*num purged:*/ usize, CrdsError> {
if self.should_trim(cap) {
let size = self.records.len().saturating_sub(cap);
self.drop(size, keep, stakes)
self.drop(size, keep, stakes, now)
} else {
Ok(Vec::default())
Ok(0)
}
}
@ -493,7 +517,8 @@ impl Crds {
size: usize,
keep: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
now: u64,
) -> Result</*num purged:*/ usize, CrdsError> {
if stakes.is_empty() {
return Err(CrdsError::UnknownStakes);
}
@ -513,7 +538,10 @@ impl Crds {
.flat_map(|k| &self.records[&k])
.map(|k| self.table.get_index(*k).unwrap().0.clone())
.collect();
Ok(keys.iter().map(|k| self.remove(k).unwrap()).collect())
for key in &keys {
self.remove(key, now);
}
Ok(keys.len())
}
}
@ -534,7 +562,7 @@ mod test {
fn test_insert() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0).ok(), Some(None));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(crds.table.len(), 1);
assert!(crds.table.contains_key(&val.label()));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
@ -543,12 +571,9 @@ mod test {
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let value_hash = hash(&serialize(&val).unwrap());
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(
crds.insert(val.clone(), 1),
Err(CrdsError::InsertFailed(value_hash))
);
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert!(crds.purged.is_empty());
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
#[test]
@ -558,15 +583,14 @@ mod test {
&Pubkey::default(),
0,
)));
assert_matches!(crds.insert(original.clone(), 0), Ok(_));
let value_hash = hash(&serialize(&original).unwrap());
assert_matches!(crds.insert(original, 0), Ok(()));
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
)));
assert_eq!(
crds.insert(val.clone(), 1).unwrap().unwrap().value,
original
);
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1));
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
}
#[test]
@ -576,12 +600,13 @@ mod test {
&Pubkey::default(),
0,
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(crds.table[&val.label()].ordinal, 0);
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let value_hash = hash(&serialize(&val2).unwrap());
assert_eq!(val2.label().pubkey(), val.label().pubkey());
assert_matches!(crds.insert(val2.clone(), 0), Ok(Some(_)));
assert_eq!(crds.insert(val2.clone(), 0), Ok(()));
crds.update_record_timestamp(&val.label().pubkey(), 2);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
@ -596,7 +621,8 @@ mod test {
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_matches!(crds.insert(val3, 3), Ok(Some(_)));
assert_eq!(crds.insert(val3, 3), Ok(()));
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].ordinal, 2);
}
@ -613,22 +639,20 @@ mod test {
let pubkey = Pubkey::new_unique();
let node = NodeInstance::new(&mut rng, pubkey, now);
let node = make_crds_value(node);
assert_eq!(crds.insert(node, now), Ok(None));
assert_eq!(crds.insert(node, now), Ok(()));
// A node-instance with a different key should insert fine even with
// older timestamps.
let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1);
let other = make_crds_value(other);
assert_eq!(crds.insert(other, now), Ok(None));
assert_eq!(crds.insert(other, now), Ok(()));
// A node-instance with older timestamp should fail to insert, even if
// the wallclock is more recent.
let other = NodeInstance::new(&mut rng, pubkey, now - 1);
let other = other.with_wallclock(now + 1);
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
assert_eq!(
crds.insert(other, now),
Err(CrdsError::InsertFailed(value_hash))
);
assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed));
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now));
// A node instance with the same timestamp should insert only if the
// random token is larger.
let mut num_overrides = 0;
@ -637,8 +661,10 @@ mod test {
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
match crds.insert(other, now) {
Ok(Some(_)) => num_overrides += 1,
Err(CrdsError::InsertFailed(x)) => assert_eq!(x, value_hash),
Ok(()) => num_overrides += 1,
Err(CrdsError::InsertFailed) => {
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now))
}
_ => panic!(),
}
}
@ -650,7 +676,7 @@ mod test {
let other = other.with_wallclock(now - 1);
let other = make_crds_value(other);
match crds.insert(other, now) {
Ok(Some(_)) => (),
Ok(()) => (),
_ => panic!(),
}
}
@ -661,7 +687,7 @@ mod test {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
let mut set = HashMap::new();
set.insert(Pubkey::default(), 0);
assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
@ -684,7 +710,7 @@ mod test {
let mut timeouts = HashMap::new();
let val = CrdsValue::new_rand(&mut rng, None);
timeouts.insert(Pubkey::default(), 3);
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.insert(val.pubkey(), 1);
assert_eq!(
@ -714,7 +740,7 @@ mod test {
crds.find_old_labels(&thread_pool, 2, &set),
vec![val.label()]
);
crds.remove(&val.label());
crds.remove(&val.label(), /*now=*/ 0);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
}
#[test]
@ -722,7 +748,7 @@ mod test {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
let mut set = HashMap::new();
//now < timestamp
set.insert(Pubkey::default(), 0);
@ -760,27 +786,19 @@ mod test {
let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect();
let mut rng = thread_rng();
let mut num_inserts = 0;
let mut num_overrides = 0;
for _ in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
match crds.insert(value, local_timestamp) {
Ok(None) => {
num_inserts += 1;
check_crds_shards(&crds);
}
Ok(Some(_)) => {
num_inserts += 1;
num_overrides += 1;
check_crds_shards(&crds);
}
Err(_) => (),
if let Ok(()) = crds.insert(value, local_timestamp) {
num_inserts += 1;
check_crds_shards(&crds);
}
}
assert_eq!(num_inserts, crds.cursor.0 as usize);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.num_purged() > 500);
assert_eq!(crds.num_purged() + crds.table.len(), 4096);
assert!(crds.table.len() > 200);
assert!(num_inserts > crds.table.len());
check_crds_shards(&crds);
@ -788,7 +806,7 @@ mod test {
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
crds.remove(&key, /*now=*/ 0);
check_crds_shards(&crds);
}
}
@ -922,20 +940,12 @@ mod test {
let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect();
let mut crds = Crds::default();
let mut num_inserts = 0;
let mut num_overrides = 0;
for k in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
match crds.insert(value, local_timestamp) {
Ok(None) => {
num_inserts += 1;
}
Ok(Some(_)) => {
num_inserts += 1;
num_overrides += 1;
}
Err(_) => (),
if let Ok(()) = crds.insert(value, local_timestamp) {
num_inserts += 1;
}
if k % 16 == 0 {
check_crds_value_indices(&mut rng, &crds);
@ -943,8 +953,9 @@ mod test {
}
assert_eq!(num_inserts, crds.cursor.0 as usize);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.num_purged() > 500);
assert!(crds.table.len() > 200);
assert_eq!(crds.num_purged() + crds.table.len(), 4096);
assert!(num_inserts > crds.table.len());
let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds);
assert!(num_nodes * 3 < crds.table.len());
@ -959,7 +970,7 @@ mod test {
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
crds.remove(&key, /*now=*/ 0);
if crds.table.len() % 16 == 0 {
check_crds_value_indices(&mut rng, &crds);
}
@ -998,7 +1009,7 @@ mod test {
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
crds.remove(&key, /*now=*/ 0);
if crds.table.len() % 64 == 0 {
check_crds_records(&crds);
}
@ -1007,6 +1018,7 @@ mod test {
}
#[test]
#[allow(clippy::needless_collect)]
fn test_drop() {
fn num_unique_pubkeys<'a, I>(values: I) -> usize
where
@ -1035,7 +1047,15 @@ mod test {
let num_pubkeys = num_unique_pubkeys(crds.table.values());
assert!(!crds.should_trim(num_pubkeys));
assert!(crds.should_trim(num_pubkeys * 5 / 6));
let purged = crds.drop(16, &[], &stakes).unwrap();
let values: Vec<_> = crds.table.values().cloned().collect();
crds.drop(16, &[], &stakes, /*now=*/ 0).unwrap();
let purged: Vec<_> = {
let purged: HashSet<_> = crds.purged.iter().map(|(hash, _)| hash).copied().collect();
values
.into_iter()
.filter(|v| purged.contains(&v.value_hash))
.collect()
};
assert_eq!(purged.len() + crds.table.len(), num_values);
assert_eq!(num_unique_pubkeys(&purged), 16);
assert_eq!(num_unique_pubkeys(crds.table.values()), num_pubkeys - 16);
@ -1072,7 +1092,7 @@ mod test {
crds.find_old_labels(&thread_pool, 2, &set),
vec![val.label()]
);
crds.remove(&val.label());
crds.remove(&val.label(), /*now=*/ 0);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
}

View File

@ -6,7 +6,7 @@
use crate::{
cluster_info::Ping,
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds::Crds,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
@ -59,21 +59,21 @@ impl CrdsGossip {
}
/// process a push message to the network
/// Returns origins' pubkeys of upserted values.
pub fn process_push_message(
&mut self,
from: &Pubkey,
values: Vec<CrdsValue>,
now: u64,
) -> Vec<VersionedCrdsValue> {
) -> Vec<Pubkey> {
values
.into_iter()
.filter_map(|val| {
let old = self
.push
.flat_map(|val| {
let origin = val.pubkey();
self.push
.process_push_message(&mut self.crds, from, val, now)
.ok()?;
self.pull.record_old_hash(old.as_ref()?.value_hash, now);
old
Some(origin)
})
.collect()
}
@ -325,10 +325,8 @@ impl CrdsGossip {
.pull
.purge_active(thread_pool, &mut self.crds, now, &timeouts);
}
if now > 5 * self.pull.crds_timeout {
let min = now - 5 * self.pull.crds_timeout;
self.pull.purge_purged(min);
}
self.crds
.trim_purged(now.saturating_sub(5 * self.pull.crds_timeout));
self.pull.purge_failed_inserts(now);
rv
}

View File

@ -12,7 +12,7 @@
use crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, CrdsError},
crds::Crds,
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -182,8 +182,6 @@ pub struct ProcessPullStats {
pub struct CrdsGossipPull {
/// timestamp of last request
pub(crate) pull_request_time: LruCache<Pubkey, u64>,
/// hash and insert time
pub purged_values: VecDeque<(Hash, u64)>,
// Hash value and record time (ms) of the pull responses which failed to be
// inserted in crds table; Preserved to stop the sender to send back the
// same outdated payload again by adding them to the filter for the next
@ -197,7 +195,6 @@ pub struct CrdsGossipPull {
impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
purged_values: VecDeque::new(),
pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
failed_inserts: VecDeque::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
@ -321,21 +318,14 @@ impl CrdsGossipPull {
self.pull_request_time.put(from, now);
}
/// Store an old hash in the purged values set
pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) {
self.purged_values.push_back((hash, timestamp))
}
/// process a pull request
pub fn process_pull_requests<I>(&mut self, crds: &mut Crds, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
for caller in callers {
let key = caller.label().pubkey();
if let Ok(Some(val)) = crds.insert(caller, now) {
self.purged_values.push_back((val.value_hash, now));
}
let key = caller.pubkey();
let _ = crds.insert(caller, now);
crds.update_record_timestamp(&key, now);
}
}
@ -409,32 +399,20 @@ impl CrdsGossipPull {
from: &Pubkey,
responses: Vec<CrdsValue>,
responses_expired_timeout: Vec<CrdsValue>,
mut failed_inserts: Vec<Hash>,
failed_inserts: Vec<Hash>,
now: u64,
stats: &mut ProcessPullStats,
) {
let mut owners = HashSet::new();
for response in responses_expired_timeout {
match crds.insert(response, now) {
Ok(None) => (),
Ok(Some(old)) => self.purged_values.push_back((old.value_hash, now)),
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
Err(CrdsError::UnknownStakes) => (),
}
let _ = crds.insert(response, now);
}
for response in responses {
let owner = response.pubkey();
match crds.insert(response, now) {
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
Err(CrdsError::UnknownStakes) => (),
Ok(old) => {
stats.success += 1;
self.num_pulls += 1;
owners.insert(owner);
if let Some(val) = old {
self.purged_values.push_back((val.value_hash, now))
}
}
if let Ok(()) = crds.insert(response, now) {
stats.success += 1;
self.num_pulls += 1;
owners.insert(owner);
}
}
owners.insert(*from);
@ -472,19 +450,14 @@ impl CrdsGossipPull {
const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
let num_items = crds.len() + self.purged_values.len() + self.failed_inserts.len();
let num_items = crds.len() + crds.num_purged() + self.failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size);
thread_pool.install(|| {
crds.par_values()
.with_min_len(PAR_MIN_LENGTH)
.map(|v| v.value_hash)
.chain(
self.purged_values
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
)
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
.chain(
self.failed_inserts
.par_iter()
@ -576,7 +549,6 @@ impl CrdsGossipPull {
}
/// Purge values from the crds that are older then `active_timeout`
/// The value_hash of an active item is put into self.purged_values queue
pub fn purge_active(
&mut self,
thread_pool: &ThreadPool,
@ -584,25 +556,11 @@ impl CrdsGossipPull {
now: u64,
timeouts: &HashMap<Pubkey, u64>,
) -> usize {
let num_purged_values = self.purged_values.len();
self.purged_values.extend(
crds.find_old_labels(thread_pool, now, timeouts)
.into_iter()
.filter_map(|label| {
let val = crds.remove(&label)?;
Some((val.value_hash, now))
}),
);
self.purged_values.len() - num_purged_values
}
/// Purge values from the `self.purged_values` queue that are older then purge_timeout
pub fn purge_purged(&mut self, min_ts: u64) {
let cnt = self
.purged_values
.iter()
.take_while(|v| v.1 < min_ts)
.count();
self.purged_values.drain(..cnt);
let labels = crds.find_old_labels(thread_pool, now, timeouts);
for label in &labels {
crds.remove(label, now);
}
labels.len()
}
/// For legacy tests
@ -642,7 +600,6 @@ impl CrdsGossipPull {
}
Self {
pull_request_time,
purged_values: self.purged_values.clone(),
failed_inserts: self.failed_inserts.clone(),
..*self
}
@ -655,7 +612,7 @@ pub(crate) mod tests {
use crate::contact_info::ContactInfo;
use crate::crds_value::{CrdsData, Vote};
use itertools::Itertools;
use rand::thread_rng;
use rand::{seq::SliceRandom, thread_rng};
use rayon::ThreadPoolBuilder;
use solana_perf::test_tx::test_tx;
use solana_sdk::{
@ -906,37 +863,23 @@ pub(crate) mod tests {
fn test_build_crds_filter() {
let mut rng = thread_rng();
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds_gossip_pull = CrdsGossipPull::default();
let crds_gossip_pull = CrdsGossipPull::default();
let mut crds = Crds::default();
for _ in 0..10_000 {
crds_gossip_pull
.purged_values
.push_back((solana_sdk::hash::new_rand(&mut rng), rng.gen()));
}
let keypairs: Vec<_> = repeat_with(Keypair::new).take(10_000).collect();
let mut num_inserts = 0;
for _ in 0..20_000 {
if crds
.insert(CrdsValue::new_rand(&mut rng, None), rng.gen())
.is_ok()
{
for _ in 0..40_000 {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
if crds.insert(value, rng.gen()).is_ok() {
num_inserts += 1;
}
}
assert_eq!(num_inserts, 20_000);
assert!(num_inserts > 30_000, "num inserts: {}", num_inserts);
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32));
let hash_values: Vec<_> = crds
.values()
.map(|v| v.value_hash)
.chain(
crds_gossip_pull
.purged_values
.iter()
.map(|(value_hash, _)| value_hash)
.cloned(),
)
.collect();
assert_eq!(hash_values.len(), 10_000 + 20_000);
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
assert_eq!(hash_values.len(), 40_000);
let mut false_positives = 0;
for hash_value in hash_values {
let mut num_hits = 0;
@ -951,7 +894,7 @@ pub(crate) mod tests {
}
assert_eq!(num_hits, 1);
}
assert!(false_positives < 50_000, "fp: {}", false_positives);
assert!(false_positives < 150_000, "fp: {}", false_positives);
}
#[test]
@ -1407,7 +1350,7 @@ pub(crate) mod tests {
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
assert_eq!(node_crds.lookup_versioned(&old.label()), None);
assert_eq!(node.purged_values.len(), 1);
assert_eq!(node_crds.num_purged(), 1);
for _ in 0..30 {
// there is a chance of a false positive with bloom filters
// assert that purged value is still in the set
@ -1417,8 +1360,8 @@ pub(crate) mod tests {
}
// purge the value
node.purge_purged(node.crds_timeout + 1);
assert_eq!(node.purged_values.len(), 0);
node_crds.trim_purged(node.crds_timeout + 1);
assert_eq!(node_crds.num_purged(), 0);
}
#[test]
#[allow(clippy::float_cmp)]

View File

@ -11,7 +11,7 @@
use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, Cursor, VersionedCrdsValue},
crds::{Crds, Cursor},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -169,7 +169,7 @@ impl CrdsGossipPush {
from: &Pubkey,
value: CrdsValue,
now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
) -> Result<(), CrdsGossipError> {
self.num_total += 1;
if !self.wallclock_window(now).contains(&value.wallclock()) {
return Err(CrdsGossipError::PushMessageTimeout);
@ -457,7 +457,7 @@ mod test {
// push a new message
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Ok(None)
Ok(())
);
assert_eq!(crds.lookup(&label), Some(&value));
@ -478,7 +478,7 @@ mod test {
// push a new message
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Ok(None)
Ok(())
);
// push an old version
@ -522,19 +522,16 @@ mod test {
// push a new message
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value_old.clone(), 0),
Ok(None)
push.process_push_message(&mut crds, &Pubkey::default(), value_old, 0),
Ok(())
);
// push an old version
ci.wallclock = 1;
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0)
.unwrap()
.unwrap()
.value,
value_old
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Ok(())
);
}
#[test]
@ -555,7 +552,7 @@ mod test {
0,
)));
assert_eq!(crds.insert(value1.clone(), now), Ok(None));
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
@ -564,7 +561,7 @@ mod test {
0,
)));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() {
@ -577,7 +574,7 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
}
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert_eq!(push.active_set.len(), push.num_active);
@ -735,7 +732,7 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), now), Ok(None));
assert_eq!(crds.insert(peer.clone(), now), Ok(()));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -746,7 +743,7 @@ mod test {
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0),
Ok(None)
Ok(())
);
assert_eq!(push.active_set.len(), 1);
assert_eq!(push.new_push_messages(&crds, 0), expected);
@ -765,11 +762,11 @@ mod test {
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
})
.collect();
assert_eq!(crds.insert(peers[0].clone(), now), Ok(None));
assert_eq!(crds.insert(peers[1].clone(), now), Ok(None));
assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now),
Ok(None)
Ok(())
);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
@ -792,7 +789,7 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer.clone(), 0), Ok(()));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -802,7 +799,7 @@ mod test {
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
Ok(None)
Ok(())
);
push.process_prune_msg(
&self_id,
@ -819,7 +816,7 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer, 0), Ok(None));
assert_eq!(crds.insert(peer, 0), Ok(()));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
@ -828,7 +825,7 @@ mod test {
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1),
Ok(None)
Ok(())
);
assert_eq!(push.new_push_messages(&crds, 0), expected);
}
@ -844,7 +841,7 @@ mod test {
// push a new message
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Ok(None)
Ok(())
);
assert_eq!(crds.lookup(&label), Some(&value));

View File

@ -140,7 +140,7 @@ mod test {
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.remove(&label).unwrap()
crds.get(&label).cloned().unwrap()
}
// Returns true if the first mask_bits most significant bits of hash is the

View File

@ -351,17 +351,14 @@ fn network_run_push(
for (to, msgs) in push_messages {
bytes += serialized_size(&msgs).unwrap() as usize;
num_msgs += 1;
let updated = network
let origins: HashSet<_> = network
.get(&to)
.map(|node| {
node.lock()
.unwrap()
.process_push_message(&from, msgs.clone(), now)
})
.unwrap();
let origins: HashSet<_> =
updated.into_iter().map(|u| u.value.pubkey()).collect();
.unwrap()
.lock()
.unwrap()
.process_push_message(&from, msgs.clone(), now)
.into_iter()
.collect();
let prunes_map = network
.get(&to)
.map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))