errors out when retransmit loopbacks to the slot leader (#29789)

When broadcasting shreds, turbine excludes the slot leader from the
random shuffle. Doing so, shreds should never loopback to the leader.
If shreds reaching retransmit stage are from the node's own leader slots
they should not be retransmited to any nodes.
This commit is contained in:
behzad nouri 2023-01-20 17:20:51 +00:00 committed by GitHub
parent 5f7fea100a
commit 64c13b74d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 23 deletions

View File

@ -33,10 +33,17 @@ use {
sync::{Arc, Mutex},
time::{Duration, Instant},
},
thiserror::Error,
};
pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;
#[derive(Debug, Error)]
pub enum Error {
#[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
Loopback { leader: Pubkey, shred: ShredId },
}
#[allow(clippy::large_enum_variant)]
enum NodeId {
// TVU node obtained through gossip (staked or not).
@ -150,14 +157,14 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> (/*root_distance:*/ usize, Vec<SocketAddr>) {
) -> Result<(/*root_distance:*/ usize, Vec<SocketAddr>), Error> {
let RetransmitPeers {
root_distance,
neighbors,
children,
addrs,
frwds,
} = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout);
} = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?;
if neighbors.is_empty() {
let peers = children
.into_iter()
@ -165,7 +172,7 @@ impl ClusterNodes<RetransmitStage> {
.filter(|node| addrs.get(&node.tvu) == Some(&node.id))
.map(|node| node.tvu)
.collect();
return (root_distance, peers);
return Ok((root_distance, peers));
}
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
@ -177,7 +184,7 @@ impl ClusterNodes<RetransmitStage> {
.filter_map(Node::contact_info)
.filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id))
.map(|node| node.tvu_forwards);
return (root_distance, peers.collect());
return Ok((root_distance, peers.collect()));
}
// First neighbor is this node itself, so skip it.
let peers = neighbors[1..]
@ -192,7 +199,7 @@ impl ClusterNodes<RetransmitStage> {
.filter(|node| addrs.get(&node.tvu) == Some(&node.id))
.map(|node| node.tvu),
);
(root_distance, peers.collect())
Ok((root_distance, peers.collect()))
}
pub fn get_retransmit_peers(
@ -201,15 +208,19 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> RetransmitPeers {
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
} else if let Some(index) = self.index.get(slot_leader) {
return Err(Error::Loopback {
leader: *slot_leader,
shred: *shred,
});
}
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
};
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut frwds = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
@ -241,13 +252,13 @@ impl ClusterNodes<RetransmitStage> {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return RetransmitPeers {
return Ok(RetransmitPeers {
root_distance,
neighbors: Vec::default(),
children: peers.collect(),
addrs,
frwds,
};
});
}
let root_distance = if self_index == 0 {
0
@ -262,13 +273,13 @@ impl ClusterNodes<RetransmitStage> {
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
RetransmitPeers {
Ok(RetransmitPeers {
root_distance,
neighbors,
children,
addrs,
frwds,
}
})
}
}

View File

@ -3,7 +3,7 @@
use {
crate::{
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
packet_hasher::PacketHasher,
},
crossbeam_channel::{Receiver, RecvTimeoutError},
@ -63,6 +63,7 @@ struct RetransmitStats {
since: Instant,
num_nodes: AtomicUsize,
num_addrs_failed: AtomicUsize,
num_loopback_errs: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: usize,
num_small_batches: usize,
@ -100,6 +101,7 @@ impl RetransmitStats {
("num_small_batches", self.num_small_batches, i64),
("num_nodes", *self.num_nodes.get_mut(), i64),
("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64),
("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64),
("num_shreds", self.num_shreds, i64),
("num_shreds_skipped", self.num_shreds_skipped, i64),
("retransmit_total", *self.retransmit_total.get_mut(), i64),
@ -118,6 +120,15 @@ impl RetransmitStats {
let old = std::mem::replace(self, Self::new(Instant::now()));
self.slot_stats = old.slot_stats;
}
fn record_error(&self, err: &Error) {
match err {
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
self.num_loopback_errs.fetch_add(1, Ordering::Relaxed)
}
};
}
}
// Map of shred (slot, index, type) => list of hash values seen for that key.
@ -245,7 +256,7 @@ fn retransmit(
shreds
.into_iter()
.enumerate()
.map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
.filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
@ -255,15 +266,20 @@ fn retransmit(
socket_addr_space,
&sockets[index % sockets.len()],
stats,
);
(key.slot(), root_distance, num_nodes)
)
.map_err(|err| {
stats.record_error(&err);
err
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new(), record)
} else {
thread_pool.install(|| {
shreds
.into_par_iter()
.map(|((key, shred), slot_leader, cluster_nodes)| {
.filter_map(|((key, shred), slot_leader, cluster_nodes)| {
let index = thread_pool.current_thread_index().unwrap();
let (root_distance, num_nodes) = retransmit_shred(
&key,
@ -274,8 +290,13 @@ fn retransmit(
socket_addr_space,
&sockets[index % sockets.len()],
stats,
);
(key.slot(), root_distance, num_nodes)
)
.map_err(|err| {
stats.record_error(&err);
err
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new, record)
.reduce(HashMap::new, RetransmitSlotStats::merge)
@ -297,11 +318,11 @@ fn retransmit_shred(
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
stats: &RetransmitStats,
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout);
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?;
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
@ -332,7 +353,7 @@ fn retransmit_shred(
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
(root_distance, num_nodes)
Ok((root_distance, num_nodes))
}
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
@ -456,6 +477,7 @@ impl RetransmitStats {
since: now,
num_nodes: AtomicUsize::default(),
num_addrs_failed: AtomicUsize::default(),
num_loopback_errs: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: 0usize,
total_batches: 0usize,