diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 0e72efd8a..e7f405f06 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -80,7 +80,7 @@ use { solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, std::{ borrow::Cow, - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, fs::{self, File}, io::BufReader, @@ -2101,77 +2101,27 @@ impl ClusterInfo { fn handle_batch_pull_responses( &self, - responses: Vec<(Pubkey, Vec)>, - thread_pool: &ThreadPool, + responses: Vec, stakes: &HashMap, epoch_duration: Duration, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time); - if responses.is_empty() { - return; - } - fn extend(hash_map: &mut HashMap>, (key, mut value): (K, Vec)) - where - K: Eq + std::hash::Hash, - { - match hash_map.entry(key) { - Entry::Occupied(mut entry) => { - let entry_value = entry.get_mut(); - if entry_value.len() < value.len() { - std::mem::swap(entry_value, &mut value); - } - entry_value.extend(value); - } - Entry::Vacant(entry) => { - entry.insert(value); - } - } - } - fn merge( - mut hash_map: HashMap>, - other: HashMap>, - ) -> HashMap> - where - K: Eq + std::hash::Hash, - { - if hash_map.len() < other.len() { - return merge(other, hash_map); - } - for kv in other { - extend(&mut hash_map, kv); - } - hash_map - } - let responses = thread_pool.install(|| { - responses - .into_par_iter() - .with_min_len(1024) - .fold(HashMap::new, |mut hash_map, kv| { - extend(&mut hash_map, kv); - hash_map - }) - .reduce(HashMap::new, merge) - }); if !responses.is_empty() { let self_pubkey = self.id(); let timeouts = self .gossip .make_timeouts(self_pubkey, stakes, epoch_duration); - for (from, data) in responses { - self.handle_pull_response(&from, data, &timeouts); - } + self.handle_pull_response(responses, &timeouts); } } // Returns (failed, timeout, success) fn handle_pull_response( &self, - from: &Pubkey, crds_values: Vec, timeouts: &CrdsTimeouts, ) -> (usize, usize, usize) { let len = crds_values.len(); - trace!("PullResponse me: {} from: {} len={}", self.id(), from, len); let mut pull_stats = ProcessPullStats::default(); let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = { let _st = ScopedTimer::from(&self.stats.filter_pull_response); @@ -2446,9 +2396,9 @@ impl ClusterInfo { Protocol::PullRequest(filter, caller) => { pull_requests.push((from_addr, filter, caller)) } - Protocol::PullResponse(from, data) => { + Protocol::PullResponse(_, mut data) => { check_duplicate_instance(&data)?; - pull_responses.push((from, data)); + pull_responses.append(&mut data); } Protocol::PushMessage(from, data) => { check_duplicate_instance(&data)?; @@ -2460,13 +2410,10 @@ impl ClusterInfo { } } if self.require_stake_for_gossip(stakes) { - for (_, data) in &mut pull_responses { - retain_staked(data, stakes); - } + retain_staked(&mut pull_responses, stakes); for (_, data) in &mut push_messages { retain_staked(data, stakes); } - pull_responses.retain(|(_, data)| !data.is_empty()); push_messages.retain(|(_, data)| !data.is_empty()); } self.handle_batch_ping_messages(ping_messages, recycler, response_sender); @@ -2478,7 +2425,7 @@ impl ClusterInfo { stakes, response_sender, ); - self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration); + self.handle_batch_pull_responses(pull_responses, stakes, epoch_duration); self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( @@ -3212,18 +3159,11 @@ mod tests { ); assert_eq!( (0, 0, 1), - ClusterInfo::handle_pull_response( - &cluster_info, - &entrypoint_pubkey, - data.clone(), - &timeouts - ) + cluster_info.handle_pull_response(data.clone(), &timeouts) ); - - let entrypoint_pubkey2 = solana_sdk::pubkey::new_rand(); assert_eq!( (1, 0, 0), - ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts) + cluster_info.handle_pull_response(data, &timeouts) ); } @@ -3981,12 +3921,7 @@ mod tests { &stakes, Duration::from_millis(cluster_info.gossip.pull.crds_timeout), ); - ClusterInfo::handle_pull_response( - &cluster_info, - &entrypoint_pubkey, - vec![entrypoint_crdsvalue], - &timeouts, - ); + cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts); let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(pings.len(), 1); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); @@ -4495,7 +4430,7 @@ mod tests { ); assert_eq!( (0, 0, NO_ENTRIES), - cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) + cluster_info.handle_pull_response(data, &timeouts) ); }