dedups turbine retransmit peers by tvu socket addresses (#28944)

No need to send duplicate shreds if several nodes have the same tvu
socket address because they are behind a relayer or whatever.
This commit is contained in:
behzad nouri 2022-11-28 19:23:02 +00:00 committed by GitHub
parent 7e87998091
commit 7d99cddb9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 23 deletions

View File

@ -49,7 +49,7 @@ fn get_retransmit_peers_deterministic(
0, 0,
0, 0,
); );
let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers( let _retransmit_peers = cluster_nodes.get_retransmit_peers(
slot_leader, slot_leader,
&shred.id(), &shred.id(),
root_bank, root_bank,

View File

@ -68,6 +68,16 @@ pub struct ClusterNodesCache<T> {
ttl: Duration, // Time to live. ttl: Duration, // Time to live.
} }
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
neighbors: Vec<&'a Node>,
children: Vec<&'a Node>,
// Maps from tvu/tvu_forwards addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
frwds: HashMap<SocketAddr, Pubkey>, // tvu_forwards addresses
}
impl Node { impl Node {
#[inline] #[inline]
fn pubkey(&self) -> Pubkey { fn pubkey(&self) -> Pubkey {
@ -139,33 +149,48 @@ impl ClusterNodes<RetransmitStage> {
root_bank: &Bank, root_bank: &Bank,
fanout: usize, fanout: usize,
) -> (/*root_distance:*/ usize, Vec<SocketAddr>) { ) -> (/*root_distance:*/ usize, Vec<SocketAddr>) {
let (root_distance, neighbors, children) = let RetransmitPeers {
self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); root_distance,
neighbors,
children,
addrs,
frwds,
} = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout);
if neighbors.is_empty() { if neighbors.is_empty() {
let peers = children.into_iter().filter_map(Node::contact_info); let peers = children
let addrs = peers.map(|peer| peer.tvu).collect(); .into_iter()
return (root_distance, addrs); .filter_map(Node::contact_info)
.filter(|node| addrs.get(&node.tvu) == Some(&node.id))
.map(|node| node.tvu)
.collect();
return (root_distance, peers);
} }
// If the node is on the critical path (i.e. the first node in each // If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its // neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it // children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forwards socket of its children. // should only forward to tvu_forwards socket of its children.
if neighbors[0].pubkey() != self.pubkey { if neighbors[0].pubkey() != self.pubkey {
let addrs = children let peers = children
.iter() .into_iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards)); .filter_map(Node::contact_info)
return (root_distance, addrs.collect()); .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id))
.map(|node| node.tvu_forwards);
return (root_distance, peers.collect());
} }
// First neighbor is this node itself, so skip it. // First neighbor is this node itself, so skip it.
let addrs = neighbors[1..] let peers = neighbors[1..]
.iter() .iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards)) .filter_map(|node| node.contact_info())
.filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id))
.map(|node| node.tvu_forwards)
.chain( .chain(
children children
.iter() .into_iter()
.filter_map(|node| Some(node.contact_info()?.tvu)), .filter_map(Node::contact_info)
.filter(|node| addrs.get(&node.tvu) == Some(&node.id))
.map(|node| node.tvu),
); );
(root_distance, addrs.collect()) (root_distance, peers.collect())
} }
pub fn get_retransmit_peers( pub fn get_retransmit_peers(
@ -174,11 +199,7 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId, shred: &ShredId,
root_bank: &Bank, root_bank: &Bank,
fanout: usize, fanout: usize,
) -> ( ) -> RetransmitPeers {
usize, // distance from the root node
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
let shred_seed = shred.seed(slot_leader); let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone(); let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes. // Exclude slot leader from list of nodes.
@ -187,16 +208,27 @@ impl ClusterNodes<RetransmitStage> {
} else if let Some(index) = self.index.get(slot_leader) { } else if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index); weighted_shuffle.remove_index(*index);
}; };
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut frwds = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed); let mut rng = ChaChaRng::from_seed(shred_seed);
let drop_redundant_turbine_path = drop_redundant_turbine_path(shred.slot(), root_bank);
let nodes: Vec<_> = weighted_shuffle let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng) .shuffle(&mut rng)
.map(|index| &self.nodes[index]) .map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
addrs.entry(node.tvu).or_insert(node.id);
if !drop_redundant_turbine_path {
frwds.entry(node.tvu_forwards).or_insert(node.id);
}
}
})
.collect(); .collect();
let self_index = nodes let self_index = nodes
.iter() .iter()
.position(|node| node.pubkey() == self.pubkey) .position(|node| node.pubkey() == self.pubkey)
.unwrap(); .unwrap();
if drop_redundant_turbine_path(shred.slot(), root_bank) { if drop_redundant_turbine_path {
let root_distance = if self_index == 0 { let root_distance = if self_index == 0 {
0 0
} else if self_index <= fanout { } else if self_index <= fanout {
@ -205,7 +237,13 @@ impl ClusterNodes<RetransmitStage> {
2 2
}; };
let peers = get_retransmit_peers(fanout, self_index, &nodes); let peers = get_retransmit_peers(fanout, self_index, &nodes);
return (root_distance, Vec::default(), peers.collect()); return RetransmitPeers {
root_distance,
neighbors: Vec::default(),
children: peers.collect(),
addrs,
frwds,
};
} }
let root_distance = if self_index == 0 { let root_distance = if self_index == 0 {
0 0
@ -218,7 +256,13 @@ impl ClusterNodes<RetransmitStage> {
// Assert that the node itself is included in the set of neighbors, at // Assert that the node itself is included in the set of neighbors, at
// the right offset. // the right offset.
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
(root_distance, neighbors, children) RetransmitPeers {
root_distance,
neighbors,
children,
addrs,
frwds,
}
} }
} }