From 69f1e487b3a55785a6bbd71e7af30856eb8294d9 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 14 Apr 2020 21:21:58 -0700 Subject: [PATCH] Reduce cluster-info metrics. (#9465) --- core/benches/cluster_info.rs | 9 +++++++-- core/src/broadcast_stage.rs | 17 ++++++++++++----- .../fail_entry_verification_broadcast_run.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 6 +++--- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 9f3a99351c..3e8b8701fa 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -10,7 +10,11 @@ use solana_ledger::shred::Shred; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::sync::RwLock; -use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant}; +use std::{ + collections::HashMap, + net::UdpSocket, + sync::{atomic::AtomicU64, Arc}, +}; use test::Bencher; #[bench] @@ -35,6 +39,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let cluster_info = Arc::new(RwLock::new(cluster_info)); let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone())); let shreds = Arc::new(shreds); + let last_datapoint = Arc::new(AtomicU64::new(0)); bencher.iter(move || { let shreds = shreds.clone(); broadcast_shreds( @@ -42,7 +47,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &shreds, &peers_and_stakes, &peers, - &mut Instant::now(), + &last_datapoint, &mut 0, ) .unwrap(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 73d75d15eb..1c61183031 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -20,10 +20,10 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_runtime::bank::Bank; -use solana_sdk::timing::duration_as_s; use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_streamer::sendmmsg::send_mmsg; +use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, net::UdpSocket, @@ -335,14 +335,21 @@ impl BroadcastStage { } } -fn update_peer_stats(num_live_peers: i64, broadcast_len: i64, last_datapoint_submit: &mut Instant) { - if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 { +fn update_peer_stats( + num_live_peers: i64, + broadcast_len: i64, + last_datapoint_submit: &Arc, +) { + let now = timestamp(); + let last = last_datapoint_submit.load(Ordering::Relaxed); + if now - last > 1000 + && last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last + { datapoint_info!( "cluster_info-num_nodes", ("live_count", num_live_peers, i64), ("broadcast_count", broadcast_len, i64) ); - *last_datapoint_submit = Instant::now(); } } @@ -363,7 +370,7 @@ pub fn broadcast_shreds( shreds: &Arc>, peers_and_stakes: &[(u64, usize)], peers: &[ContactInfo], - last_datapoint_submit: &mut Instant, + last_datapoint_submit: &Arc, send_mmsg_total: &mut u64, ) -> Result<()> { let broadcast_len = peers_and_stakes.len(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 51b4f5f5dc..8e65e182e8 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -87,7 +87,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { &shreds, &peers_and_stakes, &peers, - &mut Instant::now(), + &Arc::new(AtomicU64::new(0)), &mut send_mmsg_total, )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ef1f11f580..3cd554d40e 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -34,7 +34,7 @@ pub struct StandardBroadcastRun { slot_broadcast_start: Option, keypair: Arc, shred_version: u16, - last_datapoint_submit: Instant, + last_datapoint_submit: Arc, } impl StandardBroadcastRun { @@ -46,7 +46,7 @@ impl StandardBroadcastRun { slot_broadcast_start: None, keypair, shred_version, - last_datapoint_submit: Instant::now(), + last_datapoint_submit: Arc::new(AtomicU64::new(0)), } } @@ -269,7 +269,7 @@ impl StandardBroadcastRun { &shreds, &peers_and_stakes, &peers, - &mut self.last_datapoint_submit, + &self.last_datapoint_submit, &mut send_mmsg_total, )?;