From f35a6a8be0fec579957746ebf67bfb8e46ec7aba Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 14 Apr 2021 18:45:20 +0000 Subject: [PATCH] prioritizes contact-infos in pull responses (#16541) Expired crds values where the contact-info does not exist are wasted: https://github.com/solana-labs/solana/blob/f804ce63c/core/src/crds_gossip_pull.rs#L353-L378 and then are sent again over the next pull-request. Also, the stake of the first response (which can be anything) is used to weight all pull-responses to a node, while the rest of responses can have different stake. https://github.com/solana-labs/solana/blob/f804ce63c/core/src/cluster_info.rs#L2231 --- core/src/cluster_info.rs | 61 ++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c13115f55b..b60b5240e0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -27,9 +27,7 @@ use crate::{ result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; -use rand::distributions::{Distribution, WeightedIndex}; -use rand::{CryptoRng, Rng, SeedableRng}; -use rand_chacha::ChaChaRng; +use rand::{CryptoRng, Rng}; use solana_ledger::shred::Shred; use solana_sdk::sanitize::{Sanitize, SanitizeError}; @@ -2224,42 +2222,41 @@ impl ClusterInfo { return packets; } - let mut stats: Vec<_> = pull_responses + let stats: Vec<_> = pull_responses .iter() .enumerate() .map(|(i, (responses, _from_addr))| { - let score: u64 = if stakes.get(&responses[0].pubkey()).is_some() { - 2 - } else { - 1 - }; responses .iter() .enumerate() - .map(|(j, _response)| ResponseScore { - to: i, - responses_index: j, - score, + .map(|(j, response)| { + let score = if stakes.contains_key(&response.pubkey()) { + 2 + } else { + 1 + }; + let score = match response.data { + CrdsData::ContactInfo(_) => 2 * score, + _ => score, + }; + ResponseScore { + to: i, + responses_index: j, + score, + } }) .collect::>() }) .flatten() .collect(); - stats.sort_by(|a, b| a.score.cmp(&b.score)); + let mut seed = [0; 32]; + rand::thread_rng().fill(&mut seed[..]); let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect(); - let seed = [48u8; 32]; - let rng = &mut ChaChaRng::from_seed(seed); - let weighted_index = WeightedIndex::new(weights).unwrap(); - let mut total_bytes = 0; - let mut sent = HashSet::new(); - while sent.len() < stats.len() { - let index = weighted_index.sample(rng); - if sent.contains(&index) { - continue; - } + let mut sent = 0; + for index in weighted_shuffle(&weights, seed) { let stat = &stats[index]; let from_addr = pull_responses[stat.to].1; let response = pull_responses[stat.to].0[stat.responses_index].clone(); @@ -2268,9 +2265,9 @@ impl ClusterInfo { Err(err) => error!("failed to write pull-response packet: {:?}", err), Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { - sent.insert(index); total_bytes += packet.meta.size; - packets.packets.push(packet) + packets.packets.push(packet); + sent += 1; } else { inc_new_counter_info!("gossip_pull_request-no_budget", 1); break; @@ -2279,15 +2276,12 @@ impl ClusterInfo { } } time.stop(); - inc_new_counter_info!("gossip_pull_request-sent_requests", sent.len()); - inc_new_counter_info!( - "gossip_pull_request-dropped_requests", - stats.len() - sent.len() - ); + inc_new_counter_info!("gossip_pull_request-sent_requests", sent); + inc_new_counter_info!("gossip_pull_request-dropped_requests", stats.len() - sent); debug!( "handle_pull_requests: {} sent: {} total: {} total_bytes: {}", time, - sent.len(), + sent, stats.len(), total_bytes ); @@ -3467,7 +3461,8 @@ mod tests { duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, }; use itertools::izip; - use rand::seq::SliceRandom; + use rand::{seq::SliceRandom, SeedableRng}; + use rand_chacha::ChaChaRng; use serial_test::serial; use solana_ledger::shred::Shredder; use solana_sdk::signature::{Keypair, Signer};