reverts wide fanout in broadcast when the root node is down (#26359)

A change included in
https://github.com/solana-labs/solana/pull/20480
was that when the root node in turbine broadcast tree is down, the
leader will broadcast the shred to all nodes in the first layer.
The intention was to mitigate the impact of dead nodes on shreds
propagation, because if the root node is down, then the entire cluster
will miss out the shred.
On the other hand, if x% of stake is down, this will cause 200*x% + 1
packets/shreds ratio at the broadcast stage which might contribute to
line-rate saturation and packet drop.
To avoid this bandwidth saturation issue, this commit reverts that logic
and always broadcasts shreds from the leader only to the root node.
As before we rely on erasure codes to recover shreds lost due to staked
nodes being offline.
This commit is contained in:
behzad nouri 2022-08-16 19:40:06 +00:00 committed by GitHub
parent 67d1628602
commit 3b87aa9227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 84 deletions

View File

@ -14,7 +14,10 @@ use {
},
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::Itertools,
solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT},
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
},
solana_ledger::{blockstore::Blockstore, shred::Shred},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
@ -32,7 +35,6 @@ use {
},
std::{
collections::{HashMap, HashSet},
iter::repeat,
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
@ -390,8 +392,8 @@ fn update_peer_stats(
}
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// Broadcasts shreds from the leader (i.e. this node) to the root of the
/// turbine retransmit tree for each shred.
pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
@ -416,14 +418,10 @@ pub fn broadcast_shreds(
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
))
let node = cluster_nodes.get_broadcast_peer(&shred.id())?;
ContactInfo::is_valid_address(&node.tvu, socket_addr_space)
.then(|| (shred.payload(), node.tvu))
})
})
.collect();

View File

@ -3,7 +3,7 @@ use {
crate::cluster_nodes::ClusterNodesCache,
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_sdk::{
hash::Hash,
@ -270,12 +270,6 @@ impl BroadcastRun for BroadcastDuplicatesRun {
(bank_forks.root_bank(), bank_forks.working_bank())
};
let self_pubkey = cluster_info.id();
let nodes: Vec<_> = cluster_info
.all_peers()
.into_iter()
.map(|(node, _)| node)
.collect();
// Create cluster partition.
let cluster_partition: HashSet<Pubkey> = {
let mut cumilative_stake = 0;
@ -302,17 +296,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let addr = cluster_nodes
.get_broadcast_addrs(
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
)
.first()
.copied()?;
let node = nodes.iter().find(|node| node.tvu == addr)?;
if !socket_addr_space.check(&node.tvu) {
let node = cluster_nodes.get_broadcast_peer(&shred.id())?;
if ContactInfo::is_valid_address(&node.tvu, socket_addr_space) {
return None;
}
if self

View File

@ -26,7 +26,7 @@ use {
any::TypeId,
cmp::Reverse,
collections::HashMap,
iter::{once, repeat_with},
iter::repeat_with,
marker::PhantomData,
net::SocketAddr,
ops::Deref,
@ -114,62 +114,11 @@ impl ClusterNodes<BroadcastStage> {
new_cluster_nodes(cluster_info, stakes)
}
pub(crate) fn get_broadcast_addrs(
&self,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> {
const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60);
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let index = match self.weighted_shuffle.first(&mut rng) {
None => return Vec::default(),
Some(index) => index,
};
if let Some(node) = self.nodes[index].contact_info() {
let now = timestamp();
let age = Duration::from_millis(now.saturating_sub(node.wallclock));
if age < MAX_CONTACT_INFO_AGE
&& ContactInfo::is_valid_address(&node.tvu, socket_addr_space)
{
return vec![node.tvu];
}
}
let mut rng = ChaChaRng::from_seed(shred_seed);
let nodes: Vec<&Node> = self
.weighted_shuffle
.clone()
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.collect();
if nodes.is_empty() {
return Vec::default();
}
if drop_redundant_turbine_path(shred.slot(), root_bank) {
let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes));
let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu);
return addrs
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
}
let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes);
neighbors[..1]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu))
.chain(
neighbors[1..]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards)),
)
.chain(
children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu)),
)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
}