diff --git a/Cargo.lock b/Cargo.lock index 7b13216d1a..f028f00408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2199,6 +2199,7 @@ dependencies = [ "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/Cargo.toml b/core/Cargo.toml index ca62ae852b..e7670d120b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,6 +38,7 @@ libc = "0.2.55" log = "0.4.2" memmap = { version = "0.7.0", optional = true } nix = "0.14.0" +num-traits = "0.2" rand = "0.6.5" rand_chacha = "0.1.1" rayon = "1.0.0" diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 15d3ef0a5b..bf599ee3f8 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -9,6 +9,8 @@ use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; +use rand::SeedableRng; +use rand_chacha::ChaChaRng; use rayon::prelude::*; use rayon::ThreadPool; use solana_metrics::{ @@ -85,10 +87,12 @@ impl Broadcast { } let bank_epoch = bank.get_stakers_epoch(bank.slot()); - let mut broadcast_table = cluster_info - .read() - .unwrap() - .sorted_tvu_peers(staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref()); + let mut seed = [0; 32]; + seed[0..8].copy_from_slice(&bank.slot().to_le_bytes()); + let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers( + staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref(), + ChaChaRng::from_seed(seed), + ); inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table.len() + 1); // Layer 1, leader nodes are limited to the fanout size. diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a24b1f3712..9bd3b41ad4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -24,9 +24,12 @@ use crate::repair_service::RepairType; use crate::result::Result; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; +use crate::weighted_shuffle::weighted_shuffle; use bincode::{deserialize, serialize}; use core::cmp; +use itertools::Itertools; use rand::{thread_rng, Rng}; +use rand_chacha::ChaChaRng; use rayon::prelude::*; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_netutil::{ @@ -489,38 +492,48 @@ impl ClusterInfo { && !ContactInfo::is_valid_address(&contact_info.tpu) } - fn sort_by_stake( + fn stake_weighted_shuffle( peers: &[ContactInfo], stakes: Option<&HashMap>, + rng: ChaChaRng, ) -> Vec<(u64, ContactInfo)> { - let mut peers_with_stakes: Vec<_> = peers + let (stake_weights, peers_with_stakes): (Vec<_>, Vec<_>) = peers .iter() .map(|c| { - ( - stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)), - c.clone(), - ) + let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)); + // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is + // assumed to be missing entry. So let's make sure stake weights are atleast 1 + (cmp::max(1, stake), (stake, c.clone())) }) + .sorted_by(|(_, (l_stake, l_info)), (_, (r_stake, r_info))| { + if r_stake == l_stake { + r_info.id.cmp(&l_info.id) + } else { + r_stake.cmp(&l_stake) + } + }) + .unzip(); + + let shuffle = weighted_shuffle(stake_weights, rng); + + let mut out: Vec<(u64, ContactInfo)> = shuffle + .iter() + .map(|x| peers_with_stakes[*x].clone()) .collect(); - peers_with_stakes.sort_unstable_by(|(l_stake, l_info), (r_stake, r_info)| { - if r_stake == l_stake { - r_info.id.cmp(&l_info.id) - } else { - r_stake.cmp(&l_stake) - } - }); - peers_with_stakes.dedup(); - peers_with_stakes + + out.dedup(); + out } /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list - fn sorted_peers_and_index( + fn shuffle_peers_and_index( &self, stakes: Option<&HashMap>, + rng: ChaChaRng, ) -> (usize, Vec) { let mut peers = self.retransmit_peers(); peers.push(self.lookup(&self.id()).unwrap().clone()); - let contacts_and_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); + let contacts_and_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng); let mut index = 0; let peers: Vec<_> = contacts_and_stakes .into_iter() @@ -537,9 +550,13 @@ impl ClusterInfo { (index, peers) } - pub fn sorted_tvu_peers(&self, stakes: Option<&HashMap>) -> Vec { + pub fn sorted_tvu_peers( + &self, + stakes: Option<&HashMap>, + rng: ChaChaRng, + ) -> Vec { let peers = self.tvu_peers(); - let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); + let peers_with_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng); peers_with_stakes .iter() .map(|(_, peer)| (*peer).clone()) @@ -1498,8 +1515,12 @@ pub fn compute_retransmit_peers( stakes: Option<&HashMap>, cluster_info: &Arc>, fanout: usize, + rng: ChaChaRng, ) -> (Vec, Vec) { - let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes); + let (my_index, peers) = cluster_info + .read() + .unwrap() + .shuffle_peers_and_index(stakes, rng); //calc num_layers and num_neighborhoods using the total number of nodes let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index bcc00ca7a2..1de735371e 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -14,11 +14,14 @@ use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; +use crate::weighted_shuffle::weighted_shuffle; use bincode::serialized_size; use indexmap::map::IndexMap; +use itertools::Itertools; use rand; -use rand::distributions::{Distribution, WeightedIndex}; use rand::seq::SliceRandom; +use rand::{thread_rng, RngCore, SeedableRng}; +use rand_chacha::ChaChaRng; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -169,29 +172,36 @@ impl CrdsGossipPush { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let mut options: Vec<_> = self.push_options(crds, &self_id, stakes); + let options: Vec<_> = self.push_options(crds, &self_id, stakes); if options.is_empty() { return; } + + let mut seed = [0; 32]; + seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + let mut shuffle = weighted_shuffle( + options.iter().map(|weighted| weighted.0).collect_vec(), + ChaChaRng::from_seed(seed), + ) + .into_iter(); + while new_items.len() < need { - let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)); - if index.is_err() { - break; + match shuffle.next() { + Some(index) => { + let item = options[index].1; + if self.active_set.get(&item.id).is_some() { + continue; + } + if new_items.get(&item.id).is_some() { + continue; + } + let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); + let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); + bloom.add(&item.id); + new_items.insert(item.id, bloom); + } + _ => break, } - let index = index.unwrap(); - let index = index.sample(&mut rand::thread_rng()); - let item = options[index].1; - options.remove(index); - if self.active_set.get(&item.id).is_some() { - continue; - } - if new_items.get(&item.id).is_some() { - continue; - } - let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); - let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); - bloom.add(&item.id); - new_items.insert(item.id, bloom); } let mut keys: Vec = self.active_set.keys().cloned().collect(); keys.shuffle(&mut rand::thread_rng()); diff --git a/core/src/lib.rs b/core/src/lib.rs index 0b052b0419..5350b4adb0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -68,6 +68,7 @@ pub mod test_tx; pub mod tpu; pub mod tvu; pub mod validator; +pub mod weighted_shuffle; pub mod window_service; #[macro_use] diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d557788332..6a8b24442e 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}; +use crate::contact_info::ContactInfo; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::repair_service::RepairStrategy; use crate::result::{Error, Result}; @@ -10,6 +11,9 @@ use crate::service::Service; use crate::staking_utils; use crate::streamer::BlobReceiver; use crate::window_service::{should_retransmit_and_persist, WindowService}; +use hashbrown::HashMap; +use rand::SeedableRng; +use rand_chacha::ChaChaRng; use solana_metrics::{datapoint_info, inc_new_counter_error}; use solana_runtime::epoch_schedule::EpochSchedule; use std::net::UdpSocket; @@ -26,6 +30,8 @@ fn retransmit( cluster_info: &Arc>, r: &BlobReceiver, sock: &UdpSocket, + avalanche_topology_cache: &mut HashMap, Vec)>, + cache_history: &mut Vec, ) -> Result<()> { let timer = Duration::new(1, 0); let mut blobs = r.recv_timeout(timer)?; @@ -37,12 +43,19 @@ fn retransmit( let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); - let (neighbors, children) = compute_retransmit_peers( - staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), - cluster_info, - DATA_PLANE_FANOUT, - ); for blob in &blobs { + let slot = blob.read().unwrap().slot(); + let mut seed = [0; 32]; + seed[0..8].copy_from_slice(&slot.to_le_bytes()); + let (neighbors, children) = avalanche_topology_cache.entry(slot).or_insert_with(|| { + cache_history.push(slot); + compute_retransmit_peers( + staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), + cluster_info, + DATA_PLANE_FANOUT, + ChaChaRng::from_seed(seed), + ) + }); let leader = leader_schedule_cache .slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref())); if blob.read().unwrap().meta.forward { @@ -52,6 +65,10 @@ fn retransmit( ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, true)?; } } + + while cache_history.len() > 5 { + avalanche_topology_cache.remove(&cache_history.pop().unwrap()); + } Ok(()) } @@ -76,6 +93,8 @@ fn retransmitter( .name("solana-retransmitter".to_string()) .spawn(move || { trace!("retransmitter started"); + let mut avalanche_topology_cache = HashMap::new(); + let mut cache_history = vec![]; loop { if let Err(e) = retransmit( &bank_forks, @@ -83,6 +102,8 @@ fn retransmitter( &cluster_info, &r, &sock, + &mut avalanche_topology_cache, + &mut cache_history, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/core/src/weighted_shuffle.rs b/core/src/weighted_shuffle.rs new file mode 100644 index 0000000000..0de0795276 --- /dev/null +++ b/core/src/weighted_shuffle.rs @@ -0,0 +1,76 @@ +//! The `weighted_shuffle` module provides an iterator over shuffled weights. + +use itertools::Itertools; +use num_traits::{FromPrimitive, ToPrimitive}; +use rand::Rng; +use rand_chacha::ChaChaRng; +use std::iter; +use std::ops::Div; + +pub fn weighted_shuffle(weights: Vec, rng: ChaChaRng) -> Vec +where + T: Copy + PartialOrd + iter::Sum + Div + FromPrimitive + ToPrimitive, +{ + let mut rng = rng; + let total_weight: T = weights.clone().into_iter().sum(); + weights + .into_iter() + .enumerate() + .map(|(i, v)| { + let x = (total_weight / v).to_u32().unwrap(); + ( + i, + (&mut rng).gen_range(1, u64::from(std::u16::MAX)) * u64::from(x), + ) + }) + .sorted_by(|(_, l_val), (_, r_val)| l_val.cmp(r_val)) + .map(|x| x.0) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::SeedableRng; + + #[test] + fn test_weighted_shuffle_iterator() { + let mut test_set = [0; 6]; + let mut count = 0; + let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32])); + shuffle.into_iter().for_each(|x| { + assert_eq!(test_set[x], 0); + test_set[x] = 1; + count += 1; + }); + assert_eq!(count, 6); + } + + #[test] + fn test_weighted_shuffle_iterator_large() { + let mut test_set = [0; 100]; + let mut test_weights = vec![0; 100]; + (0..100).for_each(|i| test_weights[i] = (i + 1) as u64); + let mut count = 0; + let shuffle = weighted_shuffle(test_weights, ChaChaRng::from_seed([0xa5; 32])); + shuffle.into_iter().for_each(|x| { + assert_eq!(test_set[x], 0); + test_set[x] = 1; + count += 1; + }); + assert_eq!(count, 100); + } + + #[test] + fn test_weighted_shuffle_compare() { + let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32])); + + let shuffle1 = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32])); + shuffle1 + .into_iter() + .zip(shuffle.into_iter()) + .for_each(|(x, y)| { + assert_eq!(x, y); + }); + } +} diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index c9d8e06ae6..5e83685c68 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -1,3 +1,5 @@ +use rand::SeedableRng; +use rand_chacha::ChaChaRng; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{compute_retransmit_peers, ClusterInfo}; @@ -72,7 +74,8 @@ fn run_simulation(stakes: &[u64], fanout: usize) { let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); // pretend to broadcast from leader - cluster_info::create_broadcast_orders - let mut broadcast_table = cluster_info.sorted_tvu_peers(Some(&staked_nodes)); + let mut broadcast_table = + cluster_info.sorted_tvu_peers(Some(&staked_nodes), ChaChaRng::from_seed([0x5a; 32])); broadcast_table.truncate(fanout); let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); @@ -109,6 +112,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { Some(&staked_nodes), &Arc::new(RwLock::new(cluster.clone())), fanout, + ChaChaRng::from_seed([0x5a; 32]), ); let vec_children: Vec<_> = children .iter()