From 590b75140fb7331e8f29aaf88a82ff341629be33 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sat, 21 Jan 2023 22:28:48 +0000 Subject: [PATCH] removes legacy retransmit tests (#29817) Retransmit code has moved to core/src/cluster_nodes.rs and has been significantly revised. gossip/tests/cluster_info.rs is testing the old code which is no longer relevant. --- bloom/src/bloom.rs | 13 -- gossip/src/cluster_info.rs | 27 --- gossip/src/crds.rs | 18 -- gossip/src/crds_gossip.rs | 10 -- gossip/src/crds_gossip_pull.rs | 10 -- gossip/src/crds_gossip_push.rs | 16 -- gossip/src/ping_pong.rs | 21 --- gossip/src/push_active_set.rs | 13 -- gossip/src/received_cache.rs | 4 +- gossip/tests/cluster_info.rs | 314 --------------------------------- 10 files changed, 2 insertions(+), 444 deletions(-) delete mode 100644 gossip/tests/cluster_info.rs diff --git a/bloom/src/bloom.rs b/bloom/src/bloom.rs index 7d1f984033..2a6b6be53a 100644 --- a/bloom/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -199,19 +199,6 @@ impl AtomicBloom { bit.store(0u64, Ordering::Relaxed); }); } - - // Only for tests and simulations. - pub fn mock_clone(&self) -> Self { - Self { - keys: self.keys.clone(), - bits: self - .bits - .iter() - .map(|v| AtomicU64::new(v.load(Ordering::Relaxed))) - .collect(), - ..*self - } - } } impl From> for Bloom { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 47a532f198..55b1a2b2fa 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -432,33 +432,6 @@ impl ClusterInfo { me } - // Should only be used by tests and simulations - pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { - let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); - my_contact_info.id = *new_id; - ClusterInfo { - gossip: self.gossip.mock_clone(), - keypair: RwLock::new(self.keypair.read().unwrap().clone()), - entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()), - outbound_budget: self.outbound_budget.clone_non_atomic(), - my_contact_info: RwLock::new(my_contact_info), - ping_cache: Mutex::new(self.ping_cache.lock().unwrap().mock_clone()), - stats: GossipStats::default(), - socket: UdpSocket::bind("0.0.0.0:0").unwrap(), - local_message_pending_push_queue: Mutex::new( - self.local_message_pending_push_queue - .lock() - .unwrap() - .clone(), - ), - contact_debug_interval: self.contact_debug_interval, - instance: RwLock::new(NodeInstance::new(&mut thread_rng(), *new_id, timestamp())), - contact_info_path: PathBuf::default(), - contact_save_interval: 0, // disabled - ..*self - } - } - pub fn set_contact_debug_interval(&mut self, new: u64) { self.contact_debug_interval = new; } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 820a7df8ff..b7521ed120 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -642,24 +642,6 @@ impl Crds { pub(crate) fn take_stats(&self) -> CrdsStats { std::mem::take(&mut self.stats.lock().unwrap()) } - - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - Self { - table: self.table.clone(), - cursor: self.cursor, - shards: self.shards.clone(), - nodes: self.nodes.clone(), - votes: self.votes.clone(), - epoch_slots: self.epoch_slots.clone(), - duplicate_shreds: self.duplicate_shreds.clone(), - records: self.records.clone(), - entries: self.entries.clone(), - purged: self.purged.clone(), - shred_versions: self.shred_versions.clone(), - stats: Mutex::::default(), - } - } } impl Default for CrdsDataStats { diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index a77d785c81..521439a450 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -320,16 +320,6 @@ impl CrdsGossip { self.pull.purge_failed_inserts(now); rv } - - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - let crds = self.crds.read().unwrap().mock_clone(); - Self { - crds: RwLock::new(crds), - push: self.push.mock_clone(), - pull: self.pull.mock_clone(), - } - } } // Returns active and valid cluster nodes to gossip with. diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 634b94c2eb..cb1302272c 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -569,16 +569,6 @@ impl CrdsGossipPull { stats.success, ) } - - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - let failed_inserts = self.failed_inserts.read().unwrap().clone(); - Self { - failed_inserts: RwLock::new(failed_inserts), - num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)), - ..*self - } - } } #[cfg(test)] diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 807cba5ef8..c4e9c22bc0 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -274,22 +274,6 @@ impl CrdsGossipPush { let mut active_set = self.active_set.write().unwrap(); active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes) } - - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - let active_set = self.active_set.read().unwrap().mock_clone(); - let received_cache = self.received_cache.lock().unwrap().mock_clone(); - let crds_cursor = *self.crds_cursor.lock().unwrap(); - Self { - active_set: RwLock::new(active_set), - received_cache: Mutex::new(received_cache), - crds_cursor: Mutex::new(crds_cursor), - num_total: AtomicUsize::new(self.num_total.load(Ordering::Relaxed)), - num_old: AtomicUsize::new(self.num_old.load(Ordering::Relaxed)), - num_pushes: AtomicUsize::new(self.num_pushes.load(Ordering::Relaxed)), - ..*self - } - } } #[cfg(test)] diff --git a/gossip/src/ping_pong.rs b/gossip/src/ping_pong.rs index 6c7399281e..30ecc0d31b 100644 --- a/gossip/src/ping_pong.rs +++ b/gossip/src/ping_pong.rs @@ -242,27 +242,6 @@ impl PingCache { (check, ping) } - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - let mut clone = Self { - ttl: self.ttl, - rate_limit_delay: self.rate_limit_delay, - pings: LruCache::new(self.pings.cap()), - pongs: LruCache::new(self.pongs.cap()), - pending_cache: LruCache::new(self.pending_cache.cap()), - }; - for (k, v) in self.pongs.iter().rev() { - clone.pings.put(*k, *v); - } - for (k, v) in self.pongs.iter().rev() { - clone.pongs.put(*k, *v); - } - for (k, v) in self.pending_cache.iter().rev() { - clone.pending_cache.put(*k, *v); - } - clone - } - /// Only for tests and simulations. pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) { self.pongs.put((node, socket), now); diff --git a/gossip/src/push_active_set.rs b/gossip/src/push_active_set.rs index 8ae9798512..9196aae6d8 100644 --- a/gossip/src/push_active_set.rs +++ b/gossip/src/push_active_set.rs @@ -101,19 +101,6 @@ impl PushActiveSet { fn get_entry(&self, stake: Option<&u64>) -> &PushActiveSetEntry { &self.0[get_stake_bucket(stake)] } - - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { - Self(std::array::from_fn(|k| { - PushActiveSetEntry( - self.0[k] - .0 - .iter() - .map(|(&node, filter)| (node, filter.mock_clone())) - .collect(), - ) - })) - } } impl PushActiveSetEntry { diff --git a/gossip/src/received_cache.rs b/gossip/src/received_cache.rs index a77a758e2a..d1e7623dfd 100644 --- a/gossip/src/received_cache.rs +++ b/gossip/src/received_cache.rs @@ -55,8 +55,8 @@ impl ReceivedCache { .flatten() } - // Only for tests and simulations. - pub(crate) fn mock_clone(&self) -> Self { + #[cfg(test)] + fn mock_clone(&self) -> Self { let mut cache = LruCache::new(self.0.cap()); for (&origin, entry) in self.0.iter().rev() { cache.put(origin, entry.clone()); diff --git a/gossip/tests/cluster_info.rs b/gossip/tests/cluster_info.rs deleted file mode 100644 index 4b1d0956e2..0000000000 --- a/gossip/tests/cluster_info.rs +++ /dev/null @@ -1,314 +0,0 @@ -#![allow(clippy::integer_arithmetic)] -use { - crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}, - itertools::Itertools, - rand::SeedableRng, - rand_chacha::ChaChaRng, - rayon::{iter::ParallelIterator, prelude::*}, - serial_test::serial, - solana_gossip::{ - cluster_info::{compute_retransmit_peers, ClusterInfo}, - legacy_contact_info::LegacyContactInfo as ContactInfo, - weighted_shuffle::WeightedShuffle, - }, - solana_sdk::{pubkey::Pubkey, signer::keypair::Keypair}, - solana_streamer::socket::SocketAddrSpace, - std::{ - collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, - time::Instant, - }, -}; - -type Nodes = HashMap, Receiver<(i32, bool)>)>; - -fn num_threads() -> usize { - num_cpus::get() -} - -/// Search for the a node with the given balance -fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) { - batches.par_iter_mut().for_each(|batch| { - if batch.contains_key(id) { - let _ = batch.get_mut(id).unwrap().1.insert(shred); - } - }); -} - -fn sorted_retransmit_peers_and_stakes( - cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, -) -> (Vec, Vec<(u64, usize)>) { - let mut peers = cluster_info.tvu_peers(); - // insert "self" into this list for the layer and neighborhood computation - peers.push(cluster_info.my_contact_info()); - let stakes_and_index = sorted_stakes_with_index(&peers, stakes); - (peers, stakes_and_index) -} - -fn sorted_stakes_with_index( - peers: &[ContactInfo], - stakes: Option<&HashMap>, -) -> Vec<(u64, usize)> { - let stakes_and_index: Vec<_> = peers - .iter() - .enumerate() - .map(|(i, c)| { - // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is - // assumed to be missing entry. So let's make sure stake weights are atleast 1 - let stake = 1.max( - stakes - .as_ref() - .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), - ); - (stake, i) - }) - .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { - if r_stake == l_stake { - peers[*r_info].id.cmp(&peers[*l_info].id) - } else { - r_stake.cmp(l_stake) - } - }) - .collect(); - - stakes_and_index -} - -fn shuffle_peers_and_index( - id: &Pubkey, - peers: &[ContactInfo], - stakes_and_index: &[(u64, usize)], - seed: [u8; 32], -) -> (usize, Vec<(u64, usize)>) { - let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed); - let self_index = shuffled_stakes_and_index - .iter() - .enumerate() - .find_map(|(i, (_stake, index))| { - if peers[*index].id == *id { - Some(i) - } else { - None - } - }) - .unwrap(); - (self_index, shuffled_stakes_and_index) -} - -fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> { - let mut rng = ChaChaRng::from_seed(seed); - let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); - let shuffle = WeightedShuffle::new("stake_weighted_shuffle", &stake_weights); - shuffle - .shuffle(&mut rng) - .map(|i| stakes_and_index[i]) - .collect() -} - -fn retransmit( - mut shuffled_nodes: Vec, - senders: &HashMap>, - cluster: &ClusterInfo, - fanout: usize, - shred: i32, - retransmit: bool, -) -> i32 { - let mut seed = [0; 32]; - let mut my_index = 0; - let mut index = 0; - shuffled_nodes.retain(|c| { - if c.id == cluster.id() { - my_index = index; - false - } else { - index += 1; - true - } - }); - seed[0..4].copy_from_slice(&shred.to_le_bytes()); - let shuffled_indices: Vec<_> = (0..shuffled_nodes.len()).collect(); - let (neighbors, children) = compute_retransmit_peers(fanout, my_index, &shuffled_indices); - children.into_iter().for_each(|i| { - let s = senders.get(&shuffled_nodes[i].id).unwrap(); - let _ = s.send((shred, retransmit)); - }); - - if retransmit { - neighbors.into_iter().for_each(|i| { - let s = senders.get(&shuffled_nodes[i].id).unwrap(); - let _ = s.send((shred, false)); - }); - } - - shred -} - -#[allow(clippy::type_complexity)] -fn run_simulation(stakes: &[u64], fanout: usize) { - let num_threads = num_threads(); - // set timeout to 5 minutes - let timeout = 60 * 5; - - // describe the leader - let leader_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - let cluster_info = ClusterInfo::new( - leader_info.clone(), - Arc::new(Keypair::new()), - SocketAddrSpace::Unspecified, - ); - - // setup staked nodes - let mut staked_nodes = HashMap::new(); - - // setup accounts for all nodes (leader has 0 bal) - let (s, r) = unbounded(); - let senders: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); - senders.lock().unwrap().insert(leader_info.id, s); - let mut batches: Vec = Vec::with_capacity(num_threads); - (0..num_threads).for_each(|_| batches.push(HashMap::new())); - batches - .get_mut(0) - .unwrap() - .insert(leader_info.id, (false, HashSet::new(), r)); - let range: Vec<_> = (1..=stakes.len()).collect(); - let chunk_size = (stakes.len() + num_threads - 1) / num_threads; - range.chunks(chunk_size).for_each(|chunk| { - chunk.iter().for_each(|i| { - //distribute neighbors across threads to maximize parallel compute - let batch_ix = *i % batches.len(); - let node = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - staked_nodes.insert(node.id, stakes[*i - 1]); - cluster_info.insert_info(node.clone()); - let (s, r) = unbounded(); - batches - .get_mut(batch_ix) - .unwrap() - .insert(node.id, (false, HashSet::new(), r)); - senders.lock().unwrap().insert(node.id, s); - }) - }); - let c_info = cluster_info.clone_with_id(&cluster_info.id()); - - let shreds_len = 100; - let shuffled_peers: Vec> = (0..shreds_len as i32) - .map(|i| { - let mut seed = [0; 32]; - seed[0..4].copy_from_slice(&i.to_le_bytes()); - // TODO: Ideally these should use the new methods in - // solana_core::cluster_nodes, however that would add build - // dependency on solana_core which is not desired. - let (peers, stakes_and_index) = - sorted_retransmit_peers_and_stakes(&cluster_info, Some(&staked_nodes)); - let (_, shuffled_stakes_and_indexes) = - shuffle_peers_and_index(&cluster_info.id(), &peers, &stakes_and_index, seed); - shuffled_stakes_and_indexes - .into_iter() - .map(|(_, i)| peers[i].clone()) - .collect() - }) - .collect(); - - // create some "shreds". - (0..shreds_len).for_each(|i| { - let broadcast_table = &shuffled_peers[i]; - find_insert_shred(&broadcast_table[0].id, i as i32, &mut batches); - }); - - assert!(!batches.is_empty()); - - // start turbine simulation - let now = Instant::now(); - batches.par_iter_mut().for_each(|batch| { - let mut remaining = batch.len(); - let senders: HashMap<_, _> = senders.lock().unwrap().clone(); - while remaining > 0 { - for (id, (layer1_done, recv, r)) in batch.iter_mut() { - assert!( - now.elapsed().as_secs() < timeout, - "Timed out with {remaining:?} remaining nodes" - ); - let cluster = c_info.clone_with_id(id); - if !*layer1_done { - recv.iter().for_each(|i| { - retransmit( - shuffled_peers[*i as usize].clone(), - &senders, - &cluster, - fanout, - *i, - true, - ); - }); - *layer1_done = true; - } - - //send and recv - if recv.len() < shreds_len { - loop { - match r.try_recv() { - Ok((data, retx)) => { - if recv.insert(data) { - let _ = retransmit( - shuffled_peers[data as usize].clone(), - &senders, - &cluster, - fanout, - data, - retx, - ); - } - if recv.len() == shreds_len { - remaining -= 1; - break; - } - } - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => break, - }; - } - } - } - } - }); -} - -// Recommended to not run these tests in parallel (they are resource heavy and want all the compute) - -//todo add tests with network failures - -// Run with a single layer -#[test] -#[serial] -fn test_retransmit_small() { - let stakes: Vec<_> = (0..200).collect(); - run_simulation(&stakes, 200); -} - -// Make sure at least 2 layers are used -#[test] -#[serial] -fn test_retransmit_medium() { - let num_nodes = 2000; - let stakes: Vec<_> = (0..num_nodes).collect(); - run_simulation(&stakes, 200); -} - -// Make sure at least 2 layers are used but with equal stakes -#[test] -#[serial] -fn test_retransmit_medium_equal_stakes() { - let num_nodes = 2000; - let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect(); - run_simulation(&stakes, 200); -} - -// Scale down the network and make sure many layers are used -#[test] -#[serial] -fn test_retransmit_large() { - let num_nodes = 4000; - let stakes: Vec<_> = (0..num_nodes).collect(); - run_simulation(&stakes, 2); -}