moves cluster-info metrics to a separate module (#16883)

This commit is contained in:
behzad nouri 2021-04-28 02:04:49 +00:00 committed by GitHub
parent 36574c30ef
commit b17d5eeaee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 399 additions and 427 deletions

View File

@ -13,6 +13,7 @@
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
@ -70,8 +71,10 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard},
sync::{
atomic::{AtomicBool, Ordering},
{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard},
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -199,112 +202,6 @@ impl<'a> Drop for GossipReadLock<'a> {
}
}
#[derive(Default)]
struct Counter(AtomicU64);
impl Counter {
fn add_measure(&self, x: &mut Measure) {
x.stop();
self.0.fetch_add(x.as_us(), Ordering::Relaxed);
}
fn add_relaxed(&self, x: u64) {
self.0.fetch_add(x, Ordering::Relaxed);
}
fn clear(&self) -> u64 {
self.0.swap(0, Ordering::Relaxed)
}
}
struct ScopedTimer<'a> {
clock: Instant,
metric: &'a AtomicU64,
}
impl<'a> From<&'a Counter> for ScopedTimer<'a> {
// Output should be assigned to a *named* variable,
// otherwise it is immediately dropped.
#[must_use]
fn from(counter: &'a Counter) -> Self {
Self {
clock: Instant::now(),
metric: &counter.0,
}
}
}
impl Drop for ScopedTimer<'_> {
fn drop(&mut self) {
let micros = self.clock.elapsed().as_micros();
self.metric.fetch_add(micros as u64, Ordering::Relaxed);
}
}
#[derive(Default)]
struct GossipStats {
entrypoint: Counter,
entrypoint2: Counter,
gossip_packets_dropped_count: Counter,
push_vote_read: Counter,
get_votes: Counter,
get_accounts_hash: Counter,
all_tvu_peers: Counter,
tvu_peers: Counter,
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
new_push_requests_num: Counter,
filter_pull_response: Counter,
handle_batch_ping_messages_time: Counter,
handle_batch_pong_messages_time: Counter,
handle_batch_prune_messages_time: Counter,
handle_batch_pull_requests_time: Counter,
handle_batch_pull_responses_time: Counter,
handle_batch_push_messages_time: Counter,
packets_received_count: Counter,
packets_received_prune_messages_count: Counter,
packets_received_pull_requests_count: Counter,
packets_received_pull_responses_count: Counter,
packets_received_push_messages_count: Counter,
packets_received_verified_count: Counter,
packets_sent_gossip_requests_count: Counter,
packets_sent_prune_messages_count: Counter,
packets_sent_pull_requests_count: Counter,
packets_sent_pull_responses_count: Counter,
packets_sent_push_messages_count: Counter,
process_gossip_packets_time: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_response_fail_insert: Counter,
process_pull_response_fail_timeout: Counter,
process_pull_response_success: Counter,
process_pull_requests: Counter,
generate_pull_responses: Counter,
process_prune: Counter,
process_push_message: Counter,
prune_received_cache: Counter,
prune_message_count: Counter,
prune_message_len: Counter,
pull_request_ping_pong_check_failed_count: Counter,
purge: Counter,
require_stake_for_gossip_unknown_feature_set: Counter,
require_stake_for_gossip_unknown_stakes: Counter,
trim_crds_table_failed: Counter,
trim_crds_table_purged_values_count: Counter,
epoch_slots_lookup: Counter,
new_pull_requests: Counter,
new_pull_requests_count: Counter,
mark_pull_request: Counter,
skip_pull_response_shred_version: Counter,
skip_pull_shred_version: Counter,
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
push_response_count: Counter,
pull_requests_count: Counter,
}
pub struct ClusterInfo {
/// The network
pub gossip: RwLock<CrdsGossip>,
@ -332,17 +229,17 @@ impl Default for ClusterInfo {
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)]
pub struct PruneData {
struct PruneData {
/// Pubkey of the node that sent this prune data
pub pubkey: Pubkey,
pubkey: Pubkey,
/// Pubkeys of nodes that should be pruned
pub prunes: Vec<Pubkey>,
prunes: Vec<Pubkey>,
/// Signature of this Prune Message
pub signature: Signature,
signature: Signature,
/// The Pubkey of the intended node/destination for this message
pub destination: Pubkey,
destination: Pubkey,
/// Wallclock of the node that generated this message
pub wallclock: u64,
wallclock: u64,
}
impl PruneData {
@ -407,9 +304,9 @@ impl Signable for PruneData {
}
struct PullData {
pub from_addr: SocketAddr,
pub caller: CrdsValue,
pub filter: CrdsFilter,
from_addr: SocketAddr,
caller: CrdsValue,
filter: CrdsFilter,
}
pub fn make_accounts_hashes_message(
@ -2825,319 +2722,11 @@ impl ClusterInfo {
epoch_time_ms,
should_check_duplicate_instance,
)?;
self.print_reset_stats(last_print);
Ok(())
}
fn print_reset_stats(&self, last_print: &mut Instant) {
if last_print.elapsed().as_millis() > 2000 {
let (table_size, purged_values_size, failed_inserts_size) = {
let r_gossip = self.gossip.read().unwrap();
(
r_gossip.crds.len(),
r_gossip.pull.purged_values.len(),
r_gossip.pull.failed_inserts.len(),
)
};
datapoint_info!(
"cluster_info_stats",
("entrypoint", self.stats.entrypoint.clear(), i64),
("entrypoint2", self.stats.entrypoint2.clear(), i64),
("push_vote_read", self.stats.push_vote_read.clear(), i64),
("get_votes", self.stats.get_votes.clear(), i64),
(
"get_accounts_hash",
self.stats.get_accounts_hash.clear(),
i64
),
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
("tvu_peers", self.stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
self.stats.new_push_requests_num.clear(),
i64
),
("table_size", table_size as i64, i64),
("purged_values_size", purged_values_size as i64, i64),
("failed_inserts_size", failed_inserts_size as i64, i64),
);
datapoint_info!(
"cluster_info_stats2",
(
"gossip_packets_dropped_count",
self.stats.gossip_packets_dropped_count.clear(),
i64
),
("repair_peers", self.stats.repair_peers.clear(), i64),
(
"new_push_requests",
self.stats.new_push_requests.clear(),
i64
),
(
"new_push_requests2",
self.stats.new_push_requests2.clear(),
i64
),
("purge", self.stats.purge.clear(), i64),
(
"process_gossip_packets_time",
self.stats.process_gossip_packets_time.clear(),
i64
),
(
"handle_batch_ping_messages_time",
self.stats.handle_batch_ping_messages_time.clear(),
i64
),
(
"handle_batch_pong_messages_time",
self.stats.handle_batch_pong_messages_time.clear(),
i64
),
(
"handle_batch_prune_messages_time",
self.stats.handle_batch_prune_messages_time.clear(),
i64
),
(
"handle_batch_pull_requests_time",
self.stats.handle_batch_pull_requests_time.clear(),
i64
),
(
"handle_batch_pull_responses_time",
self.stats.handle_batch_pull_responses_time.clear(),
i64
),
(
"handle_batch_push_messages_time",
self.stats.handle_batch_push_messages_time.clear(),
i64
),
(
"process_pull_resp",
self.stats.process_pull_response.clear(),
i64
),
(
"filter_pull_resp",
self.stats.filter_pull_response.clear(),
i64
),
(
"process_pull_resp_count",
self.stats.process_pull_response_count.clear(),
i64
),
(
"pull_response_fail_insert",
self.stats.process_pull_response_fail_insert.clear(),
i64
),
(
"pull_response_fail_timeout",
self.stats.process_pull_response_fail_timeout.clear(),
i64
),
(
"pull_response_success",
self.stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
i64
),
(
"push_response_count",
self.stats.push_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
(
"process_pull_resp_len",
self.stats.process_pull_response_len.clear(),
i64
),
(
"process_pull_requests",
self.stats.process_pull_requests.clear(),
i64
),
(
"pull_request_ping_pong_check_failed_count",
self.stats.pull_request_ping_pong_check_failed_count.clear(),
i64
),
(
"generate_pull_responses",
self.stats.generate_pull_responses.clear(),
i64
),
("process_prune", self.stats.process_prune.clear(), i64),
(
"process_push_message",
self.stats.process_push_message.clear(),
i64
),
(
"prune_received_cache",
self.stats.prune_received_cache.clear(),
i64
),
(
"epoch_slots_lookup",
self.stats.epoch_slots_lookup.clear(),
i64
),
(
"new_pull_requests",
self.stats.new_pull_requests.clear(),
i64
),
(
"mark_pull_request",
self.stats.mark_pull_request.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats4",
(
"skip_push_message_shred_version",
self.stats.skip_push_message_shred_version.clear(),
i64
),
(
"skip_pull_response_shred_version",
self.stats.skip_pull_response_shred_version.clear(),
i64
),
(
"skip_pull_shred_version",
self.stats.skip_pull_shred_version.clear(),
i64
),
(
"push_message_count",
self.stats.push_message_count.clear(),
i64
),
(
"push_message_value_count",
self.stats.push_message_value_count.clear(),
i64
),
(
"new_pull_requests_count",
self.stats.new_pull_requests_count.clear(),
i64
),
(
"prune_message_count",
self.stats.prune_message_count.clear(),
i64
),
(
"prune_message_len",
self.stats.prune_message_len.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats5",
(
"pull_requests_count",
self.stats.pull_requests_count.clear(),
i64
),
(
"packets_received_count",
self.stats.packets_received_count.clear(),
i64
),
(
"packets_received_prune_messages_count",
self.stats.packets_received_prune_messages_count.clear(),
i64
),
(
"packets_received_pull_requests_count",
self.stats.packets_received_pull_requests_count.clear(),
i64
),
(
"packets_received_pull_responses_count",
self.stats.packets_received_pull_responses_count.clear(),
i64
),
(
"packets_received_push_messages_count",
self.stats.packets_received_push_messages_count.clear(),
i64
),
(
"packets_received_verified_count",
self.stats.packets_received_verified_count.clear(),
i64
),
(
"packets_sent_gossip_requests_count",
self.stats.packets_sent_gossip_requests_count.clear(),
i64
),
(
"packets_sent_prune_messages_count",
self.stats.packets_sent_prune_messages_count.clear(),
i64
),
(
"packets_sent_pull_requests_count",
self.stats.packets_sent_pull_requests_count.clear(),
i64
),
(
"packets_sent_pull_responses_count",
self.stats.packets_sent_pull_responses_count.clear(),
i64
),
(
"packets_sent_push_messages_count",
self.stats.packets_sent_push_messages_count.clear(),
i64
),
(
"require_stake_for_gossip_unknown_feature_set",
self.stats
.require_stake_for_gossip_unknown_feature_set
.clear(),
i64
),
(
"require_stake_for_gossip_unknown_stakes",
self.stats.require_stake_for_gossip_unknown_stakes.clear(),
i64
),
(
"trim_crds_table_failed",
self.stats.trim_crds_table_failed.clear(),
i64
),
(
"trim_crds_table_purged_values_count",
self.stats.trim_crds_table_purged_values_count.clear(),
i64
),
);
submit_gossip_stats(&self.stats, &self.gossip);
*last_print = Instant::now();
}
Ok(())
}
pub fn listen(

View File

@ -0,0 +1,382 @@
use crate::crds_gossip::CrdsGossip;
use solana_measure::measure::Measure;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
RwLock,
},
time::Instant,
};
#[derive(Default)]
pub(crate) struct Counter(AtomicU64);
impl Counter {
pub(crate) fn add_measure(&self, x: &mut Measure) {
x.stop();
self.0.fetch_add(x.as_us(), Ordering::Relaxed);
}
pub(crate) fn add_relaxed(&self, x: u64) {
self.0.fetch_add(x, Ordering::Relaxed);
}
fn clear(&self) -> u64 {
self.0.swap(0, Ordering::Relaxed)
}
}
pub(crate) struct ScopedTimer<'a> {
clock: Instant,
metric: &'a AtomicU64,
}
impl<'a> From<&'a Counter> for ScopedTimer<'a> {
// Output should be assigned to a *named* variable, otherwise it is
// immediately dropped.
#[must_use]
fn from(counter: &'a Counter) -> Self {
Self {
clock: Instant::now(),
metric: &counter.0,
}
}
}
impl Drop for ScopedTimer<'_> {
fn drop(&mut self) {
let micros = self.clock.elapsed().as_micros();
self.metric.fetch_add(micros as u64, Ordering::Relaxed);
}
}
#[derive(Default)]
pub(crate) struct GossipStats {
pub(crate) all_tvu_peers: Counter,
pub(crate) entrypoint2: Counter,
pub(crate) entrypoint: Counter,
pub(crate) epoch_slots_lookup: Counter,
pub(crate) filter_pull_response: Counter,
pub(crate) generate_pull_responses: Counter,
pub(crate) get_accounts_hash: Counter,
pub(crate) get_votes: Counter,
pub(crate) gossip_packets_dropped_count: Counter,
pub(crate) handle_batch_ping_messages_time: Counter,
pub(crate) handle_batch_pong_messages_time: Counter,
pub(crate) handle_batch_prune_messages_time: Counter,
pub(crate) handle_batch_pull_requests_time: Counter,
pub(crate) handle_batch_pull_responses_time: Counter,
pub(crate) handle_batch_push_messages_time: Counter,
pub(crate) mark_pull_request: Counter,
pub(crate) new_pull_requests: Counter,
pub(crate) new_pull_requests_count: Counter,
pub(crate) new_push_requests2: Counter,
pub(crate) new_push_requests: Counter,
pub(crate) new_push_requests_num: Counter,
pub(crate) packets_received_count: Counter,
pub(crate) packets_received_prune_messages_count: Counter,
pub(crate) packets_received_pull_requests_count: Counter,
pub(crate) packets_received_pull_responses_count: Counter,
pub(crate) packets_received_push_messages_count: Counter,
pub(crate) packets_received_verified_count: Counter,
pub(crate) packets_sent_gossip_requests_count: Counter,
pub(crate) packets_sent_prune_messages_count: Counter,
pub(crate) packets_sent_pull_requests_count: Counter,
pub(crate) packets_sent_pull_responses_count: Counter,
pub(crate) packets_sent_push_messages_count: Counter,
pub(crate) process_gossip_packets_time: Counter,
pub(crate) process_prune: Counter,
pub(crate) process_pull_requests: Counter,
pub(crate) process_pull_response: Counter,
pub(crate) process_pull_response_count: Counter,
pub(crate) process_pull_response_fail_insert: Counter,
pub(crate) process_pull_response_fail_timeout: Counter,
pub(crate) process_pull_response_len: Counter,
pub(crate) process_pull_response_success: Counter,
pub(crate) process_pull_response_timeout: Counter,
pub(crate) process_push_message: Counter,
pub(crate) prune_message_count: Counter,
pub(crate) prune_message_len: Counter,
pub(crate) prune_received_cache: Counter,
pub(crate) pull_request_ping_pong_check_failed_count: Counter,
pub(crate) pull_requests_count: Counter,
pub(crate) purge: Counter,
pub(crate) push_message_count: Counter,
pub(crate) push_message_value_count: Counter,
pub(crate) push_response_count: Counter,
pub(crate) push_vote_read: Counter,
pub(crate) repair_peers: Counter,
pub(crate) require_stake_for_gossip_unknown_feature_set: Counter,
pub(crate) require_stake_for_gossip_unknown_stakes: Counter,
pub(crate) skip_pull_response_shred_version: Counter,
pub(crate) skip_pull_shred_version: Counter,
pub(crate) skip_push_message_shred_version: Counter,
pub(crate) trim_crds_table_failed: Counter,
pub(crate) trim_crds_table_purged_values_count: Counter,
pub(crate) tvu_peers: Counter,
}
pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock<CrdsGossip>) {
let (table_size, purged_values_size, failed_inserts_size) = {
let gossip = gossip.read().unwrap();
(
gossip.crds.len(),
gossip.pull.purged_values.len(),
gossip.pull.failed_inserts.len(),
)
};
datapoint_info!(
"cluster_info_stats",
("entrypoint", stats.entrypoint.clear(), i64),
("entrypoint2", stats.entrypoint2.clear(), i64),
("push_vote_read", stats.push_vote_read.clear(), i64),
("get_votes", stats.get_votes.clear(), i64),
("get_accounts_hash", stats.get_accounts_hash.clear(), i64),
("all_tvu_peers", stats.all_tvu_peers.clear(), i64),
("tvu_peers", stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
stats.new_push_requests_num.clear(),
i64
),
("table_size", table_size as i64, i64),
("purged_values_size", purged_values_size as i64, i64),
("failed_inserts_size", failed_inserts_size as i64, i64),
);
datapoint_info!(
"cluster_info_stats2",
(
"gossip_packets_dropped_count",
stats.gossip_packets_dropped_count.clear(),
i64
),
("repair_peers", stats.repair_peers.clear(), i64),
("new_push_requests", stats.new_push_requests.clear(), i64),
("new_push_requests2", stats.new_push_requests2.clear(), i64),
("purge", stats.purge.clear(), i64),
(
"process_gossip_packets_time",
stats.process_gossip_packets_time.clear(),
i64
),
(
"handle_batch_ping_messages_time",
stats.handle_batch_ping_messages_time.clear(),
i64
),
(
"handle_batch_pong_messages_time",
stats.handle_batch_pong_messages_time.clear(),
i64
),
(
"handle_batch_prune_messages_time",
stats.handle_batch_prune_messages_time.clear(),
i64
),
(
"handle_batch_pull_requests_time",
stats.handle_batch_pull_requests_time.clear(),
i64
),
(
"handle_batch_pull_responses_time",
stats.handle_batch_pull_responses_time.clear(),
i64
),
(
"handle_batch_push_messages_time",
stats.handle_batch_push_messages_time.clear(),
i64
),
(
"process_pull_resp",
stats.process_pull_response.clear(),
i64
),
("filter_pull_resp", stats.filter_pull_response.clear(), i64),
(
"process_pull_resp_count",
stats.process_pull_response_count.clear(),
i64
),
(
"pull_response_fail_insert",
stats.process_pull_response_fail_insert.clear(),
i64
),
(
"pull_response_fail_timeout",
stats.process_pull_response_fail_timeout.clear(),
i64
),
(
"pull_response_success",
stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
stats.process_pull_response_timeout.clear(),
i64
),
(
"push_response_count",
stats.push_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
(
"process_pull_resp_len",
stats.process_pull_response_len.clear(),
i64
),
(
"process_pull_requests",
stats.process_pull_requests.clear(),
i64
),
(
"pull_request_ping_pong_check_failed_count",
stats.pull_request_ping_pong_check_failed_count.clear(),
i64
),
(
"generate_pull_responses",
stats.generate_pull_responses.clear(),
i64
),
("process_prune", stats.process_prune.clear(), i64),
(
"process_push_message",
stats.process_push_message.clear(),
i64
),
(
"prune_received_cache",
stats.prune_received_cache.clear(),
i64
),
("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64),
("new_pull_requests", stats.new_pull_requests.clear(), i64),
("mark_pull_request", stats.mark_pull_request.clear(), i64),
);
datapoint_info!(
"cluster_info_stats4",
(
"skip_push_message_shred_version",
stats.skip_push_message_shred_version.clear(),
i64
),
(
"skip_pull_response_shred_version",
stats.skip_pull_response_shred_version.clear(),
i64
),
(
"skip_pull_shred_version",
stats.skip_pull_shred_version.clear(),
i64
),
("push_message_count", stats.push_message_count.clear(), i64),
(
"push_message_value_count",
stats.push_message_value_count.clear(),
i64
),
(
"new_pull_requests_count",
stats.new_pull_requests_count.clear(),
i64
),
(
"prune_message_count",
stats.prune_message_count.clear(),
i64
),
("prune_message_len", stats.prune_message_len.clear(), i64),
);
datapoint_info!(
"cluster_info_stats5",
(
"pull_requests_count",
stats.pull_requests_count.clear(),
i64
),
(
"packets_received_count",
stats.packets_received_count.clear(),
i64
),
(
"packets_received_prune_messages_count",
stats.packets_received_prune_messages_count.clear(),
i64
),
(
"packets_received_pull_requests_count",
stats.packets_received_pull_requests_count.clear(),
i64
),
(
"packets_received_pull_responses_count",
stats.packets_received_pull_responses_count.clear(),
i64
),
(
"packets_received_push_messages_count",
stats.packets_received_push_messages_count.clear(),
i64
),
(
"packets_received_verified_count",
stats.packets_received_verified_count.clear(),
i64
),
(
"packets_sent_gossip_requests_count",
stats.packets_sent_gossip_requests_count.clear(),
i64
),
(
"packets_sent_prune_messages_count",
stats.packets_sent_prune_messages_count.clear(),
i64
),
(
"packets_sent_pull_requests_count",
stats.packets_sent_pull_requests_count.clear(),
i64
),
(
"packets_sent_pull_responses_count",
stats.packets_sent_pull_responses_count.clear(),
i64
),
(
"packets_sent_push_messages_count",
stats.packets_sent_push_messages_count.clear(),
i64
),
(
"require_stake_for_gossip_unknown_feature_set",
stats.require_stake_for_gossip_unknown_feature_set.clear(),
i64
),
(
"require_stake_for_gossip_unknown_stakes",
stats.require_stake_for_gossip_unknown_stakes.clear(),
i64
),
(
"trim_crds_table_failed",
stats.trim_crds_table_failed.clear(),
i64
),
(
"trim_crds_table_purged_values_count",
stats.trim_crds_table_purged_values_count.clear(),
i64
),
);
}

View File

@ -22,6 +22,7 @@ pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;
pub mod cluster_info;
mod cluster_info_metrics;
pub mod cluster_slot_state_verifier;
pub mod cluster_slots;
pub mod cluster_slots_service;