From 90f8cf09206e3e560c8d78ffabac105561ae6bcf Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 13 Jul 2021 14:04:25 +0000 Subject: [PATCH] makes CrdsGossipPush thread-safe (#18581) --- gossip/src/cluster_info.rs | 4 +- gossip/src/crds_gossip.rs | 27 +-- gossip/src/crds_gossip_push.rs | 366 +++++++++++++++++++++------------ gossip/tests/gossip.rs | 14 +- 4 files changed, 248 insertions(+), 163 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a4126b7e7..352c098e6 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3294,7 +3294,7 @@ mod tests { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); cluster_info.insert_info(spy); { - let mut gossip = cluster_info.gossip.write().unwrap(); + let gossip = cluster_info.gossip.read().unwrap(); gossip.refresh_push_active_set( &cluster_info.id(), cluster_info.my_shred_version(), @@ -3414,7 +3414,7 @@ mod tests { .mock_pong(peer.id, peer.gossip, Instant::now()); cluster_info.insert_info(peer); { - let mut gossip = cluster_info.gossip.write().unwrap(); + let gossip = cluster_info.gossip.read().unwrap(); gossip.refresh_push_active_set( &cluster_info.id(), cluster_info.my_shred_version(), diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index d6de3c512..95bdc91a8 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -15,7 +15,6 @@ use { duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, ping_pong::PingCache, }, - itertools::Itertools, rayon::ThreadPool, solana_ledger::shred::Shred, solana_sdk::{ @@ -48,21 +47,16 @@ impl CrdsGossip { values: Vec, now: u64, ) -> HashSet { - values + self.push + .process_push_message(&mut self.crds, from, values, now) .into_iter() - .filter_map(|val| { - let origin = val.pubkey(); - self.push - .process_push_message(&mut self.crds, from, val, now) - .ok()?; - Some(origin) - }) + .filter_map(Result::ok) .collect() } /// remove redundant paths in the network pub fn prune_received_cache( - &mut self, + &self, self_pubkey: &Pubkey, origins: I, // Unique pubkeys of crds values' owners. stakes: &HashMap, @@ -70,15 +64,8 @@ impl CrdsGossip { where I: IntoIterator, { - origins - .into_iter() - .flat_map(|origin| { - self.push - .prune_received_cache(self_pubkey, &origin, stakes) - .into_iter() - .zip(std::iter::repeat(origin)) - }) - .into_group_map() + self.push + .prune_received_cache_many(self_pubkey, origins, stakes) } pub fn new_push_messages( @@ -181,7 +168,7 @@ impl CrdsGossip { /// refresh the push active set /// * ratio - number of actives to rotate pub fn refresh_push_active_set( - &mut self, + &self, self_pubkey: &Pubkey, self_shred_version: u16, stakes: &HashMap, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 2c744411f..35daf5958 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -20,6 +20,7 @@ use { }, bincode::serialized_size, indexmap::map::IndexMap, + itertools::Itertools, lru::LruCache, rand::{seq::SliceRandom, Rng}, solana_runtime::bloom::{AtomicBloom, Bloom}, @@ -27,7 +28,12 @@ use { std::{ cmp, collections::{HashMap, HashSet}, - ops::RangeBounds, + iter::repeat, + ops::{DerefMut, RangeBounds}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, RwLock, + }, }, }; @@ -45,27 +51,30 @@ const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; pub struct CrdsGossipPush { /// max bytes per message - pub max_bytes: usize, + max_bytes: usize, /// active set of validators for push - active_set: IndexMap>, + active_set: RwLock>>, /// Cursor into the crds table for values to push. - crds_cursor: Cursor, + crds_cursor: Mutex, /// Cache that tracks which validators a message was received from /// bool indicates it has been pruned. /// This cache represents a lagging view of which validators /// currently have this node in their `active_set` - received_cache: HashMap< - Pubkey, // origin/owner - HashMap, + #[allow(clippy::type_complexity)] + received_cache: Mutex< + HashMap< + Pubkey, // origin/owner + HashMap, + >, >, - last_pushed_to: LruCache, - pub num_active: usize, - pub push_fanout: usize, - pub msg_timeout: u64, + last_pushed_to: RwLock>, + num_active: usize, + push_fanout: usize, + pub(crate) msg_timeout: u64, pub prune_timeout: u64, - pub num_total: usize, - pub num_old: usize, - pub num_pushes: usize, + pub num_total: AtomicUsize, + pub num_old: AtomicUsize, + pub num_pushes: AtomicUsize, } impl Default for CrdsGossipPush { @@ -73,23 +82,23 @@ impl Default for CrdsGossipPush { Self { // Allow upto 64 Crds Values per PUSH max_bytes: PACKET_DATA_SIZE * 64, - active_set: IndexMap::new(), - crds_cursor: Cursor::default(), - received_cache: HashMap::new(), - last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), + active_set: RwLock::default(), + crds_cursor: Mutex::default(), + received_cache: Mutex::default(), + last_pushed_to: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)), num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, - num_total: 0, - num_old: 0, - num_pushes: 0, + num_total: AtomicUsize::default(), + num_old: AtomicUsize::default(), + num_pushes: AtomicUsize::default(), } } } impl CrdsGossipPush { pub fn num_pending(&self, crds: &Crds) -> usize { - let mut cursor = self.crds_cursor; + let mut cursor: Cursor = *self.crds_cursor.lock().unwrap(); crds.get_entries(&mut cursor).count() } @@ -98,15 +107,42 @@ impl CrdsGossipPush { ((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1) } - pub(crate) fn prune_received_cache( - &mut self, + pub(crate) fn prune_received_cache_many( + &self, + self_pubkey: &Pubkey, + origins: I, // Unique pubkeys of crds values' owners. + stakes: &HashMap, + ) -> HashMap> + where + I: IntoIterator, + { + let mut received_cache = self.received_cache.lock().unwrap(); + origins + .into_iter() + .flat_map(|origin| { + let peers = Self::prune_received_cache( + self_pubkey, + &origin, + stakes, + received_cache.deref_mut(), + ); + peers.into_iter().zip(repeat(origin)) + }) + .into_group_map() + } + + fn prune_received_cache( self_pubkey: &Pubkey, origin: &Pubkey, stakes: &HashMap, + received_cache: &mut HashMap< + Pubkey, // origin/owner + HashMap, + >, ) -> Vec { let origin_stake = stakes.get(origin).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0); - let peers = match self.received_cache.get_mut(origin) { + let peers = match received_cache.get_mut(origin) { None => return Vec::default(), Some(peers) => peers, }; @@ -164,28 +200,48 @@ impl CrdsGossipPush { } /// process a push message to the network + /// Returns origins' pubkeys of upserted values. pub(crate) fn process_push_message( - &mut self, + &self, crds: &mut Crds, from: &Pubkey, - value: CrdsValue, + values: Vec, now: u64, - ) -> Result<(), CrdsGossipError> { - self.num_total += 1; - if !self.wallclock_window(now).contains(&value.wallclock()) { - return Err(CrdsGossipError::PushMessageTimeout); - } - let origin = value.pubkey(); - self.received_cache - .entry(origin) - .or_default() - .entry(*from) - .and_modify(|(_pruned, timestamp)| *timestamp = now) - .or_insert((/*pruned:*/ false, now)); - crds.insert(value, now).map_err(|_| { - self.num_old += 1; - CrdsGossipError::PushMessageOldVersion - }) + ) -> Vec> { + self.num_total.fetch_add(values.len(), Ordering::Relaxed); + let values: Vec<_> = { + let wallclock_window = self.wallclock_window(now); + let mut received_cache = self.received_cache.lock().unwrap(); + values + .into_iter() + .map(|value| { + if !wallclock_window.contains(&value.wallclock()) { + return Err(CrdsGossipError::PushMessageTimeout); + } + let origin = value.pubkey(); + let peers = received_cache.entry(origin).or_default(); + peers + .entry(*from) + .and_modify(|(_pruned, timestamp)| *timestamp = now) + .or_insert((/*pruned:*/ false, now)); + Ok(value) + }) + .collect() + }; + values + .into_iter() + .map(|value| { + let value = value?; + let origin = value.pubkey(); + match crds.insert(value, now) { + Ok(()) => Ok(origin), + Err(_) => { + self.num_old.fetch_add(1, Ordering::Relaxed); + Err(CrdsGossipError::PushMessageOldVersion) + } + } + }) + .collect() } /// New push message to broadcast to peers. @@ -193,8 +249,10 @@ impl CrdsGossipPush { /// peers. /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. - pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - let push_fanout = self.push_fanout.min(self.active_set.len()); + pub fn new_push_messages(&self, crds: &Crds, now: u64) -> HashMap> { + let active_set = self.active_set.read().unwrap(); + let active_set_len = active_set.len(); + let push_fanout = self.push_fanout.min(active_set_len); if push_fanout == 0 { return HashMap::default(); } @@ -203,8 +261,9 @@ impl CrdsGossipPush { let mut total_bytes: usize = 0; let mut push_messages: HashMap> = HashMap::new(); let wallclock_window = self.wallclock_window(now); + let mut crds_cursor = self.crds_cursor.lock().unwrap(); let entries = crds - .get_entries(&mut self.crds_cursor) + .get_entries(crds_cursor.deref_mut()) .map(|entry| &entry.value) .filter(|value| wallclock_window.contains(&value.wallclock())); for value in entries { @@ -219,8 +278,8 @@ impl CrdsGossipPush { // learns the MST for that origin. let offset = origin.as_ref()[0] as usize; for i in offset..offset + push_fanout { - let index = i % self.active_set.len(); - let (peer, filter) = self.active_set.get_index(index).unwrap(); + let index = i % active_set_len; + let (peer, filter) = active_set.get_index(index).unwrap(); if !filter.contains(&origin) || value.should_force_push(peer) { trace!("new_push_messages insert {} {:?}", *peer, value); push_messages.entry(*peer).or_default().push(value.clone()); @@ -228,17 +287,20 @@ impl CrdsGossipPush { } } } - self.num_pushes += num_pushes; - trace!("new_push_messages {} {}", num_values, self.active_set.len()); + drop(crds_cursor); + drop(active_set); + self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed); + trace!("new_push_messages {} {}", num_values, active_set_len); + let mut last_pushed_to = self.last_pushed_to.write().unwrap(); for target_pubkey in push_messages.keys().copied() { - self.last_pushed_to.put(target_pubkey, now); + last_pushed_to.put(target_pubkey, now); } push_messages } /// add the `from` to the peer's filter of nodes pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { - if let Some(filter) = self.active_set.get(peer) { + if let Some(filter) = self.active_set.read().unwrap().get(peer) { for origin in origins { if origin != self_pubkey { filter.add(origin); @@ -255,7 +317,7 @@ impl CrdsGossipPush { /// refresh the push active set /// * ratio - active_set.len()/ratio is the number of actives to rotate pub(crate) fn refresh_push_active_set( - &mut self, + &self, crds: &Crds, stakes: &HashMap, gossip_validators: Option<&HashSet>, @@ -271,7 +333,8 @@ impl CrdsGossipPush { #[cfg(not(debug_assertions))] const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY; let mut rng = rand::thread_rng(); - let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); + let mut active_set = self.active_set.write().unwrap(); + let need = Self::compute_need(self.num_active, active_set.len(), ratio); let mut new_items = HashMap::new(); let (weights, peers): (Vec<_>, Vec<_>) = self .push_options(crds, self_id, self_shred_version, stakes, gossip_validators) @@ -286,7 +349,7 @@ impl CrdsGossipPush { if new_items.len() >= need { break; } - if self.active_set.contains_key(&peer) || new_items.contains_key(&peer) { + if active_set.contains_key(&peer) || new_items.contains_key(&peer) { continue; } let bloom = AtomicBloom::from(Bloom::random( @@ -297,14 +360,14 @@ impl CrdsGossipPush { bloom.add(&peer); new_items.insert(peer, bloom); } - let mut keys: Vec = self.active_set.keys().cloned().collect(); + let mut keys: Vec = active_set.keys().cloned().collect(); keys.shuffle(&mut rng); let num = keys.len() / ratio; for k in &keys[..num] { - self.active_set.swap_remove(k); + active_set.swap_remove(k); } for (k, v) in new_items { - self.active_set.insert(k, v); + active_set.insert(k, v); } } @@ -320,6 +383,7 @@ impl CrdsGossipPush { let mut rng = rand::thread_rng(); let max_weight = u16::MAX as f32 - 1.0; let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); + let last_pushed_to = self.last_pushed_to.read().unwrap(); crds.get_nodes() .filter_map(|value| { let info = value.value.contact_info().unwrap(); @@ -343,11 +407,7 @@ impl CrdsGossipPush { }) }) .map(|info| { - let last_pushed_to = self - .last_pushed_to - .peek(&info.id) - .copied() - .unwrap_or_default(); + let last_pushed_to = last_pushed_to.peek(&info.id).copied().unwrap_or_default(); let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32; let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); @@ -359,8 +419,8 @@ impl CrdsGossipPush { } /// purge received push message cache - pub(crate) fn purge_old_received_cache(&mut self, min_time: u64) { - self.received_cache.retain(|_, v| { + pub(crate) fn purge_old_received_cache(&self, min_time: u64) { + self.received_cache.lock().unwrap().retain(|_, v| { v.retain(|_, (_, t)| *t > min_time); !v.is_empty() }); @@ -368,19 +428,31 @@ impl CrdsGossipPush { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { - let active_set = self - .active_set - .iter() - .map(|(k, v)| (*k, v.mock_clone())) - .collect(); - let mut last_pushed_to = LruCache::new(self.last_pushed_to.cap()); - for (k, v) in self.last_pushed_to.iter().rev() { - last_pushed_to.put(*k, *v); - } + let active_set = { + let active_set = self.active_set.read().unwrap(); + active_set + .iter() + .map(|(k, v)| (*k, v.mock_clone())) + .collect() + }; + let last_pushed_to = { + let last_pushed_to = self.last_pushed_to.read().unwrap(); + let mut clone = LruCache::new(last_pushed_to.cap()); + for (k, v) in last_pushed_to.iter().rev() { + clone.put(*k, *v); + } + clone + }; + let received_cache = self.received_cache.lock().unwrap().clone(); + let crds_cursor = *self.crds_cursor.lock().unwrap(); Self { - active_set, - received_cache: self.received_cache.clone(), - last_pushed_to, + active_set: RwLock::new(active_set), + received_cache: Mutex::new(received_cache), + last_pushed_to: RwLock::new(last_pushed_to), + crds_cursor: Mutex::new(crds_cursor), + num_total: AtomicUsize::new(self.num_total.load(Ordering::Relaxed)), + num_old: AtomicUsize::new(self.num_old.load(Ordering::Relaxed)), + num_pushes: AtomicUsize::new(self.num_pushes.load(Ordering::Relaxed)), ..*self } } @@ -396,7 +468,7 @@ mod test { #[test] fn test_prune() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut stakes = HashMap::new(); let self_id = solana_sdk::pubkey::new_rand(); @@ -410,12 +482,20 @@ mod test { let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand()); let mut low_staked_set = HashSet::new(); low_staked_peers.for_each(|p| { - let _ = push.process_push_message(&mut crds, &p, value.clone(), 0); + push.process_push_message(&mut crds, &p, vec![value.clone()], 0); low_staked_set.insert(p); stakes.insert(p, 1); }); - let pruned = push.prune_received_cache(&self_id, &origin, &stakes); + let pruned = { + let mut received_cache = push.received_cache.lock().unwrap(); + CrdsGossipPush::prune_received_cache( + &self_id, + &origin, + &stakes, + received_cache.deref_mut(), + ) + }; assert!( pruned.is_empty(), "should not prune if min threshold has not been reached" @@ -424,9 +504,17 @@ mod test { let high_staked_peer = solana_sdk::pubkey::new_rand(); let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10; stakes.insert(high_staked_peer, high_stake); - let _ = push.process_push_message(&mut crds, &high_staked_peer, value, 0); + push.process_push_message(&mut crds, &high_staked_peer, vec![value], 0); - let pruned = push.prune_received_cache(&self_id, &origin, &stakes); + let pruned = { + let mut received_cache = push.received_cache.lock().unwrap(); + CrdsGossipPush::prune_received_cache( + &self_id, + &origin, + &stakes, + received_cache.deref_mut(), + ) + }; assert!( pruned.len() < low_staked_set.len() + 1, "should not prune all peers" @@ -442,7 +530,7 @@ mod test { #[test] fn test_process_push_one() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -450,43 +538,43 @@ mod test { let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + [Ok(label.pubkey())], ); assert_eq!(crds.get(&label).unwrap().value, value); // push it again - assert_matches!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Err(CrdsGossipError::PushMessageOldVersion) + assert_eq!( + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Err(CrdsGossipError::PushMessageOldVersion)], ); } #[test] fn test_process_push_old_version() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ci.wallclock = 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Ok(ci.id)], ); // push an old version ci.wallclock = 0; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Err(CrdsGossipError::PushMessageOldVersion) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Err(CrdsGossipError::PushMessageOldVersion)], ); } #[test] fn test_process_push_timeout() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let timeout = push.msg_timeout; let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); @@ -494,38 +582,39 @@ mod test { ci.wallclock = timeout + 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Err(CrdsGossipError::PushMessageTimeout) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Err(CrdsGossipError::PushMessageTimeout)], ); // push a version to far in the past ci.wallclock = 0; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1), - Err(CrdsGossipError::PushMessageTimeout) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], timeout + 1), + [Err(CrdsGossipError::PushMessageTimeout)] ); } #[test] fn test_process_push_update() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + let origin = ci.id; ci.wallclock = 0; let value_old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value_old, 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value_old], 0), + [Ok(origin)], ); // push an old version ci.wallclock = 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Ok(origin)], ); } #[test] @@ -540,7 +629,7 @@ mod test { solana_logger::setup(); let now = timestamp(); let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -549,21 +638,26 @@ mod test { assert_eq!(crds.insert(value1.clone(), now), Ok(())); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); - assert!(push.active_set.get(&value1.label().pubkey()).is_some()); + let active_set = push.active_set.read().unwrap(); + assert!(active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); - assert!(push.active_set.get(&value2.label().pubkey()).is_none()); + assert!(active_set.get(&value2.label().pubkey()).is_none()); + drop(active_set); assert_eq!(crds.insert(value2.clone(), now), Ok(())); for _ in 0..30 { push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); - if push.active_set.get(&value2.label().pubkey()).is_some() { + let active_set = push.active_set.read().unwrap(); + if active_set.get(&value2.label().pubkey()).is_some() { break; } } - assert!(push.active_set.get(&value2.label().pubkey()).is_some()); - + { + let active_set = push.active_set.read().unwrap(); + assert!(active_set.get(&value2.label().pubkey()).is_some()); + } for _ in 0..push.num_active { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), @@ -571,14 +665,14 @@ mod test { assert_eq!(crds.insert(value2.clone(), now), Ok(())); } push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); - assert_eq!(push.active_set.len(), push.num_active); + assert_eq!(push.active_set.read().unwrap().len(), push.num_active); } #[test] fn test_active_set_refresh_with_bank() { solana_logger::setup(); let time = timestamp() - 1024; //make sure there's at least a 1 second delay let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut stakes = HashMap::new(); for i in 1..=100 { let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -588,7 +682,7 @@ mod test { let id = peer.label().pubkey(); crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); - push.last_pushed_to.put(id, time); + push.last_pushed_to.write().unwrap().put(id, time); } let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty()); @@ -721,7 +815,7 @@ mod test { fn test_new_push_messages() { let now = timestamp(); let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -735,11 +829,12 @@ mod test { ))); let mut expected = HashMap::new(); expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); + let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 0), + [Ok(origin)] ); - assert_eq!(push.active_set.len(), 1); + assert_eq!(push.active_set.read().unwrap().len(), 1); assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] @@ -747,7 +842,7 @@ mod test { let now = timestamp(); let mut rng = rand::thread_rng(); let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let peers: Vec<_> = vec![0, 0, now] .into_iter() .map(|wallclock| { @@ -756,11 +851,12 @@ mod test { CrdsValue::new_unsigned(CrdsData::ContactInfo(peer)) }) .collect(); + let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect(); assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![peers[2].clone()], now), + [Ok(origin[2])], ); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); @@ -771,14 +867,14 @@ mod test { ] .into_iter() .collect(); - assert_eq!(push.active_set.len(), 3); + assert_eq!(push.active_set.read().unwrap().len(), 3); assert_eq!(push.new_push_messages(&crds, now), expected); } #[test] fn test_process_prune() { let mut crds = Crds::default(); let self_id = solana_sdk::pubkey::new_rand(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -791,9 +887,10 @@ mod test { 0, ))); let expected = HashMap::new(); + let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg.clone()], 0), + [Ok(origin)], ); push.process_prune_msg( &self_id, @@ -805,7 +902,7 @@ mod test { #[test] fn test_purge_old_pending_push_messages() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, @@ -817,9 +914,10 @@ mod test { ci.wallclock = 1; let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); let expected = HashMap::new(); + let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 1), + [Ok(origin)], ); assert_eq!(push.new_push_messages(&crds, 0), expected); } @@ -827,22 +925,22 @@ mod test { #[test] fn test_purge_old_received_cache() { let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ci.wallclock = 0; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Ok(()) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + [Ok(label.pubkey())] ); assert_eq!(crds.get(&label).unwrap().value, value); // push it again - assert_matches!( - push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Err(CrdsGossipError::PushMessageOldVersion) + assert_eq!( + push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + [Err(CrdsGossipError::PushMessageOldVersion)], ); // purge the old pushed @@ -850,8 +948,8 @@ mod test { // push it again assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Err(CrdsGossipError::PushMessageOldVersion) + push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + [Err(CrdsGossipError::PushMessageOldVersion)], ); } } diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 9d6c08229..8107b6baf 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -322,9 +322,9 @@ pub fn cluster_info_scale() { .filter(|v| v.message.account_keys == tx.message.account_keys) .count(); let gossip = node.gossip.read().unwrap(); - num_old += gossip.push.num_old; - num_push_total += gossip.push.num_total; - num_pushes += gossip.push.num_pushes; + num_old += gossip.push.num_old.load(Ordering::Relaxed); + num_push_total += gossip.push.num_total.load(Ordering::Relaxed); + num_pushes += gossip.push.num_pushes.load(Ordering::Relaxed); num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed); if has_tx == 0 { not_done += 1; @@ -348,10 +348,10 @@ pub fn cluster_info_scale() { ); sleep(Duration::from_millis(200)); for (node, _, _) in nodes.iter() { - let mut gossip = node.gossip.write().unwrap(); - gossip.push.num_old = 0; - gossip.push.num_total = 0; - gossip.push.num_pushes = 0; + let gossip = node.gossip.read().unwrap(); + gossip.push.num_old.store(0, Ordering::Relaxed); + gossip.push.num_total.store(0, Ordering::Relaxed); + gossip.push.num_pushes.store(0, Ordering::Relaxed); gossip.pull.num_pulls.store(0, Ordering::Relaxed); } }