From c5d72fc480f76903eb77a7847cc11030aa6fd4c3 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 18 May 2023 01:25:39 -0500 Subject: [PATCH] Condense and rename pubsub counters into a single datapoint (#31691) Condense and rename pubsub counters into a single datapoint The counters have been combined into fields within a struct that are accumulated locally and submitted on a set interval (10s). Doing so decreases overhead on the system as well as makes it easier to compare these similar fields in the metrics explorer by having them all under the same measurement. --- rpc/src/rpc_pubsub_service.rs | 160 ++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 38 deletions(-) diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 59ff6ccbc..0d24449e5 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -12,11 +12,15 @@ use { jsonrpc_core::IoHandler, soketto::handshake::{server, Server}, solana_metrics::TokenCounter, + solana_sdk::timing::AtomicInterval, std::{ io, net::SocketAddr, str, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, thread::{self, Builder, JoinHandle}, }, stream_cancel::{Trigger, Tripwire}, @@ -115,49 +119,133 @@ impl PubSubService { } } -struct BroadcastHandler { - current_subscriptions: Arc>, +const METRICS_REPORT_INTERVAL_MS: u64 = 10_000; + +#[derive(Default)] +struct SentNotificationStats { + num_account: AtomicUsize, + num_logs: AtomicUsize, + num_program: AtomicUsize, + num_signature: AtomicUsize, + num_slot: AtomicUsize, + num_slots_updates: AtomicUsize, + num_root: AtomicUsize, + num_vote: AtomicUsize, + num_block: AtomicUsize, + last_report: AtomicInterval, } -fn count_final(params: &SubscriptionParams) { - match params { - SubscriptionParams::Account(_) => { - inc_new_counter_info!("rpc-pubsub-final-accounts", 1); - } - SubscriptionParams::Logs(_) => { - inc_new_counter_info!("rpc-pubsub-final-logs", 1); - } - SubscriptionParams::Program(_) => { - inc_new_counter_info!("rpc-pubsub-final-programs", 1); - } - SubscriptionParams::Signature(_) => { - inc_new_counter_info!("rpc-pubsub-final-signatures", 1); - } - SubscriptionParams::Slot => { - inc_new_counter_info!("rpc-pubsub-final-slots", 1); - } - SubscriptionParams::SlotsUpdates => { - inc_new_counter_info!("rpc-pubsub-final-slots-updates", 1); - } - SubscriptionParams::Root => { - inc_new_counter_info!("rpc-pubsub-final-roots", 1); - } - SubscriptionParams::Vote => { - inc_new_counter_info!("rpc-pubsub-final-votes", 1); - } - SubscriptionParams::Block(_) => { - inc_new_counter_info!("rpc-pubsub-final-slot-txs", 1); +impl SentNotificationStats { + fn maybe_report(&self) { + if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) { + datapoint_info!( + "rpc_pubsub-sent_notifications", + ( + "num_account", + self.num_account.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_logs", + self.num_logs.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_program", + self.num_program.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_signature", + self.num_signature.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_slot", + self.num_slot.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_slots_updates", + self.num_slots_updates.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_root", + self.num_root.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_vote", + self.num_vote.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_block", + self.num_block.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); } } } +struct BroadcastHandler { + current_subscriptions: Arc>, + sent_stats: Arc, +} + +fn increment_sent_notification_stats( + params: &SubscriptionParams, + stats: &Arc, +) { + match params { + SubscriptionParams::Account(_) => { + stats.num_account.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Logs(_) => { + stats.num_logs.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Program(_) => { + stats.num_program.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Signature(_) => { + stats.num_signature.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Slot => { + stats.num_slot.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::SlotsUpdates => { + stats.num_slots_updates.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Root => { + stats.num_root.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Vote => { + stats.num_vote.fetch_add(1, Ordering::Relaxed); + } + SubscriptionParams::Block(_) => { + stats.num_block.fetch_add(1, Ordering::Relaxed); + } + } + stats.maybe_report(); +} + impl BroadcastHandler { + fn new(current_subscriptions: Arc>) -> Self { + let sent_stats = Arc::new(SentNotificationStats::default()); + Self { + current_subscriptions, + sent_stats, + } + } + fn handle(&self, notification: RpcNotification) -> Result>, Error> { if let Entry::Occupied(entry) = self .current_subscriptions .entry(notification.subscription_id) { - count_final(entry.get().params()); + increment_sent_notification_stats(entry.get().params(), &self.sent_stats); let time_since_created = notification.created_at.elapsed(); @@ -243,9 +331,7 @@ pub fn test_connection( subscriptions.control().clone(), Arc::clone(¤t_subscriptions), ); - let broadcast_handler = BroadcastHandler { - current_subscriptions, - }; + let broadcast_handler = BroadcastHandler::new(current_subscriptions); let receiver = TestBroadcastReceiver { inner: subscriptions.control().broadcast_receiver(), handler: broadcast_handler, @@ -291,9 +377,7 @@ async fn handle_connection( Arc::clone(¤t_subscriptions), ); json_rpc_handler.extend_with(rpc_impl.to_delegate()); - let broadcast_handler = BroadcastHandler { - current_subscriptions, - }; + let broadcast_handler = BroadcastHandler::new(current_subscriptions); loop { // Extra block for dropping `receive_future`. {