From 7aa4d401f709caa1dc8ffd4c2dae886180408464 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 15 Apr 2020 15:22:16 -0700 Subject: [PATCH] Fix broadcast metrics (#9461) * Rework broadcast metrics to support multiple threads * Update dashboards Co-authored-by: Carl --- Cargo.lock | 1 - core/src/broadcast_stage.rs | 37 +- .../broadcast_fake_shreds_run.rs | 30 +- core/src/broadcast_stage/broadcast_metrics.rs | 288 ++++ .../fail_entry_verification_broadcast_run.rs | 20 +- .../broadcast_stage/standard_broadcast_run.rs | 258 ++-- core/src/crds_gossip_pull.rs | 2 +- .../dashboards/cluster-monitor.json | 1374 +++++++++++------ 8 files changed, 1389 insertions(+), 621 deletions(-) create mode 100644 core/src/broadcast_stage/broadcast_metrics.rs diff --git a/Cargo.lock b/Cargo.lock index f658321b71..1d0abeb6ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4505,7 +4505,6 @@ dependencies = [ "rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", - "solana-bpf-loader-program 1.2.0", "solana-logger 1.2.0", "solana-measure 1.2.0", "solana-metrics 1.2.0", diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 1c61183031..eae1094001 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,6 +1,6 @@ //! A stage to broadcast data from a leader node to validators use self::{ - broadcast_fake_shreds_run::BroadcastFakeShredsRun, + broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*, fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, }; @@ -35,13 +35,16 @@ use std::{ }; mod broadcast_fake_shreds_run; +pub(crate) mod broadcast_metrics; pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; -pub const NUM_INSERT_THREADS: usize = 2; -pub type RetransmitSlotsSender = CrossbeamSender>>; -pub type RetransmitSlotsReceiver = CrossbeamReceiver>>; +pub(crate) const NUM_INSERT_THREADS: usize = 2; +pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; +pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; +pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; +pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option)>; #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { @@ -107,18 +110,18 @@ trait BroadcastRun { &mut self, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()>; fn transmit( &mut self, - receiver: &Arc>>, + receiver: &Arc>, cluster_info: &Arc>, sock: &UdpSocket, ) -> Result<()>; fn record( - &self, - receiver: &Arc>>>>, + &mut self, + receiver: &Arc>, blockstore: &Arc, ) -> Result<()>; } @@ -150,8 +153,8 @@ impl BroadcastStage { fn run( blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { loop { @@ -250,7 +253,7 @@ impl BroadcastStage { let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver)); for _ in 0..NUM_INSERT_THREADS { let blockstore_receiver = blockstore_receiver.clone(); - let bs_record = broadcast_stage_run.clone(); + let mut bs_record = broadcast_stage_run.clone(); let btree = blockstore.clone(); let t = Builder::new() .name("solana-broadcaster-record".to_string()) @@ -289,7 +292,7 @@ impl BroadcastStage { fn check_retransmit_signals( blockstore: &Blockstore, retransmit_slots_receiver: &RetransmitSlotsReceiver, - socket_sender: &Sender, + socket_sender: &Sender<(TransmitShreds, Option)>, ) -> Result<()> { let timer = Duration::from_millis(100); @@ -310,7 +313,7 @@ impl BroadcastStage { ); if !data_shreds.is_empty() { - socket_sender.send((stakes.clone(), data_shreds))?; + socket_sender.send(((stakes.clone(), data_shreds), None))?; } let coding_shreds = Arc::new( @@ -320,7 +323,7 @@ impl BroadcastStage { ); if !coding_shreds.is_empty() { - socket_sender.send((stakes.clone(), coding_shreds))?; + socket_sender.send(((stakes.clone(), coding_shreds), None))?; } } @@ -478,13 +481,13 @@ pub mod test { } fn check_all_shreds_received( - transmit_receiver: &Receiver, + transmit_receiver: &TransmitReceiver, mut data_index: u64, mut coding_index: u64, num_expected_data_shreds: u64, num_expected_coding_shreds: u64, ) { - while let Ok(new_retransmit_slots) = transmit_receiver.try_recv() { + while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() { if new_retransmit_slots.1[0].is_data() { for data_shred in new_retransmit_slots.1.iter() { assert_eq!(data_shred.index() as u64, data_index); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 850ce6b809..081e83d13e 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -28,8 +28,8 @@ impl BroadcastRun for BroadcastFakeShredsRun { &mut self, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage let receive_results = broadcast_utils::recv_slot_entries(receiver)?; @@ -83,25 +83,31 @@ impl BroadcastRun for BroadcastFakeShredsRun { } let data_shreds = Arc::new(data_shreds); - blockstore_sender.send(data_shreds.clone())?; + blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step //some indicates fake shreds - socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)))?; - socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)))?; + socket_sender.send(( + (Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)), + None, + ))?; + socket_sender.send(( + (Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)), + None, + ))?; //none indicates real shreds - socket_sender.send((None, data_shreds))?; - socket_sender.send((None, Arc::new(coding_shreds)))?; + socket_sender.send(((None, data_shreds), None))?; + socket_sender.send(((None, Arc::new(coding_shreds)), None))?; Ok(()) } fn transmit( &mut self, - receiver: &Arc>>, + receiver: &Arc>, cluster_info: &Arc>, sock: &UdpSocket, ) -> Result<()> { - for (stakes, data_shreds) in receiver.lock().unwrap().iter() { + for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { let peers = cluster_info.read().unwrap().tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { if i <= self.partition && stakes.is_some() { @@ -119,11 +125,11 @@ impl BroadcastRun for BroadcastFakeShredsRun { Ok(()) } fn record( - &self, - receiver: &Arc>>>>, + &mut self, + receiver: &Arc>, blockstore: &Arc, ) -> Result<()> { - for data_shreds in receiver.lock().unwrap().iter() { + for (data_shreds, _) in receiver.lock().unwrap().iter() { blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; } Ok(()) diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs new file mode 100644 index 0000000000..dbe5af44a4 --- /dev/null +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -0,0 +1,288 @@ +use super::*; + +pub(crate) trait BroadcastStats { + fn update(&mut self, new_stats: &Self); + fn report_stats(&mut self, slot: Slot, slot_start: Instant); +} + +#[derive(Clone)] +pub(crate) struct BroadcastShredBatchInfo { + pub(crate) slot: Slot, + pub(crate) num_expected_batches: Option, + pub(crate) slot_start_ts: Instant, +} + +#[derive(Default, Clone)] +pub(crate) struct ProcessShredsStats { + // Per-slot elapsed time + pub(crate) shredding_elapsed: u64, + pub(crate) receive_elapsed: u64, +} +impl ProcessShredsStats { + pub(crate) fn update(&mut self, new_stats: &ProcessShredsStats) { + self.shredding_elapsed += new_stats.shredding_elapsed; + self.receive_elapsed += new_stats.receive_elapsed; + } + pub(crate) fn reset(&mut self) { + *self = Self::default(); + } +} + +#[derive(Default, Clone)] +pub(crate) struct TransmitShredsStats { + pub(crate) transmit_elapsed: u64, + pub(crate) send_mmsg_elapsed: u64, + pub(crate) get_peers_elapsed: u64, + pub(crate) num_shreds: usize, +} + +impl BroadcastStats for TransmitShredsStats { + fn update(&mut self, new_stats: &TransmitShredsStats) { + self.transmit_elapsed += new_stats.transmit_elapsed; + self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed; + self.get_peers_elapsed += new_stats.get_peers_elapsed; + self.num_shreds += new_stats.num_shreds; + } + fn report_stats(&mut self, slot: Slot, slot_start: Instant) { + datapoint_info!( + "broadcast-transmit-shreds-stats", + ("slot", slot as i64, i64), + ( + "end_to_end_elapsed", + // `slot_start` signals when the first batch of shreds was + // received, used to measure duration of broadcast + slot_start.elapsed().as_micros() as i64, + i64 + ), + ("transmit_elapsed", self.transmit_elapsed as i64, i64), + ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), + ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), + ("num_shreds", self.num_shreds as i64, i64), + ); + } +} + +#[derive(Default, Clone)] +pub(crate) struct InsertShredsStats { + pub(crate) insert_shreds_elapsed: u64, + pub(crate) num_shreds: usize, +} +impl BroadcastStats for InsertShredsStats { + fn update(&mut self, new_stats: &InsertShredsStats) { + self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed; + self.num_shreds += new_stats.num_shreds; + } + fn report_stats(&mut self, slot: Slot, slot_start: Instant) { + datapoint_info!( + "broadcast-insert-shreds-stats", + ("slot", slot as i64, i64), + ( + "end_to_end_elapsed", + // `slot_start` signals when the first batch of shreds was + // received, used to measure duration of broadcast + slot_start.elapsed().as_micros() as i64, + i64 + ), + ( + "insert_shreds_elapsed", + self.insert_shreds_elapsed as i64, + i64 + ), + ("num_shreds", self.num_shreds as i64, i64), + ); + } +} + +// Tracks metrics of type `T` acrosss multiple threads +#[derive(Default)] +pub(crate) struct BatchCounter { + // The number of batches processed across all threads so far + num_batches: usize, + // Filled in when the last batch of shreds is received, + // signals how many batches of shreds to expect + num_expected_batches: Option, + broadcast_shred_stats: T, +} + +impl BatchCounter { + #[cfg(test)] + pub(crate) fn num_batches(&self) -> usize { + self.num_batches + } +} + +#[derive(Default)] +pub(crate) struct SlotBroadcastStats(HashMap>); + +impl SlotBroadcastStats { + #[cfg(test)] + pub(crate) fn get(&self, slot: Slot) -> Option<&BatchCounter> { + self.0.get(&slot) + } + pub(crate) fn update(&mut self, new_stats: &T, batch_info: &Option) { + if let Some(batch_info) = batch_info { + let mut should_delete = false; + { + let slot_batch_counter = self.0.entry(batch_info.slot).or_default(); + slot_batch_counter.broadcast_shred_stats.update(new_stats); + // Only count the ones where `broadcast_shred_batch_info`.is_some(), because + // there could potentially be other `retransmit` slots inserted into the + // transmit pipeline (signaled by ReplayStage) that are not created by the + // main shredding/broadcast pipeline + slot_batch_counter.num_batches += 1; + if let Some(num_expected_batches) = batch_info.num_expected_batches { + slot_batch_counter.num_expected_batches = Some(num_expected_batches); + } + if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches { + if slot_batch_counter.num_batches == num_expected_batches { + slot_batch_counter + .broadcast_shred_stats + .report_stats(batch_info.slot, batch_info.slot_start_ts); + should_delete = true; + } + } + } + if should_delete { + self.0 + .remove(&batch_info.slot) + .expect("delete should be successful"); + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Default)] + struct TestStats { + sender: Option>, + count: usize, + } + + impl BroadcastStats for TestStats { + fn update(&mut self, new_stats: &TestStats) { + self.count += new_stats.count; + self.sender = new_stats.sender.clone(); + } + fn report_stats(&mut self, slot: Slot, slot_start: Instant) { + self.sender + .as_ref() + .unwrap() + .send((self.count, slot, slot_start)) + .unwrap() + } + } + + #[test] + fn test_update() { + let start = Instant::now(); + let mut slot_broadcast_stats = SlotBroadcastStats::default(); + slot_broadcast_stats.update( + &TransmitShredsStats { + transmit_elapsed: 1, + get_peers_elapsed: 1, + send_mmsg_elapsed: 1, + num_shreds: 1, + }, + &Some(BroadcastShredBatchInfo { + slot: 0, + num_expected_batches: Some(2), + slot_start_ts: start.clone(), + }), + ); + + // Singular update + let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap(); + assert_eq!(slot_0_stats.num_batches, 1); + assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + + slot_broadcast_stats.update( + &TransmitShredsStats { + transmit_elapsed: 1, + get_peers_elapsed: 1, + send_mmsg_elapsed: 1, + num_shreds: 1, + }, + &None, + ); + + // If BroadcastShredBatchInfo == None, then update should be ignored + let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap(); + assert_eq!(slot_0_stats.num_batches, 1); + assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); + assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); + assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); + + // If another batch is given, then total number of batches == num_expected_batches == 2, + // so the batch should be purged from the HashMap + slot_broadcast_stats.update( + &TransmitShredsStats { + transmit_elapsed: 1, + get_peers_elapsed: 1, + send_mmsg_elapsed: 1, + num_shreds: 1, + }, + &Some(BroadcastShredBatchInfo { + slot: 0, + num_expected_batches: None, + slot_start_ts: start.clone(), + }), + ); + + assert!(slot_broadcast_stats.0.get(&0).is_none()); + } + + #[test] + fn test_update_multi_threaded() { + for round in 0..50 { + let start = Instant::now(); + let slot_broadcast_stats = Arc::new(Mutex::new(SlotBroadcastStats::default())); + let num_threads = 5; + let slot = 0; + let (sender, receiver) = channel(); + let thread_handles: Vec<_> = (0..num_threads) + .into_iter() + .map(|i| { + let slot_broadcast_stats = slot_broadcast_stats.clone(); + let sender = Some(sender.clone()); + let test_stats = TestStats { sender, count: 1 }; + let mut broadcast_batch_info = BroadcastShredBatchInfo { + slot, + num_expected_batches: None, + slot_start_ts: start.clone(), + }; + if i == round % num_threads { + broadcast_batch_info.num_expected_batches = Some(num_threads); + } + Builder::new() + .name("test_update_multi_threaded".to_string()) + .spawn(move || { + slot_broadcast_stats + .lock() + .unwrap() + .update(&test_stats, &Some(broadcast_batch_info)) + }) + .unwrap() + }) + .collect(); + + for t in thread_handles { + t.join().unwrap(); + } + + assert!(slot_broadcast_stats.lock().unwrap().0.get(&slot).is_none()); + let (returned_count, returned_slot, returned_instant) = receiver.recv().unwrap(); + assert_eq!(returned_count, num_threads); + assert_eq!(returned_slot, slot); + assert_eq!(returned_instant, returned_instant); + } + } +} diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 8e65e182e8..7fada6a424 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -23,8 +23,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { &mut self, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?; @@ -61,23 +61,23 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ); let data_shreds = Arc::new(data_shreds); - blockstore_sender.send(data_shreds.clone())?; + blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); let stakes = stakes.map(Arc::new); - socket_sender.send((stakes.clone(), data_shreds))?; - socket_sender.send((stakes, Arc::new(coding_shreds)))?; + socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?; Ok(()) } fn transmit( &mut self, - receiver: &Arc>>, + receiver: &Arc>, cluster_info: &Arc>, sock: &UdpSocket, ) -> Result<()> { - let (stakes, shreds) = receiver.lock().unwrap().recv()?; + let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); @@ -94,11 +94,11 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { Ok(()) } fn record( - &self, - receiver: &Arc>>>>, + &mut self, + receiver: &Arc>, blockstore: &Arc, ) -> Result<()> { - let all_shreds = receiver.lock().unwrap().recv()?; + let (all_shreds, _) = receiver.lock().unwrap().recv()?; blockstore .insert_shreds(all_shreds.to_vec(), None, true) .expect("Failed to insert shreds in blockstore"); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 3cd554d40e..7794d532fe 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,5 +1,7 @@ -use super::broadcast_utils::{self, ReceiveResults}; -use super::*; +use super::{ + broadcast_utils::{self, ReceiveResults}, + *, +}; use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::{ entry::Entry, @@ -9,44 +11,33 @@ use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use std::collections::HashMap; use std::time::Duration; -#[derive(Default)] -struct BroadcastStats { - // Per-slot elapsed time - shredding_elapsed: u64, - insert_shreds_elapsed: u64, - broadcast_elapsed: u64, - receive_elapsed: u64, - seed_elapsed: u64, - send_mmsg_elapsed: u64, -} - -impl BroadcastStats { - fn reset(&mut self) { - *self = Self::default(); - } -} - #[derive(Clone)] pub struct StandardBroadcastRun { - stats: Arc>, + process_shreds_stats: ProcessShredsStats, + transmit_shreds_stats: Arc>>, + insert_shreds_stats: Arc>>, unfinished_slot: Option, current_slot_and_parent: Option<(u64, u64)>, slot_broadcast_start: Option, keypair: Arc, shred_version: u16, last_datapoint_submit: Arc, + num_batches: usize, } impl StandardBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { Self { - stats: Arc::new(RwLock::new(BroadcastStats::default())), + process_shreds_stats: ProcessShredsStats::default(), + transmit_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())), + insert_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())), unfinished_slot: None, current_slot_and_parent: None, slot_broadcast_start: None, keypair, shred_version, last_datapoint_submit: Arc::new(AtomicU64::new(0)), + num_batches: 0, } } @@ -141,6 +132,7 @@ impl StandardBroadcastRun { let brecv = Arc::new(Mutex::new(brecv)); //data let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.record(&brecv, blockstore); //coding let _ = self.transmit(&srecv, cluster_info, sock); let _ = self.record(&brecv, blockstore); @@ -150,8 +142,8 @@ impl StandardBroadcastRun { fn process_receive_results( &mut self, blockstore: &Arc, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, receive_results: ReceiveResults, ) -> Result<()> { let mut receive_elapsed = receive_results.time_elapsed; @@ -159,11 +151,13 @@ impl StandardBroadcastRun { let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; inc_new_counter_info!("broadcast_service-entries_received", num_entries); - + let old_broadcast_start = self.slot_broadcast_start; + let old_num_batches = self.num_batches; if self.current_slot_and_parent.is_none() || bank.slot() != self.current_slot_and_parent.unwrap().0 { self.slot_broadcast_start = Some(Instant::now()); + self.num_batches = 0; let slot = bank.slot(); let parent_slot = bank.parent_slot(); @@ -178,19 +172,19 @@ impl StandardBroadcastRun { self.check_for_interrupted_slot(bank.ticks_per_slot() as u8); // 2) Convert entries to shreds and coding shreds - let (shredder, next_shred_index) = self.init_shredder( blockstore, (bank.tick_height() % bank.ticks_per_slot()) as u8, ); - let mut data_shreds = self.entries_to_data_shreds( + let is_last_in_slot = last_tick_height == bank.max_tick_height(); + let data_shreds = self.entries_to_data_shreds( &shredder, next_shred_index, &receive_results.entries, - last_tick_height == bank.max_tick_height(), + is_last_in_slot, ); - //Insert the first shred so blockstore stores that the leader started this block - //This must be done before the blocks are sent out over the wire. + // Insert the first shred so blockstore stores that the leader started this block + // This must be done before the blocks are sent out over the wire. if !data_shreds.is_empty() && data_shreds[0].index() == 0 { let first = vec![data_shreds[0].clone()]; blockstore @@ -198,27 +192,56 @@ impl StandardBroadcastRun { .expect("Failed to insert shreds in blockstore"); } let last_data_shred = data_shreds.len(); - if let Some(last_shred) = last_unfinished_slot_shred { - data_shreds.push(last_shred); - } let to_shreds_elapsed = to_shreds_start.elapsed(); let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); let stakes = stakes.map(Arc::new); - let data_shreds = Arc::new(data_shreds); - socket_sender.send((stakes.clone(), data_shreds.clone()))?; - blockstore_sender.send(data_shreds.clone())?; - let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]); - let coding_shreds = Arc::new(coding_shreds); - socket_sender.send((stakes, coding_shreds.clone()))?; - blockstore_sender.send(coding_shreds)?; - self.update_broadcast_stats(BroadcastStats { - shredding_elapsed: duration_as_us(&to_shreds_elapsed), - receive_elapsed: duration_as_us(&receive_elapsed), - ..BroadcastStats::default() + + // Broadcast the last shred of the interrupted slot if necessary + if let Some(last_shred) = last_unfinished_slot_shred { + let batch_info = Some(BroadcastShredBatchInfo { + slot: last_shred.slot(), + num_expected_batches: Some(old_num_batches + 1), + slot_start_ts: old_broadcast_start.expect( + "Old broadcast start time for previous slot must exist if the previous slot + was interrupted", + ), + }); + let last_shred = Arc::new(vec![last_shred]); + socket_sender.send(((stakes.clone(), last_shred.clone()), batch_info.clone()))?; + blockstore_sender.send((last_shred, batch_info))?; + } + + // Increment by two batches, one for the data batch, one for the coding batch. + self.num_batches += 2; + let num_expected_batches = { + if is_last_in_slot { + Some(self.num_batches) + } else { + None + } + }; + let batch_info = Some(BroadcastShredBatchInfo { + slot: bank.slot(), + num_expected_batches, + slot_start_ts: self + .slot_broadcast_start + .clone() + .expect("Start timestamp must exist for a slot if we're broadcasting the slot"), }); + let data_shreds = Arc::new(data_shreds); + socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; + blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?; + let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]); + let coding_shreds = Arc::new(coding_shreds); + socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; + blockstore_sender.send((coding_shreds, batch_info))?; + self.process_shreds_stats.update(&ProcessShredsStats { + shredding_elapsed: duration_as_us(&to_shreds_elapsed), + receive_elapsed: duration_as_us(&receive_elapsed), + }); if last_tick_height == bank.max_tick_height() { self.report_and_reset_stats(); self.unfinished_slot = None; @@ -227,10 +250,15 @@ impl StandardBroadcastRun { Ok(()) } - fn insert(&self, blockstore: &Arc, shreds: Arc>) -> Result<()> { + fn insert( + &mut self, + blockstore: &Arc, + shreds: Arc>, + broadcast_shred_batch_info: Option, + ) -> Result<()> { // Insert shreds into blockstore let insert_shreds_start = Instant::now(); - //The first shred is inserted synchronously + // The first shred is inserted synchronously let data_shreds = if !shreds.is_empty() && shreds[0].index() == 0 { shreds[1..].to_vec() } else { @@ -240,29 +268,39 @@ impl StandardBroadcastRun { .insert_shreds(data_shreds, None, true) .expect("Failed to insert shreds in blockstore"); let insert_shreds_elapsed = insert_shreds_start.elapsed(); - self.update_broadcast_stats(BroadcastStats { + let new_insert_shreds_stats = InsertShredsStats { insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed), - ..BroadcastStats::default() - }); + num_shreds: shreds.len(), + }; + self.update_insertion_metrics(&new_insert_shreds_stats, &broadcast_shred_batch_info); Ok(()) } + fn update_insertion_metrics( + &mut self, + new_insertion_shreds_stats: &InsertShredsStats, + broadcast_shred_batch_info: &Option, + ) { + let mut insert_shreds_stats = self.insert_shreds_stats.lock().unwrap(); + insert_shreds_stats.update(new_insertion_shreds_stats, broadcast_shred_batch_info); + } + fn broadcast( &mut self, sock: &UdpSocket, cluster_info: &Arc>, stakes: Option>>, shreds: Arc>, + broadcast_shred_batch_info: Option, ) -> Result<()> { - let seed_start = Instant::now(); - let seed_elapsed = seed_start.elapsed(); + trace!("Broadcasting {:?} shreds", shreds.len()); + // Get the list of peers to broadcast to + let get_peers_start = Instant::now(); + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + let get_peers_elapsed = get_peers_start.elapsed(); // Broadcast the shreds - let broadcast_start = Instant::now(); - trace!("Broadcasting {:?} shreds", shreds.len()); - - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - + let transmit_start = Instant::now(); let mut send_mmsg_total = 0; broadcast_shreds( sock, @@ -272,42 +310,38 @@ impl StandardBroadcastRun { &self.last_datapoint_submit, &mut send_mmsg_total, )?; - - let broadcast_elapsed = broadcast_start.elapsed(); - - self.update_broadcast_stats(BroadcastStats { - broadcast_elapsed: duration_as_us(&broadcast_elapsed), - seed_elapsed: duration_as_us(&seed_elapsed), + let transmit_elapsed = transmit_start.elapsed(); + let new_transmit_shreds_stats = TransmitShredsStats { + transmit_elapsed: duration_as_us(&transmit_elapsed), + get_peers_elapsed: duration_as_us(&get_peers_elapsed), send_mmsg_elapsed: send_mmsg_total, - ..BroadcastStats::default() - }); + num_shreds: shreds.len(), + }; + + // Process metrics + self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info); Ok(()) } - fn update_broadcast_stats(&self, stats: BroadcastStats) { - let mut wstats = self.stats.write().unwrap(); - wstats.receive_elapsed += stats.receive_elapsed; - wstats.shredding_elapsed += stats.shredding_elapsed; - wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed; - wstats.broadcast_elapsed += stats.broadcast_elapsed; - wstats.seed_elapsed += stats.seed_elapsed; - wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed; + fn update_transmit_metrics( + &mut self, + new_transmit_shreds_stats: &TransmitShredsStats, + broadcast_shred_batch_info: &Option, + ) { + let mut transmit_shreds_stats = self.transmit_shreds_stats.lock().unwrap(); + transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info); } fn report_and_reset_stats(&mut self) { - let stats = self.stats.read().unwrap(); + let stats = &self.process_shreds_stats; assert!(self.unfinished_slot.is_some()); datapoint_info!( - "broadcast-bank-stats", + "broadcast-process-shreds-stats", ("slot", self.unfinished_slot.unwrap().slot as i64, i64), ("shredding_time", stats.shredding_elapsed as i64, i64), - ("insertion_time", stats.insert_shreds_elapsed as i64, i64), - ("broadcast_time", stats.broadcast_elapsed as i64, i64), ("receive_time", stats.receive_elapsed as i64, i64), - ("send_mmsg", stats.send_mmsg_elapsed as i64, i64), - ("seed", stats.seed_elapsed as i64, i64), ( - "num_shreds", + "num_data_shreds", i64::from(self.unfinished_slot.unwrap().next_shred_index), i64 ), @@ -317,8 +351,7 @@ impl StandardBroadcastRun { i64 ), ); - drop(stats); - self.stats.write().unwrap().reset(); + self.process_shreds_stats.reset(); } } @@ -327,8 +360,8 @@ impl BroadcastRun for StandardBroadcastRun { &mut self, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender, - blockstore_sender: &Sender>>, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { let receive_results = broadcast_utils::recv_slot_entries(receiver)?; self.process_receive_results( @@ -340,20 +373,20 @@ impl BroadcastRun for StandardBroadcastRun { } fn transmit( &mut self, - receiver: &Arc>>, + receiver: &Arc>, cluster_info: &Arc>, sock: &UdpSocket, ) -> Result<()> { - let (stakes, shreds) = receiver.lock().unwrap().recv()?; - self.broadcast(sock, cluster_info, stakes, shreds) + let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts) } fn record( - &self, - receiver: &Arc>>>>, + &mut self, + receiver: &Arc>, blockstore: &Arc, ) -> Result<()> { - let shreds = receiver.lock().unwrap().recv()?; - self.insert(blockstore, shreds) + let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?; + self.insert(blockstore, shreds, slot_start_ts) } } @@ -469,12 +502,29 @@ mod test { // Make sure the slot is not complete assert!(!blockstore.is_full(0)); // Modify the stats, should reset later - standard_broadcast_run - .stats - .write() - .unwrap() - .receive_elapsed = 10; - + standard_broadcast_run.process_shreds_stats.receive_elapsed = 10; + // Broadcast stats should exist, and 2 batches should have been sent, + // one for data, one for coding + assert_eq!( + standard_broadcast_run + .transmit_shreds_stats + .lock() + .unwrap() + .get(unfinished_slot.slot) + .unwrap() + .num_batches(), + 2 + ); + assert_eq!( + standard_broadcast_run + .insert_shreds_stats + .lock() + .unwrap() + .get(unfinished_slot.slot) + .unwrap() + .num_batches(), + 2 + ); // Try to fetch ticks from blockstore, nothing should break assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0); assert_eq!( @@ -485,7 +535,7 @@ mod test { // Step 2: Make a transmission for another bank that interrupts the transmission for // slot 0 let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2)); - + let interrupted_slot = unfinished_slot.slot; // Interrupting the slot should cause the unfinished_slot and stats to reset let num_shreds = 1; assert!(num_shreds < num_shreds_per_slot); @@ -509,10 +559,24 @@ mod test { // Check that the stats were reset as well assert_eq!( - standard_broadcast_run.stats.read().unwrap().receive_elapsed, + standard_broadcast_run.process_shreds_stats.receive_elapsed, 0 ); + // Broadcast stats for interrupted slot should be cleared + assert!(standard_broadcast_run + .transmit_shreds_stats + .lock() + .unwrap() + .get(interrupted_slot) + .is_none()); + assert!(standard_broadcast_run + .insert_shreds_stats + .lock() + .unwrap() + .get(interrupted_slot) + .is_none()); + // Try to fetch the incomplete ticks from blockstore, should succeed assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0); assert_eq!( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index af40826a21..5da4c4f5db 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -278,7 +278,7 @@ impl CrdsGossipPull { failed } // build a set of filters of the current crds table - // num_filters - used to increase the likely hood of a value in crds being added to some filter + // num_filters - used to increase the likelyhood of a value in crds being added to some filter pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec { let num = cmp::max( CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, diff --git a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json index 6c46bf7b60..04d8c02ba9 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json @@ -5150,7 +5150,7 @@ "measurement": "cluster_info-vote-count", "orderByTime": "ASC", "policy": "autogen", - "query": "SELECT mean(\"receive_time\") AS \"receive_time\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT mean(\"receive_time\") AS \"receive_time\" FROM \"$testnet\".\"autogen\".\"broadcast-process-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -5189,7 +5189,7 @@ "measurement": "cluster_info-vote-count", "orderByTime": "ASC", "policy": "autogen", - "query": "SELECT mean(\"shredding_time\") AS \"shredding_time\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT mean(\"shredding_time\") AS \"shredding_time\" FROM \"$testnet\".\"autogen\".\"broadcast-process-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "B", "resultFormat": "time_series", @@ -5228,7 +5228,7 @@ "measurement": "cluster_info-vote-count", "orderByTime": "ASC", "policy": "autogen", - "query": "SELECT mean(\"broadcast_time\") AS \"broadcast_time\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT mean(\"slot_broadcast_time\") AS \"slot_broadcast_time\" FROM \"$testnet\".\"autogen\".\"broadcast-process-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -5267,85 +5267,7 @@ "measurement": "cluster_info-vote-count", "orderByTime": "ASC", "policy": "autogen", - "query": "SELECT mean(\"num_shreds\") AS \"num_shreds\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", - "rawQuery": true, - "refId": "C", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "count" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - }, - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "hide": false, - "measurement": "cluster_info-vote-count", - "orderByTime": "ASC", - "policy": "autogen", - "query": "SELECT mean(\"slot_broadcast_time\") AS \"slot_broadcast_time\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", - "rawQuery": true, - "refId": "C", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "count" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - }, - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "hide": false, - "measurement": "cluster_info-vote-count", - "orderByTime": "ASC", - "policy": "autogen", - "query": "SELECT mean(\"insertion_time\") AS \"insertion_time\" FROM \"$testnet\".\"autogen\".\"broadcast-bank-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT mean(\"num_data_shreds\") AS \"num_data_shreds\" FROM \"$testnet\".\"autogen\".\"broadcast-process-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "D", "resultFormat": "time_series", @@ -5369,7 +5291,493 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Time spent in Broadcast ($hostid)", + "title": "Broadcast Receive/Shredding Stats ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "µs", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 62 + }, + "id": 74, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "broadcast-bank-stats.num_shreds", + "yaxis": 2 + }, + { + "alias": "broadcast-transmit-shreds-stats.num_data_and_coding_shreds", + "yaxis": 2 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"transmit_elapsed\") AS \"transmit_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-transmit-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"send_mmsg_elapsed\") AS \"send_mmsg_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-transmit-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"get_peers_elapsed\") AS \"get_peers_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-transmit-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"num_shreds\") AS \"num_data_and_coding_shreds\" FROM \"$testnet\".\"autogen\".\"broadcast-transmit-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "D", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"end_to_end_elapsed\") AS \"end_to_end_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-transmit-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "E", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Broadcast Transmit Stats ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "µs", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 6, + "w": 8, + "x": 16, + "y": 62 + }, + "id": 75, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "broadcast-bank-stats.num_shreds", + "yaxis": 2 + }, + { + "alias": "broadcast-insert-shreds-stats.num_data_and_coding_shreds", + "yaxis": 2 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"insert_shreds_elapsed\") AS \"insert_shreds_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-insert-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"num_shreds\") AS \"num_data_and_coding_shreds\" FROM \"$testnet\".\"autogen\".\"broadcast-insert-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"end_to_end_elapsed\") AS \"end_to_end_elapsed\" FROM \"$testnet\".\"autogen\".\"broadcast-insert-shreds-stats\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Broadcast Insert Shreds Stats ($hostid)", "tooltip": { "shared": true, "sort": 0, @@ -5423,8 +5831,8 @@ "gridPos": { "h": 6, "w": 8, - "x": 8, - "y": 62 + "x": 0, + "y": 68 }, "id": 44, "legend": { @@ -5773,6 +6181,248 @@ "alignLevel": null } }, + { + "aliasColors": { + "banking_stage-buffered_packets.sum": "#3f6833", + "banking_stage-consumed_buffered_packets.last": "#65c5db", + "banking_stage-consumed_buffered_packets.sum": "#614d93", + "banking_stage-forwarded_packets.last": "#f9ba8f", + "banking_stage-forwarded_packets.sum": "#b7dbab", + "banking_stage-rebuffered_packets.last": "#e5a8e2", + "cluster-info.repair": "#ba43a9", + "fetch_stage-discard_forwards.sum": "#00ffbb", + "fetch_stage-honor_forwards.sum": "#bf1b00", + "poh_recorder-record_lock_contention.sum": "#5195ce", + "poh_recorder-tick_lock_contention.sum": "#962d82", + "replay_stage-new_leader.last": "#00ffbb", + "tower-vote.last": "#00ffbb", + "window-service.receive": "#b7dbab", + "window-stage.consumed": "#5195ce" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 68 + }, + "id": 47, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-tick_lock_contention\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "F", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-record_lock_contention\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-tick_overhead\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-record_ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "PoH Lock Contention ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "µs", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": { "cluster-info.repair": "#ba43a9", @@ -5791,7 +6441,7 @@ "h": 6, "w": 8, "x": 16, - "y": 62 + "y": 68 }, "id": 71, "legend": { @@ -6112,7 +6762,7 @@ "h": 6, "w": 8, "x": 0, - "y": 68 + "y": 74 }, "id": 46, "legend": { @@ -6516,35 +7166,19 @@ } }, { - "aliasColors": { - "banking_stage-buffered_packets.sum": "#3f6833", - "banking_stage-consumed_buffered_packets.last": "#65c5db", - "banking_stage-consumed_buffered_packets.sum": "#614d93", - "banking_stage-forwarded_packets.last": "#f9ba8f", - "banking_stage-forwarded_packets.sum": "#b7dbab", - "banking_stage-rebuffered_packets.last": "#e5a8e2", - "cluster-info.repair": "#ba43a9", - "fetch_stage-discard_forwards.sum": "#00ffbb", - "fetch_stage-honor_forwards.sum": "#bf1b00", - "poh_recorder-record_lock_contention.sum": "#5195ce", - "poh_recorder-tick_lock_contention.sum": "#962d82", - "replay_stage-new_leader.last": "#00ffbb", - "tower-vote.last": "#00ffbb", - "window-service.receive": "#b7dbab", - "window-stage.consumed": "#5195ce" - }, + "aliasColors": {}, "bars": false, "dashLength": 10, "dashes": false, "datasource": "$datasource", "fill": 1, "gridPos": { - "h": 6, + "h": 5, "w": 8, "x": 8, - "y": 68 + "y": 74 }, - "id": 47, + "id": 54, "legend": { "alignAsTable": false, "avg": false, @@ -6556,11 +7190,11 @@ "values": false }, "lines": true, - "linewidth": 2, + "linewidth": 1, "links": [], - "nullPointMode": "null as zero", + "nullPointMode": "null", "percentage": false, - "pointradius": 2, + "pointradius": 5, "points": false, "renderer": "flot", "seriesOverrides": [], @@ -6583,46 +7217,10 @@ "type": "fill" } ], + "measurement": "cluster_info-vote-count", "orderByTime": "ASC", - "policy": "default", - "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-tick_lock_contention\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", - "rawQuery": true, - "refId": "F", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] - }, - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "orderByTime": "ASC", - "policy": "default", - "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-record_lock_contention\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "policy": "autogen", + "query": "SELECT sum(\"count\") AS \"window receive\" FROM \"$testnet\".\"autogen\".\"streamer-recv_window-recv\" WHERE $timeFilter GROUP BY time($__interval) FILL(0)\n", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -6630,13 +7228,13 @@ [ { "params": [ - "value" + "count" ], "type": "field" }, { "params": [], - "type": "mean" + "type": "sum" } ] ], @@ -6659,7 +7257,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-tick_overhead\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT sum(\"count\") AS \"window receive\" FROM \"$testnet\".\"autogen\".\"streamer-recv_window-consume\" WHERE $timeFilter GROUP BY time($__interval) FILL(0)", "rawQuery": true, "refId": "B", "resultFormat": "time_series", @@ -6678,49 +7276,12 @@ ] ], "tags": [] - }, - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "orderByTime": "ASC", - "policy": "default", - "query": "SELECT mean(\"count\") FROM \"$testnet\".\"autogen\".\"poh_recorder-record_ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", - "rawQuery": true, - "refId": "C", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "PoH Lock Contention ($hostid)", + "title": "Window", "tooltip": { "shared": true, "sort": 0, @@ -6736,7 +7297,7 @@ }, "yaxes": [ { - "format": "µs", + "format": "short", "label": null, "logBase": 1, "max": null, @@ -6749,7 +7310,7 @@ "logBase": 1, "max": null, "min": null, - "show": false + "show": true } ], "yaxis": { @@ -6775,7 +7336,7 @@ "h": 6, "w": 8, "x": 16, - "y": 68 + "y": 74 }, "id": 45, "legend": { @@ -7076,6 +7637,123 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "description": "(Must pick a host id for this to make sense)", + "fill": 1, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 79 + }, + "id": 73, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT count(\"slot\") AS \"num_my_leader_slots\" FROM \"$testnet\".\"autogen\".\"replay_stage-my_leader_slot\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time(1s) fill(null)\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "My Leader Slots ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": { "banking_stage-buffered_packets.sum": "#3f6833", @@ -7101,7 +7779,7 @@ "h": 5, "w": 8, "x": 0, - "y": 74 + "y": 80 }, "id": 53, "legend": { @@ -7392,159 +8070,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$datasource", - "fill": 1, - "gridPos": { - "h": 5, - "w": 8, - "x": 8, - "y": 74 - }, - "id": 54, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "measurement": "cluster_info-vote-count", - "orderByTime": "ASC", - "policy": "autogen", - "query": "SELECT sum(\"count\") AS \"window receive\" FROM \"$testnet\".\"autogen\".\"streamer-recv_window-recv\" WHERE $timeFilter GROUP BY time($__interval) FILL(0)\n", - "rawQuery": true, - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "count" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - }, - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "orderByTime": "ASC", - "policy": "default", - "query": "SELECT sum(\"count\") AS \"window receive\" FROM \"$testnet\".\"autogen\".\"streamer-recv_window-consume\" WHERE $timeFilter GROUP BY time($__interval) FILL(0)", - "rawQuery": true, - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Window", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -7556,7 +8081,7 @@ "h": 5, "w": 8, "x": 16, - "y": 74 + "y": 80 }, "id": 48, "legend": { @@ -7673,7 +8198,7 @@ "h": 6, "w": 8, "x": 0, - "y": 79 + "y": 85 }, "id": 61, "legend": { @@ -7820,123 +8345,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$datasource", - "description": "(Must pick a host id for this to make sense)", - "fill": 1, - "gridPos": { - "h": 6, - "w": 8, - "x": 8, - "y": 79 - }, - "id": 73, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "measurement": "cluster_info-vote-count", - "orderByTime": "ASC", - "policy": "autogen", - "query": "SELECT count(\"slot\") AS \"num_my_leader_slots\" FROM \"$testnet\".\"autogen\".\"replay_stage-my_leader_slot\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time(1s) fill(null)\n", - "rawQuery": true, - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "count" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "My Leader Slots ($hostid)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -7948,7 +8356,7 @@ "h": 6, "w": 8, "x": 16, - "y": 79 + "y": 85 }, "id": 52, "legend": { @@ -8059,7 +8467,7 @@ "h": 1, "w": 24, "x": 0, - "y": 85 + "y": 91 }, "id": 55, "panels": [], @@ -8082,7 +8490,7 @@ "h": 5, "w": 8, "x": 0, - "y": 86 + "y": 92 }, "id": 56, "legend": { @@ -8242,7 +8650,7 @@ "h": 5, "w": 8, "x": 8, - "y": 86 + "y": 92 }, "id": 57, "legend": { @@ -8402,7 +8810,7 @@ "h": 5, "w": 8, "x": 16, - "y": 86 + "y": 92 }, "id": 58, "legend": { @@ -8587,7 +8995,7 @@ "h": 1, "w": 24, "x": 0, - "y": 91 + "y": 97 }, "id": 59, "panels": [], @@ -8606,7 +9014,7 @@ "h": 5, "w": 12, "x": 0, - "y": 92 + "y": 98 }, "id": 60, "legend": { @@ -8839,7 +9247,7 @@ "h": 5, "w": 12, "x": 12, - "y": 92 + "y": 98 }, "id": 72, "legend": { @@ -8992,7 +9400,7 @@ "h": 1, "w": 24, "x": 0, - "y": 97 + "y": 103 }, "id": 62, "panels": [], @@ -9010,7 +9418,7 @@ "h": 5, "w": 12, "x": 0, - "y": 98 + "y": 104 }, "id": 63, "legend": { @@ -9212,7 +9620,7 @@ "h": 5, "w": 12, "x": 12, - "y": 98 + "y": 104 }, "id": 64, "legend": { @@ -9361,7 +9769,7 @@ "h": 1, "w": 24, "x": 0, - "y": 103 + "y": 109 }, "id": 65, "panels": [], @@ -9379,7 +9787,7 @@ "h": 6, "w": 8, "x": 0, - "y": 104 + "y": 110 }, "id": 66, "legend": { @@ -9571,7 +9979,7 @@ "h": 6, "w": 8, "x": 8, - "y": 104 + "y": 110 }, "id": 67, "legend": { @@ -9839,7 +10247,7 @@ "h": 6, "w": 8, "x": 16, - "y": 104 + "y": 110 }, "id": 68, "legend": { @@ -10028,7 +10436,7 @@ "h": 1, "w": 24, "x": 0, - "y": 110 + "y": 116 }, "id": 69, "panels": [], @@ -10046,7 +10454,7 @@ "h": 4, "w": 8, "x": 0, - "y": 111 + "y": 117 }, "id": 70, "legend": {