diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8751225154..38d442e08b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1933,16 +1933,20 @@ impl ClusterInfo { let filtered_len = crds_values.len(); let mut pull_stats = ProcessPullStats::default(); - let (filtered_pulls, filtered_pulls_expired_timeout) = self + let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self .time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); - if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() { + if !filtered_pulls.is_empty() + || !filtered_pulls_expired_timeout.is_empty() + || !failed_inserts.is_empty() + { self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response) .process_pull_responses( from, filtered_pulls, filtered_pulls_expired_timeout, + failed_inserts, timestamp(), &mut pull_stats, ); @@ -2156,9 +2160,13 @@ impl ClusterInfo { fn print_reset_stats(&self, last_print: &mut Instant) { if last_print.elapsed().as_millis() > 2000 { - let (table_size, purged_values_size) = { + let (table_size, purged_values_size, failed_inserts_size) = { let r_gossip = self.gossip.read().unwrap(); - (r_gossip.crds.table.len(), r_gossip.pull.purged_values.len()) + ( + r_gossip.crds.table.len(), + r_gossip.pull.purged_values.len(), + r_gossip.pull.failed_inserts.len(), + ) }; datapoint_info!( "cluster_info_stats", @@ -2185,6 +2193,7 @@ impl ClusterInfo { ), ("table_size", table_size as i64, i64), ("purged_values_size", purged_values_size as i64, i64), + ("failed_inserts_size", failed_inserts_size as i64, i64), ); datapoint_info!( "cluster_info_stats2", diff --git a/core/src/crds.rs b/core/src/crds.rs index 352f5b370b..cf48568d9a 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -105,19 +105,13 @@ impl Crds { &self, value: CrdsValue, local_timestamp: u64, - ) -> Option { + ) -> (bool, VersionedCrdsValue) { let new_value = self.new_versioned(local_timestamp, value); let label = new_value.value.label(); - let would_insert = self - .table - .get(&label) - .map(|current| new_value > *current) - .unwrap_or(true); - if would_insert { - Some(new_value) - } else { - None - } + // New value is outdated and fails to insert, if it already exists in + // the table with a more recent wallclock. + let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current); + (!outdated, new_value) } /// insert the new value, returns the old value if insert succeeds pub fn insert_versioned( diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index b4a32df057..c95a1393d3 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -11,7 +11,7 @@ use crate::{ crds_value::{CrdsValue, CrdsValueLabel}, }; use rayon::ThreadPool; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::{hash::Hash, pubkey::Pubkey}; use std::collections::{HashMap, HashSet}; ///The min size for bloom filters @@ -180,7 +180,7 @@ impl CrdsGossip { response: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, - ) -> (Vec, Vec) { + ) -> (Vec, Vec, Vec) { self.pull .filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats) } @@ -191,6 +191,7 @@ impl CrdsGossip { from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, + failed_inserts: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, ) { @@ -199,6 +200,7 @@ impl CrdsGossip { from, responses, responses_expired_timeout, + failed_inserts, now, process_pull_stats, ); @@ -238,6 +240,7 @@ impl CrdsGossip { let min = now - 5 * self.pull.crds_timeout; self.pull.purge_purged(min); } + self.pull.purge_failed_inserts(now); rv } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index b05dd9d63c..305f4e2538 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -29,6 +29,8 @@ use std::ops::Index; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000; +// Retention period of hashes of received outdated values. +const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; pub const FALSE_RATE: f64 = 0.1f64; pub const KEYS: f64 = 8f64; @@ -172,6 +174,11 @@ pub struct CrdsGossipPull { pub pull_request_time: HashMap, /// hash and insert time pub purged_values: VecDeque<(Hash, u64)>, + // Hash value and record time (ms) of the pull responses which failed to be + // inserted in crds table; Preserved to stop the sender to send back the + // same outdated payload again by adding them to the filter for the next + // pull request. + pub failed_inserts: VecDeque<(Hash, u64)>, pub crds_timeout: u64, pub msg_timeout: u64, pub num_pulls: usize, @@ -182,6 +189,7 @@ impl Default for CrdsGossipPull { Self { purged_values: VecDeque::new(), pull_request_time: HashMap::new(), + failed_inserts: VecDeque::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, num_pulls: 0, @@ -294,9 +302,10 @@ impl CrdsGossipPull { // Checks if responses should be inserted and // returns those responses converted to VersionedCrdsValue - // Separated in two vecs as: + // Separated in three vecs as: // .0 => responses that update the owner timestamp // .1 => responses that do not update the owner timestamp + // .2 => hash value of outdated values which will fail to insert. pub fn filter_pull_responses( &self, crds: &Crds, @@ -304,9 +313,18 @@ impl CrdsGossipPull { responses: Vec, now: u64, stats: &mut ProcessPullStats, - ) -> (Vec, Vec) { + ) -> (Vec, Vec, Vec) { let mut versioned = vec![]; let mut versioned_expired_timestamp = vec![]; + let mut failed_inserts = vec![]; + let mut maybe_push = |response, values: &mut Vec| { + let (push, value) = crds.would_insert(response, now); + if push { + values.push(value); + } else { + failed_inserts.push(value.value_hash) + } + }; for r in responses { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout @@ -337,24 +355,17 @@ impl CrdsGossipPull { if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { stats.timeout_count += 1; stats.failed_timeout += 1; - continue; } else { // Silently insert this old value without bumping record timestamps - match crds.would_insert(r, now) { - Some(resp) => versioned_expired_timestamp.push(resp), - None => stats.failed_insert += 1, - } - continue; + maybe_push(r, &mut versioned_expired_timestamp); } + continue; } } } - match crds.would_insert(r, now) { - Some(resp) => versioned.push(resp), - None => stats.failed_insert += 1, - } + maybe_push(r, &mut versioned); } - (versioned, versioned_expired_timestamp) + (versioned, versioned_expired_timestamp, failed_inserts) } /// process a vec of pull responses @@ -364,41 +375,59 @@ impl CrdsGossipPull { from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, + mut failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, ) -> Vec<(CrdsValueLabel, Hash, u64)> { let mut success = vec![]; let mut owners = HashSet::new(); for r in responses_expired_timeout { - stats.failed_insert += crds.insert_versioned(r).is_err() as usize; + let value_hash = r.value_hash; + if crds.insert_versioned(r).is_err() { + failed_inserts.push(value_hash); + } } for r in responses { - let owner = r.value.label().pubkey(); let label = r.value.label(); let wc = r.value.wallclock(); let hash = r.value_hash; - let old = crds.insert_versioned(r); - if old.is_err() { - stats.failed_insert += 1; - } else { - stats.success += 1; - self.num_pulls += 1; - success.push((label, hash, wc)); + match crds.insert_versioned(r) { + Err(_) => failed_inserts.push(hash), + Ok(old) => { + stats.success += 1; + self.num_pulls += 1; + owners.insert(label.pubkey()); + success.push((label, hash, wc)); + if let Some(val) = old { + self.purged_values + .push_back((val.value_hash, val.local_timestamp)) + } + } } - old.ok().map(|opt| { - owners.insert(owner); - opt.map(|val| { - self.purged_values - .push_back((val.value_hash, val.local_timestamp)) - }) - }); } owners.insert(*from); for owner in owners { crds.update_record_timestamp(&owner, now); } + stats.failed_insert += failed_inserts.len(); + self.purge_failed_inserts(now); + self.failed_inserts + .extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); success } + + pub fn purge_failed_inserts(&mut self, now: u64) { + if FAILED_INSERTS_RETENTION_MS < now { + let cutoff = now - FAILED_INSERTS_RETENTION_MS; + let outdated = self + .failed_inserts + .iter() + .take_while(|(_, ts)| *ts < cutoff) + .count(); + self.failed_inserts.drain(..outdated); + } + } + // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter pub fn build_crds_filters( @@ -410,23 +439,28 @@ impl CrdsGossipPull { const PAR_MIN_LENGTH: usize = 512; let num = cmp::max( CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, - crds.table.values().count() + self.purged_values.len(), + crds.table.len() + self.purged_values.len() + self.failed_inserts.len(), ); let filters = CrdsFilterSet::new(num, bloom_size); - thread_pool.join( - || { - crds.table - .par_values() - .with_min_len(PAR_MIN_LENGTH) - .for_each(|v| filters.add(v.value_hash)) - }, - || { - self.purged_values - .par_iter() - .with_min_len(PAR_MIN_LENGTH) - .for_each(|(v, _)| filters.add(*v)) - }, - ); + thread_pool.install(|| { + crds.table + .par_values() + .with_min_len(PAR_MIN_LENGTH) + .map(|v| v.value_hash) + .chain( + self.purged_values + .par_iter() + .with_min_len(PAR_MIN_LENGTH) + .map(|(v, _)| *v), + ) + .chain( + self.failed_inserts + .par_iter() + .with_min_len(PAR_MIN_LENGTH) + .map(|(v, _)| *v), + ) + .for_each(|v| filters.add(v)); + }); filters.into() } @@ -544,13 +578,14 @@ impl CrdsGossipPull { now: u64, ) -> (usize, usize, usize) { let mut stats = ProcessPullStats::default(); - let (versioned, versioned_expired_timeout) = + let (versioned, versioned_expired_timeout, failed_inserts) = self.filter_pull_responses(crds, timeouts, response, now, &mut stats); self.process_pull_responses( crds, from, versioned, versioned_expired_timeout, + failed_inserts, now, &mut stats, ); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 51349454e1..69aeace1c9 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -459,9 +459,16 @@ fn network_run_pull( let mut node = node.lock().unwrap(); node.mark_pull_request_creation_time(&from, now); let mut stats = ProcessPullStats::default(); - let (vers, vers_expired_timeout) = + let (vers, vers_expired_timeout, failed_inserts) = node.filter_pull_responses(&timeouts, rsp, now, &mut stats); - node.process_pull_responses(&from, vers, vers_expired_timeout, now, &mut stats); + node.process_pull_responses( + &from, + vers, + vers_expired_timeout, + failed_inserts, + now, + &mut stats, + ); overhead += stats.failed_insert; overhead += stats.failed_timeout; }