diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs new file mode 100644 index 0000000000..6e21b932dd --- /dev/null +++ b/core/benches/cluster_info.rs @@ -0,0 +1,39 @@ +#![feature(test)] + +extern crate test; + +use rand::{thread_rng, Rng}; +use solana_core::cluster_info::{ClusterInfo, Node}; +use solana_core::contact_info::ContactInfo; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::timing::timestamp; +use std::collections::HashMap; +use std::net::UdpSocket; +use test::Bencher; + +#[bench] +fn broadcast_shreds_bench(bencher: &mut Bencher) { + solana_logger::setup(); + let leader_pubkey = Pubkey::new_rand(); + let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey); + let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + const SHRED_SIZE: usize = 1024; + const NUM_SHREDS: usize = 32; + let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS]; + let seeds = vec![[0u8; 32]; NUM_SHREDS]; + let mut stakes = HashMap::new(); + const NUM_PEERS: usize = 200; + for _ in 0..NUM_PEERS { + let id = Pubkey::new_rand(); + let contact_info = ContactInfo::new_localhost(&id, timestamp()); + cluster_info.insert_info(contact_info); + stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); + } + bencher.iter(move || { + cluster_info + .broadcast_shreds(&socket, &shreds, &seeds, Some(&stakes)) + .unwrap(); + }); +} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8b6272dcb8..cc767dd02d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -24,7 +24,7 @@ use crate::repair_service::RepairType; use crate::result::Result; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; -use crate::weighted_shuffle::weighted_shuffle; +use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; use bincode::{deserialize, serialize, serialized_size}; use core::cmp; use itertools::Itertools; @@ -515,6 +515,35 @@ impl ClusterInfo { out } + fn peers_and_stakes( + peers: &[ContactInfo], + stakes: Option<&HashMap>, + ) -> Vec<(u64, usize)> { + let mut stakes_and_index: Vec<_> = peers + .iter() + .enumerate() + .map(|(i, c)| { + let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)); + (stake, i) + }) + .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { + if r_stake == l_stake { + peers[*r_info].id.cmp(&peers[*l_info].id) + } else { + r_stake.cmp(&l_stake) + } + }) + .collect(); + + // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is + // assumed to be missing entry. So let's make sure stake weights are atleast 1 + stakes_and_index + .iter_mut() + .for_each(|(stake, _)| *stake = cmp::max(1, *stake)); + + stakes_and_index + } + /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list pub fn shuffle_peers_and_index( &self, @@ -538,19 +567,6 @@ impl ClusterInfo { (index, peers) } - pub fn sorted_tvu_peers( - &self, - stakes: Option<&HashMap>, - rng: ChaChaRng, - ) -> Vec { - let peers = self.tvu_peers(); - let peers_with_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng); - peers_with_stakes - .iter() - .map(|(_, peer)| (*peer).clone()) - .collect() - } - /// compute broadcast table pub fn tpu_peers(&self) -> Vec { let me = self.my_data().id; @@ -694,6 +710,16 @@ impl ClusterInfo { .collect() } + fn sorted_tvu_peers_and_stakes( + &self, + stakes: Option<&HashMap>, + ) -> (Vec, Vec<(u64, usize)>) { + let mut peers = self.tvu_peers(); + peers.dedup(); + let peers_and_stakes = ClusterInfo::peers_and_stakes(&peers, stakes); + (peers, peers_and_stakes) + } + /// broadcast messages from the leader to layer 1 nodes /// # Remarks pub fn broadcast_shreds( @@ -704,24 +730,23 @@ impl ClusterInfo { stakes: Option<&HashMap>, ) -> Result<()> { let mut last_err = Ok(()); - let mut broadcast_table_len = 0; + let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); + let broadcast_len = peers_and_stakes.len(); + if broadcast_len == 0 { + datapoint_info!("cluster_info-num_nodes", ("count", 1, i64)); + return Ok(()); + } shreds.iter().zip(seeds).for_each(|(shred, seed)| { - let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(*seed)); - broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len()); + let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed)); - if !broadcast_table.is_empty() { - if let Err(e) = s.send_to(shred, &broadcast_table[0].tvu) { - trace!("{}: broadcast result {:?}", self.id(), e); - last_err = Err(e); - } + if let Err(e) = s.send_to(shred, &peers[broadcast_index].tvu) { + trace!("{}: broadcast result {:?}", self.id(), e); + last_err = Err(e); } }); last_err?; - datapoint_info!( - "cluster_info-num_nodes", - ("count", broadcast_table_len + 1, i64) - ); + datapoint_info!("cluster_info-num_nodes", ("count", broadcast_len + 1, i64)); Ok(()) } @@ -2384,4 +2409,41 @@ mod tests { let protocol = Protocol::PullRequest(filter, value.clone()); assert!(serialized_size(&protocol).unwrap() <= PACKET_DATA_SIZE as u64); } + + #[test] + fn test_tvu_peers_and_stakes() { + let d = ContactInfo::new_localhost(&Pubkey::new(&[0; 32]), timestamp()); + let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); + let mut stakes = HashMap::new(); + + // no stake + let id = Pubkey::new(&[1u8; 32]); + let contact_info = ContactInfo::new_localhost(&id, timestamp()); + cluster_info.insert_info(contact_info); + + // normal + let id2 = Pubkey::new(&[2u8; 32]); + let mut contact_info = ContactInfo::new_localhost(&id2, timestamp()); + cluster_info.insert_info(contact_info.clone()); + stakes.insert(id2, 10); + + // duplicate + contact_info.wallclock = timestamp() + 1; + cluster_info.insert_info(contact_info); + + // no tvu + let id3 = Pubkey::new(&[3u8; 32]); + let mut contact_info = ContactInfo::new_localhost(&id3, timestamp()); + contact_info.tvu = "0.0.0.0:0".parse().unwrap(); + cluster_info.insert_info(contact_info); + stakes.insert(id3, 10); + + let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(&stakes)); + assert_eq!(peers.len(), 2); + assert_eq!(peers[0].id, id); + assert_eq!(peers[1].id, id2); + assert_eq!(peers_and_stakes.len(), 2); + assert_eq!(peers_and_stakes[0].0, 10); + assert_eq!(peers_and_stakes[1].0, 1); + } } diff --git a/core/src/weighted_shuffle.rs b/core/src/weighted_shuffle.rs index d0d5a249cd..d56cb49ef6 100644 --- a/core/src/weighted_shuffle.rs +++ b/core/src/weighted_shuffle.rs @@ -33,6 +33,31 @@ where .collect() } +/// Returns the highest index after computing a weighted shuffle. +/// Saves doing any sorting for O(n) max calculation. +pub fn weighted_best(weights_and_indicies: &[(u64, usize)], rng: ChaChaRng) -> usize { + let mut rng = rng; + if weights_and_indicies.is_empty() { + return 0; + } + let total_weight: u64 = weights_and_indicies.iter().map(|x| x.0).sum(); + let mut best_weight = 0; + let mut best_index = 0; + for v in weights_and_indicies { + let x = (total_weight / v.0) + .to_u64() + .expect("values > u64::max are not supported"); + // capture the u64 into u128s to prevent overflow + let weight = (&mut rng).gen_range(1, u128::from(std::u16::MAX)) * u128::from(x); + if weight > best_weight { + best_weight = weight; + best_index = v.1; + } + } + + best_index +} + #[cfg(test)] mod tests { use super::*; @@ -92,4 +117,12 @@ mod tests { } }); } + + #[test] + fn test_weighted_best() { + let mut weights = vec![(std::u32::MAX as u64, 0); 3]; + weights.push((1, 5)); + let best = weighted_best(&weights, ChaChaRng::from_seed([0x5b; 32])); + assert_eq!(best, 5); + } }