diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index 2e35919388..e9f74bc9b9 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -49,7 +49,7 @@ fn get_retransmit_peers_deterministic( 0, 0, ); - let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers( + let _retransmit_peers = cluster_nodes.get_retransmit_peers( slot_leader, &shred.id(), root_bank, diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 1e1283e81c..270c839905 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -68,6 +68,16 @@ pub struct ClusterNodesCache { ttl: Duration, // Time to live. } +pub struct RetransmitPeers<'a> { + root_distance: usize, // distance from the root node + neighbors: Vec<&'a Node>, + children: Vec<&'a Node>, + // Maps from tvu/tvu_forwards addresses to the first node + // in the shuffle with the same address. + addrs: HashMap, // tvu addresses + frwds: HashMap, // tvu_forwards addresses +} + impl Node { #[inline] fn pubkey(&self) -> Pubkey { @@ -139,33 +149,48 @@ impl ClusterNodes { root_bank: &Bank, fanout: usize, ) -> (/*root_distance:*/ usize, Vec) { - let (root_distance, neighbors, children) = - self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + let RetransmitPeers { + root_distance, + neighbors, + children, + addrs, + frwds, + } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); if neighbors.is_empty() { - let peers = children.into_iter().filter_map(Node::contact_info); - let addrs = peers.map(|peer| peer.tvu).collect(); - return (root_distance, addrs); + let peers = children + .into_iter() + .filter_map(Node::contact_info) + .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) + .map(|node| node.tvu) + .collect(); + return (root_distance, peers); } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it // should only forward to tvu_forwards socket of its children. if neighbors[0].pubkey() != self.pubkey { - let addrs = children - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)); - return (root_distance, addrs.collect()); + let peers = children + .into_iter() + .filter_map(Node::contact_info) + .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) + .map(|node| node.tvu_forwards); + return (root_distance, peers.collect()); } // First neighbor is this node itself, so skip it. - let addrs = neighbors[1..] + let peers = neighbors[1..] .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) + .filter_map(|node| node.contact_info()) + .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) + .map(|node| node.tvu_forwards) .chain( children - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)), + .into_iter() + .filter_map(Node::contact_info) + .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) + .map(|node| node.tvu), ); - (root_distance, addrs.collect()) + (root_distance, peers.collect()) } pub fn get_retransmit_peers( @@ -174,11 +199,7 @@ impl ClusterNodes { shred: &ShredId, root_bank: &Bank, fanout: usize, - ) -> ( - usize, // distance from the root node - Vec<&Node>, // neighbors - Vec<&Node>, // children - ) { + ) -> RetransmitPeers { let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. @@ -187,16 +208,27 @@ impl ClusterNodes { } else if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); }; + let mut addrs = HashMap::::with_capacity(self.nodes.len()); + let mut frwds = HashMap::::with_capacity(self.nodes.len()); let mut rng = ChaChaRng::from_seed(shred_seed); + let drop_redundant_turbine_path = drop_redundant_turbine_path(shred.slot(), root_bank); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) .map(|index| &self.nodes[index]) + .inspect(|node| { + if let Some(node) = node.contact_info() { + addrs.entry(node.tvu).or_insert(node.id); + if !drop_redundant_turbine_path { + frwds.entry(node.tvu_forwards).or_insert(node.id); + } + } + }) .collect(); let self_index = nodes .iter() .position(|node| node.pubkey() == self.pubkey) .unwrap(); - if drop_redundant_turbine_path(shred.slot(), root_bank) { + if drop_redundant_turbine_path { let root_distance = if self_index == 0 { 0 } else if self_index <= fanout { @@ -205,7 +237,13 @@ impl ClusterNodes { 2 }; let peers = get_retransmit_peers(fanout, self_index, &nodes); - return (root_distance, Vec::default(), peers.collect()); + return RetransmitPeers { + root_distance, + neighbors: Vec::default(), + children: peers.collect(), + addrs, + frwds, + }; } let root_distance = if self_index == 0 { 0 @@ -218,7 +256,13 @@ impl ClusterNodes { // Assert that the node itself is included in the set of neighbors, at // the right offset. debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); - (root_distance, neighbors, children) + RetransmitPeers { + root_distance, + neighbors, + children, + addrs, + frwds, + } } }