broadcast_shreds opt (#6175)

* Don't clone/copy/sort ContactInfo vec
This commit is contained in:
sakridge 2019-10-01 09:38:29 -07:00 committed by GitHub
parent f09183765c
commit ae7700296d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 26 deletions

View File

@ -0,0 +1,39 @@
#![feature(test)]
extern crate test;
use rand::{thread_rng, Rng};
use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::net::UdpSocket;
use test::Bencher;
#[bench]
fn broadcast_shreds_bench(bencher: &mut Bencher) {
solana_logger::setup();
let leader_pubkey = Pubkey::new_rand();
let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
const SHRED_SIZE: usize = 1024;
const NUM_SHREDS: usize = 32;
let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS];
let seeds = vec![[0u8; 32]; NUM_SHREDS];
let mut stakes = HashMap::new();
const NUM_PEERS: usize = 200;
for _ in 0..NUM_PEERS {
let id = Pubkey::new_rand();
let contact_info = ContactInfo::new_localhost(&id, timestamp());
cluster_info.insert_info(contact_info);
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
bencher.iter(move || {
cluster_info
.broadcast_shreds(&socket, &shreds, &seeds, Some(&stakes))
.unwrap();
});
}

View File

@ -24,7 +24,7 @@ use crate::repair_service::RepairType;
use crate::result::Result;
use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::weighted_shuffle;
use crate::weighted_shuffle::{weighted_best, weighted_shuffle};
use bincode::{deserialize, serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
@ -515,6 +515,35 @@ impl ClusterInfo {
out
}
fn peers_and_stakes<S: std::hash::BuildHasher>(
peers: &[ContactInfo],
stakes: Option<&HashMap<Pubkey, u64, S>>,
) -> Vec<(u64, usize)> {
let mut stakes_and_index: Vec<_> = peers
.iter()
.enumerate()
.map(|(i, c)| {
let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0));
(stake, i)
})
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
if r_stake == l_stake {
peers[*r_info].id.cmp(&peers[*l_info].id)
} else {
r_stake.cmp(&l_stake)
}
})
.collect();
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
// assumed to be missing entry. So let's make sure stake weights are atleast 1
stakes_and_index
.iter_mut()
.for_each(|(stake, _)| *stake = cmp::max(1, *stake));
stakes_and_index
}
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
pub fn shuffle_peers_and_index<S: std::hash::BuildHasher>(
&self,
@ -538,19 +567,6 @@ impl ClusterInfo {
(index, peers)
}
pub fn sorted_tvu_peers(
&self,
stakes: Option<&HashMap<Pubkey, u64>>,
rng: ChaChaRng,
) -> Vec<ContactInfo> {
let peers = self.tvu_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
.collect()
}
/// compute broadcast table
pub fn tpu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id;
@ -694,6 +710,16 @@ impl ClusterInfo {
.collect()
}
fn sorted_tvu_peers_and_stakes(
&self,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.tvu_peers();
peers.dedup();
let peers_and_stakes = ClusterInfo::peers_and_stakes(&peers, stakes);
(peers, peers_and_stakes)
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
@ -704,24 +730,23 @@ impl ClusterInfo {
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Result<()> {
let mut last_err = Ok(());
let mut broadcast_table_len = 0;
let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 {
datapoint_info!("cluster_info-num_nodes", ("count", 1, i64));
return Ok(());
}
shreds.iter().zip(seeds).for_each(|(shred, seed)| {
let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(*seed));
broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len());
let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed));
if !broadcast_table.is_empty() {
if let Err(e) = s.send_to(shred, &broadcast_table[0].tvu) {
if let Err(e) = s.send_to(shred, &peers[broadcast_index].tvu) {
trace!("{}: broadcast result {:?}", self.id(), e);
last_err = Err(e);
}
}
});
last_err?;
datapoint_info!(
"cluster_info-num_nodes",
("count", broadcast_table_len + 1, i64)
);
datapoint_info!("cluster_info-num_nodes", ("count", broadcast_len + 1, i64));
Ok(())
}
@ -2384,4 +2409,41 @@ mod tests {
let protocol = Protocol::PullRequest(filter, value.clone());
assert!(serialized_size(&protocol).unwrap() <= PACKET_DATA_SIZE as u64);
}
#[test]
fn test_tvu_peers_and_stakes() {
let d = ContactInfo::new_localhost(&Pubkey::new(&[0; 32]), timestamp());
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone());
let mut stakes = HashMap::new();
// no stake
let id = Pubkey::new(&[1u8; 32]);
let contact_info = ContactInfo::new_localhost(&id, timestamp());
cluster_info.insert_info(contact_info);
// normal
let id2 = Pubkey::new(&[2u8; 32]);
let mut contact_info = ContactInfo::new_localhost(&id2, timestamp());
cluster_info.insert_info(contact_info.clone());
stakes.insert(id2, 10);
// duplicate
contact_info.wallclock = timestamp() + 1;
cluster_info.insert_info(contact_info);
// no tvu
let id3 = Pubkey::new(&[3u8; 32]);
let mut contact_info = ContactInfo::new_localhost(&id3, timestamp());
contact_info.tvu = "0.0.0.0:0".parse().unwrap();
cluster_info.insert_info(contact_info);
stakes.insert(id3, 10);
let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(&stakes));
assert_eq!(peers.len(), 2);
assert_eq!(peers[0].id, id);
assert_eq!(peers[1].id, id2);
assert_eq!(peers_and_stakes.len(), 2);
assert_eq!(peers_and_stakes[0].0, 10);
assert_eq!(peers_and_stakes[1].0, 1);
}
}

View File

@ -33,6 +33,31 @@ where
.collect()
}
/// Returns the highest index after computing a weighted shuffle.
/// Saves doing any sorting for O(n) max calculation.
pub fn weighted_best(weights_and_indicies: &[(u64, usize)], rng: ChaChaRng) -> usize {
let mut rng = rng;
if weights_and_indicies.is_empty() {
return 0;
}
let total_weight: u64 = weights_and_indicies.iter().map(|x| x.0).sum();
let mut best_weight = 0;
let mut best_index = 0;
for v in weights_and_indicies {
let x = (total_weight / v.0)
.to_u64()
.expect("values > u64::max are not supported");
// capture the u64 into u128s to prevent overflow
let weight = (&mut rng).gen_range(1, u128::from(std::u16::MAX)) * u128::from(x);
if weight > best_weight {
best_weight = weight;
best_index = v.1;
}
}
best_index
}
#[cfg(test)]
mod tests {
use super::*;
@ -92,4 +117,12 @@ mod tests {
}
});
}
#[test]
fn test_weighted_best() {
let mut weights = vec![(std::u32::MAX as u64, 0); 3];
weights.push((1, 5));
let best = weighted_best(&weights, ChaChaRng::from_seed([0x5b; 32]));
assert_eq!(best, 5);
}
}