Add random distribution for avalanche peers (#4493)

* Add random distribution for avalanche peers

* fix clippy warnings

* bug fixes

* nits
This commit is contained in:
Pankaj Garg 2019-06-01 07:55:43 -07:00 committed by GitHub
parent e15246746d
commit 3574469052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 188 additions and 49 deletions

1
Cargo.lock generated
View File

@ -2199,6 +2199,7 @@ dependencies = [
"matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -38,6 +38,7 @@ libc = "0.2.55"
log = "0.4.2"
memmap = { version = "0.7.0", optional = true }
nix = "0.14.0"
num-traits = "0.2"
rand = "0.6.5"
rand_chacha = "0.1.1"
rayon = "1.0.0"

View File

@ -9,6 +9,8 @@ use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_metrics::{
@ -85,10 +87,12 @@ impl Broadcast {
}
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref());
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&bank.slot().to_le_bytes());
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(
staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref(),
ChaChaRng::from_seed(seed),
);
inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table.len() + 1);
// Layer 1, leader nodes are limited to the fanout size.

View File

@ -24,9 +24,12 @@ use crate::repair_service::RepairType;
use crate::result::Result;
use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::weighted_shuffle;
use bincode::{deserialize, serialize};
use core::cmp;
use itertools::Itertools;
use rand::{thread_rng, Rng};
use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_netutil::{
@ -489,38 +492,48 @@ impl ClusterInfo {
&& !ContactInfo::is_valid_address(&contact_info.tpu)
}
fn sort_by_stake<S: std::hash::BuildHasher>(
fn stake_weighted_shuffle<S: std::hash::BuildHasher>(
peers: &[ContactInfo],
stakes: Option<&HashMap<Pubkey, u64, S>>,
rng: ChaChaRng,
) -> Vec<(u64, ContactInfo)> {
let mut peers_with_stakes: Vec<_> = peers
let (stake_weights, peers_with_stakes): (Vec<_>, Vec<_>) = peers
.iter()
.map(|c| {
(
stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)),
c.clone(),
)
let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0));
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
// assumed to be missing entry. So let's make sure stake weights are atleast 1
(cmp::max(1, stake), (stake, c.clone()))
})
.sorted_by(|(_, (l_stake, l_info)), (_, (r_stake, r_info))| {
if r_stake == l_stake {
r_info.id.cmp(&l_info.id)
} else {
r_stake.cmp(&l_stake)
}
})
.unzip();
let shuffle = weighted_shuffle(stake_weights, rng);
let mut out: Vec<(u64, ContactInfo)> = shuffle
.iter()
.map(|x| peers_with_stakes[*x].clone())
.collect();
peers_with_stakes.sort_unstable_by(|(l_stake, l_info), (r_stake, r_info)| {
if r_stake == l_stake {
r_info.id.cmp(&l_info.id)
} else {
r_stake.cmp(&l_stake)
}
});
peers_with_stakes.dedup();
peers_with_stakes
out.dedup();
out
}
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
fn sorted_peers_and_index<S: std::hash::BuildHasher>(
fn shuffle_peers_and_index<S: std::hash::BuildHasher>(
&self,
stakes: Option<&HashMap<Pubkey, u64, S>>,
rng: ChaChaRng,
) -> (usize, Vec<ContactInfo>) {
let mut peers = self.retransmit_peers();
peers.push(self.lookup(&self.id()).unwrap().clone());
let contacts_and_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
let contacts_and_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng);
let mut index = 0;
let peers: Vec<_> = contacts_and_stakes
.into_iter()
@ -537,9 +550,13 @@ impl ClusterInfo {
(index, peers)
}
pub fn sorted_tvu_peers(&self, stakes: Option<&HashMap<Pubkey, u64>>) -> Vec<ContactInfo> {
pub fn sorted_tvu_peers(
&self,
stakes: Option<&HashMap<Pubkey, u64>>,
rng: ChaChaRng,
) -> Vec<ContactInfo> {
let peers = self.tvu_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
let peers_with_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
@ -1498,8 +1515,12 @@ pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
stakes: Option<&HashMap<Pubkey, u64, S>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize,
rng: ChaChaRng,
) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
let (my_index, peers) = cluster_info.read().unwrap().sorted_peers_and_index(stakes);
let (my_index, peers) = cluster_info
.read()
.unwrap()
.shuffle_peers_and_index(stakes, rng);
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout);

View File

@ -14,11 +14,14 @@ use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use crate::weighted_shuffle::weighted_shuffle;
use bincode::serialized_size;
use indexmap::map::IndexMap;
use itertools::Itertools;
use rand;
use rand::distributions::{Distribution, WeightedIndex};
use rand::seq::SliceRandom;
use rand::{thread_rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
@ -169,29 +172,36 @@ impl CrdsGossipPush {
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new();
let mut options: Vec<_> = self.push_options(crds, &self_id, stakes);
let options: Vec<_> = self.push_options(crds, &self_id, stakes);
if options.is_empty() {
return;
}
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
let mut shuffle = weighted_shuffle(
options.iter().map(|weighted| weighted.0).collect_vec(),
ChaChaRng::from_seed(seed),
)
.into_iter();
while new_items.len() < need {
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0));
if index.is_err() {
break;
match shuffle.next() {
Some(index) => {
let item = options[index].1;
if self.active_set.get(&item.id).is_some() {
continue;
}
if new_items.get(&item.id).is_some() {
continue;
}
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
bloom.add(&item.id);
new_items.insert(item.id, bloom);
}
_ => break,
}
let index = index.unwrap();
let index = index.sample(&mut rand::thread_rng());
let item = options[index].1;
options.remove(index);
if self.active_set.get(&item.id).is_some() {
continue;
}
if new_items.get(&item.id).is_some() {
continue;
}
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
bloom.add(&item.id);
new_items.insert(item.id, bloom);
}
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
keys.shuffle(&mut rand::thread_rng());

View File

@ -68,6 +68,7 @@ pub mod test_tx;
pub mod tpu;
pub mod tvu;
pub mod validator;
pub mod weighted_shuffle;
pub mod window_service;
#[macro_use]

View File

@ -3,6 +3,7 @@
use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT};
use crate::contact_info::ContactInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::repair_service::RepairStrategy;
use crate::result::{Error, Result};
@ -10,6 +11,9 @@ use crate::service::Service;
use crate::staking_utils;
use crate::streamer::BlobReceiver;
use crate::window_service::{should_retransmit_and_persist, WindowService};
use hashbrown::HashMap;
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use solana_metrics::{datapoint_info, inc_new_counter_error};
use solana_runtime::epoch_schedule::EpochSchedule;
use std::net::UdpSocket;
@ -26,6 +30,8 @@ fn retransmit(
cluster_info: &Arc<RwLock<ClusterInfo>>,
r: &BlobReceiver,
sock: &UdpSocket,
avalanche_topology_cache: &mut HashMap<u64, (Vec<ContactInfo>, Vec<ContactInfo>)>,
cache_history: &mut Vec<u64>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut blobs = r.recv_timeout(timer)?;
@ -37,12 +43,19 @@ fn retransmit(
let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
let (neighbors, children) = compute_retransmit_peers(
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
cluster_info,
DATA_PLANE_FANOUT,
);
for blob in &blobs {
let slot = blob.read().unwrap().slot();
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&slot.to_le_bytes());
let (neighbors, children) = avalanche_topology_cache.entry(slot).or_insert_with(|| {
cache_history.push(slot);
compute_retransmit_peers(
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
cluster_info,
DATA_PLANE_FANOUT,
ChaChaRng::from_seed(seed),
)
});
let leader = leader_schedule_cache
.slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref()));
if blob.read().unwrap().meta.forward {
@ -52,6 +65,10 @@ fn retransmit(
ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, true)?;
}
}
while cache_history.len() > 5 {
avalanche_topology_cache.remove(&cache_history.pop().unwrap());
}
Ok(())
}
@ -76,6 +93,8 @@ fn retransmitter(
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
let mut avalanche_topology_cache = HashMap::new();
let mut cache_history = vec![];
loop {
if let Err(e) = retransmit(
&bank_forks,
@ -83,6 +102,8 @@ fn retransmitter(
&cluster_info,
&r,
&sock,
&mut avalanche_topology_cache,
&mut cache_history,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,

View File

@ -0,0 +1,76 @@
//! The `weighted_shuffle` module provides an iterator over shuffled weights.
use itertools::Itertools;
use num_traits::{FromPrimitive, ToPrimitive};
use rand::Rng;
use rand_chacha::ChaChaRng;
use std::iter;
use std::ops::Div;
pub fn weighted_shuffle<T>(weights: Vec<T>, rng: ChaChaRng) -> Vec<usize>
where
T: Copy + PartialOrd + iter::Sum + Div<T, Output = T> + FromPrimitive + ToPrimitive,
{
let mut rng = rng;
let total_weight: T = weights.clone().into_iter().sum();
weights
.into_iter()
.enumerate()
.map(|(i, v)| {
let x = (total_weight / v).to_u32().unwrap();
(
i,
(&mut rng).gen_range(1, u64::from(std::u16::MAX)) * u64::from(x),
)
})
.sorted_by(|(_, l_val), (_, r_val)| l_val.cmp(r_val))
.map(|x| x.0)
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use rand::SeedableRng;
#[test]
fn test_weighted_shuffle_iterator() {
let mut test_set = [0; 6];
let mut count = 0;
let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32]));
shuffle.into_iter().for_each(|x| {
assert_eq!(test_set[x], 0);
test_set[x] = 1;
count += 1;
});
assert_eq!(count, 6);
}
#[test]
fn test_weighted_shuffle_iterator_large() {
let mut test_set = [0; 100];
let mut test_weights = vec![0; 100];
(0..100).for_each(|i| test_weights[i] = (i + 1) as u64);
let mut count = 0;
let shuffle = weighted_shuffle(test_weights, ChaChaRng::from_seed([0xa5; 32]));
shuffle.into_iter().for_each(|x| {
assert_eq!(test_set[x], 0);
test_set[x] = 1;
count += 1;
});
assert_eq!(count, 100);
}
#[test]
fn test_weighted_shuffle_compare() {
let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32]));
let shuffle1 = weighted_shuffle(vec![50, 10, 2, 1, 1, 1], ChaChaRng::from_seed([0x5a; 32]));
shuffle1
.into_iter()
.zip(shuffle.into_iter())
.for_each(|(x, y)| {
assert_eq!(x, y);
});
}
}

View File

@ -1,3 +1,5 @@
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::cluster_info::{compute_retransmit_peers, ClusterInfo};
@ -72,7 +74,8 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
let mut broadcast_table = cluster_info.sorted_tvu_peers(Some(&staked_nodes));
let mut broadcast_table =
cluster_info.sorted_tvu_peers(Some(&staked_nodes), ChaChaRng::from_seed([0x5a; 32]));
broadcast_table.truncate(fanout);
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
@ -109,6 +112,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
Some(&staked_nodes),
&Arc::new(RwLock::new(cluster.clone())),
fanout,
ChaChaRng::from_seed([0x5a; 32]),
);
let vec_children: Vec<_> = children
.iter()