extends crds values timeouts if stakes are unknown (#17261)

If stakes are unknown, then timeouts will be short, resulting in values
being purged from the crds table, and consequently higher pull-response
load when they are obtained again from gossip. In particular, this slows
down validator start where almost all values obtained from entrypoint
are immediately discarded.
This commit is contained in:
behzad nouri 2021-05-21 15:55:22 +00:00 committed by GitHub
parent 44831c18d2
commit 2adce67260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 89 deletions

View File

@ -1720,21 +1720,14 @@ impl ClusterInfo {
fn handle_purge(
&self,
thread_pool: &ThreadPool,
bank_forks: &Option<Arc<RwLock<BankForks>>>,
bank_forks: Option<&RwLock<BankForks>>,
stakes: &HashMap<Pubkey, u64>,
) {
let timeout = {
if let Some(ref bank_forks) = bank_forks {
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT
} else {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
}
let epoch_duration = get_epoch_duration(bank_forks);
let timeouts = {
let gossip = self.gossip.read().unwrap();
gossip.make_timeouts(stakes, epoch_duration)
};
let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, timeout);
let num_purged = self
.time_gossip_write_lock("purge", &self.stats.purge)
.purge(thread_pool, timestamp(), &timeouts);
@ -1848,10 +1841,8 @@ impl ClusterInfo {
if exit.load(Ordering::Relaxed) {
return;
}
self.handle_purge(&thread_pool, &bank_forks, &stakes);
self.handle_purge(&thread_pool, bank_forks.as_deref(), &stakes);
entrypoints_processed = entrypoints_processed || self.process_entrypoints();
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
@ -2140,7 +2131,7 @@ impl ClusterInfo {
responses: Vec<(Pubkey, Vec<CrdsValue>)>,
thread_pool: &ThreadPool,
stakes: &HashMap<Pubkey, u64>,
epoch_time_ms: u64,
epoch_duration: Duration,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time);
if responses.is_empty() {
@ -2189,11 +2180,10 @@ impl ClusterInfo {
.reduce(HashMap::new, merge)
});
if !responses.is_empty() {
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let timeouts = {
let gossip = self.gossip.read().unwrap();
gossip.make_timeouts(&stakes, epoch_duration)
};
for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts);
}
@ -2479,28 +2469,6 @@ impl ClusterInfo {
let _ = response_sender.send(packets);
}
fn get_stakes_and_epoch_time(
bank_forks: Option<&Arc<RwLock<BankForks>>>,
) -> (
HashMap<Pubkey, u64>, // staked nodes
u64, // epoch time ms
) {
match bank_forks {
Some(ref bank_forks) => {
let bank = bank_forks.read().unwrap().root_bank();
let epoch = bank.epoch();
(
bank.staked_nodes(),
bank.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT,
)
}
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
(HashMap::new(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
}
}
}
fn require_stake_for_gossip(
&self,
feature_set: Option<&FeatureSet>,
@ -2536,7 +2504,7 @@ impl ClusterInfo {
response_sender: &PacketSender,
stakes: &HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
epoch_duration: Duration,
should_check_duplicate_instance: bool,
) -> Result<()> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
@ -2628,7 +2596,7 @@ impl ClusterInfo {
response_sender,
require_stake_for_gossip,
);
self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_time_ms);
self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
@ -2646,7 +2614,7 @@ impl ClusterInfo {
fn run_listen(
&self,
recycler: &PacketsRecycler,
bank_forks: Option<&Arc<RwLock<BankForks>>>,
bank_forks: Option<&RwLock<BankForks>>,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
thread_pool: &ThreadPool,
@ -2667,19 +2635,17 @@ impl ClusterInfo {
.add_relaxed(excess_count as u64);
}
}
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
// Using root_bank instead of working_bank here so that an enbaled
// feature does not roll back (if the feature happens to get enabled in
// a minority fork).
let feature_set = bank_forks.map(|bank_forks| {
bank_forks
.read()
.unwrap()
.root_bank()
.deref()
.feature_set
.clone()
});
let (feature_set, stakes) = match bank_forks {
None => (None, HashMap::default()),
Some(bank_forks) => {
let bank = bank_forks.read().unwrap().root_bank();
let feature_set = bank.feature_set.clone();
(Some(feature_set), bank.staked_nodes())
}
};
self.process_packets(
packets,
thread_pool,
@ -2687,7 +2653,7 @@ impl ClusterInfo {
response_sender,
&stakes,
feature_set.as_deref(),
epoch_time_ms,
get_epoch_duration(bank_forks),
should_check_duplicate_instance,
)?;
if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL {
@ -2719,7 +2685,7 @@ impl ClusterInfo {
while !exit.load(Ordering::Relaxed) {
if let Err(err) = self.run_listen(
&recycler,
bank_forks.as_ref(),
bank_forks.as_deref(),
&requests_receiver,
&response_sender,
&thread_pool,
@ -2791,6 +2757,23 @@ impl ClusterInfo {
}
}
// Returns root bank's epoch duration. Falls back on
// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT
// if there are no working banks.
fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>) -> Duration {
let num_slots = match bank_forks {
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
DEFAULT_SLOTS_PER_EPOCH
}
Some(bank_forks) => {
let bank = bank_forks.read().unwrap().root_bank();
bank.get_slots_in_epoch(bank.epoch())
}
};
Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT)
}
/// Turbine logic
/// 1 - For the current node find out if it is in layer 1
/// 1.1 - If yes, then broadcast to all layer 1 nodes
@ -3836,7 +3819,13 @@ mod tests {
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
let cluster_info = Arc::new(cluster_info);
let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test();
let timeouts = {
let gossip = cluster_info.gossip.read().unwrap();
gossip.make_timeouts(
&HashMap::default(), // stakes,
Duration::from_millis(gossip.pull.crds_timeout),
)
};
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
@ -4490,4 +4479,12 @@ mod tests {
CRDS_UNIQUE_PUBKEY_CAPACITY
);
}
#[test]
fn test_get_epoch_millis_no_bank() {
assert_eq!(
get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64,
DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours
);
}
}

View File

@ -27,6 +27,7 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::Mutex,
time::Duration,
};
pub struct CrdsGossip {
@ -297,16 +298,12 @@ impl CrdsGossip {
);
}
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
self.make_timeouts(&HashMap::new(), self.pull.crds_timeout)
}
pub fn make_timeouts(
&self,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
epoch_duration: Duration,
) -> HashMap<Pubkey, u64> {
self.pull.make_timeouts(&self.id, stakes, epoch_ms)
self.pull.make_timeouts(self.id, stakes, epoch_duration)
}
pub fn purge(
@ -322,9 +319,8 @@ impl CrdsGossip {
}
if now > self.pull.crds_timeout {
//sanity check
let min = self.pull.crds_timeout;
assert_eq!(timeouts[&self.id], std::u64::MAX);
assert_eq!(timeouts[&Pubkey::default()], min);
assert!(timeouts.contains_key(&Pubkey::default()));
rv = self
.pull
.purge_active(thread_pool, &mut self.crds, now, &timeouts);

View File

@ -34,7 +34,7 @@ use std::{
convert::TryInto,
net::SocketAddr,
sync::Mutex,
time::Instant,
time::{Duration, Instant},
};
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
@ -547,26 +547,28 @@ impl CrdsGossipPull {
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret
}
pub fn make_timeouts_def(
&self,
self_id: &Pubkey,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
min_ts: u64,
) -> HashMap<Pubkey, u64> {
let mut timeouts: HashMap<Pubkey, u64> = stakes.keys().map(|s| (*s, epoch_ms)).collect();
timeouts.insert(*self_id, std::u64::MAX);
timeouts.insert(Pubkey::default(), min_ts);
timeouts
}
pub fn make_timeouts(
pub(crate) fn make_timeouts(
&self,
self_id: &Pubkey,
self_pubkey: Pubkey,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
epoch_duration: Duration,
) -> HashMap<Pubkey, u64> {
self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout)
let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64);
let default_timeout = if stakes.values().all(|stake| *stake == 0) {
extended_timeout
} else {
self.crds_timeout
};
stakes
.iter()
.filter(|(_, stake)| **stake > 0)
.map(|(pubkey, _)| (*pubkey, extended_timeout))
.chain(vec![
(Pubkey::default(), default_timeout),
(self_pubkey, u64::MAX),
])
.collect()
}
/// Purge values from the crds that are older then `active_timeout`
@ -1340,7 +1342,7 @@ mod test {
.process_pull_response(
&mut node_crds,
&node_pubkey,
&node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
&node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()),
rsp.into_iter().flatten().collect(),
1,
)
@ -1389,8 +1391,8 @@ mod test {
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
// purge
let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts);
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
node.purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts);
//verify self is still valid after purge
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
@ -1406,7 +1408,7 @@ mod test {
}
// purge the value
node.purge_purged(3);
node.purge_purged(node.crds_timeout + 1);
assert_eq!(node.purged_values.len(), 0);
}
#[test]

View File

@ -333,7 +333,10 @@ fn network_run_push(
.par_iter()
.map(|node| {
let mut node_lock = node.lock().unwrap();
let timeouts = node_lock.make_timeouts_test();
let timeouts = node_lock.make_timeouts(
&HashMap::default(), // stakes
Duration::from_millis(node_lock.pull.crds_timeout),
);
node_lock.purge(thread_pool, now, &timeouts);
(node_lock.id, node_lock.new_push_messages(vec![], now))
})