diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 78e15fd3ad..93a44fb7fe 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1687,18 +1687,12 @@ impl ClusterInfo { Ok(()) } - fn process_entrypoints(&self, entrypoints_processed: &mut bool) { - if *entrypoints_processed { - return; - } - + fn process_entrypoints(&self) -> bool { let mut entrypoints = self.entrypoints.write().unwrap(); if entrypoints.is_empty() { // No entrypoint specified. Nothing more to process - *entrypoints_processed = true; - return; + return true; } - for entrypoint in entrypoints.iter_mut() { if entrypoint.id == Pubkey::default() { // If a pull from the entrypoint was successful it should exist in the CRDS table @@ -1727,11 +1721,10 @@ impl ClusterInfo { .set_shred_version(entrypoint.shred_version); } } - - *entrypoints_processed = self.my_shred_version() != 0 + self.my_shred_version() != 0 && entrypoints .iter() - .all(|entrypoint| entrypoint.id != Pubkey::default()); + .all(|entrypoint| entrypoint.id != Pubkey::default()) } fn handle_purge( @@ -1867,8 +1860,7 @@ impl ClusterInfo { } self.handle_purge(&thread_pool, &bank_forks, &stakes); - - self.process_entrypoints(&mut entrypoints_processed); + entrypoints_processed = entrypoints_processed || self.process_entrypoints(); //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep @@ -3851,21 +3843,17 @@ mod tests { cluster_info.set_entrypoint(entrypoint.clone()); let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert!(pings.is_empty()); - assert_eq!(1, pulls.len() as u64); - match pulls.get(0) { - Some((addr, msg)) => { - assert_eq!(*addr, entrypoint.gossip); - match msg { - Protocol::PullRequest(_, value) => { - assert!(value.verify()); - assert_eq!(value.pubkey(), cluster_info.id()) - } - _ => panic!("wrong protocol"), + assert_eq!(pulls.len(), 64); + for (addr, msg) in pulls { + assert_eq!(addr, entrypoint.gossip); + match msg { + Protocol::PullRequest(_, value) => { + assert!(value.verify()); + assert_eq!(value.pubkey(), cluster_info.id()) } + _ => panic!("wrong protocol"), } - None => panic!("entrypoint should be a pull destination"), } - // now add this message back to the table and make sure after the next pull, the entrypoint is unset let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); @@ -3879,7 +3867,7 @@ mod tests { ); let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(pings.len(), 1); - assert_eq!(1, pulls.len() as u64); + assert_eq!(pulls.len(), 64); assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]); } @@ -4068,24 +4056,30 @@ mod tests { // fresh timestamp). There should only be one pull request to `other_node` let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); - assert_eq!(1, pulls.len() as u64); - assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); + assert_eq!(64, pulls.len()); + assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip)); // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests cluster_info.entrypoints.write().unwrap()[0].wallclock = 0; let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); - assert_eq!(2, pulls.len() as u64); - assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); - assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); + assert_eq!(pulls.len(), 64 * 2); + assert!(pulls + .iter() + .take(64) + .all(|(addr, _)| *addr == other_node.gossip)); + assert!(pulls + .iter() + .skip(64) + .all(|(addr, _)| *addr == entrypoint.gossip)); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); - assert_eq!(1, pulls.len() as u64); - assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); + assert_eq!(pulls.len(), 64); + assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip)); } #[test] @@ -4249,8 +4243,7 @@ mod tests { .any(|entrypoint| *entrypoint == gossiped_entrypoint1_info)); // Adopt the entrypoint's gossiped contact info and verify - let mut entrypoints_processed = false; - ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed); + let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info); assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2); assert!(cluster_info .entrypoints @@ -4278,8 +4271,7 @@ mod tests { // Adopt the entrypoint's gossiped contact info and verify error!("Adopt the entrypoint's gossiped contact info and verify"); - let mut entrypoints_processed = false; - ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed); + let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info); assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2); assert!(cluster_info .entrypoints @@ -4322,8 +4314,7 @@ mod tests { cluster_info.insert_info(gossiped_entrypoint_info.clone()); // Adopt the entrypoint's gossiped contact info and verify - let mut entrypoints_processed = false; - ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed); + let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info); assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 1); assert_eq!( cluster_info.entrypoints.read().unwrap()[0], diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 98a63e13dd..57564c7bd3 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -29,9 +29,6 @@ use std::{ sync::Mutex, }; -///The min size for bloom filters -pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; - pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 64d4dbc9c7..aa252f1a8b 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -13,7 +13,7 @@ use crate::{ cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, crds::{Crds, CrdsError}, - crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, + crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, ping_pong::PingCache, @@ -30,7 +30,6 @@ use solana_sdk::{ signature::{Keypair, Signer}, }; use std::{ - cmp, collections::{HashMap, HashSet, VecDeque}, convert::TryInto, net::SocketAddr, @@ -468,11 +467,10 @@ impl CrdsGossipPull { bloom_size: usize, ) -> Vec { const PAR_MIN_LENGTH: usize = 512; - let num = cmp::max( - CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, - crds.len() + self.purged_values.len() + self.failed_inserts.len(), - ); - let filters = CrdsFilterSet::new(num, bloom_size); + const MIN_NUM_BLOOM_ITEMS: usize = 65_536; + let num_items = crds.len() + self.purged_values.len() + self.failed_inserts.len(); + let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); + let filters = CrdsFilterSet::new(num_items, bloom_size); thread_pool.install(|| { crds.par_values() .with_min_len(PAR_MIN_LENGTH) @@ -915,7 +913,7 @@ mod test { } assert_eq!(num_inserts, 20_000); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); - assert_eq!(filters.len(), 32); + assert_eq!(filters.len(), 64); let hash_values: Vec<_> = crds .values() .map(|v| v.value_hash) @@ -1170,24 +1168,29 @@ mod test { CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); assert_eq!(rsp[0].len(), 0); - - assert_eq!(filters.len(), 1); - filters.push(filters[0].clone()); - //should return new value since caller is new - filters[1].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1, - ))); - + assert_eq!(filters.len(), 64); + filters.extend({ + // Should return new value since caller is new. + let now = CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1; + let caller = ContactInfo::new_localhost(&Pubkey::new_unique(), now); + let caller = CrdsValue::new_unsigned(CrdsData::ContactInfo(caller)); + filters + .iter() + .map(|(_, filter)| (caller.clone(), filter.clone())) + .collect::>() + }); let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); - assert_eq!(rsp.len(), 2); - assert_eq!(rsp[0].len(), 0); - assert_eq!(rsp[1].len(), 1); // Orders are also preserved. + assert_eq!(rsp.len(), 128); + // There should be only one non-empty response in the 2nd half. + // Orders are also preserved. + assert!(rsp.iter().take(64).all(|r| r.is_empty())); + assert_eq!(rsp.iter().filter(|r| r.is_empty()).count(), 127); + assert_eq!(rsp.iter().find(|r| !r.is_empty()).unwrap().len(), 1); } #[test] @@ -1312,7 +1315,7 @@ mod test { ); let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let mut rsp = dest.generate_pull_responses( + let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, @@ -1332,13 +1335,13 @@ mod test { if rsp.is_empty() { continue; } - assert_eq!(rsp.len(), 1); + assert_eq!(rsp.len(), 64); let failed = node .process_pull_response( &mut node_crds, &node_pubkey, &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), - rsp.pop().unwrap(), + rsp.into_iter().flatten().collect(), 1, ) .0; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index f0b3b95e85..1c9bf18e25 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -12,14 +12,13 @@ use crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, crds::{Crds, Cursor, VersionedCrdsValue}, - crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, + crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, weighted_shuffle::weighted_shuffle, }; use bincode::serialized_size; use indexmap::map::IndexMap; -use itertools::Itertools; use lru::LruCache; use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::{AtomicBloom, Bloom}; @@ -264,46 +263,44 @@ impl CrdsGossipPush { network_size: usize, ratio: usize, ) { + const BLOOM_FALSE_RATE: f64 = 0.1; + const BLOOM_MAX_BITS: usize = 1024 * 8 * 4; let mut rng = rand::thread_rng(); let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - - let options: Vec<_> = self.push_options( - crds, - &self_id, - self_shred_version, - stakes, - gossip_validators, - ); - if options.is_empty() { + let (weights, peers): (Vec<_>, Vec<_>) = self + .push_options( + crds, + &self_id, + self_shred_version, + stakes, + gossip_validators, + ) + .into_iter() + .unzip(); + if peers.is_empty() { return; } - - let mut seed = [0; 32]; - rng.fill(&mut seed[..]); - let mut shuffle = weighted_shuffle( - &options.iter().map(|weighted| weighted.0).collect_vec(), - seed, - ) - .into_iter(); - - while new_items.len() < need { - match shuffle.next() { - Some(index) => { - let item = options[index].1; - if self.active_set.get(&item.id).is_some() { - continue; - } - if new_items.get(&item.id).is_some() { - continue; - } - let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); - let bloom: AtomicBloom<_> = Bloom::random(size, 0.1, 1024 * 8 * 4).into(); - bloom.add(&item.id); - new_items.insert(item.id, bloom); - } - _ => break, + let num_bloom_items = CRDS_UNIQUE_PUBKEY_CAPACITY.max(network_size); + let shuffle = { + let mut seed = [0; 32]; + rng.fill(&mut seed[..]); + weighted_shuffle(&weights, seed).into_iter() + }; + for peer in shuffle.map(|i| peers[i].id) { + if new_items.len() >= need { + break; } + if self.active_set.contains_key(&peer) || new_items.contains_key(&peer) { + continue; + } + let bloom = AtomicBloom::from(Bloom::random( + num_bloom_items, + BLOOM_FALSE_RATE, + BLOOM_MAX_BITS, + )); + bloom.add(&peer); + new_items.insert(peer, bloom); } let mut keys: Vec = self.active_set.keys().cloned().collect(); keys.shuffle(&mut rng);