bench get_retransmit_peers (#23292)

This commit is contained in:
Jeff Biseda 2022-03-01 19:10:29 -08:00 committed by GitHub
parent 2a17a661e6
commit c69e3b73ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 185 additions and 63 deletions

View File

@ -0,0 +1,112 @@
#![feature(test)]
extern crate test;
use {
rand::{seq::SliceRandom, Rng},
solana_core::{
cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
retransmit_stage::RetransmitStage,
},
solana_gossip::contact_info::ContactInfo,
solana_sdk::{clock::Slot, hash::hashv, pubkey::Pubkey, signature::Signature},
test::Bencher,
};
const NUM_SIMULATED_SHREDS: usize = 4;
fn make_cluster_nodes<R: Rng>(
rng: &mut R,
unstaked_ratio: Option<(u32, u32)>,
) -> (Vec<ContactInfo>, ClusterNodes<RetransmitStage>) {
let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
(nodes, cluster_nodes)
}
fn get_retransmit_peers_deterministic(
cluster_nodes: &ClusterNodes<RetransmitStage>,
slot: &Slot,
slot_leader: &Pubkey,
num_simulated_shreds: usize,
) {
for i in 0..num_simulated_shreds {
// see Shred::seed
let shred_seed = hashv(&[
&slot.to_le_bytes(),
&(i as u32).to_le_bytes(),
&slot_leader.to_bytes(),
])
.to_bytes();
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_deterministic(
shred_seed,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
*slot_leader,
);
}
}
fn get_retransmit_peers_compat(
cluster_nodes: &ClusterNodes<RetransmitStage>,
slot_leader: &Pubkey,
signatures: &[Signature],
) {
for signature in signatures.iter() {
// see Shred::seed
let signature = signature.as_ref();
let offset = signature.len().checked_sub(32).unwrap();
let shred_seed = signature[offset..].try_into().unwrap();
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_compat(
shred_seed,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
*slot_leader,
);
}
}
fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) {
let mut rng = rand::thread_rng();
let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio);
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>();
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,
&slot,
&slot_leader,
NUM_SIMULATED_SHREDS,
)
});
}
fn get_retransmit_peers_compat_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) {
let mut rng = rand::thread_rng();
let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio);
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let signatures: Vec<_> = std::iter::repeat_with(Signature::new_unique)
.take(NUM_SIMULATED_SHREDS)
.collect();
b.iter(|| get_retransmit_peers_compat(&cluster_nodes, &slot_leader, &signatures));
}
#[bench]
fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_2(b: &mut Bencher) {
get_retransmit_peers_deterministic_wrapper(b, Some((1, 2)));
}
#[bench]
fn bench_get_retransmit_peers_compat_unstaked_ratio_1_2(b: &mut Bencher) {
get_retransmit_peers_compat_wrapper(b, Some((1, 2)));
}
#[bench]
fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_32(b: &mut Bencher) {
get_retransmit_peers_deterministic_wrapper(b, Some((1, 32)));
}
#[bench]
fn bench_get_retransmit_peers_compat_unstaked_ratio_1_32(b: &mut Bencher) {
get_retransmit_peers_compat_wrapper(b, Some((1, 32)));
}

View File

@ -2,12 +2,14 @@ use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
lru::LruCache,
rand::SeedableRng,
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo,
crds::GossipRoute,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_value::{CrdsData, CrdsValue},
weighted_shuffle::{weighted_best, weighted_shuffle, WeightedShuffle},
},
solana_ledger::shred::Shred,
@ -16,6 +18,7 @@ use {
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
signature::Keypair,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
@ -23,6 +26,7 @@ use {
any::TypeId,
cmp::Reverse,
collections::HashMap,
iter::repeat_with,
marker::PhantomData,
net::SocketAddr,
ops::Deref,
@ -39,7 +43,7 @@ enum NodeId {
Pubkey(Pubkey),
}
struct Node {
pub struct Node {
node: NodeId,
stake: u64,
}
@ -233,6 +237,18 @@ impl ClusterNodes<RetransmitStage> {
if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) {
return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader);
}
self.get_retransmit_peers_deterministic(shred_seed, fanout, slot_leader)
}
pub fn get_retransmit_peers_deterministic(
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Pubkey,
) -> (
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == self.pubkey {
@ -256,7 +272,7 @@ impl ClusterNodes<RetransmitStage> {
(neighbors, children)
}
fn get_retransmit_peers_compat(
pub fn get_retransmit_peers_compat(
&self,
shred_seed: [u8; 32],
fanout: usize,
@ -297,7 +313,7 @@ impl ClusterNodes<RetransmitStage> {
}
}
fn new_cluster_nodes<T: 'static>(
pub fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
stakes: &HashMap<Pubkey, u64>,
) -> ClusterNodes<T> {
@ -462,22 +478,61 @@ impl From<Pubkey> for NodeId {
}
}
pub fn make_test_cluster<R: Rng>(
rng: &mut R,
num_nodes: usize,
unstaked_ratio: Option<(u32, u32)>,
) -> (
Vec<ContactInfo>,
HashMap<Pubkey, u64>, // stakes
ClusterInfo,
) {
let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
.take(num_nodes)
.collect();
nodes.shuffle(rng);
let this_node = nodes[0].clone();
let mut stakes: HashMap<Pubkey, u64> = nodes
.iter()
.filter_map(|node| {
if rng.gen_ratio(unstaked_numerator, unstaked_denominator) {
None // No stake for some of the nodes.
} else {
Some((node.id, rng.gen_range(0, 20)))
}
})
.collect();
// Add some staked nodes with no contact-info.
stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100));
let cluster_info = ClusterInfo::new(
this_node,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
{
let now = timestamp();
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
// First node is pushed to crds table by ClusterInfo constructor.
for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(
gossip_crds.insert(node, now, GossipRoute::LocalMessage),
Ok(())
);
}
}
(nodes, stakes, cluster_info)
}
#[cfg(test)]
mod tests {
use {
super::*,
rand::{seq::SliceRandom, Rng},
solana_gossip::{
crds::GossipRoute,
crds_value::{CrdsData, CrdsValue},
deprecated::{
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes,
sorted_stakes_with_index,
},
solana_gossip::deprecated::{
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, sorted_stakes_with_index,
},
solana_sdk::{signature::Keypair, timing::timestamp},
solana_streamer::socket::SocketAddrSpace,
std::{iter::repeat_with, sync::Arc},
};
// Legacy methods copied for testing backward compatibility.
@ -499,55 +554,10 @@ mod tests {
sorted_stakes_with_index(peers, stakes)
}
fn make_cluster<R: Rng>(
rng: &mut R,
) -> (
Vec<ContactInfo>,
HashMap<Pubkey, u64>, // stakes
ClusterInfo,
) {
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
.take(1000)
.collect();
nodes.shuffle(rng);
let this_node = nodes[0].clone();
let mut stakes: HashMap<Pubkey, u64> = nodes
.iter()
.filter_map(|node| {
if rng.gen_ratio(1, 7) {
None // No stake for some of the nodes.
} else {
Some((node.id, rng.gen_range(0, 20)))
}
})
.collect();
// Add some staked nodes with no contact-info.
stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100));
let cluster_info = ClusterInfo::new(
this_node,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
{
let now = timestamp();
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
// First node is pushed to crds table by ClusterInfo constructor.
for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(
gossip_crds.insert(node, now, GossipRoute::LocalMessage),
Ok(())
);
}
}
(nodes, stakes, cluster_info)
}
#[test]
fn test_cluster_nodes_retransmit() {
let mut rng = rand::thread_rng();
let (nodes, stakes, cluster_info) = make_cluster(&mut rng);
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
let this_node = cluster_info.my_contact_info();
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
@ -628,7 +638,7 @@ mod tests {
#[test]
fn test_cluster_nodes_broadcast() {
let mut rng = rand::thread_rng();
let (nodes, stakes, cluster_info) = make_cluster(&mut rng);
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);

View File

@ -416,7 +416,7 @@ pub fn retransmitter(
.unwrap()
}
pub(crate) struct RetransmitStage {
pub struct RetransmitStage {
retransmit_thread_handle: JoinHandle<()>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,