diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 7ce4d0647..633d74668 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -33,10 +33,17 @@ use { sync::{Arc, Mutex}, time::{Duration, Instant}, }, + thiserror::Error, }; pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4; +#[derive(Debug, Error)] +pub enum Error { + #[error("Loopback from slot leader: {leader}, shred: {shred:?}")] + Loopback { leader: Pubkey, shred: ShredId }, +} + #[allow(clippy::large_enum_variant)] enum NodeId { // TVU node obtained through gossip (staked or not). @@ -150,14 +157,14 @@ impl ClusterNodes { shred: &ShredId, root_bank: &Bank, fanout: usize, - ) -> (/*root_distance:*/ usize, Vec) { + ) -> Result<(/*root_distance:*/ usize, Vec), Error> { let RetransmitPeers { root_distance, neighbors, children, addrs, frwds, - } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?; if neighbors.is_empty() { let peers = children .into_iter() @@ -165,7 +172,7 @@ impl ClusterNodes { .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) .map(|node| node.tvu) .collect(); - return (root_distance, peers); + return Ok((root_distance, peers)); } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -177,7 +184,7 @@ impl ClusterNodes { .filter_map(Node::contact_info) .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) .map(|node| node.tvu_forwards); - return (root_distance, peers.collect()); + return Ok((root_distance, peers.collect())); } // First neighbor is this node itself, so skip it. let peers = neighbors[1..] @@ -192,7 +199,7 @@ impl ClusterNodes { .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) .map(|node| node.tvu), ); - (root_distance, peers.collect()) + Ok((root_distance, peers.collect())) } pub fn get_retransmit_peers( @@ -201,15 +208,19 @@ impl ClusterNodes { shred: &ShredId, root_bank: &Bank, fanout: usize, - ) -> RetransmitPeers { + ) -> Result { let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { - error!("retransmit from slot leader: {}", slot_leader); - } else if let Some(index) = self.index.get(slot_leader) { + return Err(Error::Loopback { + leader: *slot_leader, + shred: *shred, + }); + } + if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); - }; + } let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut frwds = HashMap::::with_capacity(self.nodes.len()); let mut rng = ChaChaRng::from_seed(shred_seed); @@ -241,13 +252,13 @@ impl ClusterNodes { 3 // If changed, update MAX_NUM_TURBINE_HOPS. }; let peers = get_retransmit_peers(fanout, self_index, &nodes); - return RetransmitPeers { + return Ok(RetransmitPeers { root_distance, neighbors: Vec::default(), children: peers.collect(), addrs, frwds, - }; + }); } let root_distance = if self_index == 0 { 0 @@ -262,13 +273,13 @@ impl ClusterNodes { // Assert that the node itself is included in the set of neighbors, at // the right offset. debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); - RetransmitPeers { + Ok(RetransmitPeers { root_distance, neighbors, children, addrs, frwds, - } + }) } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8c2568aa9..14bd70dee 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,7 +3,7 @@ use { crate::{ - cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS}, + cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, packet_hasher::PacketHasher, }, crossbeam_channel::{Receiver, RecvTimeoutError}, @@ -63,6 +63,7 @@ struct RetransmitStats { since: Instant, num_nodes: AtomicUsize, num_addrs_failed: AtomicUsize, + num_loopback_errs: AtomicUsize, num_shreds: usize, num_shreds_skipped: usize, num_small_batches: usize, @@ -100,6 +101,7 @@ impl RetransmitStats { ("num_small_batches", self.num_small_batches, i64), ("num_nodes", *self.num_nodes.get_mut(), i64), ("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64), + ("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64), ("num_shreds", self.num_shreds, i64), ("num_shreds_skipped", self.num_shreds_skipped, i64), ("retransmit_total", *self.retransmit_total.get_mut(), i64), @@ -118,6 +120,15 @@ impl RetransmitStats { let old = std::mem::replace(self, Self::new(Instant::now())); self.slot_stats = old.slot_stats; } + + fn record_error(&self, err: &Error) { + match err { + Error::Loopback { .. } => { + error!("retransmit_shred: {err}"); + self.num_loopback_errs.fetch_add(1, Ordering::Relaxed) + } + }; + } } // Map of shred (slot, index, type) => list of hash values seen for that key. @@ -245,7 +256,7 @@ fn retransmit( shreds .into_iter() .enumerate() - .map(|(index, ((key, shred), slot_leader, cluster_nodes))| { + .filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| { let (root_distance, num_nodes) = retransmit_shred( &key, &shred, @@ -255,15 +266,20 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - ); - (key.slot(), root_distance, num_nodes) + ) + .map_err(|err| { + stats.record_error(&err); + err + }) + .ok()?; + Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new(), record) } else { thread_pool.install(|| { shreds .into_par_iter() - .map(|((key, shred), slot_leader, cluster_nodes)| { + .filter_map(|((key, shred), slot_leader, cluster_nodes)| { let index = thread_pool.current_thread_index().unwrap(); let (root_distance, num_nodes) = retransmit_shred( &key, @@ -274,8 +290,13 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - ); - (key.slot(), root_distance, num_nodes) + ) + .map_err(|err| { + stats.record_error(&err); + err + }) + .ok()?; + Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -297,11 +318,11 @@ fn retransmit_shred( socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, stats: &RetransmitStats, -) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) { +) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { let mut compute_turbine_peers = Measure::start("turbine_start"); let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout); + cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?; let addrs: Vec<_> = addrs .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) @@ -332,7 +353,7 @@ fn retransmit_shred( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - (root_distance, num_nodes) + Ok((root_distance, num_nodes)) } /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. @@ -456,6 +477,7 @@ impl RetransmitStats { since: now, num_nodes: AtomicUsize::default(), num_addrs_failed: AtomicUsize::default(), + num_loopback_errs: AtomicUsize::default(), num_shreds: 0usize, num_shreds_skipped: 0usize, total_batches: 0usize,