Cleanup BlockstoreInsertionMetrics (#25618)

* Move BlockstoreInsertionMetrics to blockstore_metrics.rs

* Specify unit (us) in metric fields
This commit is contained in:
steviez 2022-06-01 10:54:11 -05:00 committed by GitHub
parent 94b0ce5d43
commit 17995c7e67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 133 deletions

View File

@ -73,6 +73,7 @@ pub use {
crate::{
blockstore_db::BlockstoreError,
blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta},
blockstore_metrics::BlockstoreInsertionMetrics,
},
blockstore_purge::PurgeType,
rocksdb::properties as RocksProperties,
@ -198,34 +199,6 @@ pub struct SlotMetaWorkingSetEntry {
did_insert_occur: bool,
}
#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub num_shreds: usize,
pub insert_lock_elapsed: u64,
pub insert_shreds_elapsed: u64,
pub shred_recovery_elapsed: u64,
pub chaining_elapsed: u64,
pub commit_working_sets_elapsed: u64,
pub write_batch_elapsed: u64,
pub total_elapsed: u64,
pub num_inserted: u64,
pub num_repair: u64,
pub num_recovered: usize,
num_recovered_blockstore_error: usize,
pub num_recovered_inserted: usize,
pub num_recovered_failed_sig: usize,
pub num_recovered_failed_invalid: usize,
pub num_recovered_exists: usize,
pub index_meta_time: u64,
num_data_shreds_exists: usize,
num_data_shreds_invalid: usize,
num_data_shreds_blockstore_error: usize,
num_coding_shreds_exists: usize,
num_coding_shreds_invalid: usize,
num_coding_shreds_invalid_erasure_config: usize,
num_coding_shreds_inserted: usize,
}
impl SlotMetaWorkingSetEntry {
/// Construct a new SlotMetaWorkingSetEntry with the specified `new_slot_meta`
/// and `old_slot_meta`. `did_insert_occur` is set to false.
@ -238,89 +211,6 @@ impl SlotMetaWorkingSetEntry {
}
}
impl BlockstoreInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!(
metric_name,
("num_shreds", self.num_shreds as i64, i64),
("total_elapsed", self.total_elapsed as i64, i64),
("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
(
"shred_recovery_elapsed",
self.shred_recovery_elapsed as i64,
i64
),
("chaining_elapsed", self.chaining_elapsed as i64, i64),
(
"commit_working_sets_elapsed",
self.commit_working_sets_elapsed as i64,
i64
),
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
("num_inserted", self.num_inserted as i64, i64),
("num_repair", self.num_repair as i64, i64),
("num_recovered", self.num_recovered as i64, i64),
(
"num_recovered_inserted",
self.num_recovered_inserted as i64,
i64
),
(
"num_recovered_failed_sig",
self.num_recovered_failed_sig as i64,
i64
),
(
"num_recovered_failed_invalid",
self.num_recovered_failed_invalid as i64,
i64
),
(
"num_recovered_exists",
self.num_recovered_exists as i64,
i64
),
(
"num_recovered_blockstore_error",
self.num_recovered_blockstore_error,
i64
),
("num_data_shreds_exists", self.num_data_shreds_exists, i64),
("num_data_shreds_invalid", self.num_data_shreds_invalid, i64),
(
"num_data_shreds_blockstore_error",
self.num_data_shreds_blockstore_error,
i64
),
(
"num_coding_shreds_exists",
self.num_coding_shreds_exists,
i64
),
(
"num_coding_shreds_invalid",
self.num_coding_shreds_invalid,
i64
),
(
"num_coding_shreds_invalid_erasure_config",
self.num_coding_shreds_invalid_erasure_config,
i64
),
(
"num_coding_shreds_inserted",
self.num_coding_shreds_inserted,
i64
),
);
}
}
impl Blockstore {
pub fn db(self) -> Arc<Database> {
self.db
@ -880,7 +770,7 @@ impl Blockstore {
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
metrics.insert_lock_elapsed += start.as_us();
metrics.insert_lock_elapsed_us += start.as_us();
let db = &*self.db;
let mut write_batch = db.batch()?;
@ -892,7 +782,7 @@ impl Blockstore {
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
let mut index_meta_time = 0;
let mut index_meta_time_us = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
@ -910,7 +800,7 @@ impl Blockstore {
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_shreds,
&mut index_meta_time,
&mut index_meta_time_us,
is_trusted,
handle_duplicate,
leader_schedule,
@ -938,7 +828,7 @@ impl Blockstore {
&mut index_working_set,
&mut write_batch,
&mut just_inserted_shreds,
&mut index_meta_time,
&mut index_meta_time_us,
handle_duplicate,
is_trusted,
shred_source,
@ -949,7 +839,7 @@ impl Blockstore {
}
start.stop();
metrics.insert_shreds_elapsed += start.as_us();
metrics.insert_shreds_elapsed_us += start.as_us();
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data_shreds = Self::try_shred_recovery(
@ -976,7 +866,7 @@ impl Blockstore {
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_shreds,
&mut index_meta_time,
&mut index_meta_time_us,
is_trusted,
&handle_duplicate,
leader_schedule,
@ -1012,14 +902,14 @@ impl Blockstore {
}
}
start.stop();
metrics.shred_recovery_elapsed += start.as_us();
metrics.shred_recovery_elapsed_us += start.as_us();
let mut start = Measure::start("Shred recovery");
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
// drop the others
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
metrics.chaining_elapsed += start.as_us();
metrics.chaining_elapsed_us += start.as_us();
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
@ -1038,12 +928,12 @@ impl Blockstore {
}
}
start.stop();
metrics.commit_working_sets_elapsed += start.as_us();
metrics.commit_working_sets_elapsed_us += start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
metrics.write_batch_elapsed += start.as_us();
metrics.write_batch_elapsed_us += start.as_us();
send_signals(
&self.new_shreds_signals.lock().unwrap(),
@ -1054,8 +944,8 @@ impl Blockstore {
total_start.stop();
metrics.total_elapsed += total_start.as_us();
metrics.index_meta_time += index_meta_time;
metrics.total_elapsed_us += total_start.as_us();
metrics.index_meta_time_us += index_meta_time_us;
Ok((newly_completed_data_sets, inserted_indices))
}
@ -1138,7 +1028,7 @@ impl Blockstore {
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time: &mut u64,
index_meta_time_us: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
shred_source: ShredSource,
@ -1151,7 +1041,7 @@ impl Blockstore {
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us);
let index_meta = &mut index_meta_working_set_entry.index;
@ -1296,7 +1186,7 @@ impl Blockstore {
/// be committed atomically.
/// - `just_inserted_data_shreds`: a (slot, shred index within the slot)
/// to shred map which maintains which data shreds have been inserted.
/// - `index_meta_time`: the time spent on loading or creating the
/// - `index_meta_time_us`: the time spent on loading or creating the
/// index meta entry from the db.
/// - `is_trusted`: if false, this function will check whether the
/// input shred is duplicate.
@ -1313,7 +1203,7 @@ impl Blockstore {
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time: &mut u64,
index_meta_time_us: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
leader_schedule: Option<&LeaderScheduleCache>,
@ -1326,7 +1216,7 @@ impl Blockstore {
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us);
let index_meta = &mut index_meta_working_set_entry.index;
let slot_meta_entry = get_slot_meta_entry(
@ -3349,7 +3239,7 @@ fn get_index_meta_entry<'a>(
db: &Database,
slot: Slot,
index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
index_meta_time: &mut u64,
index_meta_time_us: &mut u64,
) -> &'a mut IndexMetaWorkingSetEntry {
let index_cf = db.column::<cf::Index>();
let mut total_start = Measure::start("Total elapsed");
@ -3364,7 +3254,7 @@ fn get_index_meta_entry<'a>(
}
});
total_start.stop();
*index_meta_time += total_start.as_us();
*index_meta_time_us += total_start.as_us();
res
}
@ -5997,14 +5887,14 @@ pub mod tests {
let mut index_working_set = HashMap::new();
let mut just_received_shreds = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
let mut index_meta_time = 0;
let mut index_meta_time_us = 0;
assert!(blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time,
&mut index_meta_time_us,
&|_shred| {
panic!("no dupes");
},
@ -6022,7 +5912,7 @@ pub mod tests {
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time,
&mut index_meta_time_us,
&|_shred| {
counter.fetch_add(1, Ordering::Relaxed);
},

View File

@ -14,6 +14,125 @@ use {
},
};
#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub insert_lock_elapsed_us: u64,
pub insert_shreds_elapsed_us: u64,
pub shred_recovery_elapsed_us: u64,
pub chaining_elapsed_us: u64,
pub commit_working_sets_elapsed_us: u64,
pub write_batch_elapsed_us: u64,
pub total_elapsed_us: u64,
pub index_meta_time_us: u64,
pub num_shreds: usize,
pub num_inserted: u64,
pub num_repair: u64,
pub num_recovered: usize,
pub num_recovered_blockstore_error: usize,
pub num_recovered_inserted: usize,
pub num_recovered_failed_sig: usize,
pub num_recovered_failed_invalid: usize,
pub num_recovered_exists: usize,
pub num_data_shreds_exists: usize,
pub num_data_shreds_invalid: usize,
pub num_data_shreds_blockstore_error: usize,
pub num_coding_shreds_exists: usize,
pub num_coding_shreds_invalid: usize,
pub num_coding_shreds_invalid_erasure_config: usize,
pub num_coding_shreds_inserted: usize,
}
impl BlockstoreInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!(
metric_name,
("num_shreds", self.num_shreds as i64, i64),
("total_elapsed_us", self.total_elapsed_us as i64, i64),
(
"insert_lock_elapsed_us",
self.insert_lock_elapsed_us as i64,
i64
),
(
"insert_shreds_elapsed_us",
self.insert_shreds_elapsed_us as i64,
i64
),
(
"shred_recovery_elapsed_us",
self.shred_recovery_elapsed_us as i64,
i64
),
("chaining_elapsed_us", self.chaining_elapsed_us as i64, i64),
(
"commit_working_sets_elapsed_us",
self.commit_working_sets_elapsed_us as i64,
i64
),
(
"write_batch_elapsed_us",
self.write_batch_elapsed_us as i64,
i64
),
("num_inserted", self.num_inserted as i64, i64),
("num_repair", self.num_repair as i64, i64),
("num_recovered", self.num_recovered as i64, i64),
(
"num_recovered_inserted",
self.num_recovered_inserted as i64,
i64
),
(
"num_recovered_failed_sig",
self.num_recovered_failed_sig as i64,
i64
),
(
"num_recovered_failed_invalid",
self.num_recovered_failed_invalid as i64,
i64
),
(
"num_recovered_exists",
self.num_recovered_exists as i64,
i64
),
(
"num_recovered_blockstore_error",
self.num_recovered_blockstore_error,
i64
),
("num_data_shreds_exists", self.num_data_shreds_exists, i64),
("num_data_shreds_invalid", self.num_data_shreds_invalid, i64),
(
"num_data_shreds_blockstore_error",
self.num_data_shreds_blockstore_error,
i64
),
(
"num_coding_shreds_exists",
self.num_coding_shreds_exists,
i64
),
(
"num_coding_shreds_invalid",
self.num_coding_shreds_invalid,
i64
),
(
"num_coding_shreds_invalid_erasure_config",
self.num_coding_shreds_invalid_erasure_config,
i64
),
(
"num_coding_shreds_inserted",
self.num_coding_shreds_inserted,
i64
),
);
}
}
/// A metrics struct that exposes RocksDB's column family properties.
///
/// Here we only expose a subset of all the internal properties which are