experiments different turbine fanouts for propagating shreds (#29393)
The commit allocates 2% of slots to running experiments with different turbine fanouts based on the slot number. The experiment is feature gated with an additional feature to disable the experiment.
This commit is contained in:
parent
f2ba16ee87
commit
456d06785d
|
@ -5,7 +5,7 @@ use {
|
|||
rand::{seq::SliceRandom, Rng, SeedableRng},
|
||||
rand_chacha::ChaChaRng,
|
||||
solana_gossip::{
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo},
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||
contact_info::ContactInfo,
|
||||
crds::GossipRoute,
|
||||
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
|
||||
|
@ -35,6 +35,8 @@ use {
|
|||
},
|
||||
};
|
||||
|
||||
pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum NodeId {
|
||||
// TVU node obtained through gossip (staked or not).
|
||||
|
@ -233,8 +235,10 @@ impl ClusterNodes<RetransmitStage> {
|
|||
0
|
||||
} else if self_index <= fanout {
|
||||
1
|
||||
} else {
|
||||
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
|
||||
2
|
||||
} else {
|
||||
3 // If changed, update MAX_NUM_TURBINE_HOPS.
|
||||
};
|
||||
let peers = get_retransmit_peers(fanout, self_index, &nodes);
|
||||
return RetransmitPeers {
|
||||
|
@ -249,8 +253,10 @@ impl ClusterNodes<RetransmitStage> {
|
|||
0
|
||||
} else if self_index < fanout {
|
||||
1
|
||||
} else {
|
||||
} else if self_index < fanout.saturating_add(1).saturating_mul(fanout) {
|
||||
2
|
||||
} else {
|
||||
3 // If changed, update MAX_NUM_TURBINE_HOPS.
|
||||
};
|
||||
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
|
||||
// Assert that the node itself is included in the set of neighbors, at
|
||||
|
@ -480,11 +486,47 @@ pub fn make_test_cluster<R: Rng>(
|
|||
(nodes, stakes, cluster_info)
|
||||
}
|
||||
|
||||
pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize {
|
||||
if enable_turbine_fanout_experiments(shred_slot, root_bank) {
|
||||
// Allocate ~2% of slots to turbine fanout experiments.
|
||||
match shred_slot % 359 {
|
||||
11 => 64,
|
||||
61 => 768,
|
||||
111 => 128,
|
||||
161 => 640,
|
||||
211 => 256,
|
||||
261 => 512,
|
||||
311 => 384,
|
||||
_ => DATA_PLANE_FANOUT,
|
||||
}
|
||||
} else {
|
||||
DATA_PLANE_FANOUT
|
||||
}
|
||||
}
|
||||
|
||||
fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool {
|
||||
let feature_slot = root_bank
|
||||
.feature_set
|
||||
.activated_slot(&feature_set::drop_redundant_turbine_path::id());
|
||||
match feature_slot {
|
||||
check_feature_activation(
|
||||
&feature_set::drop_redundant_turbine_path::id(),
|
||||
shred_slot,
|
||||
root_bank,
|
||||
)
|
||||
}
|
||||
|
||||
fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool {
|
||||
check_feature_activation(
|
||||
&feature_set::enable_turbine_fanout_experiments::id(),
|
||||
shred_slot,
|
||||
root_bank,
|
||||
) && !check_feature_activation(
|
||||
&feature_set::disable_turbine_fanout_experiments::id(),
|
||||
shred_slot,
|
||||
root_bank,
|
||||
)
|
||||
}
|
||||
|
||||
// Returns true if the feature is effective for the shred slot.
|
||||
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
|
||||
match root_bank.feature_set.activated_slot(feature) {
|
||||
None => false,
|
||||
Some(feature_slot) => {
|
||||
let epoch_schedule = root_bank.epoch_schedule();
|
||||
|
|
|
@ -3,17 +3,14 @@
|
|||
|
||||
use {
|
||||
crate::{
|
||||
cluster_nodes::{ClusterNodes, ClusterNodesCache},
|
||||
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS},
|
||||
packet_hasher::PacketHasher,
|
||||
},
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError},
|
||||
itertools::{izip, Itertools},
|
||||
lru::LruCache,
|
||||
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
|
||||
solana_gossip::{
|
||||
cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
|
||||
contact_info::ContactInfo,
|
||||
},
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_ledger::{
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
shred::{self, ShredId},
|
||||
|
@ -56,8 +53,8 @@ struct RetransmitSlotStats {
|
|||
outset: u64, // 1st shred retransmit timestamp.
|
||||
// Number of shreds sent and received at different
|
||||
// distances from the turbine broadcast root.
|
||||
num_shreds_received: [usize; 3],
|
||||
num_shreds_sent: [usize; 3],
|
||||
num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS],
|
||||
num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS],
|
||||
}
|
||||
|
||||
struct RetransmitStats {
|
||||
|
@ -300,8 +297,9 @@ fn retransmit_shred(
|
|||
stats: &RetransmitStats,
|
||||
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
|
||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
|
||||
let (root_distance, addrs) =
|
||||
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT);
|
||||
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout);
|
||||
let addrs: Vec<_> = addrs
|
||||
.into_iter()
|
||||
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
|
||||
|
@ -441,7 +439,7 @@ impl AddAssign for RetransmitSlotStats {
|
|||
} else {
|
||||
self.outset.min(outset)
|
||||
};
|
||||
for k in 0..3 {
|
||||
for k in 0..MAX_NUM_TURBINE_HOPS {
|
||||
self.num_shreds_received[k] += num_shreds_received[k];
|
||||
self.num_shreds_sent[k] += num_shreds_sent[k];
|
||||
}
|
||||
|
@ -555,9 +553,15 @@ impl RetransmitSlotStats {
|
|||
self.num_shreds_received[2],
|
||||
i64
|
||||
),
|
||||
(
|
||||
"num_shreds_received_3rd_layer",
|
||||
self.num_shreds_received[3],
|
||||
i64
|
||||
),
|
||||
("num_shreds_sent_root", self.num_shreds_sent[0], i64),
|
||||
("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64),
|
||||
("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64),
|
||||
("num_shreds_sent_3rd_layer", self.num_shreds_sent[3], i64),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -558,6 +558,14 @@ pub mod commission_updates_only_allowed_in_first_half_of_epoch {
|
|||
solana_sdk::declare_id!("noRuG2kzACwgaY7TVmLRnUNPLKNVQE1fb7X55YWBehp");
|
||||
}
|
||||
|
||||
pub mod enable_turbine_fanout_experiments {
|
||||
solana_sdk::declare_id!("D31EFnLgdiysi84Woo3of4JMu7VmasUS3Z7j9HYXCeLY");
|
||||
}
|
||||
|
||||
pub mod disable_turbine_fanout_experiments {
|
||||
solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa");
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
/// Map of feature identifiers to user-visible description
|
||||
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
|
||||
|
@ -692,6 +700,8 @@ lazy_static! {
|
|||
(enable_alt_bn128_syscall::id(), "add alt_bn128 syscalls #27961"),
|
||||
(enable_program_redeployment_cooldown::id(), "enable program redeployment cooldown #29135"),
|
||||
(commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"),
|
||||
(enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"),
|
||||
(disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"),
|
||||
/*************** ADD NEW FEATURES HERE ***************/
|
||||
]
|
||||
.iter()
|
||||
|
|
Loading…
Reference in New Issue