makes turbine peer computation consistent between broadcast and retransmit (#14910)

get_broadcast_peers is using tvu_peers:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/broadcast_stage.rs#L362-L370
which is potentially inconsistent with retransmit_peers:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1332-L1345

Also, the leader does not include its own contact-info when broadcasting
shreds:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/cluster_info.rs#L1324
but on the retransmit side, slot leader is removed only _after_ neighbors and
children are computed:
https://github.com/solana-labs/solana/blob/84e52b606/core/src/retransmit_stage.rs#L383-L384
So the turbine broadcast tree is different between the two stages.

This commit:
* Removes retransmit_peers. Broadcast and retransmit stages will use tvu_peers
  consistently.
* Retransmit stage removes slot leader _before_ computing children and
  neighbors.
This commit is contained in:
behzad nouri 2021-03-24 13:34:48 +00:00 committed by GitHub
parent 664ed76523
commit 570fd3f810
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 53 deletions

View File

@ -373,7 +373,7 @@ pub fn get_broadcast_peers(
/// # Remarks /// # Remarks
pub fn broadcast_shreds( pub fn broadcast_shreds(
s: &UdpSocket, s: &UdpSocket,
shreds: &Arc<Vec<Shred>>, shreds: &[Shred],
peers_and_stakes: &[(u64, usize)], peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo], peers: &[ContactInfo],
last_datapoint_submit: &Arc<AtomicU64>, last_datapoint_submit: &Arc<AtomicU64>,

View File

@ -252,7 +252,6 @@ struct GossipStats {
get_accounts_hash: Counter, get_accounts_hash: Counter,
all_tvu_peers: Counter, all_tvu_peers: Counter,
tvu_peers: Counter, tvu_peers: Counter,
retransmit_peers: Counter,
repair_peers: Counter, repair_peers: Counter,
new_push_requests: Counter, new_push_requests: Counter,
new_push_requests2: Counter, new_push_requests2: Counter,
@ -1383,21 +1382,6 @@ impl ClusterInfo {
.collect() .collect()
} }
/// all peers that have a valid tvu
pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers)
.crds
.get_nodes_contact_info()
.filter(|x| {
x.id != self.id()
&& x.shred_version == self.my_shred_version()
&& ContactInfo::is_valid_address(&x.tvu)
&& ContactInfo::is_valid_address(&x.tvu_forwards)
})
.cloned()
.collect()
}
/// all tvu peers with valid gossip addrs that likely have the slot being requested /// all tvu peers with valid gossip addrs that likely have the slot being requested
pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> { pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let mut time = Measure::start("repair_peers"); let mut time = Measure::start("repair_peers");
@ -1461,9 +1445,9 @@ impl ClusterInfo {
stakes_and_index: &[(u64, usize)], stakes_and_index: &[(u64, usize)],
seed: [u8; 32], seed: [u8; 32],
) -> Vec<(u64, usize)> { ) -> Vec<(u64, usize)> {
let stake_weights = stakes_and_index.iter().map(|(w, _)| *w).collect(); let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect();
let shuffle = weighted_shuffle(stake_weights, seed); let shuffle = weighted_shuffle(&stake_weights, seed);
shuffle.iter().map(|x| stakes_and_index[*x]).collect() shuffle.iter().map(|x| stakes_and_index[*x]).collect()
} }
@ -1473,7 +1457,7 @@ impl ClusterInfo {
&self, &self,
stakes: Option<&HashMap<Pubkey, u64>>, stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) { ) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.retransmit_peers(); let mut peers = self.tvu_peers();
// insert "self" into this list for the layer and neighborhood computation // insert "self" into this list for the layer and neighborhood computation
peers.push(self.my_contact_info()); peers.push(self.my_contact_info());
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
@ -1520,20 +1504,22 @@ impl ClusterInfo {
pub fn retransmit_to( pub fn retransmit_to(
peers: &[&ContactInfo], peers: &[&ContactInfo],
packet: &mut Packet, packet: &mut Packet,
slot_leader_pubkey: Option<Pubkey>,
s: &UdpSocket, s: &UdpSocket,
forwarded: bool, forwarded: bool,
) -> Result<()> { ) -> Result<()> {
trace!("retransmit orders {}", peers.len()); trace!("retransmit orders {}", peers.len());
let dests: Vec<_> = peers let dests: Vec<_> = if forwarded {
.iter() peers
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) .iter()
.map(|v| if forwarded { &v.tvu_forwards } else { &v.tvu }) .map(|peer| &peer.tvu_forwards)
.collect(); .filter(|addr| ContactInfo::is_valid_address(addr))
.collect()
} else {
peers.iter().map(|peer| &peer.tvu).collect()
};
let mut sent = 0; let mut sent = 0;
while sent < dests.len() { while sent < dests.len() {
match multicast(s, &mut packet.data[..packet.meta.size], &dests[sent..]) { match multicast(s, &packet.data[..packet.meta.size], &dests[sent..]) {
Ok(n) => sent += n, Ok(n) => sent += n,
Err(e) => { Err(e) => {
inc_new_counter_error!( inc_new_counter_error!(
@ -2902,7 +2888,6 @@ impl ClusterInfo {
self.stats.gossip_packets_dropped_count.clear(), self.stats.gossip_packets_dropped_count.clear(),
i64 i64
), ),
("retransmit_peers", self.stats.retransmit_peers.clear(), i64),
("repair_peers", self.stats.repair_peers.clear(), i64), ("repair_peers", self.stats.repair_peers.clear(), i64),
( (
"new_push_requests", "new_push_requests",

View File

@ -129,7 +129,7 @@ impl CrdsGossipPush {
let mut seed = [0; 32]; let mut seed = [0; 32];
rand::thread_rng().fill(&mut seed[..]); rand::thread_rng().fill(&mut seed[..]);
let shuffle = weighted_shuffle( let shuffle = weighted_shuffle(
staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), &staked_peers.iter().map(|(_, stake)| *stake).collect_vec(),
seed, seed,
); );
@ -326,7 +326,7 @@ impl CrdsGossipPush {
let mut seed = [0; 32]; let mut seed = [0; 32];
rng.fill(&mut seed[..]); rng.fill(&mut seed[..]);
let mut shuffle = weighted_shuffle( let mut shuffle = weighted_shuffle(
options.iter().map(|weighted| weighted.0).collect_vec(), &options.iter().map(|weighted| weighted.0).collect_vec(),
seed, seed,
) )
.into_iter(); .into_iter();

View File

@ -289,6 +289,33 @@ fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) ->
} }
} }
// Drops shred slot leader from retransmit peers.
// TODO: decide which bank should be used here.
fn get_retransmit_peers(
self_pubkey: Pubkey,
shred_slot: Slot,
leader_schedule_cache: &LeaderScheduleCache,
bank: &Bank,
stakes_cache: &EpochStakesCache,
) -> Vec<(u64 /*stakes*/, usize /*index*/)> {
match leader_schedule_cache.slot_leader_at(shred_slot, Some(bank)) {
None => {
error!("unknown leader for shred slot");
stakes_cache.stakes_and_index.clone()
}
Some(pubkey) if pubkey == self_pubkey => {
error!("retransmit from slot leader: {}", pubkey);
stakes_cache.stakes_and_index.clone()
}
Some(pubkey) => stakes_cache
.stakes_and_index
.iter()
.filter(|(_, i)| stakes_cache.peers[*i].id != pubkey)
.copied()
.collect(),
}
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn retransmit( fn retransmit(
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
@ -390,10 +417,17 @@ fn retransmit(
} }
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
let stakes_and_index = get_retransmit_peers(
my_id,
shred_slot,
leader_schedule_cache,
r_bank.deref(),
r_epoch_stakes_cache.deref(),
);
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id, &my_id,
&r_epoch_stakes_cache.peers, &r_epoch_stakes_cache.peers,
&r_epoch_stakes_cache.stakes_and_index, &stakes_and_index,
packet.meta.seed, packet.meta.seed,
); );
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
@ -432,15 +466,11 @@ fn retransmit(
.entry(packet.meta.addr().to_string()) .entry(packet.meta.addr().to_string())
.or_insert(0) += 1; .or_insert(0) += 1;
let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
let mut retransmit_time = Measure::start("retransmit_to"); let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward { if !packet.meta.forward {
ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?; ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?;
} else {
ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?;
} }
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
retransmit_time.stop(); retransmit_time.stop();
retransmit_total += retransmit_time.as_us(); retransmit_total += retransmit_time.as_us();
} }

View File

@ -9,18 +9,18 @@ use std::ops::Div;
/// Returns a list of indexes shuffled based on the input weights /// Returns a list of indexes shuffled based on the input weights
/// Note - The sum of all weights must not exceed `u64::MAX` /// Note - The sum of all weights must not exceed `u64::MAX`
pub fn weighted_shuffle<T>(weights: Vec<T>, seed: [u8; 32]) -> Vec<usize> pub fn weighted_shuffle<T>(weights: &[T], seed: [u8; 32]) -> Vec<usize>
where where
T: Copy + PartialOrd + iter::Sum + Div<T, Output = T> + FromPrimitive + ToPrimitive, T: Copy + PartialOrd + iter::Sum + Div<T, Output = T> + FromPrimitive + ToPrimitive,
{ {
let total_weight: T = weights.clone().into_iter().sum(); let total_weight: T = weights.iter().copied().sum();
let mut rng = ChaChaRng::from_seed(seed); let mut rng = ChaChaRng::from_seed(seed);
weights weights
.into_iter() .iter()
.enumerate() .enumerate()
.map(|(i, v)| { .map(|(i, v)| {
// This generates an "inverse" weight but it avoids floating point math // This generates an "inverse" weight but it avoids floating point math
let x = (total_weight / v) let x = (total_weight / *v)
.to_u64() .to_u64()
.expect("values > u64::max are not supported"); .expect("values > u64::max are not supported");
( (
@ -71,7 +71,7 @@ mod tests {
fn test_weighted_shuffle_iterator() { fn test_weighted_shuffle_iterator() {
let mut test_set = [0; 6]; let mut test_set = [0; 6];
let mut count = 0; let mut count = 0;
let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]);
shuffle.into_iter().for_each(|x| { shuffle.into_iter().for_each(|x| {
assert_eq!(test_set[x], 0); assert_eq!(test_set[x], 0);
test_set[x] = 1; test_set[x] = 1;
@ -86,7 +86,7 @@ mod tests {
let mut test_weights = vec![0; 100]; let mut test_weights = vec![0; 100];
(0..100).for_each(|i| test_weights[i] = (i + 1) as u64); (0..100).for_each(|i| test_weights[i] = (i + 1) as u64);
let mut count = 0; let mut count = 0;
let shuffle = weighted_shuffle(test_weights, [0xa5; 32]); let shuffle = weighted_shuffle(&test_weights, [0xa5; 32]);
shuffle.into_iter().for_each(|x| { shuffle.into_iter().for_each(|x| {
assert_eq!(test_set[x], 0); assert_eq!(test_set[x], 0);
test_set[x] = 1; test_set[x] = 1;
@ -97,9 +97,9 @@ mod tests {
#[test] #[test]
fn test_weighted_shuffle_compare() { fn test_weighted_shuffle_compare() {
let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]);
let shuffle1 = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], [0x5a; 32]); let shuffle1 = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]);
shuffle1 shuffle1
.into_iter() .into_iter()
.zip(shuffle.into_iter()) .zip(shuffle.into_iter())
@ -112,7 +112,7 @@ mod tests {
fn test_weighted_shuffle_imbalanced() { fn test_weighted_shuffle_imbalanced() {
let mut weights = vec![std::u32::MAX as u64; 3]; let mut weights = vec![std::u32::MAX as u64; 3];
weights.push(1); weights.push(1);
let shuffle = weighted_shuffle(weights.clone(), [0x5a; 32]); let shuffle = weighted_shuffle(&weights, [0x5a; 32]);
shuffle.into_iter().for_each(|x| { shuffle.into_iter().for_each(|x| {
if x == weights.len() - 1 { if x == weights.len() - 1 {
assert_eq!(weights[x], 1); assert_eq!(weights[x], 1);

View File

@ -199,9 +199,9 @@ pub fn cluster_info_retransmit() {
assert!(done); assert!(done);
let mut p = Packet::default(); let mut p = Packet::default();
p.meta.size = 10; p.meta.size = 10;
let peers = c1.retransmit_peers(); let peers = c1.tvu_peers();
let retransmit_peers: Vec<_> = peers.iter().collect(); let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap(); ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap();
let res: Vec<_> = [tn1, tn2, tn3] let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter() .into_par_iter()
.map(|s| { .map(|s| {

View File

@ -97,7 +97,7 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec<u8>, &SocketAddr)]) -> io::R
} }
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> { pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result<usize> {
let count = dests.len(); let count = dests.len();
for a in dests { for a in dests {
sock.send_to(packet, a)?; sock.send_to(packet, a)?;
@ -107,7 +107,7 @@ pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) ->
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> { pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t}; use libc::{sendmmsg, socklen_t};
use std::mem; use std::mem;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
@ -216,11 +216,11 @@ mod tests {
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packet = Packet::default(); let packet = Packet::default();
let sent = multicast( let sent = multicast(
&sender, &sender,
&mut packet.data[..packet.meta.size], &packet.data[..packet.meta.size],
&[&addr, &addr2, &addr3, &addr4], &[&addr, &addr2, &addr3, &addr4],
) )
.ok(); .ok();