Condense and rename pubsub counters into a single datapoint (#31691)

Condense and rename pubsub counters into a single datapoint

The counters have been combined into fields within a struct that are
accumulated locally and submitted on a set interval (10s). Doing so
decreases overhead on the system as well as makes it easier to compare
these similar fields in the metrics explorer by having them all under
the same measurement.
This commit is contained in:
steviez 2023-05-18 01:25:39 -05:00 committed by GitHub
parent f9b0691eb4
commit c5d72fc480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 122 additions and 38 deletions

View File

@ -12,11 +12,15 @@ use {
jsonrpc_core::IoHandler, jsonrpc_core::IoHandler,
soketto::handshake::{server, Server}, soketto::handshake::{server, Server},
solana_metrics::TokenCounter, solana_metrics::TokenCounter,
solana_sdk::timing::AtomicInterval,
std::{ std::{
io, io,
net::SocketAddr, net::SocketAddr,
str, str,
sync::Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
}, },
stream_cancel::{Trigger, Tripwire}, stream_cancel::{Trigger, Tripwire},
@ -115,49 +119,133 @@ impl PubSubService {
} }
} }
struct BroadcastHandler { const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
#[derive(Default)]
struct SentNotificationStats {
num_account: AtomicUsize,
num_logs: AtomicUsize,
num_program: AtomicUsize,
num_signature: AtomicUsize,
num_slot: AtomicUsize,
num_slots_updates: AtomicUsize,
num_root: AtomicUsize,
num_vote: AtomicUsize,
num_block: AtomicUsize,
last_report: AtomicInterval,
} }
fn count_final(params: &SubscriptionParams) { impl SentNotificationStats {
match params { fn maybe_report(&self) {
SubscriptionParams::Account(_) => { if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
inc_new_counter_info!("rpc-pubsub-final-accounts", 1); datapoint_info!(
} "rpc_pubsub-sent_notifications",
SubscriptionParams::Logs(_) => { (
inc_new_counter_info!("rpc-pubsub-final-logs", 1); "num_account",
} self.num_account.swap(0, Ordering::Relaxed) as i64,
SubscriptionParams::Program(_) => { i64
inc_new_counter_info!("rpc-pubsub-final-programs", 1); ),
} (
SubscriptionParams::Signature(_) => { "num_logs",
inc_new_counter_info!("rpc-pubsub-final-signatures", 1); self.num_logs.swap(0, Ordering::Relaxed) as i64,
} i64
SubscriptionParams::Slot => { ),
inc_new_counter_info!("rpc-pubsub-final-slots", 1); (
} "num_program",
SubscriptionParams::SlotsUpdates => { self.num_program.swap(0, Ordering::Relaxed) as i64,
inc_new_counter_info!("rpc-pubsub-final-slots-updates", 1); i64
} ),
SubscriptionParams::Root => { (
inc_new_counter_info!("rpc-pubsub-final-roots", 1); "num_signature",
} self.num_signature.swap(0, Ordering::Relaxed) as i64,
SubscriptionParams::Vote => { i64
inc_new_counter_info!("rpc-pubsub-final-votes", 1); ),
} (
SubscriptionParams::Block(_) => { "num_slot",
inc_new_counter_info!("rpc-pubsub-final-slot-txs", 1); self.num_slot.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_slots_updates",
self.num_slots_updates.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_root",
self.num_root.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_vote",
self.num_vote.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_block",
self.num_block.swap(0, Ordering::Relaxed) as i64,
i64
),
);
} }
} }
} }
struct BroadcastHandler {
current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
sent_stats: Arc<SentNotificationStats>,
}
fn increment_sent_notification_stats(
params: &SubscriptionParams,
stats: &Arc<SentNotificationStats>,
) {
match params {
SubscriptionParams::Account(_) => {
stats.num_account.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Logs(_) => {
stats.num_logs.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Program(_) => {
stats.num_program.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Signature(_) => {
stats.num_signature.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Slot => {
stats.num_slot.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::SlotsUpdates => {
stats.num_slots_updates.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Root => {
stats.num_root.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Vote => {
stats.num_vote.fetch_add(1, Ordering::Relaxed);
}
SubscriptionParams::Block(_) => {
stats.num_block.fetch_add(1, Ordering::Relaxed);
}
}
stats.maybe_report();
}
impl BroadcastHandler { impl BroadcastHandler {
fn new(current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>) -> Self {
let sent_stats = Arc::new(SentNotificationStats::default());
Self {
current_subscriptions,
sent_stats,
}
}
fn handle(&self, notification: RpcNotification) -> Result<Option<Arc<String>>, Error> { fn handle(&self, notification: RpcNotification) -> Result<Option<Arc<String>>, Error> {
if let Entry::Occupied(entry) = self if let Entry::Occupied(entry) = self
.current_subscriptions .current_subscriptions
.entry(notification.subscription_id) .entry(notification.subscription_id)
{ {
count_final(entry.get().params()); increment_sent_notification_stats(entry.get().params(), &self.sent_stats);
let time_since_created = notification.created_at.elapsed(); let time_since_created = notification.created_at.elapsed();
@ -243,9 +331,7 @@ pub fn test_connection(
subscriptions.control().clone(), subscriptions.control().clone(),
Arc::clone(&current_subscriptions), Arc::clone(&current_subscriptions),
); );
let broadcast_handler = BroadcastHandler { let broadcast_handler = BroadcastHandler::new(current_subscriptions);
current_subscriptions,
};
let receiver = TestBroadcastReceiver { let receiver = TestBroadcastReceiver {
inner: subscriptions.control().broadcast_receiver(), inner: subscriptions.control().broadcast_receiver(),
handler: broadcast_handler, handler: broadcast_handler,
@ -291,9 +377,7 @@ async fn handle_connection(
Arc::clone(&current_subscriptions), Arc::clone(&current_subscriptions),
); );
json_rpc_handler.extend_with(rpc_impl.to_delegate()); json_rpc_handler.extend_with(rpc_impl.to_delegate());
let broadcast_handler = BroadcastHandler { let broadcast_handler = BroadcastHandler::new(current_subscriptions);
current_subscriptions,
};
loop { loop {
// Extra block for dropping `receive_future`. // Extra block for dropping `receive_future`.
{ {