diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 0a4f5c8a3..3e7cbb3ab 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -3,6 +3,7 @@ extern crate test; use rand::{thread_rng, Rng}; +use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats; use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; @@ -47,7 +48,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &peers_and_stakes, &peers, &last_datapoint, - &mut 0, + &mut TransmitShredsStats::default(), ) .unwrap(); }); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 9e9e6c31b..4cf7c866e 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -35,7 +35,7 @@ use std::{ }; mod broadcast_fake_shreds_run; -pub(crate) mod broadcast_metrics; +pub mod broadcast_metrics; pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; @@ -374,13 +374,14 @@ pub fn broadcast_shreds( peers_and_stakes: &[(u64, usize)], peers: &[ContactInfo], last_datapoint_submit: &Arc, - send_mmsg_total: &mut u64, + transmit_stats: &mut TransmitShredsStats, ) -> Result<()> { let broadcast_len = peers_and_stakes.len(); if broadcast_len == 0 { update_peer_stats(1, 1, last_datapoint_submit); return Ok(()); } + let mut shred_select = Measure::start("shred_select"); let packets: Vec<_> = shreds .iter() .map(|shred| { @@ -389,6 +390,8 @@ pub fn broadcast_shreds( (&shred.payload, &peers[broadcast_index].tvu) }) .collect(); + shred_select.stop(); + transmit_stats.shred_select += shred_select.as_us(); let mut sent = 0; let mut send_mmsg_time = Measure::start("send_mmsg"); @@ -401,7 +404,7 @@ pub fn broadcast_shreds( } } send_mmsg_time.stop(); - *send_mmsg_total += send_mmsg_time.as_us(); + transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); let num_live_peers = num_live_peers(&peers); update_peer_stats( diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index f6084c5b9..af821de38 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -29,11 +29,12 @@ impl ProcessShredsStats { } #[derive(Default, Clone)] -pub(crate) struct TransmitShredsStats { - pub(crate) transmit_elapsed: u64, - pub(crate) send_mmsg_elapsed: u64, - pub(crate) get_peers_elapsed: u64, - pub(crate) num_shreds: usize, +pub struct TransmitShredsStats { + pub transmit_elapsed: u64, + pub send_mmsg_elapsed: u64, + pub get_peers_elapsed: u64, + pub shred_select: u64, + pub num_shreds: usize, } impl BroadcastStats for TransmitShredsStats { @@ -42,6 +43,7 @@ impl BroadcastStats for TransmitShredsStats { self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed; self.get_peers_elapsed += new_stats.get_peers_elapsed; self.num_shreds += new_stats.num_shreds; + self.shred_select += new_stats.shred_select; } fn report_stats(&mut self, slot: Slot, slot_start: Instant) { datapoint_info!( @@ -58,6 +60,7 @@ impl BroadcastStats for TransmitShredsStats { ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), ("num_shreds", self.num_shreds as i64, i64), + ("shred_select", self.shred_select as i64, i64), ); } } @@ -176,15 +179,16 @@ mod test { } #[test] - fn test_update() { + fn test_update_broadcast() { let start = Instant::now(); let mut slot_broadcast_stats = SlotBroadcastStats::default(); slot_broadcast_stats.update( &TransmitShredsStats { transmit_elapsed: 1, - get_peers_elapsed: 1, - send_mmsg_elapsed: 1, - num_shreds: 1, + get_peers_elapsed: 2, + send_mmsg_elapsed: 3, + shred_select: 4, + num_shreds: 5, }, &Some(BroadcastShredBatchInfo { slot: 0, @@ -198,16 +202,18 @@ mod test { assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); + assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); slot_broadcast_stats.update( &TransmitShredsStats { - transmit_elapsed: 1, - get_peers_elapsed: 1, - send_mmsg_elapsed: 1, - num_shreds: 1, + transmit_elapsed: 7, + get_peers_elapsed: 8, + send_mmsg_elapsed: 9, + shred_select: 10, + num_shreds: 11, }, &None, ); @@ -217,9 +223,10 @@ mod test { assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); - assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); + assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); // If another batch is given, then total number of batches == num_expected_batches == 2, // so the batch should be purged from the HashMap @@ -228,6 +235,7 @@ mod test { transmit_elapsed: 1, get_peers_elapsed: 1, send_mmsg_elapsed: 1, + shred_select: 1, num_shreds: 1, }, &Some(BroadcastShredBatchInfo { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 1b57f4d69..a53596bd0 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -137,14 +137,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Broadcast data let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - let mut send_mmsg_total = 0; broadcast_shreds( sock, &shreds, &peers_and_stakes, &peers, &Arc::new(AtomicU64::new(0)), - &mut send_mmsg_total, + &mut TransmitShredsStats::default(), )?; Ok(()) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 3f3a888da..1228c3854 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,6 +9,7 @@ use solana_ledger::{ }; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use std::collections::HashMap; +use std::sync::RwLock; use std::time::Duration; #[derive(Clone)] @@ -23,6 +24,14 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, + broadcast_peer_cache: Arc>, + last_peer_update: Arc, +} + +#[derive(Default)] +struct BroadcastPeerCache { + peers: Vec, + peers_and_stakes: Vec<(u64, usize)>, } impl StandardBroadcastRun { @@ -38,6 +47,8 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::new(AtomicU64::new(0)), num_batches: 0, + broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())), + last_peer_update: Arc::new(AtomicU64::new(0)), } } @@ -293,33 +304,46 @@ impl StandardBroadcastRun { shreds: Arc>, broadcast_shred_batch_info: Option, ) -> Result<()> { + const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to - let get_peers_start = Instant::now(); - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - let get_peers_elapsed = get_peers_start.elapsed(); + let mut get_peers_time = Measure::start("broadcast::get_peers"); + let now = timestamp(); + let last = self.last_peer_update.load(Ordering::Relaxed); + if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS + && self + .last_peer_update + .compare_and_swap(now, last, Ordering::Relaxed) + == last + { + let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap(); + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + w_broadcast_peer_cache.peers = peers; + w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes; + } + get_peers_time.stop(); + let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap(); + let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds - let transmit_start = Instant::now(); - let mut send_mmsg_total = 0; + let mut transmit_time = Measure::start("broadcast_shreds"); broadcast_shreds( sock, &shreds, - &peers_and_stakes, - &peers, + &r_broadcast_peer_cache.peers_and_stakes, + &r_broadcast_peer_cache.peers, &self.last_datapoint_submit, - &mut send_mmsg_total, + &mut transmit_stats, )?; - let transmit_elapsed = transmit_start.elapsed(); - let new_transmit_shreds_stats = TransmitShredsStats { - transmit_elapsed: duration_as_us(&transmit_elapsed), - get_peers_elapsed: duration_as_us(&get_peers_elapsed), - send_mmsg_elapsed: send_mmsg_total, - num_shreds: shreds.len(), - }; + drop(r_broadcast_peer_cache); + transmit_time.stop(); + + transmit_stats.transmit_elapsed = transmit_time.as_us(); + transmit_stats.get_peers_elapsed = get_peers_time.as_us(); + transmit_stats.num_shreds = shreds.len(); // Process metrics - self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info); + self.update_transmit_metrics(&transmit_stats, &broadcast_shred_batch_info); Ok(()) }