From 04787be8b154d788a5423c38b857562073d69571 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 7 Jul 2021 00:35:25 +0000 Subject: [PATCH] encapsulates turbine peers computations of broadcast & retransmit stages (#18238) Broadcast stage and retransmit stage should arrange nodes on turbine broadcast tree in exactly same order. Additionally any changes to this ordering (e.g. updating how unstaked nodes are handled) requires feature gating to keep the cluster in sync. Current implementation is scattered out over several public methods and exposes too much of implementation details (e.g. usize indices into peers vector) which makes code changes and checking for feature activations more difficult. This commit encapsulates turbine peer computations into a new struct, and only exposes two public methods, get_broadcast_peer and get_retransmit_peers, for call-sites. --- core/benches/cluster_info.rs | 17 +- core/src/broadcast_stage.rs | 67 +-- .../fail_entry_verification_broadcast_run.rs | 10 +- .../broadcast_stage/standard_broadcast_run.rs | 27 +- core/src/cluster_nodes.rs | 441 ++++++++++++++++++ core/src/lib.rs | 1 + core/src/retransmit_stage.rs | 110 +---- gossip/src/cluster_info.rs | 91 ---- gossip/src/contact_info.rs | 2 +- gossip/src/deprecated.rs | 80 +++- gossip/src/lib.rs | 2 +- gossip/tests/cluster_info.rs | 14 +- 12 files changed, 589 insertions(+), 273 deletions(-) create mode 100644 core/src/cluster_nodes.rs diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 54750a2497..9f526eb59b 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -3,10 +3,14 @@ extern crate test; use rand::{thread_rng, Rng}; -use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats; -use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; -use solana_gossip::cluster_info::{ClusterInfo, Node}; -use solana_gossip::contact_info::ContactInfo; +use solana_core::{ + broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage}, + cluster_nodes::ClusterNodes, +}; +use solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, +}; use solana_ledger::shred::Shred; use solana_sdk::pubkey; use solana_sdk::timing::timestamp; @@ -36,7 +40,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } let cluster_info = Arc::new(cluster_info); - let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); let shreds = Arc::new(shreds); let last_datapoint = Arc::new(AtomicU64::new(0)); bencher.iter(move || { @@ -44,8 +48,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { broadcast_shreds( &socket, &shreds, - &peers_and_stakes, - &peers, + &cluster_nodes, &last_datapoint, &mut TransmitShredsStats::default(), ) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a6e1792b64..d55d48484d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -6,17 +6,15 @@ use self::{ fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, }; -use crate::result::{Error, Result}; +use crate::{ + cluster_nodes::ClusterNodes, + result::{Error, Result}, +}; use crossbeam_channel::{ Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, Sender as CrossbeamSender, }; -use solana_gossip::{ - cluster_info::{self, ClusterInfo, ClusterInfoError}, - contact_info::ContactInfo, - crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, - weighted_shuffle::weighted_best, -}; +use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}; use solana_ledger::{blockstore::Blockstore, shred::Shred}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; @@ -390,26 +388,16 @@ fn update_peer_stats( } } -pub fn get_broadcast_peers( - cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, -) -> (Vec, Vec<(u64, usize)>) { - let mut peers = cluster_info.tvu_peers(); - let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes); - (peers, peers_and_stakes) -} - /// broadcast messages from the leader to layer 1 nodes /// # Remarks pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], - peers_and_stakes: &[(u64, usize)], - peers: &[ContactInfo], + cluster_nodes: &ClusterNodes, last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, ) -> Result<()> { - let broadcast_len = peers_and_stakes.len(); + let broadcast_len = cluster_nodes.num_peers(); if broadcast_len == 0 { update_peer_stats(1, 1, last_datapoint_submit); return Ok(()); @@ -417,10 +405,9 @@ pub fn broadcast_shreds( let mut shred_select = Measure::start("shred_select"); let packets: Vec<_> = shreds .iter() - .map(|shred| { - let broadcast_index = weighted_best(peers_and_stakes, shred.seed()); - - (&shred.payload, &peers[broadcast_index].tvu) + .filter_map(|shred| { + let node = cluster_nodes.get_broadcast_peer(shred.seed())?; + Some((&shred.payload, &node.tvu)) }) .collect(); shred_select.stop(); @@ -439,7 +426,7 @@ pub fn broadcast_shreds( send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); - let num_live_peers = num_live_peers(peers); + let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64; update_peer_stats( num_live_peers, broadcast_len as i64 + 1, @@ -448,25 +435,6 @@ pub fn broadcast_shreds( Ok(()) } -fn distance(a: u64, b: u64) -> u64 { - if a > b { - a - b - } else { - b - a - } -} - -fn num_live_peers(peers: &[ContactInfo]) -> i64 { - let mut num_live_peers = 1i64; - peers.iter().for_each(|p| { - // A peer is considered live if they generated their contact info recently - if distance(timestamp(), p.wallclock) <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS { - num_live_peers += 1; - } - }); - num_live_peers -} - #[cfg(test)] pub mod test { use super::*; @@ -550,19 +518,6 @@ pub mod test { assert_eq!(num_expected_coding_shreds, coding_index); } - #[test] - fn test_num_live_peers() { - let mut ci = ContactInfo { - wallclock: std::u64::MAX, - ..ContactInfo::default() - }; - assert_eq!(num_live_peers(&[ci.clone()]), 1); - ci.wallclock = timestamp() - 1; - assert_eq!(num_live_peers(&[ci.clone()]), 2); - ci.wallclock = timestamp() - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - 1; - assert_eq!(num_live_peers(&[ci]), 1); - } - #[test] fn test_duplicate_retransmit_signal() { // Setup diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 6175db6209..901dc3759a 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,4 +1,5 @@ use super::*; +use crate::cluster_nodes::ClusterNodes; use solana_ledger::shred::Shredder; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; @@ -133,13 +134,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref()); - + let cluster_nodes = ClusterNodes::::new( + cluster_info, + stakes.as_deref().unwrap_or(&HashMap::default()), + ); broadcast_shreds( sock, &shreds, - &peers_and_stakes, - &peers, + &cluster_nodes, &Arc::new(AtomicU64::new(0)), &mut TransmitShredsStats::default(), )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 003076c20f..76cb09ed03 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -4,7 +4,7 @@ use super::{ broadcast_utils::{self, ReceiveResults}, *, }; -use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; +use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}; use solana_ledger::{ entry::Entry, shred::{ @@ -26,16 +26,10 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, - broadcast_peer_cache: Arc>, + cluster_nodes: Arc>>, last_peer_update: Arc, } -#[derive(Default)] -struct BroadcastPeerCache { - peers: Vec, - peers_and_stakes: Vec<(u64, usize)>, -} - impl StandardBroadcastRun { pub(super) fn new(shred_version: u16) -> Self { Self { @@ -48,7 +42,7 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, - broadcast_peer_cache: Arc::default(), + cluster_nodes: Arc::default(), last_peer_update: Arc::default(), } } @@ -353,13 +347,13 @@ impl StandardBroadcastRun { .compare_and_swap(now, last, Ordering::Relaxed) == last { - let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap(); - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - w_broadcast_peer_cache.peers = peers; - w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes; + *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( + cluster_info, + stakes.unwrap_or(&HashMap::default()), + ); } get_peers_time.stop(); - let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap(); + let cluster_nodes = self.cluster_nodes.read().unwrap(); let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds @@ -367,12 +361,11 @@ impl StandardBroadcastRun { broadcast_shreds( sock, &shreds, - &r_broadcast_peer_cache.peers_and_stakes, - &r_broadcast_peer_cache.peers, + &cluster_nodes, &self.last_datapoint_submit, &mut transmit_stats, )?; - drop(r_broadcast_peer_cache); + drop(cluster_nodes); transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us(); diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs new file mode 100644 index 0000000000..c19f841b86 --- /dev/null +++ b/core/src/cluster_nodes.rs @@ -0,0 +1,441 @@ +use { + crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, + itertools::Itertools, + solana_gossip::{ + cluster_info::{compute_retransmit_peers, ClusterInfo}, + contact_info::ContactInfo, + crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + weighted_shuffle::{weighted_best, weighted_shuffle}, + }, + solana_sdk::pubkey::Pubkey, + std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData}, +}; + +enum NodeId { + // TVU node obtained through gossip (staked or not). + ContactInfo(ContactInfo), + // Staked node with no contact-info in gossip table. + Pubkey(Pubkey), +} + +struct Node { + node: NodeId, + stake: u64, +} + +pub struct ClusterNodes { + pubkey: Pubkey, // The local node itself. + // All staked nodes + other known tvu-peers + the node itself; + // sorted by (stake, pubkey) in descending order. + nodes: Vec, + // Weights and indices for sampling peers. weighted_{shuffle,best} expect + // weights >= 1. For backward compatibility we use max(1, stake) for + // weights and exclude nodes with no contact-info. + index: Vec<(/*weight:*/ u64, /*index:*/ usize)>, + _phantom: PhantomData, +} + +impl Node { + #[inline] + fn pubkey(&self) -> Pubkey { + match &self.node { + NodeId::Pubkey(pubkey) => *pubkey, + NodeId::ContactInfo(node) => node.id, + } + } + + #[inline] + fn contact_info(&self) -> Option<&ContactInfo> { + match &self.node { + NodeId::Pubkey(_) => None, + NodeId::ContactInfo(node) => Some(node), + } + } +} + +impl ClusterNodes { + pub fn num_peers(&self) -> usize { + self.index.len() + } + + // A peer is considered live if they generated their contact info recently. + pub fn num_peers_live(&self, now: u64) -> usize { + self.index + .iter() + .filter_map(|(_, index)| self.nodes[*index].contact_info()) + .filter(|node| { + let elapsed = if node.wallclock < now { + now - node.wallclock + } else { + node.wallclock - now + }; + elapsed < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + }) + .count() + } +} + +impl ClusterNodes { + pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { + new_cluster_nodes(cluster_info, stakes) + } + + /// Returns the root of turbine broadcast tree, which the leader sends the + /// shred to. + pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + if self.index.is_empty() { + None + } else { + let index = weighted_best(&self.index, shred_seed); + match &self.nodes[index].node { + NodeId::ContactInfo(node) => Some(node), + NodeId::Pubkey(_) => panic!("this should not happen!"), + } + } + } +} + +impl ClusterNodes { + pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { + new_cluster_nodes(cluster_info, stakes) + } + + pub fn get_retransmit_peers( + &self, + shred_seed: [u8; 32], + fanout: usize, + slot_leader: Option, + ) -> ( + Vec<&ContactInfo>, // neighbors + Vec<&ContactInfo>, // children + ) { + // Exclude leader from list of nodes. + let index = self.index.iter().copied(); + let (weights, index): (Vec, Vec) = match slot_leader { + None => { + error!("unknown leader for shred slot"); + index.unzip() + } + Some(slot_leader) if slot_leader == self.pubkey => { + error!("retransmit from slot leader: {}", slot_leader); + index.unzip() + } + Some(slot_leader) => index + .filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader) + .unzip(), + }; + let index: Vec<_> = { + let shuffle = weighted_shuffle(&weights, shred_seed); + shuffle.into_iter().map(|i| index[i]).collect() + }; + let self_index = index + .iter() + .position(|i| self.nodes[*i].pubkey() == self.pubkey) + .unwrap(); + let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &index); + // Assert that the node itself is included in the set of neighbors, at + // the right offset. + debug_assert_eq!( + self.nodes[neighbors[self_index % fanout]].pubkey(), + self.pubkey + ); + let get_contact_infos = |index: Vec| -> Vec<&ContactInfo> { + index + .into_iter() + .map(|i| self.nodes[i].contact_info().unwrap()) + .collect() + }; + (get_contact_infos(neighbors), get_contact_infos(children)) + } +} + +fn new_cluster_nodes( + cluster_info: &ClusterInfo, + stakes: &HashMap, +) -> ClusterNodes { + let self_pubkey = cluster_info.id(); + let nodes = get_nodes(cluster_info, stakes); + let broadcast = TypeId::of::() == TypeId::of::(); + // For backward compatibility: + // * nodes which do not have contact-info are excluded. + // * stakes are floored at 1. + // The sorting key here should be equivalent to + // solana_gossip::deprecated::sorted_stakes_with_index. + // Leader itself is excluded when sampling broadcast peers. + let index = nodes + .iter() + .enumerate() + .filter(|(_, node)| node.contact_info().is_some()) + .filter(|(_, node)| !broadcast || node.pubkey() != self_pubkey) + .sorted_by_key(|(_, node)| Reverse((node.stake.max(1), node.pubkey()))) + .map(|(index, node)| (node.stake.max(1), index)) + .collect(); + ClusterNodes { + pubkey: self_pubkey, + nodes, + index, + _phantom: PhantomData::default(), + } +} + +// All staked nodes + other known tvu-peers + the node itself; +// sorted by (stake, pubkey) in descending order. +fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec { + let self_pubkey = cluster_info.id(); + // The local node itself. + std::iter::once({ + let stake = stakes.get(&self_pubkey).copied().unwrap_or_default(); + let node = NodeId::from(cluster_info.my_contact_info()); + Node { node, stake } + }) + // All known tvu-peers from gossip. + .chain(cluster_info.tvu_peers().into_iter().map(|node| { + let stake = stakes.get(&node.id).copied().unwrap_or_default(); + let node = NodeId::from(node); + Node { node, stake } + })) + // All staked nodes. + .chain( + stakes + .iter() + .filter(|(_, stake)| **stake > 0) + .map(|(&pubkey, &stake)| Node { + node: NodeId::from(pubkey), + stake, + }), + ) + .sorted_by_key(|node| Reverse((node.stake, node.pubkey()))) + // Since sorted_by_key is stable, in case of duplicates, this + // will keep nodes with contact-info. + .dedup_by(|a, b| a.pubkey() == b.pubkey()) + .collect() +} + +impl From for NodeId { + fn from(node: ContactInfo) -> Self { + NodeId::ContactInfo(node) + } +} + +impl From for NodeId { + fn from(pubkey: Pubkey) -> Self { + NodeId::Pubkey(pubkey) + } +} + +impl Default for ClusterNodes { + fn default() -> Self { + Self { + pubkey: Pubkey::default(), + nodes: Vec::default(), + index: Vec::default(), + _phantom: PhantomData::default(), + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + rand::{seq::SliceRandom, Rng}, + solana_gossip::{ + crds_value::{CrdsData, CrdsValue}, + deprecated::{ + shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, + sorted_stakes_with_index, + }, + }, + solana_sdk::timing::timestamp, + std::iter::repeat_with, + }; + + // Legacy methods copied for testing backward compatibility. + + fn get_broadcast_peers( + cluster_info: &ClusterInfo, + stakes: Option<&HashMap>, + ) -> (Vec, Vec<(u64, usize)>) { + let mut peers = cluster_info.tvu_peers(); + let peers_and_stakes = stake_weight_peers(&mut peers, stakes); + (peers, peers_and_stakes) + } + + fn stake_weight_peers( + peers: &mut Vec, + stakes: Option<&HashMap>, + ) -> Vec<(u64, usize)> { + peers.dedup(); + sorted_stakes_with_index(peers, stakes) + } + + fn make_cluster( + rng: &mut R, + ) -> ( + Vec, + HashMap, // stakes + ClusterInfo, + ) { + let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) + .take(1000) + .collect(); + nodes.shuffle(rng); + let this_node = nodes[0].clone(); + let mut stakes: HashMap = nodes + .iter() + .filter_map(|node| { + if rng.gen_ratio(1, 7) { + None // No stake for some of the nodes. + } else { + Some((node.id, rng.gen_range(0, 20))) + } + }) + .collect(); + // Add some staked nodes with no contact-info. + stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100)); + let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node); + { + let now = timestamp(); + let mut gossip = cluster_info.gossip.write().unwrap(); + // First node is pushed to crds table by ClusterInfo constructor. + for node in nodes.iter().skip(1) { + let node = CrdsData::ContactInfo(node.clone()); + let node = CrdsValue::new_unsigned(node); + assert_eq!(gossip.crds.insert(node, now), Ok(())); + } + } + (nodes, stakes, cluster_info) + } + + #[test] + fn test_cluster_nodes_retransmit() { + let mut rng = rand::thread_rng(); + let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + let this_node = cluster_info.my_contact_info(); + // ClusterInfo::tvu_peers excludes the node itself. + assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + // All nodes with contact-info should be in the index. + assert_eq!(cluster_nodes.index.len(), nodes.len()); + // Staked nodes with no contact-info should be included. + assert!(cluster_nodes.nodes.len() > nodes.len()); + // Assert that all nodes keep their contact-info. + // and, all staked nodes are also included. + { + let cluster_nodes: HashMap<_, _> = cluster_nodes + .nodes + .iter() + .map(|node| (node.pubkey(), node)) + .collect(); + for node in &nodes { + assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id); + } + for (pubkey, stake) in &stakes { + if *stake > 0 { + assert_eq!(cluster_nodes[pubkey].stake, *stake); + } + } + } + let (peers, stakes_and_index) = + sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes)); + assert_eq!(stakes_and_index.len(), peers.len()); + assert_eq!(cluster_nodes.index.len(), peers.len()); + for (i, node) in cluster_nodes + .index + .iter() + .map(|(_, i)| &cluster_nodes.nodes[*i]) + .enumerate() + { + let (stake, index) = stakes_and_index[i]; + // Wallclock may be update by ClusterInfo::push_self. + if node.pubkey() == this_node.id { + assert_eq!(this_node.id, peers[index].id) + } else { + assert_eq!(node.contact_info().unwrap(), &peers[index]); + } + assert_eq!(node.stake.max(1), stake); + } + let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; + // Remove slot leader from peers indices. + let stakes_and_index: Vec<_> = stakes_and_index + .into_iter() + .filter(|(_stake, index)| peers[*index].id != slot_leader) + .collect(); + assert_eq!(peers.len(), stakes_and_index.len() + 1); + let mut shred_seed = [0u8; 32]; + rng.fill(&mut shred_seed[..]); + let (self_index, shuffled_peers_and_stakes) = + shuffle_peers_and_index(&this_node.id, &peers, &stakes_and_index, shred_seed); + let shuffled_index: Vec<_> = shuffled_peers_and_stakes + .into_iter() + .map(|(_, index)| index) + .collect(); + assert_eq!(this_node.id, peers[shuffled_index[self_index]].id); + for fanout in 1..200 { + let (neighbors_indices, children_indices) = + compute_retransmit_peers(fanout, self_index, &shuffled_index); + let (neighbors, children) = + cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader)); + assert_eq!(children.len(), children_indices.len()); + for (node, index) in children.into_iter().zip(children_indices) { + assert_eq!(*node, peers[index]); + } + assert_eq!(neighbors.len(), neighbors_indices.len()); + assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id); + for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) { + assert_eq!(*node, peers[index]); + } + } + } + + #[test] + fn test_cluster_nodes_broadcast() { + let mut rng = rand::thread_rng(); + let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + // ClusterInfo::tvu_peers excludes the node itself. + assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + // All nodes with contact-info should be in the index. + // Excluding this node itself. + assert_eq!(cluster_nodes.index.len() + 1, nodes.len()); + // Staked nodes with no contact-info should be included. + assert!(cluster_nodes.nodes.len() > nodes.len()); + // Assert that all nodes keep their contact-info. + // and, all staked nodes are also included. + { + let cluster_nodes: HashMap<_, _> = cluster_nodes + .nodes + .iter() + .map(|node| (node.pubkey(), node)) + .collect(); + for node in &nodes { + assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id); + } + for (pubkey, stake) in &stakes { + if *stake > 0 { + assert_eq!(cluster_nodes[pubkey].stake, *stake); + } + } + } + let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); + assert_eq!(peers_and_stakes.len(), peers.len()); + assert_eq!(cluster_nodes.index.len(), peers.len()); + for (i, node) in cluster_nodes + .index + .iter() + .map(|(_, i)| &cluster_nodes.nodes[*i]) + .enumerate() + { + let (stake, index) = peers_and_stakes[i]; + assert_eq!(node.contact_info().unwrap(), &peers[index]); + assert_eq!(node.stake.max(1), stake); + } + for _ in 0..100 { + let mut shred_seed = [0u8; 32]; + rng.fill(&mut shred_seed[..]); + let index = weighted_best(&peers_and_stakes, shred_seed); + let peer = cluster_nodes.get_broadcast_peer(shred_seed).unwrap(); + assert_eq!(*peer, peers[index]); + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 8c93ba8c35..e191d4b23b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -12,6 +12,7 @@ pub mod banking_stage; pub mod broadcast_stage; pub mod cache_block_meta_service; pub mod cluster_info_vote_listener; +pub mod cluster_nodes; pub mod cluster_slot_state_verifier; pub mod cluster_slots; pub mod cluster_slots_service; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 143bb9f596..14e10191fc 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, + cluster_nodes::ClusterNodes, cluster_slots::ClusterSlots, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, completed_data_sets_service::CompletedDataSetsSender, @@ -13,10 +14,7 @@ use crate::{ use crossbeam_channel::{Receiver, Sender}; use lru::LruCache; use solana_client::rpc_response::SlotUpdate; -use solana_gossip::{ - cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, - contact_info::ContactInfo, -}; +use solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}; use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}; use solana_measure::measure::Measure; @@ -27,7 +25,6 @@ use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_streamer::streamer::PacketReceiver; use std::{ - cmp, collections::hash_set::HashSet, collections::{BTreeMap, BTreeSet, HashMap}, net::UdpSocket, @@ -211,12 +208,6 @@ fn update_retransmit_stats( } } -#[derive(Default)] -struct EpochStakesCache { - peers: Vec, - stakes_and_index: Vec<(u64, usize)>, -} - use crate::packet_hasher::PacketHasher; // Map of shred (slot, index, is_data) => list of hash values seen for that key. pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; @@ -277,33 +268,6 @@ fn check_if_first_shred_received( } } -// Drops shred slot leader from retransmit peers. -// TODO: decide which bank should be used here. -fn get_retransmit_peers( - self_pubkey: Pubkey, - shred_slot: Slot, - leader_schedule_cache: &LeaderScheduleCache, - bank: &Bank, - stakes_cache: &EpochStakesCache, -) -> Vec<(u64 /*stakes*/, usize /*index*/)> { - match leader_schedule_cache.slot_leader_at(shred_slot, Some(bank)) { - None => { - error!("unknown leader for shred slot"); - stakes_cache.stakes_and_index.clone() - } - Some(pubkey) if pubkey == self_pubkey => { - error!("retransmit from slot leader: {}", pubkey); - stakes_cache.stakes_and_index.clone() - } - Some(pubkey) => stakes_cache - .stakes_and_index - .iter() - .filter(|(_, i)| stakes_cache.peers[*i].id != pubkey) - .copied() - .collect(), - } -} - #[allow(clippy::too_many_arguments)] fn retransmit( bank_forks: &RwLock, @@ -313,7 +277,7 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &RetransmitStats, - epoch_stakes_cache: &RwLock, + cluster_nodes: &RwLock>, last_peer_update: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, @@ -351,20 +315,17 @@ fn retransmit( && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last { let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); - let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref()); - { - let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); - epoch_stakes_cache.peers = peers; - epoch_stakes_cache.stakes_and_index = stakes_and_index; - } + *cluster_nodes.write().unwrap() = ClusterNodes::::new( + cluster_info, + &epoch_staked_nodes.unwrap_or_default(), + ); { let mut sr = shreds_received.lock().unwrap(); sr.0.clear(); sr.1.reset(); } } - let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + let cluster_nodes = cluster_nodes.read().unwrap(); let mut peers_len = 0; epoch_cache_update.stop(); @@ -405,52 +366,19 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); - let stakes_and_index = get_retransmit_peers( - my_id, - shred_slot, - leader_schedule_cache, - r_bank.deref(), - r_epoch_stakes_cache.deref(), - ); - let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( - &my_id, - &r_epoch_stakes_cache.peers, - &stakes_and_index, - packet.meta.seed, - ); + let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(r_bank.deref())); + let (neighbors, children) = + cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each // neighborhood), then we expect that the packet arrives at tvu socket // as opposed to tvu-forwards. If this is not the case, then the // turbine broadcast/retransmit tree is mismatched across nodes. - let anchor_node = my_index % DATA_PLANE_FANOUT == 0; + let anchor_node = neighbors[0].id == my_id; if packet.meta.forward == anchor_node { // TODO: Consider forwarding the packet to the root node here. retransmit_tree_mismatch += 1; } - peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); - // split off the indexes, we don't need the stakes anymore - let indexes: Vec<_> = shuffled_stakes_and_index - .into_iter() - .map(|(_, index)| index) - .collect(); - debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id); - - let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); - let neighbors: Vec<_> = neighbors - .into_iter() - .filter_map(|index| { - let peer = &r_epoch_stakes_cache.peers[index]; - if peer.id == my_id { - None - } else { - Some(peer) - } - }) - .collect(); - let children: Vec<_> = children - .into_iter() - .map(|index| &r_epoch_stakes_cache.peers[index]) - .collect(); + peers_len = peers_len.max(cluster_nodes.num_peers()); compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); @@ -465,7 +393,13 @@ fn retransmit( // children and also tvu_forward socket of its neighbors. Otherwise it // should only forward to tvu_forward socket of its children. if anchor_node { - ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true); + // First neighbor is this node itself, so skip it. + ClusterInfo::retransmit_to( + &neighbors[1..], + packet, + sock, + /*forward socket=*/ true, + ); } ClusterInfo::retransmit_to( &children, @@ -535,7 +469,7 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); - let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); + let cluster_nodes = Arc::default(); let last_peer_update = Arc::new(AtomicU64::new(0)); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); @@ -555,7 +489,7 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, - &epoch_stakes_cache, + &cluster_nodes, &last_peer_update, &shreds_received, &max_slots, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 6e68c53c35..a748eaf69f 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1325,80 +1325,6 @@ impl ClusterInfo { || !ContactInfo::is_valid_address(&contact_info.tvu) } - fn sorted_stakes_with_index( - peers: &[ContactInfo], - stakes: Option<&HashMap>, - ) -> Vec<(u64, usize)> { - let stakes_and_index: Vec<_> = peers - .iter() - .enumerate() - .map(|(i, c)| { - // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is - // assumed to be missing entry. So let's make sure stake weights are atleast 1 - let stake = 1.max( - stakes - .as_ref() - .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), - ); - (stake, i) - }) - .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { - if r_stake == l_stake { - peers[*r_info].id.cmp(&peers[*l_info].id) - } else { - r_stake.cmp(l_stake) - } - }) - .collect(); - - stakes_and_index - } - - fn stake_weighted_shuffle( - stakes_and_index: &[(u64, usize)], - seed: [u8; 32], - ) -> Vec<(u64, usize)> { - let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); - - let shuffle = weighted_shuffle(&stake_weights, seed); - - shuffle.iter().map(|x| stakes_and_index[*x]).collect() - } - - // Return sorted_retransmit_peers(including self) and their stakes - pub fn sorted_retransmit_peers_and_stakes( - &self, - stakes: Option<&HashMap>, - ) -> (Vec, Vec<(u64, usize)>) { - let mut peers = self.tvu_peers(); - // insert "self" into this list for the layer and neighborhood computation - peers.push(self.my_contact_info()); - let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); - (peers, stakes_and_index) - } - - /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list - pub fn shuffle_peers_and_index( - id: &Pubkey, - peers: &[ContactInfo], - stakes_and_index: &[(u64, usize)], - seed: [u8; 32], - ) -> (usize, Vec<(u64, usize)>) { - let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed); - let self_index = shuffled_stakes_and_index - .iter() - .enumerate() - .find_map(|(i, (_stake, index))| { - if peers[*index].id == *id { - Some(i) - } else { - None - } - }) - .unwrap(); - (self_index, shuffled_stakes_and_index) - } - /// compute broadcast table pub fn tpu_peers(&self) -> Vec { let self_pubkey = self.id(); @@ -3071,14 +2997,6 @@ pub fn push_messages_to_peer( Ok(()) } -pub fn stake_weight_peers( - peers: &mut Vec, - stakes: Option<&HashMap>, -) -> Vec<(u64, usize)> { - peers.dedup(); - ClusterInfo::sorted_stakes_with_index(peers, stakes) -} - // Filters out values from nodes with different shred-version. fn filter_on_shred_version( mut msg: Protocol, @@ -4061,15 +3979,6 @@ mod tests { assert_ne!(contact_info.shred_version, d.shred_version); cluster_info.insert_info(contact_info); stakes.insert(id4, 10); - - let mut peers = cluster_info.tvu_peers(); - let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes)); - assert_eq!(peers.len(), 2); - assert_eq!(peers[0].id, id); - assert_eq!(peers[1].id, id2); - assert_eq!(peers_and_stakes.len(), 2); - assert_eq!(peers_and_stakes[0].0, 10); - assert_eq!(peers_and_stakes[1].0, 1); } #[test] diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 4485e8e1cc..0143c0b3a6 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -105,7 +105,7 @@ impl ContactInfo { } /// New random ContactInfo for tests and simulations. - pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let delay = 10 * 60 * 1000; // 10 minutes let now = timestamp() - delay + rng.gen_range(0, 2 * delay); let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); diff --git a/gossip/src/deprecated.rs b/gossip/src/deprecated.rs index 57a7a8315c..120c69ffc1 100644 --- a/gossip/src/deprecated.rs +++ b/gossip/src/deprecated.rs @@ -1,4 +1,11 @@ -use solana_sdk::clock::Slot; +use { + crate::{ + cluster_info::ClusterInfo, contact_info::ContactInfo, weighted_shuffle::weighted_shuffle, + }, + itertools::Itertools, + solana_sdk::{clock::Slot, pubkey::Pubkey}, + std::collections::HashMap, +}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample, AbiEnumVisitor)] enum CompressionType { @@ -19,3 +26,74 @@ pub(crate) struct EpochIncompleteSlots { compression: CompressionType, compressed_list: Vec, } + +// Legacy methods copied for testing backward compatibility. + +pub fn sorted_retransmit_peers_and_stakes( + cluster_info: &ClusterInfo, + stakes: Option<&HashMap>, +) -> (Vec, Vec<(u64, usize)>) { + let mut peers = cluster_info.tvu_peers(); + // insert "self" into this list for the layer and neighborhood computation + peers.push(cluster_info.my_contact_info()); + let stakes_and_index = sorted_stakes_with_index(&peers, stakes); + (peers, stakes_and_index) +} + +pub fn sorted_stakes_with_index( + peers: &[ContactInfo], + stakes: Option<&HashMap>, +) -> Vec<(u64, usize)> { + let stakes_and_index: Vec<_> = peers + .iter() + .enumerate() + .map(|(i, c)| { + // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is + // assumed to be missing entry. So let's make sure stake weights are atleast 1 + let stake = 1.max( + stakes + .as_ref() + .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), + ); + (stake, i) + }) + .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { + if r_stake == l_stake { + peers[*r_info].id.cmp(&peers[*l_info].id) + } else { + r_stake.cmp(l_stake) + } + }) + .collect(); + + stakes_and_index +} + +pub fn shuffle_peers_and_index( + id: &Pubkey, + peers: &[ContactInfo], + stakes_and_index: &[(u64, usize)], + seed: [u8; 32], +) -> (usize, Vec<(u64, usize)>) { + let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed); + let self_index = shuffled_stakes_and_index + .iter() + .enumerate() + .find_map(|(i, (_stake, index))| { + if peers[*index].id == *id { + Some(i) + } else { + None + } + }) + .unwrap(); + (self_index, shuffled_stakes_and_index) +} + +fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> { + let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); + + let shuffle = weighted_shuffle(&stake_weights, seed); + + shuffle.iter().map(|x| stakes_and_index[*x]).collect() +} diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 7562674a0c..e5b2d7ccce 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -13,7 +13,7 @@ pub mod crds_gossip_push; pub mod crds_shards; pub mod crds_value; pub mod data_budget; -mod deprecated; +pub mod deprecated; pub mod duplicate_shred; pub mod epoch_slots; pub mod gossip_error; diff --git a/gossip/tests/cluster_info.rs b/gossip/tests/cluster_info.rs index 921a842c32..8d31a142f0 100644 --- a/gossip/tests/cluster_info.rs +++ b/gossip/tests/cluster_info.rs @@ -5,6 +5,7 @@ use { solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, + deprecated::{shuffle_peers_and_index, sorted_retransmit_peers_and_stakes}, }, solana_sdk::pubkey::Pubkey, std::{ @@ -118,14 +119,13 @@ fn run_simulation(stakes: &[u64], fanout: usize) { .map(|i| { let mut seed = [0; 32]; seed[0..4].copy_from_slice(&i.to_le_bytes()); + // TODO: Ideally these should use the new methods in + // solana_core::cluster_nodes, however that would add build + // dependency on solana_core which is not desired. let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); - let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index( - &cluster_info.id(), - &peers, - &stakes_and_index, - seed, - ); + sorted_retransmit_peers_and_stakes(&cluster_info, Some(&staked_nodes)); + let (_, shuffled_stakes_and_indexes) = + shuffle_peers_and_index(&cluster_info.id(), &peers, &stakes_and_index, seed); shuffled_stakes_and_indexes .into_iter() .map(|(_, i)| peers[i].clone())