diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 99eab0a23..7d12feac7 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -18,7 +18,7 @@ use { net::SocketAddr, str, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, thread::{self, Builder, JoinHandle}, @@ -132,6 +132,7 @@ struct SentNotificationStats { num_root: AtomicUsize, num_vote: AtomicUsize, num_block: AtomicUsize, + total_creation_to_queue_time_us: AtomicU64, last_report: AtomicInterval, } @@ -185,6 +186,12 @@ impl SentNotificationStats { self.num_block.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "total_creation_to_queue_time_us", + self.total_creation_to_queue_time_us + .swap(0, Ordering::Relaxed) as i64, + i64 + ) ); } } @@ -197,6 +204,7 @@ struct BroadcastHandler { fn increment_sent_notification_stats( params: &SubscriptionParams, + notification: &RpcNotification, stats: &Arc, ) { match params { @@ -228,6 +236,11 @@ fn increment_sent_notification_stats( stats.num_block.fetch_add(1, Ordering::Relaxed); } } + stats.total_creation_to_queue_time_us.fetch_add( + notification.created_at.elapsed().as_micros() as u64, + Ordering::Relaxed, + ); + stats.maybe_report(); } @@ -245,17 +258,10 @@ impl BroadcastHandler { .current_subscriptions .entry(notification.subscription_id) { - increment_sent_notification_stats(entry.get().params(), &self.sent_stats); - - let time_since_created = notification.created_at.elapsed(); - - datapoint_info!( - "pubsub_notifications", - ( - "created_to_queue_time_us", - time_since_created.as_micros() as i64, - i64 - ), + increment_sent_notification_stats( + entry.get().params(), + ¬ification, + &self.sent_stats, ); if notification.is_final {