diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index eba231d682..e9436ed120 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -16,6 +16,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, + feature_set, pubkey::Pubkey, signature::Keypair, timing::timestamp, @@ -25,7 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, - iter::repeat_with, + iter::{once, repeat_with}, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -116,7 +117,7 @@ impl ClusterNodes { pub(crate) fn get_broadcast_addrs( &self, shred: &Shred, - _root_bank: &Bank, + root_bank: &Bank, fanout: usize, socket_addr_space: &SocketAddrSpace, ) -> Vec { @@ -146,6 +147,13 @@ impl ClusterNodes { if nodes.is_empty() { return Vec::default(); } + if drop_redundant_turbine_path(shred.slot(), root_bank) { + let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes)); + let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu); + return addrs + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect(); + } let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); neighbors[..1] .iter() @@ -175,6 +183,10 @@ impl ClusterNodes { ) -> Vec { let (neighbors, children) = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + if neighbors.is_empty() { + let peers = children.into_iter().filter_map(Node::contact_info); + return peers.map(|peer| peer.tvu).collect(); + } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it @@ -201,7 +213,7 @@ impl ClusterNodes { &self, slot_leader: Pubkey, shred: &Shred, - _root_bank: &Bank, + root_bank: &Bank, fanout: usize, ) -> ( Vec<&Node>, // neighbors @@ -224,6 +236,10 @@ impl ClusterNodes { .iter() .position(|node| node.pubkey() == self.pubkey) .unwrap(); + if drop_redundant_turbine_path(shred.slot(), root_bank) { + let peers = get_retransmit_peers(fanout, self_index, &nodes); + return (Vec::default(), peers.collect()); + } let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); // Assert that the node itself is included in the set of neighbors, at // the right offset. @@ -291,6 +307,37 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec( + fanout: usize, + index: usize, // Local node's index withing the nodes slice. + nodes: &[T], +) -> impl Iterator + '_ { + // Node's index within its neighborhood. + let offset = index.saturating_sub(1) % fanout; + // First node in the neighborhood. + let anchor = index - offset; + let step = if index == 0 { 1 } else { fanout }; + (anchor * fanout + offset + 1..) + .step_by(step) + .take(fanout) + .map(|i| nodes.get(i)) + .while_some() + .copied() +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -415,6 +462,21 @@ pub fn make_test_cluster( (nodes, stakes, cluster_info) } +fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool { + let feature_slot = root_bank + .feature_set + .activated_slot(&feature_set::drop_redundant_turbine_path::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = root_bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -477,4 +539,86 @@ mod tests { } } } + + #[test] + fn test_get_retransmit_peers() { + // fanout 2 + let index = vec![ + 7, // root + 6, 10, // 1st layer + // 2nd layer + 5, 19, // 1st neighborhood + 0, 14, // 2nd + // 3rd layer + 3, 1, // 1st neighborhood + 12, 2, // 2nd + 11, 4, // 3rd + 15, 18, // 4th + // 4th layer + 13, 16, // 1st neighborhood + 17, 9, // 2nd + 8, // 3rd + ]; + let peers = vec![ + vec![6, 10], + vec![5, 0], + vec![19, 14], + vec![3, 12], + vec![1, 2], + vec![11, 15], + vec![4, 18], + vec![13, 17], + vec![16, 9], + vec![8], + ]; + for (k, peers) in peers.into_iter().enumerate() { + let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); + assert_eq!(retransmit_peers.collect::>(), peers); + } + for k in 10..=index.len() { + let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); + assert_eq!(retransmit_peers.next(), None); + } + // fanout 3 + let index = vec![ + 19, // root + 14, 15, 28, // 1st layer + // 2nd layer + 29, 4, 5, // 1st neighborhood + 9, 16, 7, // 2nd + 26, 23, 2, // 3rd + // 3rd layer + 31, 3, 17, // 1st neighborhood + 20, 25, 0, // 2nd + 13, 30, 18, // 3rd + 35, 21, 22, // 4th + 6, 8, 11, // 5th + 27, 1, 10, // 6th + 12, 24, 34, // 7th + 33, 32, // 8th + ]; + let peers = vec![ + vec![14, 15, 28], + vec![29, 9, 26], + vec![4, 16, 23], + vec![5, 7, 2], + vec![31, 20, 13], + vec![3, 25, 30], + vec![17, 0, 18], + vec![35, 6, 27], + vec![21, 8, 1], + vec![22, 11, 10], + vec![12, 33], + vec![24, 32], + vec![34], + ]; + for (k, peers) in peers.into_iter().enumerate() { + let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); + assert_eq!(retransmit_peers.collect::>(), peers); + } + for k in 13..=index.len() { + let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); + assert_eq!(retransmit_peers.next(), None); + } + } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index bcf4924cda..3d59ef8e5c 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -343,6 +343,10 @@ pub mod reject_callx_r10 { solana_sdk::declare_id!("3NKRSwpySNwD3TvP5pHnRmkAQRsdkXWRr1WaQh8p4PWX"); } +pub mod drop_redundant_turbine_path { + solana_sdk::declare_id!("4Di3y24QFLt5QEUPZtbnjyfQKfm6ZMTfa6Dw1psfoMKU"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -423,6 +427,7 @@ lazy_static! { (add_get_minimum_delegation_instruction_to_stake_program::id(), "add GetMinimumDelegation instruction to stake program"), (error_on_syscall_bpf_function_hash_collisions::id(), "error on bpf function hash collisions"), (reject_callx_r10::id(), "Reject bpf callx r10 instructions"), + (drop_redundant_turbine_path::id(), "drop redundant turbine path"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()