From 570fd3f81054d49a7c59a731a58583a80f0909b9 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 24 Mar 2021 13:34:48 +0000 Subject: [PATCH] makes turbine peer computation consistent between broadcast and retransmit (#14910) get_broadcast_peers is using tvu_peers: https://github.com/solana-labs/solana/blob/84e52b606/core/src/broadcast_stage.rs#L362-L370 which is potentially inconsistent with retransmit_peers: https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1332-L1345 Also, the leader does not include its own contact-info when broadcasting shreds: https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1324 but on the retransmit side, slot leader is removed only _after_ neighbors and children are computed: https://github.com/solana-labs/solana/blob/84e52b606/core/src/retransmit_stage.rs#L383-L384 So the turbine broadcast tree is different between the two stages. This commit: * Removes retransmit_peers. Broadcast and retransmit stages will use tvu_peers consistently. * Retransmit stage removes slot leader _before_ computing children and neighbors. --- core/src/broadcast_stage.rs | 2 +- core/src/cluster_info.rs | 41 +++++++++++---------------------- core/src/crds_gossip_push.rs | 4 ++-- core/src/retransmit_stage.rs | 44 ++++++++++++++++++++++++++++++------ core/src/weighted_shuffle.rs | 18 +++++++-------- core/tests/gossip.rs | 4 ++-- streamer/src/sendmmsg.rs | 8 +++---- 7 files changed, 68 insertions(+), 53 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 6cfc635f1b..275ae7f47e 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -373,7 +373,7 @@ pub fn get_broadcast_peers( /// # Remarks pub fn broadcast_shreds( s: &UdpSocket, - shreds: &Arc>, + shreds: &[Shred], peers_and_stakes: &[(u64, usize)], peers: &[ContactInfo], last_datapoint_submit: &Arc, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b1cc79fbc4..41122637b4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -252,7 +252,6 @@ struct GossipStats { get_accounts_hash: Counter, all_tvu_peers: Counter, tvu_peers: Counter, - retransmit_peers: Counter, repair_peers: Counter, new_push_requests: Counter, new_push_requests2: Counter, @@ -1383,21 +1382,6 @@ impl ClusterInfo { .collect() } - /// all peers that have a valid tvu - pub fn retransmit_peers(&self) -> Vec { - self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers) - .crds - .get_nodes_contact_info() - .filter(|x| { - x.id != self.id() - && x.shred_version == self.my_shred_version() - && ContactInfo::is_valid_address(&x.tvu) - && ContactInfo::is_valid_address(&x.tvu_forwards) - }) - .cloned() - .collect() - } - /// all tvu peers with valid gossip addrs that likely have the slot being requested pub fn repair_peers(&self, slot: Slot) -> Vec { let mut time = Measure::start("repair_peers"); @@ -1461,9 +1445,9 @@ impl ClusterInfo { stakes_and_index: &[(u64, usize)], seed: [u8; 32], ) -> Vec<(u64, usize)> { - let stake_weights = stakes_and_index.iter().map(|(w, _)| *w).collect(); + let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); - let shuffle = weighted_shuffle(stake_weights, seed); + let shuffle = weighted_shuffle(&stake_weights, seed); shuffle.iter().map(|x| stakes_and_index[*x]).collect() } @@ -1473,7 +1457,7 @@ impl ClusterInfo { &self, stakes: Option<&HashMap>, ) -> (Vec, Vec<(u64, usize)>) { - let mut peers = self.retransmit_peers(); + let mut peers = self.tvu_peers(); // insert "self" into this list for the layer and neighborhood computation peers.push(self.my_contact_info()); let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); @@ -1520,20 +1504,22 @@ impl ClusterInfo { pub fn retransmit_to( peers: &[&ContactInfo], packet: &mut Packet, - slot_leader_pubkey: Option, s: &UdpSocket, forwarded: bool, ) -> Result<()> { trace!("retransmit orders {}", peers.len()); - let dests: Vec<_> = peers - .iter() - .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) - .map(|v| if forwarded { &v.tvu_forwards } else { &v.tvu }) - .collect(); - + let dests: Vec<_> = if forwarded { + peers + .iter() + .map(|peer| &peer.tvu_forwards) + .filter(|addr| ContactInfo::is_valid_address(addr)) + .collect() + } else { + peers.iter().map(|peer| &peer.tvu).collect() + }; let mut sent = 0; while sent < dests.len() { - match multicast(s, &mut packet.data[..packet.meta.size], &dests[sent..]) { + match multicast(s, &packet.data[..packet.meta.size], &dests[sent..]) { Ok(n) => sent += n, Err(e) => { inc_new_counter_error!( @@ -2902,7 +2888,6 @@ impl ClusterInfo { self.stats.gossip_packets_dropped_count.clear(), i64 ), - ("retransmit_peers", self.stats.retransmit_peers.clear(), i64), ("repair_peers", self.stats.repair_peers.clear(), i64), ( "new_push_requests", diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 64a09875b3..ac6575145b 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -129,7 +129,7 @@ impl CrdsGossipPush { let mut seed = [0; 32]; rand::thread_rng().fill(&mut seed[..]); let shuffle = weighted_shuffle( - staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), + &staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), seed, ); @@ -326,7 +326,7 @@ impl CrdsGossipPush { let mut seed = [0; 32]; rng.fill(&mut seed[..]); let mut shuffle = weighted_shuffle( - options.iter().map(|weighted| weighted.0).collect_vec(), + &options.iter().map(|weighted| weighted.0).collect_vec(), seed, ) .into_iter(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3ac3fd4388..55eba71c92 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -289,6 +289,33 @@ fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) -> } } +// Drops shred slot leader from retransmit peers. +// TODO: decide which bank should be used here. +fn get_retransmit_peers( + self_pubkey: Pubkey, + shred_slot: Slot, + leader_schedule_cache: &LeaderScheduleCache, + bank: &Bank, + stakes_cache: &EpochStakesCache, +) -> Vec<(u64 /*stakes*/, usize /*index*/)> { + match leader_schedule_cache.slot_leader_at(shred_slot, Some(bank)) { + None => { + error!("unknown leader for shred slot"); + stakes_cache.stakes_and_index.clone() + } + Some(pubkey) if pubkey == self_pubkey => { + error!("retransmit from slot leader: {}", pubkey); + stakes_cache.stakes_and_index.clone() + } + Some(pubkey) => stakes_cache + .stakes_and_index + .iter() + .filter(|(_, i)| stakes_cache.peers[*i].id != pubkey) + .copied() + .collect(), + } +} + #[allow(clippy::too_many_arguments)] fn retransmit( bank_forks: &RwLock, @@ -390,10 +417,17 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); + let stakes_and_index = get_retransmit_peers( + my_id, + shred_slot, + leader_schedule_cache, + r_bank.deref(), + r_epoch_stakes_cache.deref(), + ); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, &r_epoch_stakes_cache.peers, - &r_epoch_stakes_cache.stakes_and_index, + &stakes_and_index, packet.meta.seed, ); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -432,15 +466,11 @@ fn retransmit( .entry(packet.meta.addr().to_string()) .or_insert(0) += 1; - let leader = - leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let mut retransmit_time = Measure::start("retransmit_to"); if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?; - ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?; - } else { - ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?; } + ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?; retransmit_time.stop(); retransmit_total += retransmit_time.as_us(); } diff --git a/core/src/weighted_shuffle.rs b/core/src/weighted_shuffle.rs index 29afc6c64e..b7c3306011 100644 --- a/core/src/weighted_shuffle.rs +++ b/core/src/weighted_shuffle.rs @@ -9,18 +9,18 @@ use std::ops::Div; /// Returns a list of indexes shuffled based on the input weights /// Note - The sum of all weights must not exceed `u64::MAX` -pub fn weighted_shuffle(weights: Vec, seed: [u8; 32]) -> Vec +pub fn weighted_shuffle(weights: &[T], seed: [u8; 32]) -> Vec where T: Copy + PartialOrd + iter::Sum + Div + FromPrimitive + ToPrimitive, { - let total_weight: T = weights.clone().into_iter().sum(); + let total_weight: T = weights.iter().copied().sum(); let mut rng = ChaChaRng::from_seed(seed); weights - .into_iter() + .iter() .enumerate() .map(|(i, v)| { // This generates an "inverse" weight but it avoids floating point math - let x = (total_weight / v) + let x = (total_weight / *v) .to_u64() .expect("values > u64::max are not supported"); ( @@ -71,7 +71,7 @@ mod tests { fn test_weighted_shuffle_iterator() { let mut test_set = [0; 6]; let mut count = 0; - let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); shuffle.into_iter().for_each(|x| { assert_eq!(test_set[x], 0); test_set[x] = 1; @@ -86,7 +86,7 @@ mod tests { let mut test_weights = vec![0; 100]; (0..100).for_each(|i| test_weights[i] = (i + 1) as u64); let mut count = 0; - let shuffle = weighted_shuffle(test_weights, [0xa5; 32]); + let shuffle = weighted_shuffle(&test_weights, [0xa5; 32]); shuffle.into_iter().for_each(|x| { assert_eq!(test_set[x], 0); test_set[x] = 1; @@ -97,9 +97,9 @@ mod tests { #[test] fn test_weighted_shuffle_compare() { - let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); - let shuffle1 = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle1 = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); shuffle1 .into_iter() .zip(shuffle.into_iter()) @@ -112,7 +112,7 @@ mod tests { fn test_weighted_shuffle_imbalanced() { let mut weights = vec![std::u32::MAX as u64; 3]; weights.push(1); - let shuffle = weighted_shuffle(weights.clone(), [0x5a; 32]); + let shuffle = weighted_shuffle(&weights, [0x5a; 32]); shuffle.into_iter().for_each(|x| { if x == weights.len() - 1 { assert_eq!(weights[x], 1); diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index d06d1a61c1..b272b908bd 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -199,9 +199,9 @@ pub fn cluster_info_retransmit() { assert!(done); let mut p = Packet::default(); p.meta.size = 10; - let peers = c1.retransmit_peers(); + let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap(); + ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| { diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 39d320ea50..8802117829 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -97,7 +97,7 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec, &SocketAddr)]) -> io::R } #[cfg(not(target_os = "linux"))] -pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result { +pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result { let count = dests.len(); for a in dests { sock.send_to(packet, a)?; @@ -107,7 +107,7 @@ pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> } #[cfg(target_os = "linux")] -pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result { +pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result { use libc::{sendmmsg, socklen_t}; use std::mem; use std::os::unix::io::AsRawFd; @@ -216,11 +216,11 @@ mod tests { let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut packet = Packet::default(); + let packet = Packet::default(); let sent = multicast( &sender, - &mut packet.data[..packet.meta.size], + &packet.data[..packet.meta.size], &[&addr, &addr2, &addr3, &addr4], ) .ok();