From 0c0384ec32e6077e2c7c4bf187d03ccd0c9461f0 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 14 Oct 2021 15:09:36 +0000 Subject: [PATCH] revises turbine peers shuffling order (#20480) Turbine randomly shuffles cluster nodes on a broadcast tree for each shred. This requires knowing the stakes and nodes' contact-infos (from gossip). However gossip is subject to partitioning and propogation delays. Additionally unstaked nodes may join and leave the cluster at any moment, changing the cluster view from one node to another. This commit: * Always arranges the unstaked nodes at the bottom of turbine broadcast tree. * Staked nodes are always included regardless of if their contact-info is available in gossip or not. * Uses the unbiased WeightedShuffle construct for shuffling nodes. --- core/src/broadcast_stage.rs | 18 +- .../broadcast_duplicates_run.rs | 13 +- core/src/cluster_nodes.rs | 236 ++++++++++++++++-- core/src/retransmit_stage.rs | 45 ++-- gossip/src/cluster_info.rs | 16 +- gossip/src/weighted_shuffle.rs | 86 ++++++- sdk/src/feature_set.rs | 5 + 7 files changed, 355 insertions(+), 64 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 84ffae41fb..9db03d8f0a 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -17,7 +17,7 @@ use { Sender as CrossbeamSender, }, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}, + solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -33,6 +33,7 @@ use { }, std::{ collections::HashMap, + iter::repeat, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -412,8 +413,6 @@ pub fn broadcast_shreds( ) -> Result<()> { let mut result = Ok(()); let mut shred_select = Measure::start("shred_select"); - // Only the leader broadcasts shreds. - let leader = cluster_info.id(); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) @@ -427,12 +426,13 @@ pub fn broadcast_shreds( cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); let root_bank = root_bank.clone(); - shreds.filter_map(move |shred| { - let seed = shred.seed(leader, &root_bank); - let node = cluster_nodes.get_broadcast_peer(seed)?; - socket_addr_space - .check(&node.tvu) - .then(|| (&shred.payload, node.tvu)) + shreds.flat_map(move |shred| { + repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs( + shred, + &root_bank, + DATA_PLANE_FANOUT, + socket_addr_space, + )) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index c7b51cf9c2..5964cdb606 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,6 +3,7 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, + solana_gossip::cluster_info::DATA_PLANE_FANOUT, solana_ledger::shred::Shredder, solana_sdk::{ hash::Hash, @@ -246,6 +247,11 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); + let nodes: Vec<_> = cluster_info + .all_peers() + .into_iter() + .map(|(node, _)| node) + .collect(); // Creat cluster partition. let cluster_partition: HashSet = { @@ -273,8 +279,11 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let seed = shred.seed(self_pubkey, &root_bank); - let node = cluster_nodes.get_broadcast_peer(seed)?; + let addr = cluster_nodes + .get_broadcast_addrs(shred, &root_bank, DATA_PLANE_FANOUT, socket_addr_space) + .first() + .copied()?; + let node = nodes.iter().find(|node| node.tvu == addr)?; if !socket_addr_space.check(&node.tvu) { return None; } diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index ec982db518..ff0ce1124c 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -2,22 +2,31 @@ use { crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, lru::LruCache, + rand::{Rng, SeedableRng}, + rand_chacha::ChaChaRng, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, - weighted_shuffle::{weighted_best, weighted_shuffle}, + weighted_shuffle::{ + weighted_best, weighted_sample_single, weighted_shuffle, WeightedShuffle, + }, }, + solana_ledger::shred::Shred, solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, + feature_set, pubkey::Pubkey, + timing::timestamp, }, + solana_streamer::socket::SocketAddrSpace, std::{ any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData, + net::SocketAddr, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -41,6 +50,9 @@ pub struct ClusterNodes { // All staked nodes + other known tvu-peers + the node itself; // sorted by (stake, pubkey) in descending order. nodes: Vec, + // Cumulative stakes (excluding the node itself), used for sampling + // broadcast peers. + cumulative_weights: Vec, // Weights and indices for sampling peers. weighted_{shuffle,best} expect // weights >= 1. For backward compatibility we use max(1, stake) for // weights and exclude nodes with no contact-info. @@ -102,9 +114,68 @@ impl ClusterNodes { new_cluster_nodes(cluster_info, stakes) } + pub(crate) fn get_broadcast_addrs( + &self, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + socket_addr_space: &SocketAddrSpace, + ) -> Vec { + const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); + let shred_seed = shred.seed(self.pubkey, root_bank); + if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { + if let Some(node) = self.get_broadcast_peer(shred_seed) { + if socket_addr_space.check(&node.tvu) { + return vec![node.tvu]; + } + } + return Vec::default(); + } + let mut rng = ChaChaRng::from_seed(shred_seed); + let index = match weighted_sample_single(&mut rng, &self.cumulative_weights) { + None => return Vec::default(), + Some(index) => index, + }; + if let Some(node) = self.nodes[index].contact_info() { + let now = timestamp(); + let age = Duration::from_millis(now.saturating_sub(node.wallclock)); + if age < MAX_CONTACT_INFO_AGE + && ContactInfo::is_valid_address(&node.tvu, socket_addr_space) + { + return vec![node.tvu]; + } + } + let nodes: Vec<_> = self + .nodes + .iter() + .filter(|node| node.pubkey() != self.pubkey) + .collect(); + if nodes.is_empty() { + return Vec::default(); + } + let mut rng = ChaChaRng::from_seed(shred_seed); + let nodes = shuffle_nodes(&mut rng, &nodes); + let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); + neighbors[..1] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)) + .chain( + neighbors[1..] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)), + ) + .chain( + children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)), + ) + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect() + } + /// Returns the root of turbine broadcast tree, which the leader sends the /// shred to. - pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { if self.index.is_empty() { None } else { @@ -118,14 +189,82 @@ impl ClusterNodes { } impl ClusterNodes { - pub(crate) fn get_retransmit_peers( + pub(crate) fn get_retransmit_addrs( + &self, + slot_leader: Pubkey, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + ) -> Vec { + let (neighbors, children) = + self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), it should send the packet to tvu socket of its + // children and also tvu_forward socket of its neighbors. Otherwise it + // should only forward to tvu_forwards socket of its children. + if neighbors[0].pubkey() != self.pubkey { + return children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) + .collect(); + } + // First neighbor is this node itself, so skip it. + neighbors[1..] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) + .chain( + children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)), + ) + .collect() + } + + fn get_retransmit_peers( + &self, + slot_leader: Pubkey, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + ) -> ( + Vec<&Node>, // neighbors + Vec<&Node>, // children + ) { + let shred_seed = shred.seed(slot_leader, root_bank); + if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { + return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); + } + // Exclude slot leader from list of nodes. + let nodes: Vec<_> = if slot_leader == self.pubkey { + error!("retransmit from slot leader: {}", slot_leader); + self.nodes.iter().collect() + } else { + self.nodes + .iter() + .filter(|node| node.pubkey() != slot_leader) + .collect() + }; + let mut rng = ChaChaRng::from_seed(shred_seed); + let nodes = shuffle_nodes(&mut rng, &nodes); + let self_index = nodes + .iter() + .position(|node| node.pubkey() == self.pubkey) + .unwrap(); + let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); + // Assert that the node itself is included in the set of neighbors, at + // the right offset. + debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); + (neighbors, children) + } + + fn get_retransmit_peers_compat( &self, shred_seed: [u8; 32], fanout: usize, slot_leader: Pubkey, ) -> ( - Vec<&ContactInfo>, // neighbors - Vec<&ContactInfo>, // children + Vec<&Node>, // neighbors + Vec<&Node>, // children ) { // Exclude leader from list of nodes. let (weights, index): (Vec, Vec) = if slot_leader == self.pubkey { @@ -153,16 +292,36 @@ impl ClusterNodes { self.nodes[neighbors[self_index % fanout]].pubkey(), self.pubkey ); - let get_contact_infos = |index: Vec| -> Vec<&ContactInfo> { - index - .into_iter() - .map(|i| self.nodes[i].contact_info().unwrap()) - .collect() - }; - (get_contact_infos(neighbors), get_contact_infos(children)) + let neighbors = neighbors.into_iter().map(|i| &self.nodes[i]).collect(); + let children = children.into_iter().map(|i| &self.nodes[i]).collect(); + (neighbors, children) } } +fn build_cumulative_weights(self_pubkey: Pubkey, nodes: &[Node]) -> Vec { + let cumulative_stakes: Vec<_> = nodes + .iter() + .scan(0, |acc, node| { + if node.pubkey() != self_pubkey { + *acc += node.stake; + } + Some(*acc) + }) + .collect(); + if cumulative_stakes.last() != Some(&0) { + return cumulative_stakes; + } + nodes + .iter() + .scan(0, |acc, node| { + if node.pubkey() != self_pubkey { + *acc += 1; + } + Some(*acc) + }) + .collect() +} + fn new_cluster_nodes( cluster_info: &ClusterInfo, stakes: &HashMap, @@ -170,6 +329,11 @@ fn new_cluster_nodes( let self_pubkey = cluster_info.id(); let nodes = get_nodes(cluster_info, stakes); let broadcast = TypeId::of::() == TypeId::of::(); + let cumulative_weights = if broadcast { + build_cumulative_weights(self_pubkey, &nodes) + } else { + Vec::default() + }; // For backward compatibility: // * nodes which do not have contact-info are excluded. // * stakes are floored at 1. @@ -187,6 +351,7 @@ fn new_cluster_nodes( ClusterNodes { pubkey: self_pubkey, nodes, + cumulative_weights, index, _phantom: PhantomData::default(), } @@ -225,6 +390,44 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec bool { + let feature_slot = root_bank + .feature_set + .activated_slot(&feature_set::turbine_peers_shuffle::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = root_bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + +// Shuffles nodes w.r.t their stakes. +// Unstaked nodes will always appear at the very end. +fn shuffle_nodes<'a, R: Rng>(rng: &mut R, nodes: &[&'a Node]) -> Vec<&'a Node> { + // Nodes are sorted by (stake, pubkey) in descending order. + let stakes: Vec = nodes + .iter() + .map(|node| node.stake) + .take_while(|stake| *stake > 0) + .collect(); + let num_staked = stakes.len(); + let mut out: Vec<_> = WeightedShuffle::new(rng, &stakes) + .unwrap() + .map(|i| nodes[i]) + .collect(); + let weights = vec![1; nodes.len() - num_staked]; + out.extend( + WeightedShuffle::new(rng, &weights) + .unwrap() + .map(|i| nodes[i + num_staked]), + ); + out +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -306,6 +509,7 @@ impl Default for ClusterNodes { Self { pubkey: Pubkey::default(), nodes: Vec::default(), + cumulative_weights: Vec::default(), index: Vec::default(), _phantom: PhantomData::default(), } @@ -458,15 +662,15 @@ mod tests { let (neighbors_indices, children_indices) = compute_retransmit_peers(fanout, self_index, &shuffled_index); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader); + cluster_nodes.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); assert_eq!(children.len(), children_indices.len()); for (node, index) in children.into_iter().zip(children_indices) { - assert_eq!(*node, peers[index]); + assert_eq!(*node.contact_info().unwrap(), peers[index]); } assert_eq!(neighbors.len(), neighbors_indices.len()); - assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id); + assert_eq!(neighbors[0].pubkey(), peers[neighbors_indices[0]].id); for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) { - assert_eq!(*node, peers[index]); + assert_eq!(*node.contact_info().unwrap(), peers[index]); } } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3770f532d8..de0f714fc4 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -17,7 +17,10 @@ use { lru::LruCache, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_client::rpc_response::SlotUpdate, - solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, + solana_gossip::{ + cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, + contact_info::ContactInfo, + }, solana_ledger::{ shred::Shred, {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, @@ -28,6 +31,7 @@ use { solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, + solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, std::{ collections::{BTreeSet, HashSet}, net::UdpSocket, @@ -215,7 +219,6 @@ fn retransmit( epoch_cache_update.stop(); stats.epoch_cache_update += epoch_cache_update.as_us(); - let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); let retransmit_shred = |shred: Shred, socket: &UdpSocket| { if should_skip_retransmit(&shred, shreds_received) { @@ -253,37 +256,29 @@ fn retransmit( }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); - let shred_seed = shred.seed(slot_leader, &root_bank); - let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); - let anchor_node = neighbors[0].id == my_id; + let addrs: Vec<_> = cluster_nodes + .get_retransmit_addrs(slot_leader, &shred, &root_bank, DATA_PLANE_FANOUT) + .into_iter() + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect(); compute_turbine_peers.stop(); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), it should send the packet to tvu socket of its - // children and also tvu_forward socket of its neighbors. Otherwise it - // should only forward to tvu_forward socket of its children. - if anchor_node { - // First neighbor is this node itself, so skip it. - ClusterInfo::retransmit_to( - &neighbors[1..], - &shred.payload, - socket, - true, // forward socket - socket_addr_space, + if let Err(SendPktsError::IoError(ioerr, num_failed)) = + multi_target_send(socket, &shred.payload, &addrs) + { + inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); + inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + addrs.len(), ); } - ClusterInfo::retransmit_to( - &children, - &shred.payload, - socket, - !anchor_node, // send to forward socket! - socket_addr_space, - ); retransmit_time.stop(); stats .retransmit_total diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 54bc76e26c..3eefd7ef2a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2671,27 +2671,27 @@ fn get_epoch_duration(bank_forks: Option<&RwLock>) -> Duration { /// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake -pub fn compute_retransmit_peers( +pub fn compute_retransmit_peers( fanout: usize, - node: usize, - index: &[usize], -) -> (Vec /*neighbors*/, Vec /*children*/) { + index: usize, // Local node's index withing the nodes slice. + nodes: &[T], +) -> (Vec /*neighbors*/, Vec /*children*/) { // 1st layer: fanout nodes starting at 0 // 2nd layer: fanout**2 nodes starting at fanout // 3rd layer: fanout**3 nodes starting at fanout + fanout**2 // ... // Each layer is divided into neighborhoods of fanout nodes each. - let offset = node % fanout; // Node's index within its neighborhood. - let anchor = node - offset; // First node in the neighborhood. + let offset = index % fanout; // Node's index within its neighborhood. + let anchor = index - offset; // First node in the neighborhood. let neighbors = (anchor..) .take(fanout) - .map(|i| index.get(i).copied()) + .map(|i| nodes.get(i).copied()) .while_some() .collect(); let children = ((anchor + 1) * fanout + offset..) .step_by(fanout) .take(fanout) - .map(|i| index.get(i).copied()) + .map(|i| nodes.get(i).copied()) .while_some() .collect(); (neighbors, children) diff --git a/gossip/src/weighted_shuffle.rs b/gossip/src/weighted_shuffle.rs index 25e0658bec..0482396221 100644 --- a/gossip/src/weighted_shuffle.rs +++ b/gossip/src/weighted_shuffle.rs @@ -135,6 +135,35 @@ where } } +// Equivalent to WeightedShuffle(rng, weights).unwrap().next(). +pub fn weighted_sample_single(rng: &mut R, cumulative_weights: &[T]) -> Option +where + T: Copy + Default + PartialOrd + SampleUniform, +{ + let zero = ::default(); + let high = cumulative_weights.last().copied().unwrap_or_default(); + #[allow(clippy::neg_cmp_op_on_partial_ord)] + if !(high > zero) { + return None; + } + let sample = ::Sampler::sample_single(zero, high, rng); + let mut lo = 0usize; + let mut hi = cumulative_weights.len() - 1; + while lo + 1 < hi { + let k = lo + (hi - lo) / 2; + if cumulative_weights[k] <= sample { + lo = k; + } else { + hi = k; + } + } + if cumulative_weights[lo] > sample { + Some(lo) + } else { + Some(hi) + } +} + /// Returns a list of indexes shuffled based on the input weights /// Note - The sum of all weights must not exceed `u64::MAX` pub fn weighted_shuffle(weights: F, seed: [u8; 32]) -> Vec @@ -288,20 +317,40 @@ mod tests { assert_eq!(best_index, 2); } + // Asserts that empty weights will return empty shuffle. + #[test] + fn test_weighted_shuffle_empty_weights() { + let weights = Vec::::new(); + let mut rng = rand::thread_rng(); + let shuffle = WeightedShuffle::new(&mut rng, &weights); + assert!(shuffle.unwrap().next().is_none()); + assert_eq!(weighted_sample_single(&mut rng, &weights), None); + } + + // Asserts that zero weights will return empty shuffle. + #[test] + fn test_weighted_shuffle_zero_weights() { + let weights = vec![0u64; 5]; + let mut rng = rand::thread_rng(); + let shuffle = WeightedShuffle::new(&mut rng, &weights); + assert!(shuffle.unwrap().next().is_none()); + assert_eq!(weighted_sample_single(&mut rng, &weights), None); + } + // Asserts that each index is selected proportional to its weight. #[test] fn test_weighted_shuffle_sanity() { let seed: Vec<_> = (1..).step_by(3).take(32).collect(); let seed: [u8; 32] = seed.try_into().unwrap(); let mut rng = ChaChaRng::from_seed(seed); - let weights = [1, 1000, 10, 100]; - let mut counts = [0; 4]; + let weights = [1, 0, 1000, 0, 0, 10, 100, 0]; + let mut counts = [0; 8]; for _ in 0..100000 { let mut shuffle = WeightedShuffle::new(&mut rng, &weights).unwrap(); counts[shuffle.next().unwrap()] += 1; let _ = shuffle.count(); // consume the rest. } - assert_eq!(counts, [101, 90113, 891, 8895]); + assert_eq!(counts, [101, 0, 90113, 0, 0, 891, 8895, 0]); } #[test] @@ -309,6 +358,13 @@ mod tests { let weights = [ 78, 70, 38, 27, 21, 0, 82, 42, 21, 77, 77, 17, 4, 50, 96, 83, 33, 16, 72, ]; + let cumulative_weights: Vec<_> = weights + .iter() + .scan(0, |acc, w| { + *acc += w; + Some(*acc) + }) + .collect(); let seed = [48u8; 32]; let mut rng = ChaChaRng::from_seed(seed); let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); @@ -316,6 +372,11 @@ mod tests { shuffle, [2, 11, 16, 0, 13, 14, 15, 10, 1, 9, 7, 6, 12, 18, 4, 17, 3, 8] ); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(2), + ); let seed = [37u8; 32]; let mut rng = ChaChaRng::from_seed(seed); let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); @@ -323,12 +384,24 @@ mod tests { shuffle, [17, 3, 14, 13, 6, 10, 15, 16, 9, 2, 4, 1, 0, 7, 8, 18, 11, 12] ); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(17), + ); } #[test] fn test_weighted_shuffle_match_slow() { let mut rng = rand::thread_rng(); let weights: Vec = repeat_with(|| rng.gen_range(0, 1000)).take(997).collect(); + let cumulative_weights: Vec<_> = weights + .iter() + .scan(0, |acc, w| { + *acc += w; + Some(*acc) + }) + .collect(); for _ in 0..10 { let mut seed = [0u8; 32]; rng.fill(&mut seed[..]); @@ -336,7 +409,12 @@ mod tests { let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); let mut rng = ChaChaRng::from_seed(seed); let shuffle_slow = weighted_shuffle_slow(&mut rng, weights.clone()); - assert_eq!(shuffle, shuffle_slow,); + assert_eq!(shuffle, shuffle_slow); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(shuffle[0]), + ); } } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index fcf581660d..98204a9223 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -241,6 +241,10 @@ pub mod send_to_tpu_vote_port { solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); } +pub mod turbine_peers_shuffle { + solana_sdk::declare_id!("4VvpgRD6UsHvkXwpuQhtR5NG1G4esMaExeWuSEpsYRUa"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -296,6 +300,7 @@ lazy_static! { (optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"), (remove_native_loader::id(), "Remove support for the native loader"), (send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"), + (turbine_peers_shuffle::id(), "turbine peers shuffle patch"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()