Gossip cleanup remove duplicate gossip metrics and name worker threads (#10435)

Refactor into functions
This commit is contained in:
sakridge 2020-06-06 15:05:45 -07:00 committed by GitHub
parent ebb612ab4e
commit 0645a0c96d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 74 additions and 66 deletions

View File

@ -144,7 +144,6 @@ impl<'a> DerefMut for GossipWriteLock<'a> {
impl<'a> Drop for GossipWriteLock<'a> {
fn drop(&mut self) {
self.timer.stop();
self.counter.add_measure(&mut self.timer);
}
}
@ -178,7 +177,6 @@ impl<'a> Deref for GossipReadLock<'a> {
impl<'a> Drop for GossipReadLock<'a> {
fn drop(&mut self) {
self.timer.stop();
self.counter.add_measure(&mut self.timer);
}
}
@ -1393,7 +1391,11 @@ impl ClusterInfo {
messages
}
fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
// Generate new push and pull requests
fn generate_new_gossip_requests(
&self,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(SocketAddr, Protocol)> {
let pulls: Vec<_> = self.new_pull_requests(stakes);
let pushes: Vec<_> = self.new_push_requests();
vec![pulls, pushes].into_iter().flatten().collect()
@ -1406,7 +1408,7 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
sender: &PacketSender,
) -> Result<()> {
let reqs = obj.gossip_request(&stakes);
let reqs = obj.generate_new_gossip_requests(&stakes);
if !reqs.is_empty() {
let packets = to_packets_with_destination(recycler.clone(), &reqs);
sender.send(packets)?;
@ -1414,6 +1416,65 @@ impl ClusterInfo {
Ok(())
}
fn handle_adopt_shred_version(obj: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone());
if let Some(entrypoint) = entrypoint {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id
);
obj.my_contact_info.write().unwrap().shred_version =
entrypoint.shred_version;
obj.gossip
.write()
.unwrap()
.set_shred_version(entrypoint.shred_version);
obj.insert_self();
*adopt_shred_version = false;
}
}
}
}
}
fn handle_purge(
obj: &Arc<Self>,
bank_forks: &Option<Arc<RwLock<BankForks>>>,
stakes: &HashMap<Pubkey, u64>,
) {
let timeout = {
if let Some(ref bank_forks) = bank_forks {
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT
} else {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
}
};
let timeouts = obj.gossip.read().unwrap().make_timeouts(stakes, timeout);
let num_purged = obj
.time_gossip_write_lock("purge", &obj.stats.purge)
.purge(timestamp(), &timeouts);
inc_new_counter_info!("cluster_info-purge-count", num_purged);
let table_size = obj.gossip.read().unwrap().crds.table.len();
datapoint_debug!(
"cluster_info-purge",
("table_size", table_size as i64, i64),
("purge_stake_timeout", timeout as i64, i64)
);
}
/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(
obj: Arc<Self>,
@ -1447,60 +1508,16 @@ impl ClusterInfo {
}
None => HashMap::new(),
};
let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender);
if exit.load(Ordering::Relaxed) {
return;
}
let timeout = {
if let Some(ref bank_forks) = bank_forks {
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT
} else {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
}
};
let timeouts = obj.gossip.read().unwrap().make_timeouts(&stakes, timeout);
let num_purged = obj
.time_gossip_write_lock("purge", &obj.stats.purge)
.purge(timestamp(), &timeouts);
inc_new_counter_info!("cluster_info-purge-count", num_purged);
let table_size = obj.gossip.read().unwrap().crds.table.len();
datapoint_debug!(
"cluster_info-purge",
("table_size", table_size as i64, i64),
("purge_stake_timeout", timeout as i64, i64)
);
// Adopt the entrypoint's `shred_version` if ours is unset
if adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint =
obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone());
if let Some(entrypoint) = entrypoint {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id
);
obj.my_contact_info.write().unwrap().shred_version =
entrypoint.shred_version;
obj.gossip
.write()
.unwrap()
.set_shred_version(entrypoint.shred_version);
obj.insert_self();
adopt_shred_version = false;
}
}
}
}
Self::handle_purge(&obj, &bank_forks, &stakes);
Self::handle_adopt_shred_version(&obj, &mut adopt_shred_version);
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
@ -2009,7 +2026,7 @@ impl ClusterInfo {
("tvu_peers", self.stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
self.stats.new_push_requests2.clear(),
self.stats.new_push_requests_num.clear(),
i64
),
);
@ -2038,21 +2055,11 @@ impl ClusterInfo {
self.stats.process_pull_response_count.clear(),
i64
),
(
"process_pull_resp_success",
self.stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
i64
),
(
"process_pull_resp_fail",
self.stats.process_pull_response_fail.clear(),
i64
),
(
"push_response_count",
self.stats.push_response_count.clear(),
@ -2167,6 +2174,7 @@ impl ClusterInfo {
.spawn(move || {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("sol-gossip-work-{}", i))
.build()
.unwrap();
let mut last_print = Instant::now();
@ -2559,7 +2567,7 @@ mod tests {
.write()
.unwrap()
.refresh_push_active_set(&HashMap::new());
let reqs = cluster_info.gossip_request(&HashMap::new());
let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new());
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr);