From f1198fc6d5ac1f9131d2b044a03fad524e5616e2 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 26 Jul 2021 17:13:11 +0000 Subject: [PATCH] filters crds values in parallel when responding to gossip pull-requests (#18877) When responding to gossip pull-requests, filter_crds_values takes a lot of time while holding onto read-lock: https://github.com/solana-labs/solana/blob/f51d64868/gossip/src/crds_gossip_pull.rs#L509-L566 This commit will filter-crds-values in parallel using rayon thread-pools. --- gossip/src/cluster_info.rs | 39 +++++++---- gossip/src/crds_gossip.rs | 9 ++- gossip/src/crds_gossip_pull.rs | 117 ++++++++++++++++++--------------- gossip/src/gossip_service.rs | 14 ++-- gossip/tests/crds_gossip.rs | 5 +- 5 files changed, 108 insertions(+), 76 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 43b265921..607d73ffc 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1635,9 +1635,8 @@ impl ClusterInfo { bank_forks: Option>>, sender: PacketSender, gossip_validators: Option>, - exit: &Arc, + exit: Arc, ) -> JoinHandle<()> { - let exit = exit.clone(); let thread_pool = ThreadPoolBuilder::new() .num_threads(std::cmp::min(get_thread_count(), 8)) .thread_name(|i| format!("ClusterInfo::gossip-{}", i)) @@ -1812,8 +1811,13 @@ impl ClusterInfo { self.stats .pull_requests_count .add_relaxed(requests.len() as u64); - let response = - self.handle_pull_requests(recycler, requests, stakes, require_stake_for_gossip); + let response = self.handle_pull_requests( + thread_pool, + recycler, + requests, + stakes, + require_stake_for_gossip, + ); if !response.is_empty() { self.stats .packets_sent_pull_responses_count @@ -1883,6 +1887,7 @@ impl ClusterInfo { // and tries to send back to them the values it detects are missing. fn handle_pull_requests( &self, + thread_pool: &ThreadPool, recycler: &PacketsRecycler, requests: Vec, stakes: &HashMap, @@ -1914,8 +1919,12 @@ impl ClusterInfo { let self_id = self.id(); let mut pull_responses = { let _st = ScopedTimer::from(&self.stats.generate_pull_responses); - self.gossip - .generate_pull_responses(&caller_and_filters, output_size_limit, now) + self.gossip.generate_pull_responses( + thread_pool, + &caller_and_filters, + output_size_limit, + now, + ) }; if require_stake_for_gossip { for resp in &mut pull_responses { @@ -2516,6 +2525,9 @@ impl ClusterInfo { match self.run_socket_consume(&receiver, &sender, &thread_pool) { Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + // A send operation can only fail if the receiving end of a + // channel is disconnected. + Err(GossipError::SendError) => break, Err(err) => error!("gossip consume: {}", err), Ok(()) => (), } @@ -2531,19 +2543,18 @@ impl ClusterInfo { requests_receiver: Receiver>, response_sender: PacketSender, should_check_duplicate_instance: bool, - exit: &Arc, + exit: Arc, ) -> JoinHandle<()> { - let exit = exit.clone(); + let mut last_print = Instant::now(); let recycler = PacketsRecycler::default(); + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("sol-gossip-work-{}", i)) + .build() + .unwrap(); Builder::new() .name("solana-listen".to_string()) .spawn(move || { - let thread_pool = ThreadPoolBuilder::new() - .num_threads(std::cmp::min(get_thread_count(), 8)) - .thread_name(|i| format!("sol-gossip-work-{}", i)) - .build() - .unwrap(); - let mut last_print = Instant::now(); while !exit.load(Ordering::Relaxed) { if let Err(err) = self.run_listen( &recycler, diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 17d11d60b..753ca8c26 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -240,11 +240,18 @@ impl CrdsGossip { pub fn generate_pull_responses( &self, + thread_pool: &ThreadPool, filters: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { - CrdsGossipPull::generate_pull_responses(&self.crds, filters, output_size_limit, now) + CrdsGossipPull::generate_pull_responses( + thread_pool, + &self.crds, + filters, + output_size_limit, + now, + ) } pub fn filter_pull_responses( diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 949dbdf81..fb8fcbb3a 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -22,7 +22,6 @@ use { ping_pong::PingCache, weighted_shuffle::WeightedShuffle, }, - itertools::Itertools, lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool}, @@ -39,7 +38,7 @@ use { iter::{repeat, repeat_with}, net::SocketAddr, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicI64, AtomicUsize, Ordering}, Mutex, RwLock, }, time::{Duration, Instant}, @@ -356,12 +355,13 @@ impl CrdsGossipPull { /// Create gossip responses to pull requests pub(crate) fn generate_pull_responses( + thread_pool: &ThreadPool, crds: &RwLock, requests: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { - Self::filter_crds_values(crds, requests, output_size_limit, now) + Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now) } // Checks if responses should be inserted and @@ -508,9 +508,10 @@ impl CrdsGossipPull { /// Filter values that fail the bloom filter up to `max_bytes`. fn filter_crds_values( + thread_pool: &ThreadPool, crds: &RwLock, filters: &[(CrdsValue, CrdsFilter)], - mut output_size_limit: usize, // Limit number of crds values returned. + output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; @@ -518,50 +519,57 @@ impl CrdsGossipPull { //skip filters from callers that are too old let caller_wallclock_window = now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); - let mut dropped_requests = 0; - let mut total_skipped = 0; + let dropped_requests = AtomicUsize::default(); + let total_skipped = AtomicUsize::default(); + let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX); + let output_size_limit = AtomicI64::new(output_size_limit); let crds = crds.read().unwrap(); - let ret: Vec<_> = filters - .iter() - .map(|(caller, filter)| { - if output_size_limit == 0 { - return None; + let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| { + if output_size_limit.load(Ordering::Relaxed) <= 0 { + return Vec::default(); + } + let caller_wallclock = caller.wallclock(); + if !caller_wallclock_window.contains(&caller_wallclock) { + dropped_requests.fetch_add(1, Ordering::Relaxed); + return Vec::default(); + } + let caller_pubkey = caller.pubkey(); + let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); + let pred = |entry: &&VersionedCrdsValue| { + debug_assert!(filter.test_mask(&entry.value_hash)); + // Skip values that are too new. + if entry.value.wallclock() > caller_wallclock { + total_skipped.fetch_add(1, Ordering::Relaxed); + false + } else { + !filter.filter_contains(&entry.value_hash) + && (entry.value.pubkey() != caller_pubkey + || entry.value.should_force_push(&caller_pubkey)) } - let caller_wallclock = caller.wallclock(); - if !caller_wallclock_window.contains(&caller_wallclock) { - dropped_requests += 1; - return Some(vec![]); - } - let caller_pubkey = caller.pubkey(); - let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); - let pred = |entry: &&VersionedCrdsValue| { - debug_assert!(filter.test_mask(&entry.value_hash)); - // Skip values that are too new. - if entry.value.wallclock() > caller_wallclock { - total_skipped += 1; - false - } else { - !filter.filter_contains(&entry.value_hash) - && (entry.value.pubkey() != caller_pubkey - || entry.value.should_force_push(&caller_pubkey)) - } - }; - let out: Vec<_> = crds - .filter_bitmask(filter.mask, filter.mask_bits) - .filter(pred) - .map(|entry| entry.value.clone()) - .take(output_size_limit) - .collect(); - output_size_limit -= out.len(); - Some(out) - }) - .while_some() - .collect(); + }; + let out: Vec<_> = crds + .filter_bitmask(filter.mask, filter.mask_bits) + .filter(pred) + .map(|entry| entry.value.clone()) + .take(output_size_limit.load(Ordering::Relaxed).max(0) as usize) + .collect(); + output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed); + out + }; + let ret: Vec<_> = thread_pool.install(|| { + filters + .par_iter() + .map(|(caller, filter)| apply_filter(caller, filter)) + .collect() + }); inc_new_counter_info!( "gossip_filter_crds_values-dropped_requests", - dropped_requests + filters.len() - ret.len() + dropped_requests.into_inner() + ); + inc_new_counter_info!( + "gossip_filter_crds_values-dropped_values", + total_skipped.into_inner() ); - inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } @@ -1213,10 +1221,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = CrdsGossipPull::generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); assert_eq!(rsp[0].len(), 0); @@ -1233,10 +1242,11 @@ pub(crate) mod tests { //should skip new value since caller is to old let rsp = CrdsGossipPull::generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + usize::MAX, // output_size_limit + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now ); assert_eq!(rsp[0].len(), 0); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS); @@ -1251,9 +1261,10 @@ pub(crate) mod tests { .collect::>() }); let rsp = CrdsGossipPull::generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, + usize::MAX, // output_size_limit CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS); @@ -1304,10 +1315,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = CrdsGossipPull::generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); let callers = filters.into_iter().map(|(caller, _)| caller); CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); @@ -1382,10 +1394,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = CrdsGossipPull::generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); CrdsGossipPull::process_pull_requests( &dest_crds, diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index b71a1e261..3cb43e865 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -58,27 +58,27 @@ impl GossipService { 1, false, ); - let (response_sender, response_receiver) = channel(); let (consume_sender, listen_receiver) = channel(); + // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 + let _consume_sender = consume_sender.clone(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( request_receiver, consume_sender, exit.clone(), ); - let t_listen = ClusterInfo::listen( - cluster_info.clone(), + let (response_sender, response_receiver) = channel(); + let t_listen = cluster_info.clone().listen( bank_forks.clone(), listen_receiver, response_sender.clone(), should_check_duplicate_instance, - exit, + exit.clone(), ); - let t_gossip = ClusterInfo::gossip( - cluster_info.clone(), + let t_gossip = cluster_info.clone().gossip( bank_forks, response_sender, gossip_validators, - exit, + exit.clone(), ); // To work around: // https://github.com/rust-lang/rust/issues/54267 diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 03dfc5ffe..5fb894960 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -505,7 +505,7 @@ fn network_run_pull( .collect() }; let transfered: Vec<_> = requests - .into_par_iter() + .into_iter() .map(|(to, filters, caller_info)| { let mut bytes: usize = 0; let mut msgs: usize = 0; @@ -527,8 +527,9 @@ fn network_run_pull( let rsp = node .gossip .generate_pull_responses( + thread_pool, &filters, - /*output_size_limit=*/ usize::MAX, + usize::MAX, // output_size_limit now, ) .into_iter()