removes feature gate code dropping redundant turbine path (#32075)

This commit is contained in:
behzad nouri 2023-06-16 19:53:05 +00:00 committed by GitHub
parent 174ceba82b
commit 987e8eeeaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 280 deletions

View File

@ -9,11 +9,7 @@ use {
retransmit_stage::RetransmitStage,
},
solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::{Shred, ShredFlags},
},
solana_runtime::bank::Bank,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::{clock::Slot, pubkey::Pubkey},
test::Bencher,
};
@ -33,7 +29,6 @@ fn get_retransmit_peers_deterministic(
cluster_nodes: &ClusterNodes<RetransmitStage>,
slot: Slot,
slot_leader: &Pubkey,
root_bank: &Bank,
num_simulated_shreds: usize,
) {
let parent_offset = u16::from(slot != 0);
@ -52,7 +47,6 @@ fn get_retransmit_peers_deterministic(
let _retransmit_peers = cluster_nodes.get_retransmit_peers(
slot_leader,
&shred.id(),
root_bank,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
);
}
@ -60,19 +54,11 @@ fn get_retransmit_peers_deterministic(
fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) {
let mut rng = rand::thread_rng();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_benches(&genesis_config);
let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio);
let slot_leader = *nodes[1..].choose(&mut rng).unwrap().pubkey();
let slot = rand::random::<u64>();
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,
slot,
&slot_leader,
&bank,
NUM_SIMULATED_SHREDS,
)
get_retransmit_peers_deterministic(&cluster_nodes, slot, &slot_leader, NUM_SIMULATED_SHREDS)
});
}

View File

@ -5,7 +5,7 @@ use {
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol},
crds::GossipRoute,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
@ -80,12 +80,10 @@ pub struct ClusterNodesCache<T> {
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
neighbors: Vec<&'a Node>,
children: Vec<&'a Node>,
// Maps from tvu/tvu_forwards addresses to the first node
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
frwds: HashMap<SocketAddr, Pubkey>, // tvu_forwards addresses
}
impl Node {
@ -166,54 +164,20 @@ impl ClusterNodes<RetransmitStage> {
&self,
slot_leader: &Pubkey,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> Result<(/*root_distance:*/ usize, Vec<SocketAddr>), Error> {
let RetransmitPeers {
root_distance,
neighbors,
children,
addrs,
frwds,
} = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?;
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
if neighbors.is_empty() {
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
return Ok((root_distance, peers.collect()));
}
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forwards socket of its children.
if neighbors[0].pubkey() != self.pubkey {
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu_forwards()
.ok()
.filter(|addr| frwds.get(addr) == Some(&node.pubkey()))
});
return Ok((root_distance, peers.collect()));
}
// First neighbor is this node itself, so skip it.
let peers = neighbors[1..]
.iter()
.filter_map(|node| {
node.contact_info()?
.tvu_forwards()
.ok()
.filter(|addr| frwds.get(addr) == Some(&node.pubkey()))
})
.chain(children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
}));
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
}
@ -221,7 +185,6 @@ impl ClusterNodes<RetransmitStage> {
&self,
slot_leader: &Pubkey,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
@ -237,9 +200,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut frwds = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let drop_redundant_turbine_path = drop_redundant_turbine_path(shred.slot(), root_bank);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
@ -249,11 +210,6 @@ impl ClusterNodes<RetransmitStage> {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
if !drop_redundant_turbine_path {
if let Ok(addr) = node.tvu_forwards() {
frwds.entry(addr).or_insert(*node.pubkey());
}
}
}
})
.collect();
@ -261,44 +217,20 @@ impl ClusterNodes<RetransmitStage> {
.iter()
.position(|node| node.pubkey() == self.pubkey)
.unwrap();
if drop_redundant_turbine_path {
let root_distance = if self_index == 0 {
0
} else if self_index <= fanout {
1
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return Ok(RetransmitPeers {
root_distance,
neighbors: Vec::default(),
children: peers.collect(),
addrs,
frwds,
});
}
let root_distance = if self_index == 0 {
0
} else if self_index < fanout {
} else if self_index <= fanout {
1
} else if self_index < fanout.saturating_add(1).saturating_mul(fanout) {
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
let peers = get_retransmit_peers(fanout, self_index, &nodes);
Ok(RetransmitPeers {
root_distance,
neighbors,
children,
children: peers.collect(),
addrs,
frwds,
})
}
}
@ -549,14 +481,6 @@ pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize
}
}
fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
&feature_set::drop_redundant_turbine_path::id(),
shred_slot,
root_bank,
)
}
fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
&feature_set::enable_turbine_fanout_experiments::id(),

View File

@ -320,7 +320,7 @@ fn retransmit_shred(
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?;
cluster_nodes.get_retransmit_addrs(slot_leader, key, data_plane_fanout)?;
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| socket_addr_space.check(addr))

View File

@ -2793,40 +2793,6 @@ fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>, stats: &GossipStat
Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT)
}
/// Turbine logic
/// 1 - For the current node find out if it is in layer 1
/// 1.1 - If yes, then broadcast to all layer 1 nodes
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
/// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake
pub fn compute_retransmit_peers<T: Copy>(
fanout: usize,
index: usize, // Local node's index withing the nodes slice.
nodes: &[T],
) -> (Vec<T> /*neighbors*/, Vec<T> /*children*/) {
// 1st layer: fanout nodes starting at 0
// 2nd layer: fanout**2 nodes starting at fanout
// 3rd layer: fanout**3 nodes starting at fanout + fanout**2
// ...
// Each layer is divided into neighborhoods of fanout nodes each.
let offset = index % fanout; // Node's index within its neighborhood.
let anchor = index - offset; // First node in the neighborhood.
let neighbors = (anchor..)
.take(fanout)
.map(|i| nodes.get(i).copied())
.while_some()
.collect();
let children = ((anchor + 1) * fanout + offset..)
.step_by(fanout)
.take(fanout)
.map(|i| nodes.get(i).copied())
.while_some()
.collect();
(neighbors, children)
}
#[derive(Debug)]
pub struct Sockets {
pub gossip: UdpSocket,
@ -3227,8 +3193,6 @@ mod tests {
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
},
itertools::izip,
rand::{seq::SliceRandom, SeedableRng},
rand_chacha::ChaChaRng,
regex::Regex,
solana_ledger::shred::Shredder,
solana_net_utils::MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
@ -4648,144 +4612,6 @@ RPC Enabled Nodes: 1"#;
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
}
#[test]
fn test_compute_retransmit_peers_small() {
const FANOUT: usize = 3;
let index = vec![
14, 15, 28, // 1st layer
// 2nd layer
29, 4, 5, // 1st neighborhood
9, 16, 7, // 2nd neighborhood
26, 23, 2, // 3rd neighborhood
// 3rd layer
31, 3, 17, // 1st neighborhood
20, 25, 0, // 2nd neighborhood
13, 30, 18, // 3rd neighborhood
19, 21, 22, // 4th neighborhood
6, 8, 11, // 5th neighborhood
27, 1, 10, // 6th neighborhood
12, 24, 34, // 7th neighborhood
33, 32, // 8th neighborhood
];
// 1st layer
assert_eq!(
compute_retransmit_peers(FANOUT, 0, &index),
(vec![14, 15, 28], vec![29, 9, 26])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 1, &index),
(vec![14, 15, 28], vec![4, 16, 23])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 2, &index),
(vec![14, 15, 28], vec![5, 7, 2])
);
// 2nd layer, 1st neighborhood
assert_eq!(
compute_retransmit_peers(FANOUT, 3, &index),
(vec![29, 4, 5], vec![31, 20, 13])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 4, &index),
(vec![29, 4, 5], vec![3, 25, 30])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 5, &index),
(vec![29, 4, 5], vec![17, 0, 18])
);
// 2nd layer, 2nd neighborhood
assert_eq!(
compute_retransmit_peers(FANOUT, 6, &index),
(vec![9, 16, 7], vec![19, 6, 27])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 7, &index),
(vec![9, 16, 7], vec![21, 8, 1])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 8, &index),
(vec![9, 16, 7], vec![22, 11, 10])
);
// 2nd layer, 3rd neighborhood
assert_eq!(
compute_retransmit_peers(FANOUT, 9, &index),
(vec![26, 23, 2], vec![12, 33])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 10, &index),
(vec![26, 23, 2], vec![24, 32])
);
assert_eq!(
compute_retransmit_peers(FANOUT, 11, &index),
(vec![26, 23, 2], vec![34])
);
// 3rd layer
let num_nodes = index.len();
for k in (12..num_nodes).step_by(3) {
let end = num_nodes.min(k + 3);
let neighbors = index[k..end].to_vec();
for i in k..end {
assert_eq!(
compute_retransmit_peers(FANOUT, i, &index),
(neighbors.clone(), vec![])
);
}
}
}
#[test]
fn test_compute_retransmit_peers_with_fanout_five() {
const FANOUT: usize = 5;
const NUM_NODES: usize = 2048;
const SEED: [u8; 32] = [0x55; 32];
let mut rng = ChaChaRng::from_seed(SEED);
let mut index: Vec<_> = (0..NUM_NODES).collect();
index.shuffle(&mut rng);
let (neighbors, children) = compute_retransmit_peers(FANOUT, 17, &index);
assert_eq!(neighbors, vec![1410, 1293, 1810, 552, 512]);
assert_eq!(children, vec![511, 1989, 283, 1606, 1154]);
}
#[test]
fn test_compute_retransmit_peers_large() {
const FANOUT: usize = 7;
const NUM_NODES: usize = 512;
let mut rng = rand::thread_rng();
let mut index: Vec<_> = (0..NUM_NODES).collect();
index.shuffle(&mut rng);
let pos: HashMap<usize, usize> = index
.iter()
.enumerate()
.map(|(i, node)| (*node, i))
.collect();
let mut seen = vec![0; NUM_NODES];
for i in 0..NUM_NODES {
let node = index[i];
let (neighbors, children) = compute_retransmit_peers(FANOUT, i, &index);
assert!(neighbors.len() <= FANOUT);
assert!(children.len() <= FANOUT);
// If x is neighbor of y then y is also neighbor of x.
for other in &neighbors {
let j = pos[other];
let (other_neighbors, _) = compute_retransmit_peers(FANOUT, j, &index);
assert!(other_neighbors.contains(&node));
}
for i in children {
seen[i] += 1;
}
}
// Except for the first layer, each node
// is child of exactly one other node.
let (seed, _) = compute_retransmit_peers(FANOUT, 0, &index);
for (i, k) in seen.into_iter().enumerate() {
if seed.contains(&i) {
assert_eq!(k, 0);
} else {
assert_eq!(k, 1);
}
}
}
#[test]
#[ignore] // TODO: debug why this is flaky on buildkite!
fn test_pull_request_time_pruning() {