From 64c13b74d8616380a101bfc99ac3695b9fc9fac8 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 20 Jan 2023 17:20:51 +0000 Subject: [PATCH] errors out when retransmit loopbacks to the slot leader (#29789) When broadcasting shreds, turbine excludes the slot leader from the random shuffle. Doing so, shreds should never loopback to the leader. If shreds reaching retransmit stage are from the node's own leader slots they should not be retransmited to any nodes. --- core/src/cluster_nodes.rs | 37 ++++++++++++++++++++----------- core/src/retransmit_stage.rs | 42 +++++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 7ce4d0647..633d74668 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -33,10 +33,17 @@ use { sync::{Arc, Mutex}, time::{Duration, Instant}, }, + thiserror::Error, }; pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4; +#[derive(Debug, Error)] +pub enum Error { + #[error("Loopback from slot leader: {leader}, shred: {shred:?}")] + Loopback { leader: Pubkey, shred: ShredId }, +} + #[allow(clippy::large_enum_variant)] enum NodeId { // TVU node obtained through gossip (staked or not). @@ -150,14 +157,14 @@ impl ClusterNodes { shred: &ShredId, root_bank: &Bank, fanout: usize, - ) -> (/*root_distance:*/ usize, Vec) { + ) -> Result<(/*root_distance:*/ usize, Vec), Error> { let RetransmitPeers { root_distance, neighbors, children, addrs, frwds, - } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?; if neighbors.is_empty() { let peers = children .into_iter() @@ -165,7 +172,7 @@ impl ClusterNodes { .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) .map(|node| node.tvu) .collect(); - return (root_distance, peers); + return Ok((root_distance, peers)); } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -177,7 +184,7 @@ impl ClusterNodes { .filter_map(Node::contact_info) .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) .map(|node| node.tvu_forwards); - return (root_distance, peers.collect()); + return Ok((root_distance, peers.collect())); } // First neighbor is this node itself, so skip it. let peers = neighbors[1..] @@ -192,7 +199,7 @@ impl ClusterNodes { .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) .map(|node| node.tvu), ); - (root_distance, peers.collect()) + Ok((root_distance, peers.collect())) } pub fn get_retransmit_peers( @@ -201,15 +208,19 @@ impl ClusterNodes { shred: &ShredId, root_bank: &Bank, fanout: usize, - ) -> RetransmitPeers { + ) -> Result { let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { - error!("retransmit from slot leader: {}", slot_leader); - } else if let Some(index) = self.index.get(slot_leader) { + return Err(Error::Loopback { + leader: *slot_leader, + shred: *shred, + }); + } + if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); - }; + } let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut frwds = HashMap::::with_capacity(self.nodes.len()); let mut rng = ChaChaRng::from_seed(shred_seed); @@ -241,13 +252,13 @@ impl ClusterNodes { 3 // If changed, update MAX_NUM_TURBINE_HOPS. }; let peers = get_retransmit_peers(fanout, self_index, &nodes); - return RetransmitPeers { + return Ok(RetransmitPeers { root_distance, neighbors: Vec::default(), children: peers.collect(), addrs, frwds, - }; + }); } let root_distance = if self_index == 0 { 0 @@ -262,13 +273,13 @@ impl ClusterNodes { // Assert that the node itself is included in the set of neighbors, at // the right offset. debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); - RetransmitPeers { + Ok(RetransmitPeers { root_distance, neighbors, children, addrs, frwds, - } + }) } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8c2568aa9..14bd70dee 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,7 +3,7 @@ use { crate::{ - cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS}, + cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, packet_hasher::PacketHasher, }, crossbeam_channel::{Receiver, RecvTimeoutError}, @@ -63,6 +63,7 @@ struct RetransmitStats { since: Instant, num_nodes: AtomicUsize, num_addrs_failed: AtomicUsize, + num_loopback_errs: AtomicUsize, num_shreds: usize, num_shreds_skipped: usize, num_small_batches: usize, @@ -100,6 +101,7 @@ impl RetransmitStats { ("num_small_batches", self.num_small_batches, i64), ("num_nodes", *self.num_nodes.get_mut(), i64), ("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64), + ("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64), ("num_shreds", self.num_shreds, i64), ("num_shreds_skipped", self.num_shreds_skipped, i64), ("retransmit_total", *self.retransmit_total.get_mut(), i64), @@ -118,6 +120,15 @@ impl RetransmitStats { let old = std::mem::replace(self, Self::new(Instant::now())); self.slot_stats = old.slot_stats; } + + fn record_error(&self, err: &Error) { + match err { + Error::Loopback { .. } => { + error!("retransmit_shred: {err}"); + self.num_loopback_errs.fetch_add(1, Ordering::Relaxed) + } + }; + } } // Map of shred (slot, index, type) => list of hash values seen for that key. @@ -245,7 +256,7 @@ fn retransmit( shreds .into_iter() .enumerate() - .map(|(index, ((key, shred), slot_leader, cluster_nodes))| { + .filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| { let (root_distance, num_nodes) = retransmit_shred( &key, &shred, @@ -255,15 +266,20 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - ); - (key.slot(), root_distance, num_nodes) + ) + .map_err(|err| { + stats.record_error(&err); + err + }) + .ok()?; + Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new(), record) } else { thread_pool.install(|| { shreds .into_par_iter() - .map(|((key, shred), slot_leader, cluster_nodes)| { + .filter_map(|((key, shred), slot_leader, cluster_nodes)| { let index = thread_pool.current_thread_index().unwrap(); let (root_distance, num_nodes) = retransmit_shred( &key, @@ -274,8 +290,13 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - ); - (key.slot(), root_distance, num_nodes) + ) + .map_err(|err| { + stats.record_error(&err); + err + }) + .ok()?; + Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -297,11 +318,11 @@ fn retransmit_shred( socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, stats: &RetransmitStats, -) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) { +) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { let mut compute_turbine_peers = Measure::start("turbine_start"); let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout); + cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?; let addrs: Vec<_> = addrs .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) @@ -332,7 +353,7 @@ fn retransmit_shred( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - (root_distance, num_nodes) + Ok((root_distance, num_nodes)) } /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. @@ -456,6 +477,7 @@ impl RetransmitStats { since: now, num_nodes: AtomicUsize::default(), num_addrs_failed: AtomicUsize::default(), + num_loopback_errs: AtomicUsize::default(), num_shreds: 0usize, num_shreds_skipped: 0usize, total_batches: 0usize,