Accumulate blockstore metrics and submit every 2s (#9075)

This commit is contained in:
sakridge 2020-03-26 12:51:41 -07:00 committed by GitHub
parent 284920433f
commit ed036b978d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 21 deletions

View File

@ -14,7 +14,9 @@ use rayon::iter::IntoParallelRefMutIterator;
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_ledger::bank_forks::BankForks; use solana_ledger::bank_forks::BankForks;
use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT}; use solana_ledger::blockstore::{
self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT,
};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::Shred; use solana_ledger::shred::Shred;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
@ -110,6 +112,7 @@ fn run_insert<F>(
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
handle_duplicate: F, handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<()> ) -> Result<()>
where where
F: Fn(Shred) -> (), F: Fn(Shred) -> (),
@ -121,14 +124,13 @@ where
shreds.append(&mut more_shreds) shreds.append(&mut more_shreds)
} }
let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate( blockstore.insert_shreds_handle_duplicate(
shreds, shreds,
Some(leader_schedule_cache), Some(leader_schedule_cache),
false, false,
&handle_duplicate, &handle_duplicate,
metrics,
)?; )?;
blockstore_insert_metrics.report_metrics("recv-window-insert-shreds");
Ok(()) Ok(())
} }
@ -358,6 +360,8 @@ impl WindowService {
let handle_duplicate = |shred| { let handle_duplicate = |shred| {
let _ = duplicate_sender.send(shred); let _ = duplicate_sender.send(shred);
}; };
let mut metrics = BlockstoreInsertionMetrics::default();
let mut last_print = Instant::now();
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
@ -368,11 +372,18 @@ impl WindowService {
&blockstore, &blockstore,
&leader_schedule_cache, &leader_schedule_cache,
&handle_duplicate, &handle_duplicate,
&mut metrics,
) { ) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break; break;
} }
} }
if last_print.elapsed().as_secs() > 2 {
metrics.report_metrics("recv-window-insert-shreds");
metrics = BlockstoreInsertionMetrics::default();
last_print = Instant::now();
}
} }
}) })
.unwrap() .unwrap()

View File

@ -111,6 +111,7 @@ pub struct SlotMetaWorkingSetEntry {
did_insert_occur: bool, did_insert_occur: bool,
} }
#[derive(Default)]
pub struct BlockstoreInsertionMetrics { pub struct BlockstoreInsertionMetrics {
pub num_shreds: usize, pub num_shreds: usize,
pub insert_lock_elapsed: u64, pub insert_lock_elapsed: u64,
@ -137,7 +138,7 @@ impl SlotMetaWorkingSetEntry {
impl BlockstoreInsertionMetrics { impl BlockstoreInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) { pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_debug!( datapoint_info!(
metric_name, metric_name,
("num_shreds", self.num_shreds as i64, i64), ("num_shreds", self.num_shreds as i64, i64),
("total_elapsed", self.total_elapsed as i64, i64), ("total_elapsed", self.total_elapsed as i64, i64),
@ -625,7 +626,8 @@ impl Blockstore {
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool, is_trusted: bool,
handle_duplicate: &F, handle_duplicate: &F,
) -> Result<BlockstoreInsertionMetrics> metrics: &mut BlockstoreInsertionMetrics,
) -> Result<()>
where where
F: Fn(Shred) -> (), F: Fn(Shred) -> (),
{ {
@ -764,19 +766,19 @@ impl Blockstore {
total_start.stop(); total_start.stop();
Ok(BlockstoreInsertionMetrics { metrics.num_shreds += num_shreds;
num_shreds, metrics.total_elapsed += total_start.as_us();
total_elapsed: total_start.as_us(), metrics.insert_lock_elapsed += insert_lock_elapsed;
insert_lock_elapsed, metrics.insert_shreds_elapsed += insert_shreds_elapsed;
insert_shreds_elapsed, metrics.shred_recovery_elapsed += shred_recovery_elapsed;
shred_recovery_elapsed, metrics.chaining_elapsed += chaining_elapsed;
chaining_elapsed, metrics.commit_working_sets_elapsed += commit_working_sets_elapsed;
commit_working_sets_elapsed, metrics.write_batch_elapsed += write_batch_elapsed;
write_batch_elapsed, metrics.num_inserted += num_inserted;
num_inserted, metrics.num_recovered += num_recovered;
num_recovered, metrics.index_meta_time += index_meta_time;
index_meta_time,
}) Ok(())
} }
pub fn insert_shreds( pub fn insert_shreds(
@ -784,8 +786,14 @@ impl Blockstore {
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool, is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> { ) -> Result<()> {
self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {}) self.insert_shreds_handle_duplicate(
shreds,
leader_schedule,
is_trusted,
&|_| {},
&mut BlockstoreInsertionMetrics::default(),
)
} }
fn check_insert_coding_shred( fn check_insert_coding_shred(