From 039488b562e3d0127827342fe3f65348c9caa018 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 19 Apr 2022 00:11:29 +0000 Subject: [PATCH] drops redundant turbine propagation path (#24351) Most nodes in the cluster receive the same shred from two different nodes: parent, and the first node of their neighborhood: https://github.com/solana-labs/solana/blob/a8c695ba5/core/src/cluster_nodes.rs#L178-L197 Because of the erasure codings, half of the shreds are already redundant. So this redundant propagation path will only add extra overhead. Additionally the very first node of the broadcast tree has 2x fanout (i.e. 400 nodes) which adds too much load at one node. This commit simplifies the broadcast tree by dropping the redundant propagation path and removing the 2x fanout at root node. --- core/src/cluster_nodes.rs | 150 +++++++++++++++++++++++++++++++++++++- sdk/src/feature_set.rs | 5 ++ 2 files changed, 152 insertions(+), 3 deletions(-) diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index eba231d682..e9436ed120 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -16,6 +16,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, + feature_set, pubkey::Pubkey, signature::Keypair, timing::timestamp, @@ -25,7 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, - iter::repeat_with, + iter::{once, repeat_with}, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -116,7 +117,7 @@ impl ClusterNodes { pub(crate) fn get_broadcast_addrs( &self, shred: &Shred, - _root_bank: &Bank, + root_bank: &Bank, fanout: usize, socket_addr_space: &SocketAddrSpace, ) -> Vec { @@ -146,6 +147,13 @@ impl ClusterNodes { if nodes.is_empty() { return Vec::default(); } + if drop_redundant_turbine_path(shred.slot(), root_bank) { + let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes)); + let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu); + return addrs + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect(); + } let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); neighbors[..1] .iter() @@ -175,6 +183,10 @@ impl ClusterNodes { ) -> Vec { let (neighbors, children) = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + if neighbors.is_empty() { + let peers = children.into_iter().filter_map(Node::contact_info); + return peers.map(|peer| peer.tvu).collect(); + } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it @@ -201,7 +213,7 @@ impl ClusterNodes { &self, slot_leader: Pubkey, shred: &Shred, - _root_bank: &Bank, + root_bank: &Bank, fanout: usize, ) -> ( Vec<&Node>, // neighbors @@ -224,6 +236,10 @@ impl ClusterNodes { .iter() .position(|node| node.pubkey() == self.pubkey) .unwrap(); + if drop_redundant_turbine_path(shred.slot(), root_bank) { + let peers = get_retransmit_peers(fanout, self_index, &nodes); + return (Vec::default(), peers.collect()); + } let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); // Assert that the node itself is included in the set of neighbors, at // the right offset. @@ -291,6 +307,37 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec( + fanout: usize, + index: usize, // Local node's index withing the nodes slice. + nodes: &[T], +) -> impl Iterator + '_ { + // Node's index within its neighborhood. + let offset = index.saturating_sub(1) % fanout; + // First node in the neighborhood. + let anchor = index - offset; + let step = if index == 0 { 1 } else { fanout }; + (anchor * fanout + offset + 1..) + .step_by(step) + .take(fanout) + .map(|i| nodes.get(i)) + .while_some() + .copied() +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -415,6 +462,21 @@ pub fn make_test_cluster( (nodes, stakes, cluster_info) } +fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool { + let feature_slot = root_bank + .feature_set + .activated_slot(&feature_set::drop_redundant_turbine_path::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = root_bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -477,4 +539,86 @@ mod tests { } } } + + #[test] + fn test_get_retransmit_peers() { + // fanout 2 + let index = vec![ + 7, // root + 6, 10, // 1st layer + // 2nd layer + 5, 19, // 1st neighborhood + 0, 14, // 2nd + // 3rd layer + 3, 1, // 1st neighborhood + 12, 2, // 2nd + 11, 4, // 3rd + 15, 18, // 4th + // 4th layer + 13, 16, // 1st neighborhood + 17, 9, // 2nd + 8, // 3rd + ]; + let peers = vec![ + vec![6, 10], + vec![5, 0], + vec![19, 14], + vec![3, 12], + vec![1, 2], + vec![11, 15], + vec![4, 18], + vec![13, 17], + vec![16, 9], + vec![8], + ]; + for (k, peers) in peers.into_iter().enumerate() { + let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); + assert_eq!(retransmit_peers.collect::>(), peers); + } + for k in 10..=index.len() { + let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); + assert_eq!(retransmit_peers.next(), None); + } + // fanout 3 + let index = vec![ + 19, // root + 14, 15, 28, // 1st layer + // 2nd layer + 29, 4, 5, // 1st neighborhood + 9, 16, 7, // 2nd + 26, 23, 2, // 3rd + // 3rd layer + 31, 3, 17, // 1st neighborhood + 20, 25, 0, // 2nd + 13, 30, 18, // 3rd + 35, 21, 22, // 4th + 6, 8, 11, // 5th + 27, 1, 10, // 6th + 12, 24, 34, // 7th + 33, 32, // 8th + ]; + let peers = vec![ + vec![14, 15, 28], + vec![29, 9, 26], + vec![4, 16, 23], + vec![5, 7, 2], + vec![31, 20, 13], + vec![3, 25, 30], + vec![17, 0, 18], + vec![35, 6, 27], + vec![21, 8, 1], + vec![22, 11, 10], + vec![12, 33], + vec![24, 32], + vec![34], + ]; + for (k, peers) in peers.into_iter().enumerate() { + let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); + assert_eq!(retransmit_peers.collect::>(), peers); + } + for k in 13..=index.len() { + let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); + assert_eq!(retransmit_peers.next(), None); + } + } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index bcf4924cda..3d59ef8e5c 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -343,6 +343,10 @@ pub mod reject_callx_r10 { solana_sdk::declare_id!("3NKRSwpySNwD3TvP5pHnRmkAQRsdkXWRr1WaQh8p4PWX"); } +pub mod drop_redundant_turbine_path { + solana_sdk::declare_id!("4Di3y24QFLt5QEUPZtbnjyfQKfm6ZMTfa6Dw1psfoMKU"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -423,6 +427,7 @@ lazy_static! { (add_get_minimum_delegation_instruction_to_stake_program::id(), "add GetMinimumDelegation instruction to stake program"), (error_on_syscall_bpf_function_hash_collisions::id(), "error on bpf function hash collisions"), (reject_callx_r10::id(), "Reject bpf callx r10 instructions"), + (drop_redundant_turbine_path::id(), "drop redundant turbine path"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()