Reduce cluster-info metrics. (#9465)
This commit is contained in:
parent
2b2b2cac1f
commit
69f1e487b3
|
@ -10,7 +10,11 @@ use solana_ledger::shred::Shred;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
net::UdpSocket,
|
||||||
|
sync::{atomic::AtomicU64, Arc},
|
||||||
|
};
|
||||||
use test::Bencher;
|
use test::Bencher;
|
||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
|
@ -35,6 +39,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
||||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||||
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone()));
|
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone()));
|
||||||
let shreds = Arc::new(shreds);
|
let shreds = Arc::new(shreds);
|
||||||
|
let last_datapoint = Arc::new(AtomicU64::new(0));
|
||||||
bencher.iter(move || {
|
bencher.iter(move || {
|
||||||
let shreds = shreds.clone();
|
let shreds = shreds.clone();
|
||||||
broadcast_shreds(
|
broadcast_shreds(
|
||||||
|
@ -42,7 +47,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
||||||
&shreds,
|
&shreds,
|
||||||
&peers_and_stakes,
|
&peers_and_stakes,
|
||||||
&peers,
|
&peers,
|
||||||
&mut Instant::now(),
|
&last_datapoint,
|
||||||
&mut 0,
|
&mut 0,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -20,10 +20,10 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils};
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::timing::duration_as_s;
|
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||||
use solana_streamer::sendmmsg::send_mmsg;
|
use solana_streamer::sendmmsg::send_mmsg;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
|
@ -335,14 +335,21 @@ impl BroadcastStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_peer_stats(num_live_peers: i64, broadcast_len: i64, last_datapoint_submit: &mut Instant) {
|
fn update_peer_stats(
|
||||||
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
|
num_live_peers: i64,
|
||||||
|
broadcast_len: i64,
|
||||||
|
last_datapoint_submit: &Arc<AtomicU64>,
|
||||||
|
) {
|
||||||
|
let now = timestamp();
|
||||||
|
let last = last_datapoint_submit.load(Ordering::Relaxed);
|
||||||
|
if now - last > 1000
|
||||||
|
&& last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last
|
||||||
|
{
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"cluster_info-num_nodes",
|
"cluster_info-num_nodes",
|
||||||
("live_count", num_live_peers, i64),
|
("live_count", num_live_peers, i64),
|
||||||
("broadcast_count", broadcast_len, i64)
|
("broadcast_count", broadcast_len, i64)
|
||||||
);
|
);
|
||||||
*last_datapoint_submit = Instant::now();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,7 +370,7 @@ pub fn broadcast_shreds(
|
||||||
shreds: &Arc<Vec<Shred>>,
|
shreds: &Arc<Vec<Shred>>,
|
||||||
peers_and_stakes: &[(u64, usize)],
|
peers_and_stakes: &[(u64, usize)],
|
||||||
peers: &[ContactInfo],
|
peers: &[ContactInfo],
|
||||||
last_datapoint_submit: &mut Instant,
|
last_datapoint_submit: &Arc<AtomicU64>,
|
||||||
send_mmsg_total: &mut u64,
|
send_mmsg_total: &mut u64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let broadcast_len = peers_and_stakes.len();
|
let broadcast_len = peers_and_stakes.len();
|
||||||
|
|
|
@ -87,7 +87,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||||
&shreds,
|
&shreds,
|
||||||
&peers_and_stakes,
|
&peers_and_stakes,
|
||||||
&peers,
|
&peers,
|
||||||
&mut Instant::now(),
|
&Arc::new(AtomicU64::new(0)),
|
||||||
&mut send_mmsg_total,
|
&mut send_mmsg_total,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ pub struct StandardBroadcastRun {
|
||||||
slot_broadcast_start: Option<Instant>,
|
slot_broadcast_start: Option<Instant>,
|
||||||
keypair: Arc<Keypair>,
|
keypair: Arc<Keypair>,
|
||||||
shred_version: u16,
|
shred_version: u16,
|
||||||
last_datapoint_submit: Instant,
|
last_datapoint_submit: Arc<AtomicU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StandardBroadcastRun {
|
impl StandardBroadcastRun {
|
||||||
|
@ -46,7 +46,7 @@ impl StandardBroadcastRun {
|
||||||
slot_broadcast_start: None,
|
slot_broadcast_start: None,
|
||||||
keypair,
|
keypair,
|
||||||
shred_version,
|
shred_version,
|
||||||
last_datapoint_submit: Instant::now(),
|
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,7 +269,7 @@ impl StandardBroadcastRun {
|
||||||
&shreds,
|
&shreds,
|
||||||
&peers_and_stakes,
|
&peers_and_stakes,
|
||||||
&peers,
|
&peers,
|
||||||
&mut self.last_datapoint_submit,
|
&self.last_datapoint_submit,
|
||||||
&mut send_mmsg_total,
|
&mut send_mmsg_total,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue