diff --git a/core/src/window_service.rs b/core/src/window_service.rs index bbc4b54027..bb17116c22 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -14,7 +14,9 @@ use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; use solana_ledger::bank_forks::BankForks; -use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT}; +use solana_ledger::blockstore::{ + self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT, +}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::Shred; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; @@ -110,6 +112,7 @@ fn run_insert( blockstore: &Arc, leader_schedule_cache: &Arc, handle_duplicate: F, + metrics: &mut BlockstoreInsertionMetrics, ) -> Result<()> where F: Fn(Shred) -> (), @@ -121,14 +124,13 @@ where shreds.append(&mut more_shreds) } - let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate( + blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), false, &handle_duplicate, + metrics, )?; - blockstore_insert_metrics.report_metrics("recv-window-insert-shreds"); - Ok(()) } @@ -358,6 +360,8 @@ impl WindowService { let handle_duplicate = |shred| { let _ = duplicate_sender.send(shred); }; + let mut metrics = BlockstoreInsertionMetrics::default(); + let mut last_print = Instant::now(); loop { if exit.load(Ordering::Relaxed) { break; @@ -368,11 +372,18 @@ impl WindowService { &blockstore, &leader_schedule_cache, &handle_duplicate, + &mut metrics, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; } } + + if last_print.elapsed().as_secs() > 2 { + metrics.report_metrics("recv-window-insert-shreds"); + metrics = BlockstoreInsertionMetrics::default(); + last_print = Instant::now(); + } } }) .unwrap() diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 366fec4d8d..a215379ce4 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -111,6 +111,7 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } +#[derive(Default)] pub struct BlockstoreInsertionMetrics { pub num_shreds: usize, pub insert_lock_elapsed: u64, @@ -137,7 +138,7 @@ impl SlotMetaWorkingSetEntry { impl BlockstoreInsertionMetrics { pub fn report_metrics(&self, metric_name: &'static str) { - datapoint_debug!( + datapoint_info!( metric_name, ("num_shreds", self.num_shreds as i64, i64), ("total_elapsed", self.total_elapsed as i64, i64), @@ -625,7 +626,8 @@ impl Blockstore { leader_schedule: Option<&Arc>, is_trusted: bool, handle_duplicate: &F, - ) -> Result + metrics: &mut BlockstoreInsertionMetrics, + ) -> Result<()> where F: Fn(Shred) -> (), { @@ -764,19 +766,19 @@ impl Blockstore { total_start.stop(); - Ok(BlockstoreInsertionMetrics { - num_shreds, - total_elapsed: total_start.as_us(), - insert_lock_elapsed, - insert_shreds_elapsed, - shred_recovery_elapsed, - chaining_elapsed, - commit_working_sets_elapsed, - write_batch_elapsed, - num_inserted, - num_recovered, - index_meta_time, - }) + metrics.num_shreds += num_shreds; + metrics.total_elapsed += total_start.as_us(); + metrics.insert_lock_elapsed += insert_lock_elapsed; + metrics.insert_shreds_elapsed += insert_shreds_elapsed; + metrics.shred_recovery_elapsed += shred_recovery_elapsed; + metrics.chaining_elapsed += chaining_elapsed; + metrics.commit_working_sets_elapsed += commit_working_sets_elapsed; + metrics.write_batch_elapsed += write_batch_elapsed; + metrics.num_inserted += num_inserted; + metrics.num_recovered += num_recovered; + metrics.index_meta_time += index_meta_time; + + Ok(()) } pub fn insert_shreds( @@ -784,8 +786,14 @@ impl Blockstore { shreds: Vec, leader_schedule: Option<&Arc>, is_trusted: bool, - ) -> Result { - self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {}) + ) -> Result<()> { + self.insert_shreds_handle_duplicate( + shreds, + leader_schedule, + is_trusted, + &|_| {}, + &mut BlockstoreInsertionMetrics::default(), + ) } fn check_insert_coding_shred(