removes turbine peers shuffle patch feature

This commit is contained in:
behzad nouri 2022-03-30 11:43:08 -04:00
parent 855801cc95
commit d0b850cdd9
3 changed files with 26 additions and 227 deletions

View File

@ -9,7 +9,12 @@ use {
retransmit_stage::RetransmitStage, retransmit_stage::RetransmitStage,
}, },
solana_gossip::contact_info::ContactInfo, solana_gossip::contact_info::ContactInfo,
solana_sdk::{clock::Slot, hash::hashv, pubkey::Pubkey, signature::Signature}, solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::Shred,
},
solana_runtime::bank::Bank,
solana_sdk::pubkey::Pubkey,
test::Bencher, test::Bencher,
}; };
@ -26,87 +31,48 @@ fn make_cluster_nodes<R: Rng>(
fn get_retransmit_peers_deterministic( fn get_retransmit_peers_deterministic(
cluster_nodes: &ClusterNodes<RetransmitStage>, cluster_nodes: &ClusterNodes<RetransmitStage>,
slot: &Slot, shred: &mut Shred,
slot_leader: &Pubkey, slot_leader: &Pubkey,
root_bank: &Bank,
num_simulated_shreds: usize, num_simulated_shreds: usize,
) { ) {
for i in 0..num_simulated_shreds { for i in 0..num_simulated_shreds {
// see Shred::seed shred.common_header.index = i as u32;
let shred_seed = hashv(&[ let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
&slot.to_le_bytes(),
&(i as u32).to_le_bytes(),
&slot_leader.to_bytes(),
])
.to_bytes();
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_deterministic(
shred_seed,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
*slot_leader, *slot_leader,
); shred,
} root_bank,
}
fn get_retransmit_peers_compat(
cluster_nodes: &ClusterNodes<RetransmitStage>,
slot_leader: &Pubkey,
signatures: &[Signature],
) {
for signature in signatures.iter() {
// see Shred::seed
let signature = signature.as_ref();
let offset = signature.len().checked_sub(32).unwrap();
let shred_seed = signature[offset..].try_into().unwrap();
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_compat(
shred_seed,
solana_gossip::cluster_info::DATA_PLANE_FANOUT, solana_gossip::cluster_info::DATA_PLANE_FANOUT,
*slot_leader,
); );
} }
} }
fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) { fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_benches(&genesis_config);
let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio); let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio);
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>(); let slot = rand::random::<u64>();
let mut shred = Shred::new_empty_data_shred();
shred.common_header.slot = slot;
b.iter(|| { b.iter(|| {
get_retransmit_peers_deterministic( get_retransmit_peers_deterministic(
&cluster_nodes, &cluster_nodes,
&slot, &mut shred,
&slot_leader, &slot_leader,
&bank,
NUM_SIMULATED_SHREDS, NUM_SIMULATED_SHREDS,
) )
}); });
} }
fn get_retransmit_peers_compat_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) {
let mut rng = rand::thread_rng();
let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio);
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let signatures: Vec<_> = std::iter::repeat_with(Signature::new_unique)
.take(NUM_SIMULATED_SHREDS)
.collect();
b.iter(|| get_retransmit_peers_compat(&cluster_nodes, &slot_leader, &signatures));
}
#[bench] #[bench]
fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_2(b: &mut Bencher) { fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_2(b: &mut Bencher) {
get_retransmit_peers_deterministic_wrapper(b, Some((1, 2))); get_retransmit_peers_deterministic_wrapper(b, Some((1, 2)));
} }
#[bench]
fn bench_get_retransmit_peers_compat_unstaked_ratio_1_2(b: &mut Bencher) {
get_retransmit_peers_compat_wrapper(b, Some((1, 2)));
}
#[bench] #[bench]
fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_32(b: &mut Bencher) { fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_32(b: &mut Bencher) {
get_retransmit_peers_deterministic_wrapper(b, Some((1, 32))); get_retransmit_peers_deterministic_wrapper(b, Some((1, 32)));
} }
#[bench]
fn bench_get_retransmit_peers_compat_unstaked_ratio_1_32(b: &mut Bencher) {
get_retransmit_peers_compat_wrapper(b, Some((1, 32)));
}

View File

@ -10,13 +10,12 @@ use {
crds::GossipRoute, crds::GossipRoute,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_value::{CrdsData, CrdsValue}, crds_value::{CrdsData, CrdsValue},
weighted_shuffle::{weighted_best, weighted_shuffle, WeightedShuffle}, weighted_shuffle::WeightedShuffle,
}, },
solana_ledger::shred::Shred, solana_ledger::shred::Shred,
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
solana_sdk::{ solana_sdk::{
clock::{Epoch, Slot}, clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey, pubkey::Pubkey,
signature::Keypair, signature::Keypair,
timing::timestamp, timing::timestamp,
@ -56,10 +55,6 @@ pub struct ClusterNodes<T> {
// Reverse index from nodes pubkey to their index in self.nodes. // Reverse index from nodes pubkey to their index in self.nodes.
index: HashMap<Pubkey, /*index:*/ usize>, index: HashMap<Pubkey, /*index:*/ usize>,
weighted_shuffle: WeightedShuffle</*stake:*/ u64>, weighted_shuffle: WeightedShuffle</*stake:*/ u64>,
// Weights and indices for sampling peers. weighted_{shuffle,best} expect
// weights >= 1. For backward compatibility we use max(1, stake) for
// weights and exclude nodes with no contact-info.
compat_index: Vec<(/*weight:*/ u64, /*index:*/ usize)>,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
@ -92,14 +87,15 @@ impl Node {
impl<T> ClusterNodes<T> { impl<T> ClusterNodes<T> {
pub(crate) fn num_peers(&self) -> usize { pub(crate) fn num_peers(&self) -> usize {
self.compat_index.len() self.nodes.len().saturating_sub(1)
} }
// A peer is considered live if they generated their contact info recently. // A peer is considered live if they generated their contact info recently.
pub(crate) fn num_peers_live(&self, now: u64) -> usize { pub(crate) fn num_peers_live(&self, now: u64) -> usize {
self.compat_index self.nodes
.iter() .iter()
.filter_map(|(_, index)| self.nodes[*index].contact_info()) .filter(|node| node.pubkey() != self.pubkey)
.filter_map(|node| node.contact_info())
.filter(|node| { .filter(|node| {
let elapsed = if node.wallclock < now { let elapsed = if node.wallclock < now {
now - node.wallclock now - node.wallclock
@ -120,20 +116,12 @@ impl ClusterNodes<BroadcastStage> {
pub(crate) fn get_broadcast_addrs( pub(crate) fn get_broadcast_addrs(
&self, &self,
shred: &Shred, shred: &Shred,
root_bank: &Bank, _root_bank: &Bank,
fanout: usize, fanout: usize,
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> { ) -> Vec<SocketAddr> {
const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60);
let shred_seed = shred.seed(self.pubkey); let shred_seed = shred.seed(self.pubkey);
if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) {
if let Some(node) = self.get_broadcast_peer(shred_seed) {
if socket_addr_space.check(&node.tvu) {
return vec![node.tvu];
}
}
return Vec::default();
}
let mut rng = ChaChaRng::from_seed(shred_seed); let mut rng = ChaChaRng::from_seed(shred_seed);
let index = match self.weighted_shuffle.first(&mut rng) { let index = match self.weighted_shuffle.first(&mut rng) {
None => return Vec::default(), None => return Vec::default(),
@ -175,20 +163,6 @@ impl ClusterNodes<BroadcastStage> {
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect() .collect()
} }
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.compat_index.is_empty() {
None
} else {
let index = weighted_best(&self.compat_index, shred_seed);
match &self.nodes[index].node {
NodeId::ContactInfo(node) => Some(node),
NodeId::Pubkey(_) => panic!("this should not happen!"),
}
}
}
} }
impl ClusterNodes<RetransmitStage> { impl ClusterNodes<RetransmitStage> {
@ -223,32 +197,17 @@ impl ClusterNodes<RetransmitStage> {
.collect() .collect()
} }
fn get_retransmit_peers( pub fn get_retransmit_peers(
&self, &self,
slot_leader: Pubkey, slot_leader: Pubkey,
shred: &Shred, shred: &Shred,
root_bank: &Bank, _root_bank: &Bank,
fanout: usize, fanout: usize,
) -> ( ) -> (
Vec<&Node>, // neighbors Vec<&Node>, // neighbors
Vec<&Node>, // children Vec<&Node>, // children
) { ) {
let shred_seed = shred.seed(slot_leader); let shred_seed = shred.seed(slot_leader);
if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) {
return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader);
}
self.get_retransmit_peers_deterministic(shred_seed, fanout, slot_leader)
}
pub fn get_retransmit_peers_deterministic(
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Pubkey,
) -> (
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
let mut weighted_shuffle = self.weighted_shuffle.clone(); let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes. // Exclude slot leader from list of nodes.
if slot_leader == self.pubkey { if slot_leader == self.pubkey {
@ -271,46 +230,6 @@ impl ClusterNodes<RetransmitStage> {
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
(neighbors, children) (neighbors, children)
} }
pub fn get_retransmit_peers_compat(
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Pubkey,
) -> (
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
// Exclude leader from list of nodes.
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
self.compat_index.iter().copied().unzip()
} else {
self.compat_index
.iter()
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
.copied()
.unzip()
};
let index: Vec<_> = {
let shuffle = weighted_shuffle(weights.into_iter(), shred_seed);
shuffle.into_iter().map(|i| index[i]).collect()
};
let self_index = index
.iter()
.position(|i| self.nodes[*i].pubkey() == self.pubkey)
.unwrap();
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &index);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(
self.nodes[neighbors[self_index % fanout]].pubkey(),
self.pubkey
);
let neighbors = neighbors.into_iter().map(|i| &self.nodes[i]).collect();
let children = children.into_iter().map(|i| &self.nodes[i]).collect();
(neighbors, children)
}
} }
pub fn new_cluster_nodes<T: 'static>( pub fn new_cluster_nodes<T: 'static>(
@ -330,26 +249,11 @@ pub fn new_cluster_nodes<T: 'static>(
if broadcast { if broadcast {
weighted_shuffle.remove_index(index[&self_pubkey]); weighted_shuffle.remove_index(index[&self_pubkey]);
} }
// For backward compatibility:
// * nodes which do not have contact-info are excluded.
// * stakes are floored at 1.
// The sorting key here should be equivalent to
// solana_gossip::deprecated::sorted_stakes_with_index.
// Leader itself is excluded when sampling broadcast peers.
let compat_index = nodes
.iter()
.enumerate()
.filter(|(_, node)| node.contact_info().is_some())
.filter(|(_, node)| !broadcast || node.pubkey() != self_pubkey)
.sorted_by_key(|(_, node)| Reverse((node.stake.max(1), node.pubkey())))
.map(|(index, node)| (node.stake.max(1), index))
.collect();
ClusterNodes { ClusterNodes {
pubkey: self_pubkey, pubkey: self_pubkey,
nodes, nodes,
index, index,
weighted_shuffle, weighted_shuffle,
compat_index,
_phantom: PhantomData::default(), _phantom: PhantomData::default(),
} }
} }
@ -387,21 +291,6 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect() .collect()
} }
fn enable_turbine_peers_shuffle_patch(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::turbine_peers_shuffle::id());
match feature_slot {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
let feature_epoch = epoch_schedule.get_epoch(feature_slot);
let shred_epoch = epoch_schedule.get_epoch(shred_slot);
feature_epoch < shred_epoch
}
}
}
impl<T> ClusterNodesCache<T> { impl<T> ClusterNodesCache<T> {
pub fn new( pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs. // Capacity of underlying LRU-cache in terms of number of epochs.
@ -563,7 +452,6 @@ mod tests {
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes); let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
// All nodes with contact-info should be in the index. // All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.compat_index.len(), nodes.len());
// Staked nodes with no contact-info should be included. // Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len()); assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info. // Assert that all nodes keep their contact-info.
@ -586,22 +474,6 @@ mod tests {
let (peers, stakes_and_index) = let (peers, stakes_and_index) =
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes)); sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes));
assert_eq!(stakes_and_index.len(), peers.len()); assert_eq!(stakes_and_index.len(), peers.len());
assert_eq!(cluster_nodes.compat_index.len(), peers.len());
for (i, node) in cluster_nodes
.compat_index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()
{
let (stake, index) = stakes_and_index[i];
// Wallclock may be update by ClusterInfo::push_self.
if node.pubkey() == this_node.id {
assert_eq!(this_node.id, peers[index].id)
} else {
assert_eq!(node.contact_info().unwrap(), &peers[index]);
}
assert_eq!(node.stake.max(1), stake);
}
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
// Remove slot leader from peers indices. // Remove slot leader from peers indices.
let stakes_and_index: Vec<_> = stakes_and_index let stakes_and_index: Vec<_> = stakes_and_index
@ -618,21 +490,6 @@ mod tests {
.map(|(_, index)| index) .map(|(_, index)| index)
.collect(); .collect();
assert_eq!(this_node.id, peers[shuffled_index[self_index]].id); assert_eq!(this_node.id, peers[shuffled_index[self_index]].id);
for fanout in 1..200 {
let (neighbors_indices, children_indices) =
compute_retransmit_peers(fanout, self_index, &shuffled_index);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers_compat(shred_seed, fanout, slot_leader);
assert_eq!(children.len(), children_indices.len());
for (node, index) in children.into_iter().zip(children_indices) {
assert_eq!(*node.contact_info().unwrap(), peers[index]);
}
assert_eq!(neighbors.len(), neighbors_indices.len());
assert_eq!(neighbors[0].pubkey(), peers[neighbors_indices[0]].id);
for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) {
assert_eq!(*node.contact_info().unwrap(), peers[index]);
}
}
} }
#[test] #[test]
@ -644,7 +501,6 @@ mod tests {
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes); let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
// All nodes with contact-info should be in the index. // All nodes with contact-info should be in the index.
// Excluding this node itself. // Excluding this node itself.
assert_eq!(cluster_nodes.compat_index.len() + 1, nodes.len());
// Staked nodes with no contact-info should be included. // Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len()); assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info. // Assert that all nodes keep their contact-info.
@ -666,23 +522,5 @@ mod tests {
} }
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
assert_eq!(peers_and_stakes.len(), peers.len()); assert_eq!(peers_and_stakes.len(), peers.len());
assert_eq!(cluster_nodes.compat_index.len(), peers.len());
for (i, node) in cluster_nodes
.compat_index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()
{
let (stake, index) = peers_and_stakes[i];
assert_eq!(node.contact_info().unwrap(), &peers[index]);
assert_eq!(node.stake.max(1), stake);
}
for _ in 0..100 {
let mut shred_seed = [0u8; 32];
rng.fill(&mut shred_seed[..]);
let index = weighted_best(&peers_and_stakes, shred_seed);
let peer = cluster_nodes.get_broadcast_peer(shred_seed).unwrap();
assert_eq!(*peer, peers[index]);
}
} }
} }

View File

@ -211,10 +211,6 @@ pub mod send_to_tpu_vote_port {
solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo");
} }
pub mod turbine_peers_shuffle {
solana_sdk::declare_id!("4VvpgRD6UsHvkXwpuQhtR5NG1G4esMaExeWuSEpsYRUa");
}
pub mod requestable_heap_size { pub mod requestable_heap_size {
solana_sdk::declare_id!("CCu4boMmfLuqcmfTLPHQiUo22ZdUsXjgzPAURYaWt1Bw"); solana_sdk::declare_id!("CCu4boMmfLuqcmfTLPHQiUo22ZdUsXjgzPAURYaWt1Bw");
} }
@ -386,7 +382,6 @@ lazy_static! {
(optimize_epoch_boundary_updates::id(), "optimize epoch boundary updates"), (optimize_epoch_boundary_updates::id(), "optimize epoch boundary updates"),
(remove_native_loader::id(), "remove support for the native loader"), (remove_native_loader::id(), "remove support for the native loader"),
(send_to_tpu_vote_port::id(), "send votes to the tpu vote port"), (send_to_tpu_vote_port::id(), "send votes to the tpu vote port"),
(turbine_peers_shuffle::id(), "turbine peers shuffle patch"),
(requestable_heap_size::id(), "Requestable heap frame size"), (requestable_heap_size::id(), "Requestable heap frame size"),
(disable_fee_calculator::id(), "deprecate fee calculator"), (disable_fee_calculator::id(), "deprecate fee calculator"),
(add_compute_budget_program::id(), "Add compute_budget_program"), (add_compute_budget_program::id(), "Add compute_budget_program"),