Budget for gossip traffic (#9550)

This commit is contained in:
sakridge 2020-04-18 22:11:17 -07:00 committed by GitHub
parent 3205361163
commit 65a9658b13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 144 additions and 21 deletions

View File

@ -24,6 +24,11 @@ use crate::{
result::{Error, Result},
weighted_shuffle::weighted_shuffle,
};
use rand::distributions::{Distribution, WeightedIndex};
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use bincode::{serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
@ -31,6 +36,7 @@ use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::{bank_forks::BankForks, staking_utils};
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_net_utils::{
@ -94,6 +100,12 @@ pub enum ClusterInfoError {
BadGossipAddress,
}
#[derive(Clone)]
pub struct DataBudget {
bytes: usize, // amount of bytes we have in the budget to send
last_timestamp_ms: u64, // Last time that we upped the bytes count,
// used to detect when to up the bytes budget again
}
#[derive(Clone)]
pub struct ClusterInfo {
/// The network
pub gossip: CrdsGossip,
@ -101,6 +113,8 @@ pub struct ClusterInfo {
pub(crate) keypair: Arc<Keypair>,
/// The network entrypoint
entrypoint: Option<ContactInfo>,
outbound_budget: DataBudget,
}
#[derive(Default, Clone)]
@ -197,6 +211,17 @@ enum Protocol {
PruneMessage(Pubkey, PruneData),
}
// Rating for pull requests
// A response table is generated as a
// 2-d table arranged by target nodes and a
// list of responses for that node,
// to/responses_index is a location in that table.
struct ResponseScore {
to: usize, // to, index of who the response is to
responses_index: usize, // index into the list of responses for a given to
score: u64, // Relative score of the response
}
impl ClusterInfo {
/// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
@ -208,6 +233,10 @@ impl ClusterInfo {
gossip: CrdsGossip::default(),
keypair,
entrypoint: None,
outbound_budget: DataBudget {
bytes: 0,
last_timestamp_ms: 0,
},
};
let id = contact_info.id;
me.gossip.set_self(&id);
@ -1335,20 +1364,43 @@ impl ClusterInfo {
})
});
// process the collected pulls together
let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data);
let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
}
// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
me: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
// split the requests into addrs and filters
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
{
let mut cluster_info = me.write().unwrap();
let now = timestamp();
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default
if now - cluster_info.outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
cluster_info.outbound_budget.bytes += len * BYTES_PER_INTERVAL;
cluster_info.outbound_budget.bytes = std::cmp::min(
cluster_info.outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
cluster_info.outbound_budget.last_timestamp_ms = now;
}
}
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
@ -1360,30 +1412,101 @@ impl ClusterInfo {
.unwrap()
.gossip
.process_pull_requests(caller_and_filters, now);
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
pull_responses
// Filter bad to addresses
let pull_responses: Vec<_> = pull_responses
.into_iter()
.zip(addrs.into_iter())
.for_each(|(response, from_addr)| {
if !from_addr.ip().is_unspecified() && from_addr.port() != 0 {
let len = response.len();
trace!("get updates since response {}", len);
inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
Self::split_gossip_messages(response)
.into_iter()
.for_each(|payload| {
let protocol = Protocol::PullResponse(self_id, payload);
// The remote node may not know its public IP:PORT. Instead of responding to the caller's
// gossip addr, respond to the origin addr. The last origin addr is picked from the list of
// addrs.
packets
.packets
.push(Packet::from_data(&from_addr, protocol))
})
.filter_map(|(responses, from_addr)| {
if !from_addr.ip().is_unspecified()
&& from_addr.port() != 0
&& !responses.is_empty()
{
Some((responses, from_addr))
} else {
trace!("Dropping Gossip pull response, as destination is unknown");
None
}
});
})
.collect();
if pull_responses.is_empty() {
return None;
}
let mut stats: Vec<_> = pull_responses
.iter()
.enumerate()
.map(|(i, (responses, _from_addr))| {
let score: u64 = if stakes.get(&responses[0].pubkey()).is_some() {
2
} else {
1
};
responses
.iter()
.enumerate()
.map(|(j, _response)| ResponseScore {
to: i,
responses_index: j,
score,
})
.collect::<Vec<ResponseScore>>()
})
.flatten()
.collect();
stats.sort_by(|a, b| a.score.cmp(&b.score));
let weights: Vec<_> = stats.iter().map(|stat| stat.score).collect();
let seed = [48u8; 32];
let rng = &mut ChaChaRng::from_seed(seed);
let weighted_index = WeightedIndex::new(weights).unwrap();
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut total_bytes = 0;
let outbound_budget = me.read().unwrap().outbound_budget.bytes;
let mut sent = HashSet::new();
while sent.len() < stats.len() {
let index = weighted_index.sample(rng);
if sent.contains(&index) {
continue;
}
sent.insert(index);
let stat = &stats[index];
let from_addr = pull_responses[stat.to].1;
let response = pull_responses[stat.to].0[stat.responses_index].clone();
let protocol = Protocol::PullResponse(self_id, vec![response]);
packets
.packets
.push(Packet::from_data(&from_addr, protocol));
let len = packets.packets.len();
total_bytes += packets.packets[len - 1].meta.size;
if total_bytes > outbound_budget {
inc_new_counter_info!("gossip_pull_request-no_budget", 1);
break;
}
}
{
let mut cluster_info = me.write().unwrap();
cluster_info.outbound_budget.bytes = cluster_info
.outbound_budget
.bytes
.saturating_sub(total_bytes);
}
time.stop();
inc_new_counter_info!("gossip_pull_request-sent_requests", sent.len());
inc_new_counter_info!(
"gossip_pull_request-dropped_requests",
stats.len() - sent.len()
);
debug!(
"handle_pull_requests: {} sent: {} total: {} total_bytes: {}",
time,
sent.len(),
stats.len(),
total_bytes
);
if packets.is_empty() {
return None;
}