drops redundant turbine propagation path (#24351)

Most nodes in the cluster receive the same shred from two different
nodes: parent, and the first node of their neighborhood:
https://github.com/solana-labs/solana/blob/a8c695ba5/core/src/cluster_nodes.rs#L178-L197

Because of the erasure codings, half of the shreds are already
redundant. So this redundant propagation path will only add extra
overhead.

Additionally the very first node of the broadcast tree has 2x fanout
(i.e. 400 nodes) which adds too much load at one node.

This commit simplifies the broadcast tree by dropping the redundant
propagation path and removing the 2x fanout at root node.
This commit is contained in:
behzad nouri 2022-04-19 00:11:29 +00:00 committed by GitHub
parent 1d50832389
commit 039488b562
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 152 additions and 3 deletions

View File

@ -16,6 +16,7 @@ use {
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
signature::Keypair,
timing::timestamp,
@ -25,7 +26,7 @@ use {
any::TypeId,
cmp::Reverse,
collections::HashMap,
iter::repeat_with,
iter::{once, repeat_with},
marker::PhantomData,
net::SocketAddr,
ops::Deref,
@ -116,7 +117,7 @@ impl ClusterNodes<BroadcastStage> {
pub(crate) fn get_broadcast_addrs(
&self,
shred: &Shred,
_root_bank: &Bank,
root_bank: &Bank,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> {
@ -146,6 +147,13 @@ impl ClusterNodes<BroadcastStage> {
if nodes.is_empty() {
return Vec::default();
}
if drop_redundant_turbine_path(shred.slot(), root_bank) {
let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes));
let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu);
return addrs
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
}
let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes);
neighbors[..1]
.iter()
@ -175,6 +183,10 @@ impl ClusterNodes<RetransmitStage> {
) -> Vec<SocketAddr> {
let (neighbors, children) =
self.get_retransmit_peers(slot_leader, shred, root_bank, fanout);
if neighbors.is_empty() {
let peers = children.into_iter().filter_map(Node::contact_info);
return peers.map(|peer| peer.tvu).collect();
}
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
@ -201,7 +213,7 @@ impl ClusterNodes<RetransmitStage> {
&self,
slot_leader: Pubkey,
shred: &Shred,
_root_bank: &Bank,
root_bank: &Bank,
fanout: usize,
) -> (
Vec<&Node>, // neighbors
@ -224,6 +236,10 @@ impl ClusterNodes<RetransmitStage> {
.iter()
.position(|node| node.pubkey() == self.pubkey)
.unwrap();
if drop_redundant_turbine_path(shred.slot(), root_bank) {
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return (Vec::default(), peers.collect());
}
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
@ -291,6 +307,37 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}
// root : [0]
// 1st layer: [1, 2, ..., fanout]
// 2nd layer: [[fanout + 1, ..., fanout * 2],
// [fanout * 2 + 1, ..., fanout * 3],
// ...
// [fanout * fanout + 1, ..., fanout * (fanout + 1)]]
// 3rd layer: ...
// ...
// The leader node broadcasts shreds to the root node.
// The root node retransmits the shreds to all nodes in the 1st layer.
// Each other node retransmits shreds to fanout many nodes in the next layer.
// For example the node k in the 1st layer will retransmit to nodes:
// fanout + k, 2*fanout + k, ..., fanout*fanout + k
fn get_retransmit_peers<T: Copy>(
fanout: usize,
index: usize, // Local node's index withing the nodes slice.
nodes: &[T],
) -> impl Iterator<Item = T> + '_ {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
// First node in the neighborhood.
let anchor = index - offset;
let step = if index == 0 { 1 } else { fanout };
(anchor * fanout + offset + 1..)
.step_by(step)
.take(fanout)
.map(|i| nodes.get(i))
.while_some()
.copied()
}
impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
@ -415,6 +462,21 @@ pub fn make_test_cluster<R: Rng>(
(nodes, stakes, cluster_info)
}
fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::drop_redundant_turbine_path::id());
match feature_slot {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
let feature_epoch = epoch_schedule.get_epoch(feature_slot);
let shred_epoch = epoch_schedule.get_epoch(shred_slot);
feature_epoch < shred_epoch
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -477,4 +539,86 @@ mod tests {
}
}
}
#[test]
fn test_get_retransmit_peers() {
// fanout 2
let index = vec![
7, // root
6, 10, // 1st layer
// 2nd layer
5, 19, // 1st neighborhood
0, 14, // 2nd
// 3rd layer
3, 1, // 1st neighborhood
12, 2, // 2nd
11, 4, // 3rd
15, 18, // 4th
// 4th layer
13, 16, // 1st neighborhood
17, 9, // 2nd
8, // 3rd
];
let peers = vec![
vec![6, 10],
vec![5, 0],
vec![19, 14],
vec![3, 12],
vec![1, 2],
vec![11, 15],
vec![4, 18],
vec![13, 17],
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
// fanout 3
let index = vec![
19, // root
14, 15, 28, // 1st layer
// 2nd layer
29, 4, 5, // 1st neighborhood
9, 16, 7, // 2nd
26, 23, 2, // 3rd
// 3rd layer
31, 3, 17, // 1st neighborhood
20, 25, 0, // 2nd
13, 30, 18, // 3rd
35, 21, 22, // 4th
6, 8, 11, // 5th
27, 1, 10, // 6th
12, 24, 34, // 7th
33, 32, // 8th
];
let peers = vec![
vec![14, 15, 28],
vec![29, 9, 26],
vec![4, 16, 23],
vec![5, 7, 2],
vec![31, 20, 13],
vec![3, 25, 30],
vec![17, 0, 18],
vec![35, 6, 27],
vec![21, 8, 1],
vec![22, 11, 10],
vec![12, 33],
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
}
}

View File

@ -343,6 +343,10 @@ pub mod reject_callx_r10 {
solana_sdk::declare_id!("3NKRSwpySNwD3TvP5pHnRmkAQRsdkXWRr1WaQh8p4PWX");
}
pub mod drop_redundant_turbine_path {
solana_sdk::declare_id!("4Di3y24QFLt5QEUPZtbnjyfQKfm6ZMTfa6Dw1psfoMKU");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -423,6 +427,7 @@ lazy_static! {
(add_get_minimum_delegation_instruction_to_stake_program::id(), "add GetMinimumDelegation instruction to stake program"),
(error_on_syscall_bpf_function_hash_collisions::id(), "error on bpf function hash collisions"),
(reject_callx_r10::id(), "Reject bpf callx r10 instructions"),
(drop_redundant_turbine_path::id(), "drop redundant turbine path"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()