prioritizes more recent values in pull responses (#17238)

On the receiving end, the outdated values are discarded, and they will
only waste bandwidth:
https://github.com/solana-labs/solana/blob/3f0480d06/core/src/crds_gossip_pull.rs#L385-L400

This is also exacerbating validator start, since the entrypoint is
returning old values in pull responses, and the validator immediately
discards those; resulting in huge delay until the validator obtains
contact-info of the entrypoint and is able to adopt shred-version and
fully start.
This commit is contained in:
behzad nouri 2021-05-21 14:07:46 +00:00 committed by GitHub
parent e8b35a4f7b
commit 5e6b00fe98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 66 deletions

View File

@ -70,8 +70,9 @@ use std::{
fmt::Debug, fmt::Debug,
fs::{self, File}, fs::{self, File},
io::BufReader, io::BufReader,
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut, Div},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -428,17 +429,6 @@ impl Sanitize for Protocol {
} }
} }
// Rating for pull requests
// A response table is generated as a
// 2-d table arranged by target nodes and a
// list of responses for that node,
// to/responses_index is a location in that table.
struct ResponseScore {
to: usize, // to, index of who the response is to
responses_index: usize, // index into the list of responses for a given to
score: u64, // Relative score of the response
}
// Retains only CRDS values associated with nodes with enough stake. // Retains only CRDS values associated with nodes with enough stake.
// (some crds types are exempted) // (some crds types are exempted)
fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) { fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
@ -1566,7 +1556,7 @@ impl ClusterInfo {
let self_info = CrdsValue::new_signed(self_info, &self.keypair); let self_info = CrdsValue::new_signed(self_info, &self.keypair);
let pulls = pulls let pulls = pulls
.into_iter() .into_iter()
.flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters)) .flat_map(|(peer, filters)| repeat(peer.gossip).zip(filters))
.map(|(gossip_addr, filter)| { .map(|(gossip_addr, filter)| {
let request = Protocol::PullRequest(filter, self_info.clone()); let request = Protocol::PullRequest(filter, self_info.clone());
(gossip_addr, request) (gossip_addr, request)
@ -2050,6 +2040,7 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
require_stake_for_gossip: bool, require_stake_for_gossip: bool,
) -> Packets { ) -> Packets {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests"); let mut time = Measure::start("handle_pull_requests");
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
@ -2081,56 +2072,42 @@ impl ClusterInfo {
retain_staked(resp, stakes); retain_staked(resp, stakes);
} }
} }
let pull_responses: Vec<_> = pull_responses let (responses, scores): (Vec<_>, Vec<_>) = addrs
.into_iter()
.zip(addrs.into_iter())
.filter(|(response, _)| !response.is_empty())
.collect();
if pull_responses.is_empty() {
return packets;
}
let stats: Vec<_> = pull_responses
.iter() .iter()
.enumerate() .zip(pull_responses)
.map(|(i, (responses, _from_addr))| { .flat_map(|(addr, responses)| repeat(addr).zip(responses))
responses .map(|(addr, response)| {
.iter() let age = now.saturating_sub(response.wallclock());
.enumerate() let score = DEFAULT_EPOCH_DURATION_MS
.map(|(j, response)| { .saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
let score = if stakes.contains_key(&response.pubkey()) { let score = if stakes.contains_key(&response.pubkey()) {
2 2 * score
} else { } else {
1 score
}; };
let score = match response.data { let score = match response.data {
CrdsData::ContactInfo(_) => 2 * score, CrdsData::ContactInfo(_) => 2 * score,
_ => score, _ => score,
}; };
ResponseScore { ((addr, response), score)
to: i, })
responses_index: j, .unzip();
score, if responses.is_empty() {
return packets;
} }
}) let shuffle = {
.collect::<Vec<ResponseScore>>()
})
.flatten()
.collect();
let mut seed = [0; 32]; let mut seed = [0; 32];
rand::thread_rng().fill(&mut seed[..]); rand::thread_rng().fill(&mut seed[..]);
let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect(); weighted_shuffle(&scores, seed).into_iter()
};
let mut total_bytes = 0; let mut total_bytes = 0;
let mut sent = 0; let mut sent = 0;
for index in weighted_shuffle(&weights, seed) { for (addr, response) in shuffle.map(|i| &responses[i]) {
let stat = &stats[index]; let response = vec![response.clone()];
let from_addr = pull_responses[stat.to].1; let response = Protocol::PullResponse(self_id, response);
let response = pull_responses[stat.to].0[stat.responses_index].clone(); match Packet::from_data(Some(addr), response) {
let protocol = Protocol::PullResponse(self_id, vec![response]);
match Packet::from_data(Some(&from_addr), protocol) {
Err(err) => error!("failed to write pull-response packet: {:?}", err), Err(err) => error!("failed to write pull-response packet: {:?}", err),
Ok(packet) => { Ok(packet) => {
if self.outbound_budget.take(packet.meta.size) { if self.outbound_budget.take(packet.meta.size) {
@ -2145,13 +2122,14 @@ impl ClusterInfo {
} }
} }
time.stop(); time.stop();
let dropped_responses = responses.len() - sent;
inc_new_counter_info!("gossip_pull_request-sent_requests", sent); inc_new_counter_info!("gossip_pull_request-sent_requests", sent);
inc_new_counter_info!("gossip_pull_request-dropped_requests", stats.len() - sent); inc_new_counter_info!("gossip_pull_request-dropped_requests", dropped_responses);
debug!( debug!(
"handle_pull_requests: {} sent: {} total: {} total_bytes: {}", "handle_pull_requests: {} sent: {} total: {} total_bytes: {}",
time, time,
sent, sent,
stats.len(), responses.len(),
total_bytes total_bytes
); );
packets packets
@ -2437,7 +2415,7 @@ impl ClusterInfo {
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
.into_iter() .into_iter()
.flat_map(|(from, prunes)| { .flat_map(|(from, prunes)| {
std::iter::repeat(from).zip( repeat(from).zip(
prunes prunes
.into_iter() .into_iter()
.chunks(MAX_PRUNE_DATA_NODES) .chunks(MAX_PRUNE_DATA_NODES)
@ -3890,7 +3868,7 @@ mod tests {
fn test_split_gossip_messages() { fn test_split_gossip_messages() {
const NUM_CRDS_VALUES: usize = 2048; const NUM_CRDS_VALUES: usize = 2048;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let values: Vec<_> = std::iter::repeat_with(|| CrdsValue::new_rand(&mut rng, None)) let values: Vec<_> = repeat_with(|| CrdsValue::new_rand(&mut rng, None))
.take(NUM_CRDS_VALUES) .take(NUM_CRDS_VALUES)
.collect(); .collect();
let splits: Vec<_> = let splits: Vec<_> =

View File

@ -503,8 +503,8 @@ impl CrdsGossipPull {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4);
//skip filters from callers that are too old //skip filters from callers that are too old
let future = now.saturating_add(msg_timeout); let caller_wallclock_window =
let past = now.saturating_sub(msg_timeout); now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
let mut dropped_requests = 0; let mut dropped_requests = 0;
let mut total_skipped = 0; let mut total_skipped = 0;
let ret: Vec<_> = filters let ret: Vec<_> = filters
@ -514,7 +514,7 @@ impl CrdsGossipPull {
return None; return None;
} }
let caller_wallclock = caller.wallclock(); let caller_wallclock = caller.wallclock();
if caller_wallclock >= future || caller_wallclock < past { if !caller_wallclock_window.contains(&caller_wallclock) {
dropped_requests += 1; dropped_requests += 1;
return Some(vec![]); return Some(vec![]);
} }