report pubsub stats every 2s (#21192)

This commit is contained in:
Jeff Biseda 2021-11-05 19:59:54 -07:00 committed by GitHub
parent 706b60b5c8
commit 7659a2edc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 40 additions and 8 deletions

View File

@ -431,6 +431,40 @@ fn initial_last_notified_slot(
}
}
#[derive(Default)]
struct PubsubNotificationStats {
since: Option<Instant>,
notification_entry_processing_count: u64,
notification_entry_processing_time_us: u64,
}
impl PubsubNotificationStats {
fn maybe_submit(&mut self) {
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
return;
}
datapoint_info!(
"pubsub_notification_entries",
(
"notification_entry_processing_count",
self.notification_entry_processing_count,
i64
),
(
"notification_entry_processing_time_us",
self.notification_entry_processing_time_us,
i64
),
);
*self = Self {
since: Some(Instant::now()),
..Self::default()
};
}
}
pub struct RpcSubscriptions {
notification_sender: Sender<TimestampedNotificationEntry>,
t_cleanup: Option<JoinHandle<()>>,
@ -618,6 +652,8 @@ impl RpcSubscriptions {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) {
let mut stats = PubsubNotificationStats::default();
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -738,14 +774,9 @@ impl RpcSubscriptions {
}
}
}
datapoint_info!(
"pubsub_notification_entries",
(
"notification_entry_processing_time_us",
queued_at.elapsed().as_micros() as i64,
i64
)
);
stats.notification_entry_processing_time_us +=
queued_at.elapsed().as_micros() as u64;
stats.notification_entry_processing_count += 1;
}
Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again
@ -755,6 +786,7 @@ impl RpcSubscriptions {
break;
}
}
stats.maybe_submit();
}
}