diff --git a/core/Cargo.toml b/core/Cargo.toml index e1fbb2e09f..6fec41d111 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -94,6 +94,9 @@ name = "banking_stage" [[bench]] name = "blockstore" +[[bench]] +name = "crds" + [[bench]] name = "crds_gossip_pull" diff --git a/core/benches/crds.rs b/core/benches/crds.rs new file mode 100644 index 0000000000..c415febfee --- /dev/null +++ b/core/benches/crds.rs @@ -0,0 +1,31 @@ +#![feature(test)] + +extern crate test; + +use rand::{thread_rng, Rng}; +use rayon::ThreadPoolBuilder; +use solana_core::crds::Crds; +use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; +use solana_core::crds_value::CrdsValue; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use test::Bencher; + +#[bench] +fn bench_find_old_labels(bencher: &mut Bencher) { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut rng = thread_rng(); + let mut crds = Crds::default(); + let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000; + std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng), rng.gen_range(0, now))) + .take(50_000) + .for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok())); + let mut timeouts = HashMap::new(); + timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + bencher.iter(|| { + let out = crds.find_old_labels(&thread_pool, now, &timeouts); + assert!(out.len() > 10); + assert!(out.len() < 250); + out + }); +} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4db2cebee3..7d542e982f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1664,6 +1664,7 @@ impl ClusterInfo { fn handle_purge( self: &Arc, + thread_pool: &ThreadPool, bank_forks: &Option>>, stakes: &HashMap, ) { @@ -1681,7 +1682,7 @@ impl ClusterInfo { let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout); let num_purged = self .time_gossip_write_lock("purge", &self.stats.purge) - .purge(timestamp(), &timeouts); + .purge(thread_pool, timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); } @@ -1742,7 +1743,7 @@ impl ClusterInfo { return; } - self.handle_purge(&bank_forks, &stakes); + self.handle_purge(&thread_pool, &bank_forks, &stakes); self.handle_adopt_shred_version(&mut adopt_shred_version); diff --git a/core/src/crds.rs b/core/src/crds.rs index 3182011e6f..98973d66d0 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -28,6 +28,7 @@ use crate::crds_shards::CrdsShards; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use bincode::serialize; use indexmap::map::{Entry, IndexMap}; +use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use std::cmp; @@ -176,37 +177,40 @@ impl Crds { /// * timeouts - Pubkey specific timeouts with Pubkey::default() as the default timeout. pub fn find_old_labels( &self, + thread_pool: &ThreadPool, now: u64, timeouts: &HashMap, ) -> Vec { let default_timeout = *timeouts .get(&Pubkey::default()) .expect("must have default timeout"); - self.table - .iter() - .filter_map(|(k, v)| { - let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout); - if v.local_timestamp.saturating_add(*timeout) <= now { - Some(k) - } else { - None - } - }) - .cloned() - .collect() + thread_pool.install(|| { + self.table + .par_iter() + .with_min_len(1024) + .filter_map(|(k, v)| { + let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout); + if v.local_timestamp.saturating_add(*timeout) <= now { + Some(k.clone()) + } else { + None + } + }) + .collect() + }) } - pub fn remove(&mut self, key: &CrdsValueLabel) { - if let Some((index, _, value)) = self.table.swap_remove_full(key) { - assert!(self.shards.remove(index, &value)); - // The previously last element in the table is now moved to the - // 'index' position. Shards need to be updated accordingly. - if index < self.table.len() { - let value = self.table.index(index); - assert!(self.shards.remove(self.table.len(), value)); - assert!(self.shards.insert(index, value)); - } + pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { + let (index, _, value) = self.table.swap_remove_full(key)?; + assert!(self.shards.remove(index, &value)); + // The previously last element in the table is now moved to the + // 'index' position. Shards need to be updated accordingly. + if index < self.table.len() { + let value = self.table.index(index); + assert!(self.shards.remove(self.table.len(), value)); + assert!(self.shards.insert(index, value)); } + Some(value) } } @@ -216,6 +220,7 @@ mod test { use crate::contact_info::ContactInfo; use crate::crds_value::CrdsData; use rand::{thread_rng, Rng}; + use rayon::ThreadPoolBuilder; #[test] fn test_insert() { @@ -288,48 +293,67 @@ mod test { } #[test] fn test_find_old_records_default() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_eq!(crds.insert(val.clone(), 1), Ok(None)); let mut set = HashMap::new(); set.insert(Pubkey::default(), 0); - assert!(crds.find_old_labels(0, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); set.insert(Pubkey::default(), 1); - assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &set), + vec![val.label()] + ); set.insert(Pubkey::default(), 2); - assert_eq!(crds.find_old_labels(4, &set), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 4, &set), + vec![val.label()] + ); } #[test] fn test_find_old_records_with_override() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut rng = thread_rng(); let mut crds = Crds::default(); let mut timeouts = HashMap::new(); let val = CrdsValue::new_rand(&mut rng); timeouts.insert(Pubkey::default(), 3); assert_eq!(crds.insert(val.clone(), 0), Ok(None)); - assert!(crds.find_old_labels(2, &timeouts).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.insert(val.pubkey(), 1); - assert_eq!(crds.find_old_labels(2, &timeouts), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &timeouts), + vec![val.label()] + ); timeouts.insert(val.pubkey(), u64::MAX); - assert!(crds.find_old_labels(2, &timeouts).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.insert(Pubkey::default(), 1); - assert!(crds.find_old_labels(2, &timeouts).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.remove(&val.pubkey()); - assert_eq!(crds.find_old_labels(2, &timeouts), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &timeouts), + vec![val.label()] + ); } #[test] fn test_remove_default() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_matches!(crds.insert(val.clone(), 1), Ok(_)); let mut set = HashMap::new(); set.insert(Pubkey::default(), 1); - assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &set), + vec![val.label()] + ); crds.remove(&val.label()); - assert!(crds.find_old_labels(2, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); } #[test] fn test_find_old_records_staked() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_eq!(crds.insert(val.clone(), 1), Ok(None)); @@ -337,20 +361,26 @@ mod test { //now < timestamp set.insert(Pubkey::default(), 0); set.insert(val.pubkey(), 0); - assert!(crds.find_old_labels(0, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); //pubkey shouldn't expire since its timeout is MAX set.insert(val.pubkey(), std::u64::MAX); - assert!(crds.find_old_labels(2, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); //default has max timeout, but pubkey should still expire set.insert(Pubkey::default(), std::u64::MAX); set.insert(val.pubkey(), 1); - assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &set), + vec![val.label()] + ); set.insert(val.pubkey(), 2); - assert!(crds.find_old_labels(2, &set).is_empty()); - assert_eq!(crds.find_old_labels(3, &set), vec![val.label()]); + assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); + assert_eq!( + crds.find_old_labels(&thread_pool, 3, &set), + vec![val.label()] + ); } #[test] @@ -396,6 +426,7 @@ mod test { #[test] fn test_remove_staked() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_matches!(crds.insert(val.clone(), 1), Ok(_)); @@ -404,9 +435,12 @@ mod test { //default has max timeout, but pubkey should still expire set.insert(Pubkey::default(), std::u64::MAX); set.insert(val.pubkey(), 1); - assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + assert_eq!( + crds.find_old_labels(&thread_pool, 2, &set), + vec![val.label()] + ); crds.remove(&val.label()); - assert!(crds.find_old_labels(2, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); } #[test] diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 9996dd50f8..ad95478f7e 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -232,7 +232,12 @@ impl CrdsGossip { self.pull.make_timeouts(&self.id, stakes, epoch_ms) } - pub fn purge(&mut self, now: u64, timeouts: &HashMap) -> usize { + pub fn purge( + &mut self, + thread_pool: &ThreadPool, + now: u64, + timeouts: &HashMap, + ) -> usize { let mut rv = 0; if now > self.push.msg_timeout { let min = now - self.push.msg_timeout; @@ -247,7 +252,9 @@ impl CrdsGossip { let min = self.pull.crds_timeout; assert_eq!(timeouts[&self.id], std::u64::MAX); assert_eq!(timeouts[&Pubkey::default()], min); - rv = self.pull.purge_active(&mut self.crds, now, &timeouts); + rv = self + .pull + .purge_active(thread_pool, &mut self.crds, now, &timeouts); } if now > 5 * self.pull.crds_timeout { let min = now - 5 * self.pull.crds_timeout; diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index a62d1a61c4..d895a8b729 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -537,24 +537,21 @@ impl CrdsGossipPull { /// The value_hash of an active item is put into self.purged_values queue pub fn purge_active( &mut self, + thread_pool: &ThreadPool, crds: &mut Crds, now: u64, timeouts: &HashMap, ) -> usize { - let old = crds.find_old_labels(now, timeouts); - let mut purged: VecDeque<_> = old - .iter() - .filter_map(|label| { - let rv = crds - .lookup_versioned(label) - .map(|val| (val.value_hash, val.local_timestamp)); - crds.remove(label); - rv - }) - .collect(); - let ret = purged.len(); - self.purged_values.append(&mut purged); - ret + let num_purged_values = self.purged_values.len(); + self.purged_values.extend( + crds.find_old_labels(thread_pool, now, timeouts) + .into_iter() + .filter_map(|label| { + let val = crds.remove(&label)?; + Some((val.value_hash, val.local_timestamp)) + }), + ); + self.purged_values.len() - num_purged_values } /// Purge values from the `self.purged_values` queue that are older then purge_timeout pub fn purge_purged(&mut self, min_ts: u64) { @@ -1229,7 +1226,7 @@ mod test { // purge let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1); - node.purge_active(&mut node_crds, 2, &timeouts); + node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts); //verify self is still valid after purge assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 529768396b..2540e041bb 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -254,7 +254,7 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver ); }); // push for a bit - let (queue_size, bytes_tx) = network_run_push(network, start, end); + let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end); total_bytes += bytes_tx; trace!( "network_simulator_push_{}: queue_size: {} bytes: {}", @@ -278,7 +278,12 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver } } -fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, usize) { +fn network_run_push( + thread_pool: &ThreadPool, + network: &mut Network, + start: usize, + end: usize, +) -> (usize, usize) { let mut bytes: usize = 0; let mut num_msgs: usize = 0; let mut total: usize = 0; @@ -295,7 +300,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, .map(|node| { let mut node_lock = node.lock().unwrap(); let timeouts = node_lock.make_timeouts_test(); - node_lock.purge(now, &timeouts); + node_lock.purge(thread_pool, now, &timeouts); node_lock.new_push_messages(vec![], now) }) .collect();