From 65a9658b13e2df76b2598db59c44041308721beb Mon Sep 17 00:00:00 2001 From: sakridge Date: Sat, 18 Apr 2020 22:11:17 -0700 Subject: [PATCH] Budget for gossip traffic (#9550) --- core/src/cluster_info.rs | 165 ++++++++++++++++++++++++++++++++++----- 1 file changed, 144 insertions(+), 21 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index fdf86b7584..aa825b495a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -24,6 +24,11 @@ use crate::{ result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; + +use rand::distributions::{Distribution, WeightedIndex}; +use rand::SeedableRng; +use rand_chacha::ChaChaRng; + use bincode::{serialize, serialized_size}; use core::cmp; use itertools::Itertools; @@ -31,6 +36,7 @@ use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; use solana_ledger::{bank_forks::BankForks, staking_utils}; +use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_net_utils::{ @@ -94,6 +100,12 @@ pub enum ClusterInfoError { BadGossipAddress, } #[derive(Clone)] +pub struct DataBudget { + bytes: usize, // amount of bytes we have in the budget to send + last_timestamp_ms: u64, // Last time that we upped the bytes count, + // used to detect when to up the bytes budget again +} +#[derive(Clone)] pub struct ClusterInfo { /// The network pub gossip: CrdsGossip, @@ -101,6 +113,8 @@ pub struct ClusterInfo { pub(crate) keypair: Arc, /// The network entrypoint entrypoint: Option, + + outbound_budget: DataBudget, } #[derive(Default, Clone)] @@ -197,6 +211,17 @@ enum Protocol { PruneMessage(Pubkey, PruneData), } +// Rating for pull requests +// A response table is generated as a +// 2-d table arranged by target nodes and a +// list of responses for that node, +// to/responses_index is a location in that table. +struct ResponseScore { + to: usize, // to, index of who the response is to + responses_index: usize, // index into the list of responses for a given to + score: u64, // Relative score of the response +} + impl ClusterInfo { /// Without a valid keypair gossip will not function. Only useful for tests. pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { @@ -208,6 +233,10 @@ impl ClusterInfo { gossip: CrdsGossip::default(), keypair, entrypoint: None, + outbound_budget: DataBudget { + bytes: 0, + last_timestamp_ms: 0, + }, }; let id = contact_info.id; me.gossip.set_self(&id); @@ -1335,20 +1364,43 @@ impl ClusterInfo { }) }); // process the collected pulls together - let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data); + let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } } + // Pull requests take an incoming bloom filter of contained entries from a node + // and tries to send back to them the values it detects are missing. fn handle_pull_requests( me: &Arc>, recycler: &PacketsRecycler, requests: Vec, + stakes: &HashMap, ) -> Option { // split the requests into addrs and filters let mut caller_and_filters = vec![]; let mut addrs = vec![]; + let mut time = Measure::start("handle_pull_requests"); + { + let mut cluster_info = me.write().unwrap(); + + let now = timestamp(); + const INTERVAL_MS: u64 = 100; + // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s + const BYTES_PER_INTERVAL: usize = 5000; + const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default + + if now - cluster_info.outbound_budget.last_timestamp_ms > INTERVAL_MS { + let len = std::cmp::max(stakes.len(), 2); + cluster_info.outbound_budget.bytes += len * BYTES_PER_INTERVAL; + cluster_info.outbound_budget.bytes = std::cmp::min( + cluster_info.outbound_budget.bytes, + MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, + ); + cluster_info.outbound_budget.last_timestamp_ms = now; + } + } for pull_data in requests { caller_and_filters.push((pull_data.caller, pull_data.filter)); addrs.push(pull_data.from_addr); @@ -1360,30 +1412,101 @@ impl ClusterInfo { .unwrap() .gossip .process_pull_requests(caller_and_filters, now); - let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); - pull_responses + + // Filter bad to addresses + let pull_responses: Vec<_> = pull_responses .into_iter() .zip(addrs.into_iter()) - .for_each(|(response, from_addr)| { - if !from_addr.ip().is_unspecified() && from_addr.port() != 0 { - let len = response.len(); - trace!("get updates since response {}", len); - inc_new_counter_debug!("cluster_info-pull_request-rsp", len); - Self::split_gossip_messages(response) - .into_iter() - .for_each(|payload| { - let protocol = Protocol::PullResponse(self_id, payload); - // The remote node may not know its public IP:PORT. Instead of responding to the caller's - // gossip addr, respond to the origin addr. The last origin addr is picked from the list of - // addrs. - packets - .packets - .push(Packet::from_data(&from_addr, protocol)) - }) + .filter_map(|(responses, from_addr)| { + if !from_addr.ip().is_unspecified() + && from_addr.port() != 0 + && !responses.is_empty() + { + Some((responses, from_addr)) } else { - trace!("Dropping Gossip pull response, as destination is unknown"); + None } - }); + }) + .collect(); + + if pull_responses.is_empty() { + return None; + } + + let mut stats: Vec<_> = pull_responses + .iter() + .enumerate() + .map(|(i, (responses, _from_addr))| { + let score: u64 = if stakes.get(&responses[0].pubkey()).is_some() { + 2 + } else { + 1 + }; + responses + .iter() + .enumerate() + .map(|(j, _response)| ResponseScore { + to: i, + responses_index: j, + score, + }) + .collect::>() + }) + .flatten() + .collect(); + + stats.sort_by(|a, b| a.score.cmp(&b.score)); + let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect(); + + let seed = [48u8; 32]; + let rng = &mut ChaChaRng::from_seed(seed); + let weighted_index = WeightedIndex::new(weights).unwrap(); + + let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let mut total_bytes = 0; + let outbound_budget = me.read().unwrap().outbound_budget.bytes; + let mut sent = HashSet::new(); + while sent.len() < stats.len() { + let index = weighted_index.sample(rng); + if sent.contains(&index) { + continue; + } + sent.insert(index); + let stat = &stats[index]; + let from_addr = pull_responses[stat.to].1; + let response = pull_responses[stat.to].0[stat.responses_index].clone(); + let protocol = Protocol::PullResponse(self_id, vec![response]); + packets + .packets + .push(Packet::from_data(&from_addr, protocol)); + let len = packets.packets.len(); + total_bytes += packets.packets[len - 1].meta.size; + + if total_bytes > outbound_budget { + inc_new_counter_info!("gossip_pull_request-no_budget", 1); + break; + } + } + { + let mut cluster_info = me.write().unwrap(); + cluster_info.outbound_budget.bytes = cluster_info + .outbound_budget + .bytes + .saturating_sub(total_bytes); + } + time.stop(); + inc_new_counter_info!("gossip_pull_request-sent_requests", sent.len()); + inc_new_counter_info!( + "gossip_pull_request-dropped_requests", + stats.len() - sent.len() + ); + debug!( + "handle_pull_requests: {} sent: {} total: {} total_bytes: {}", + time, + sent.len(), + stats.len(), + total_bytes + ); if packets.is_empty() { return None; }