diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d981b3456c..3e6dd0dde5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -73,6 +73,7 @@ pub use { crate::{ blockstore_db::BlockstoreError, blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta}, + blockstore_metrics::BlockstoreInsertionMetrics, }, blockstore_purge::PurgeType, rocksdb::properties as RocksProperties, @@ -198,34 +199,6 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } -#[derive(Default)] -pub struct BlockstoreInsertionMetrics { - pub num_shreds: usize, - pub insert_lock_elapsed: u64, - pub insert_shreds_elapsed: u64, - pub shred_recovery_elapsed: u64, - pub chaining_elapsed: u64, - pub commit_working_sets_elapsed: u64, - pub write_batch_elapsed: u64, - pub total_elapsed: u64, - pub num_inserted: u64, - pub num_repair: u64, - pub num_recovered: usize, - num_recovered_blockstore_error: usize, - pub num_recovered_inserted: usize, - pub num_recovered_failed_sig: usize, - pub num_recovered_failed_invalid: usize, - pub num_recovered_exists: usize, - pub index_meta_time: u64, - num_data_shreds_exists: usize, - num_data_shreds_invalid: usize, - num_data_shreds_blockstore_error: usize, - num_coding_shreds_exists: usize, - num_coding_shreds_invalid: usize, - num_coding_shreds_invalid_erasure_config: usize, - num_coding_shreds_inserted: usize, -} - impl SlotMetaWorkingSetEntry { /// Construct a new SlotMetaWorkingSetEntry with the specified `new_slot_meta` /// and `old_slot_meta`. `did_insert_occur` is set to false. @@ -238,89 +211,6 @@ impl SlotMetaWorkingSetEntry { } } -impl BlockstoreInsertionMetrics { - pub fn report_metrics(&self, metric_name: &'static str) { - datapoint_info!( - metric_name, - ("num_shreds", self.num_shreds as i64, i64), - ("total_elapsed", self.total_elapsed as i64, i64), - ("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64), - ( - "insert_shreds_elapsed", - self.insert_shreds_elapsed as i64, - i64 - ), - ( - "shred_recovery_elapsed", - self.shred_recovery_elapsed as i64, - i64 - ), - ("chaining_elapsed", self.chaining_elapsed as i64, i64), - ( - "commit_working_sets_elapsed", - self.commit_working_sets_elapsed as i64, - i64 - ), - ("write_batch_elapsed", self.write_batch_elapsed as i64, i64), - ("num_inserted", self.num_inserted as i64, i64), - ("num_repair", self.num_repair as i64, i64), - ("num_recovered", self.num_recovered as i64, i64), - ( - "num_recovered_inserted", - self.num_recovered_inserted as i64, - i64 - ), - ( - "num_recovered_failed_sig", - self.num_recovered_failed_sig as i64, - i64 - ), - ( - "num_recovered_failed_invalid", - self.num_recovered_failed_invalid as i64, - i64 - ), - ( - "num_recovered_exists", - self.num_recovered_exists as i64, - i64 - ), - ( - "num_recovered_blockstore_error", - self.num_recovered_blockstore_error, - i64 - ), - ("num_data_shreds_exists", self.num_data_shreds_exists, i64), - ("num_data_shreds_invalid", self.num_data_shreds_invalid, i64), - ( - "num_data_shreds_blockstore_error", - self.num_data_shreds_blockstore_error, - i64 - ), - ( - "num_coding_shreds_exists", - self.num_coding_shreds_exists, - i64 - ), - ( - "num_coding_shreds_invalid", - self.num_coding_shreds_invalid, - i64 - ), - ( - "num_coding_shreds_invalid_erasure_config", - self.num_coding_shreds_invalid_erasure_config, - i64 - ), - ( - "num_coding_shreds_inserted", - self.num_coding_shreds_inserted, - i64 - ), - ); - } -} - impl Blockstore { pub fn db(self) -> Arc { self.db @@ -880,7 +770,7 @@ impl Blockstore { let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); start.stop(); - metrics.insert_lock_elapsed += start.as_us(); + metrics.insert_lock_elapsed_us += start.as_us(); let db = &*self.db; let mut write_batch = db.batch()?; @@ -892,7 +782,7 @@ impl Blockstore { metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); - let mut index_meta_time = 0; + let mut index_meta_time_us = 0; let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { @@ -910,7 +800,7 @@ impl Blockstore { &mut slot_meta_working_set, &mut write_batch, &mut just_inserted_shreds, - &mut index_meta_time, + &mut index_meta_time_us, is_trusted, handle_duplicate, leader_schedule, @@ -938,7 +828,7 @@ impl Blockstore { &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, - &mut index_meta_time, + &mut index_meta_time_us, handle_duplicate, is_trusted, shred_source, @@ -949,7 +839,7 @@ impl Blockstore { } start.stop(); - metrics.insert_shreds_elapsed += start.as_us(); + metrics.insert_shreds_elapsed_us += start.as_us(); let mut start = Measure::start("Shred recovery"); if let Some(leader_schedule_cache) = leader_schedule { let recovered_data_shreds = Self::try_shred_recovery( @@ -976,7 +866,7 @@ impl Blockstore { &mut slot_meta_working_set, &mut write_batch, &mut just_inserted_shreds, - &mut index_meta_time, + &mut index_meta_time_us, is_trusted, &handle_duplicate, leader_schedule, @@ -1012,14 +902,14 @@ impl Blockstore { } } start.stop(); - metrics.shred_recovery_elapsed += start.as_us(); + metrics.shred_recovery_elapsed_us += start.as_us(); let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; start.stop(); - metrics.chaining_elapsed += start.as_us(); + metrics.chaining_elapsed_us += start.as_us(); let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( @@ -1038,12 +928,12 @@ impl Blockstore { } } start.stop(); - metrics.commit_working_sets_elapsed += start.as_us(); + metrics.commit_working_sets_elapsed_us += start.as_us(); let mut start = Measure::start("Write Batch"); self.db.write(write_batch)?; start.stop(); - metrics.write_batch_elapsed += start.as_us(); + metrics.write_batch_elapsed_us += start.as_us(); send_signals( &self.new_shreds_signals.lock().unwrap(), @@ -1054,8 +944,8 @@ impl Blockstore { total_start.stop(); - metrics.total_elapsed += total_start.as_us(); - metrics.index_meta_time += index_meta_time; + metrics.total_elapsed_us += total_start.as_us(); + metrics.index_meta_time_us += index_meta_time_us; Ok((newly_completed_data_sets, inserted_indices)) } @@ -1138,7 +1028,7 @@ impl Blockstore { index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, - index_meta_time: &mut u64, + index_meta_time_us: &mut u64, handle_duplicate: &F, is_trusted: bool, shred_source: ShredSource, @@ -1151,7 +1041,7 @@ impl Blockstore { let shred_index = u64::from(shred.index()); let index_meta_working_set_entry = - get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); + get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; @@ -1296,7 +1186,7 @@ impl Blockstore { /// be committed atomically. /// - `just_inserted_data_shreds`: a (slot, shred index within the slot) /// to shred map which maintains which data shreds have been inserted. - /// - `index_meta_time`: the time spent on loading or creating the + /// - `index_meta_time_us`: the time spent on loading or creating the /// index meta entry from the db. /// - `is_trusted`: if false, this function will check whether the /// input shred is duplicate. @@ -1313,7 +1203,7 @@ impl Blockstore { slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_inserted_shreds: &mut HashMap, - index_meta_time: &mut u64, + index_meta_time_us: &mut u64, is_trusted: bool, handle_duplicate: &F, leader_schedule: Option<&LeaderScheduleCache>, @@ -1326,7 +1216,7 @@ impl Blockstore { let shred_index = u64::from(shred.index()); let index_meta_working_set_entry = - get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); + get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; let slot_meta_entry = get_slot_meta_entry( @@ -3349,7 +3239,7 @@ fn get_index_meta_entry<'a>( db: &Database, slot: Slot, index_working_set: &'a mut HashMap, - index_meta_time: &mut u64, + index_meta_time_us: &mut u64, ) -> &'a mut IndexMetaWorkingSetEntry { let index_cf = db.column::(); let mut total_start = Measure::start("Total elapsed"); @@ -3364,7 +3254,7 @@ fn get_index_meta_entry<'a>( } }); total_start.stop(); - *index_meta_time += total_start.as_us(); + *index_meta_time_us += total_start.as_us(); res } @@ -5997,14 +5887,14 @@ pub mod tests { let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); - let mut index_meta_time = 0; + let mut index_meta_time_us = 0; assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, - &mut index_meta_time, + &mut index_meta_time_us, &|_shred| { panic!("no dupes"); }, @@ -6022,7 +5912,7 @@ pub mod tests { &mut index_working_set, &mut write_batch, &mut just_received_shreds, - &mut index_meta_time, + &mut index_meta_time_us, &|_shred| { counter.fetch_add(1, Ordering::Relaxed); }, diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs index 027e7283cd..136d205444 100644 --- a/ledger/src/blockstore_metrics.rs +++ b/ledger/src/blockstore_metrics.rs @@ -14,6 +14,125 @@ use { }, }; +#[derive(Default)] +pub struct BlockstoreInsertionMetrics { + pub insert_lock_elapsed_us: u64, + pub insert_shreds_elapsed_us: u64, + pub shred_recovery_elapsed_us: u64, + pub chaining_elapsed_us: u64, + pub commit_working_sets_elapsed_us: u64, + pub write_batch_elapsed_us: u64, + pub total_elapsed_us: u64, + pub index_meta_time_us: u64, + pub num_shreds: usize, + pub num_inserted: u64, + pub num_repair: u64, + pub num_recovered: usize, + pub num_recovered_blockstore_error: usize, + pub num_recovered_inserted: usize, + pub num_recovered_failed_sig: usize, + pub num_recovered_failed_invalid: usize, + pub num_recovered_exists: usize, + pub num_data_shreds_exists: usize, + pub num_data_shreds_invalid: usize, + pub num_data_shreds_blockstore_error: usize, + pub num_coding_shreds_exists: usize, + pub num_coding_shreds_invalid: usize, + pub num_coding_shreds_invalid_erasure_config: usize, + pub num_coding_shreds_inserted: usize, +} + +impl BlockstoreInsertionMetrics { + pub fn report_metrics(&self, metric_name: &'static str) { + datapoint_info!( + metric_name, + ("num_shreds", self.num_shreds as i64, i64), + ("total_elapsed_us", self.total_elapsed_us as i64, i64), + ( + "insert_lock_elapsed_us", + self.insert_lock_elapsed_us as i64, + i64 + ), + ( + "insert_shreds_elapsed_us", + self.insert_shreds_elapsed_us as i64, + i64 + ), + ( + "shred_recovery_elapsed_us", + self.shred_recovery_elapsed_us as i64, + i64 + ), + ("chaining_elapsed_us", self.chaining_elapsed_us as i64, i64), + ( + "commit_working_sets_elapsed_us", + self.commit_working_sets_elapsed_us as i64, + i64 + ), + ( + "write_batch_elapsed_us", + self.write_batch_elapsed_us as i64, + i64 + ), + ("num_inserted", self.num_inserted as i64, i64), + ("num_repair", self.num_repair as i64, i64), + ("num_recovered", self.num_recovered as i64, i64), + ( + "num_recovered_inserted", + self.num_recovered_inserted as i64, + i64 + ), + ( + "num_recovered_failed_sig", + self.num_recovered_failed_sig as i64, + i64 + ), + ( + "num_recovered_failed_invalid", + self.num_recovered_failed_invalid as i64, + i64 + ), + ( + "num_recovered_exists", + self.num_recovered_exists as i64, + i64 + ), + ( + "num_recovered_blockstore_error", + self.num_recovered_blockstore_error, + i64 + ), + ("num_data_shreds_exists", self.num_data_shreds_exists, i64), + ("num_data_shreds_invalid", self.num_data_shreds_invalid, i64), + ( + "num_data_shreds_blockstore_error", + self.num_data_shreds_blockstore_error, + i64 + ), + ( + "num_coding_shreds_exists", + self.num_coding_shreds_exists, + i64 + ), + ( + "num_coding_shreds_invalid", + self.num_coding_shreds_invalid, + i64 + ), + ( + "num_coding_shreds_invalid_erasure_config", + self.num_coding_shreds_invalid_erasure_config, + i64 + ), + ( + "num_coding_shreds_inserted", + self.num_coding_shreds_inserted, + i64 + ), + ); + } +} + /// A metrics struct that exposes RocksDB's column family properties. /// /// Here we only expose a subset of all the internal properties which are