From 5e6b00fe9870de3ef6dd65e4da19700007a2a0fe Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 21 May 2021 14:07:46 +0000 Subject: [PATCH] prioritizes more recent values in pull responses (#17238) On the receiving end, the outdated values are discarded, and they will only waste bandwidth: https://github.com/solana-labs/solana/blob/3f0480d06/core/src/crds_gossip_pull.rs#L385-L400 This is also exacerbating validator start, since the entrypoint is returning old values in pull responses, and the validator immediately discards those; resulting in huge delay until the validator obtains contact-info of the entrypoint and is able to adopt shred-version and fully start. --- core/src/cluster_info.rs | 104 ++++++++++++++--------------------- core/src/crds_gossip_pull.rs | 6 +- 2 files changed, 44 insertions(+), 66 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 93a44fb7fe..f25fef5798 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -70,8 +70,9 @@ use std::{ fmt::Debug, fs::{self, File}, io::BufReader, + iter::repeat, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, - ops::{Deref, DerefMut}, + ops::{Deref, DerefMut, Div}, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, @@ -428,17 +429,6 @@ impl Sanitize for Protocol { } } -// Rating for pull requests -// A response table is generated as a -// 2-d table arranged by target nodes and a -// list of responses for that node, -// to/responses_index is a location in that table. -struct ResponseScore { - to: usize, // to, index of who the response is to - responses_index: usize, // index into the list of responses for a given to - score: u64, // Relative score of the response -} - // Retains only CRDS values associated with nodes with enough stake. // (some crds types are exempted) fn retain_staked(values: &mut Vec, stakes: &HashMap) { @@ -1566,7 +1556,7 @@ impl ClusterInfo { let self_info = CrdsValue::new_signed(self_info, &self.keypair); let pulls = pulls .into_iter() - .flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters)) + .flat_map(|(peer, filters)| repeat(peer.gossip).zip(filters)) .map(|(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) @@ -2050,6 +2040,7 @@ impl ClusterInfo { stakes: &HashMap, require_stake_for_gossip: bool, ) -> Packets { + const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; let mut time = Measure::start("handle_pull_requests"); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) @@ -2081,56 +2072,42 @@ impl ClusterInfo { retain_staked(resp, stakes); } } - let pull_responses: Vec<_> = pull_responses - .into_iter() - .zip(addrs.into_iter()) - .filter(|(response, _)| !response.is_empty()) - .collect(); - - if pull_responses.is_empty() { + let (responses, scores): (Vec<_>, Vec<_>) = addrs + .iter() + .zip(pull_responses) + .flat_map(|(addr, responses)| repeat(addr).zip(responses)) + .map(|(addr, response)| { + let age = now.saturating_sub(response.wallclock()); + let score = DEFAULT_EPOCH_DURATION_MS + .saturating_sub(age) + .div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) + .max(1); + let score = if stakes.contains_key(&response.pubkey()) { + 2 * score + } else { + score + }; + let score = match response.data { + CrdsData::ContactInfo(_) => 2 * score, + _ => score, + }; + ((addr, response), score) + }) + .unzip(); + if responses.is_empty() { return packets; } - - let stats: Vec<_> = pull_responses - .iter() - .enumerate() - .map(|(i, (responses, _from_addr))| { - responses - .iter() - .enumerate() - .map(|(j, response)| { - let score = if stakes.contains_key(&response.pubkey()) { - 2 - } else { - 1 - }; - let score = match response.data { - CrdsData::ContactInfo(_) => 2 * score, - _ => score, - }; - ResponseScore { - to: i, - responses_index: j, - score, - } - }) - .collect::>() - }) - .flatten() - .collect(); - - let mut seed = [0; 32]; - rand::thread_rng().fill(&mut seed[..]); - let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect(); - + let shuffle = { + let mut seed = [0; 32]; + rand::thread_rng().fill(&mut seed[..]); + weighted_shuffle(&scores, seed).into_iter() + }; let mut total_bytes = 0; let mut sent = 0; - for index in weighted_shuffle(&weights, seed) { - let stat = &stats[index]; - let from_addr = pull_responses[stat.to].1; - let response = pull_responses[stat.to].0[stat.responses_index].clone(); - let protocol = Protocol::PullResponse(self_id, vec![response]); - match Packet::from_data(Some(&from_addr), protocol) { + for (addr, response) in shuffle.map(|i| &responses[i]) { + let response = vec![response.clone()]; + let response = Protocol::PullResponse(self_id, response); + match Packet::from_data(Some(addr), response) { Err(err) => error!("failed to write pull-response packet: {:?}", err), Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { @@ -2145,13 +2122,14 @@ impl ClusterInfo { } } time.stop(); + let dropped_responses = responses.len() - sent; inc_new_counter_info!("gossip_pull_request-sent_requests", sent); - inc_new_counter_info!("gossip_pull_request-dropped_requests", stats.len() - sent); + inc_new_counter_info!("gossip_pull_request-dropped_requests", dropped_responses); debug!( "handle_pull_requests: {} sent: {} total: {} total_bytes: {}", time, sent, - stats.len(), + responses.len(), total_bytes ); packets @@ -2437,7 +2415,7 @@ impl ClusterInfo { let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes .into_iter() .flat_map(|(from, prunes)| { - std::iter::repeat(from).zip( + repeat(from).zip( prunes .into_iter() .chunks(MAX_PRUNE_DATA_NODES) @@ -3890,7 +3868,7 @@ mod tests { fn test_split_gossip_messages() { const NUM_CRDS_VALUES: usize = 2048; let mut rng = rand::thread_rng(); - let values: Vec<_> = std::iter::repeat_with(|| CrdsValue::new_rand(&mut rng, None)) + let values: Vec<_> = repeat_with(|| CrdsValue::new_rand(&mut rng, None)) .take(NUM_CRDS_VALUES) .collect(); let splits: Vec<_> = diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index aa252f1a8b..a87a5ab7d7 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -503,8 +503,8 @@ impl CrdsGossipPull { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); //skip filters from callers that are too old - let future = now.saturating_add(msg_timeout); - let past = now.saturating_sub(msg_timeout); + let caller_wallclock_window = + now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); let mut dropped_requests = 0; let mut total_skipped = 0; let ret: Vec<_> = filters @@ -514,7 +514,7 @@ impl CrdsGossipPull { return None; } let caller_wallclock = caller.wallclock(); - if caller_wallclock >= future || caller_wallclock < past { + if !caller_wallclock_window.contains(&caller_wallclock) { dropped_requests += 1; return Some(vec![]); }