prioritizes contact-infos in pull responses (#16541)

Expired crds values where the contact-info does not exist are wasted:
https://github.com/solana-labs/solana/blob/f804ce63c/core/src/crds_gossip_pull.rs#L353-L378
and then are sent again over the next pull-request.

Also, the stake of the first response (which can be anything) is used to
weight all pull-responses to a node, while the rest of responses can
have different stake.
https://github.com/solana-labs/solana/blob/f804ce63c/core/src/cluster_info.rs#L2231
This commit is contained in:
behzad nouri 2021-04-14 18:45:20 +00:00 committed by GitHub
parent ad71e27a0d
commit f35a6a8be0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 33 deletions

View File

@ -27,9 +27,7 @@ use crate::{
result::{Error, Result}, result::{Error, Result},
weighted_shuffle::weighted_shuffle, weighted_shuffle::weighted_shuffle,
}; };
use rand::distributions::{Distribution, WeightedIndex}; use rand::{CryptoRng, Rng};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_ledger::shred::Shred; use solana_ledger::shred::Shred;
use solana_sdk::sanitize::{Sanitize, SanitizeError}; use solana_sdk::sanitize::{Sanitize, SanitizeError};
@ -2224,42 +2222,41 @@ impl ClusterInfo {
return packets; return packets;
} }
let mut stats: Vec<_> = pull_responses let stats: Vec<_> = pull_responses
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, (responses, _from_addr))| { .map(|(i, (responses, _from_addr))| {
let score: u64 = if stakes.get(&responses[0].pubkey()).is_some() { responses
.iter()
.enumerate()
.map(|(j, response)| {
let score = if stakes.contains_key(&response.pubkey()) {
2 2
} else { } else {
1 1
}; };
responses let score = match response.data {
.iter() CrdsData::ContactInfo(_) => 2 * score,
.enumerate() _ => score,
.map(|(j, _response)| ResponseScore { };
ResponseScore {
to: i, to: i,
responses_index: j, responses_index: j,
score, score,
}
}) })
.collect::<Vec<ResponseScore>>() .collect::<Vec<ResponseScore>>()
}) })
.flatten() .flatten()
.collect(); .collect();
stats.sort_by(|a, b| a.score.cmp(&b.score)); let mut seed = [0; 32];
rand::thread_rng().fill(&mut seed[..]);
let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect(); let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect();
let seed = [48u8; 32];
let rng = &mut ChaChaRng::from_seed(seed);
let weighted_index = WeightedIndex::new(weights).unwrap();
let mut total_bytes = 0; let mut total_bytes = 0;
let mut sent = HashSet::new(); let mut sent = 0;
while sent.len() < stats.len() { for index in weighted_shuffle(&weights, seed) {
let index = weighted_index.sample(rng);
if sent.contains(&index) {
continue;
}
let stat = &stats[index]; let stat = &stats[index];
let from_addr = pull_responses[stat.to].1; let from_addr = pull_responses[stat.to].1;
let response = pull_responses[stat.to].0[stat.responses_index].clone(); let response = pull_responses[stat.to].0[stat.responses_index].clone();
@ -2268,9 +2265,9 @@ impl ClusterInfo {
Err(err) => error!("failed to write pull-response packet: {:?}", err), Err(err) => error!("failed to write pull-response packet: {:?}", err),
Ok(packet) => { Ok(packet) => {
if self.outbound_budget.take(packet.meta.size) { if self.outbound_budget.take(packet.meta.size) {
sent.insert(index);
total_bytes += packet.meta.size; total_bytes += packet.meta.size;
packets.packets.push(packet) packets.packets.push(packet);
sent += 1;
} else { } else {
inc_new_counter_info!("gossip_pull_request-no_budget", 1); inc_new_counter_info!("gossip_pull_request-no_budget", 1);
break; break;
@ -2279,15 +2276,12 @@ impl ClusterInfo {
} }
} }
time.stop(); time.stop();
inc_new_counter_info!("gossip_pull_request-sent_requests", sent.len()); inc_new_counter_info!("gossip_pull_request-sent_requests", sent);
inc_new_counter_info!( inc_new_counter_info!("gossip_pull_request-dropped_requests", stats.len() - sent);
"gossip_pull_request-dropped_requests",
stats.len() - sent.len()
);
debug!( debug!(
"handle_pull_requests: {} sent: {} total: {} total_bytes: {}", "handle_pull_requests: {} sent: {} total: {} total_bytes: {}",
time, time,
sent.len(), sent,
stats.len(), stats.len(),
total_bytes total_bytes
); );
@ -3467,7 +3461,8 @@ mod tests {
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
}; };
use itertools::izip; use itertools::izip;
use rand::seq::SliceRandom; use rand::{seq::SliceRandom, SeedableRng};
use rand_chacha::ChaChaRng;
use serial_test::serial; use serial_test::serial;
use solana_ledger::shred::Shredder; use solana_ledger::shred::Shredder;
use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::signature::{Keypair, Signer};