diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 9fc69348b..47e22979f 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -85,6 +85,7 @@ where total_packets += more_packets.packets.len(); packets.push(more_packets) } + let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); @@ -127,7 +128,8 @@ where } } - blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?; + let blocktree_insert_metrics = blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?; + blocktree_insert_metrics.report_metrics("recv-window-insert-shreds"); trace!( "Elapsed processing time in recv_window(): {}", diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index a10419ed3..f5a9cf500 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -17,6 +17,7 @@ use rayon::iter::IntoParallelRefIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; use rocksdb::DBRawIterator; +use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::clock::Slot; @@ -60,6 +61,49 @@ pub struct Blocktree { pub completed_slots_senders: Vec>>, } +pub struct BlocktreeInsertionMetrics { + pub num_shreds: usize, + pub insert_lock_elapsed: u64, + pub insert_shreds_elapsed: u64, + pub shred_recovery_elapsed: u64, + pub chaining_elapsed: u64, + pub commit_working_sets_elapsed: u64, + pub write_batch_elapsed: u64, + pub total_elapsed: u64, + pub num_inserted: u64, + pub num_recovered: usize, +} + +impl BlocktreeInsertionMetrics { + pub fn report_metrics(&self, metric_name: &'static str) { + datapoint_info!( + metric_name, + ("num_shreds", self.num_shreds as i64, i64), + ("total_elapsed", self.total_elapsed as i64, i64), + ("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64), + ( + "insert_shreds_elapsed", + self.insert_shreds_elapsed as i64, + i64 + ), + ( + "shred_recovery_elapsed", + self.shred_recovery_elapsed as i64, + i64 + ), + ("chaining_elapsed", self.chaining_elapsed as i64, i64), + ( + "commit_working_sets_elapsed", + self.commit_working_sets_elapsed as i64, + i64 + ), + ("write_batch_elapsed", self.write_batch_elapsed as i64, i64), + ("num_inserted", self.num_inserted as i64, i64), + ("num_recovered", self.num_recovered as i64, i64), + ); + } +} + impl Blocktree { /// Opens a Ledger in directory, provides "infinite" window of shreds pub fn open(ledger_path: &Path) -> Result { @@ -360,8 +404,13 @@ impl Blocktree { &self, shreds: Vec, leader_schedule: Option<&Arc>, - ) -> Result<()> { + ) -> Result { + let mut total_start = Measure::start("Total elapsed"); + let mut start = Measure::start("Blocktree lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); + start.stop(); + let insert_lock_elapsed = start.as_us(); + let db = &*self.db; let mut write_batch = db.batch()?; @@ -371,26 +420,40 @@ impl Blocktree { let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); + let num_shreds = shreds.len(); + let mut start = Measure::start("Shred insertion"); + let mut num_inserted = 0; shreds.into_iter().for_each(|shred| { - if shred.is_data() { - self.check_insert_data_shred( - shred, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - ); - } else if shred.is_code() { - self.check_insert_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut write_batch, - &mut just_inserted_coding_shreds, - ); + let insert_success = { + if shred.is_data() { + self.check_insert_data_shred( + shred, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + ) + } else if shred.is_code() { + self.check_insert_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut write_batch, + &mut just_inserted_coding_shreds, + ) + } else { + panic!("There should be no other case"); + } + }; + if insert_success { + num_inserted += 1; } }); + start.stop(); + let insert_shreds_elapsed = start.as_us(); + let mut start = Measure::start("Shred recovery"); + let mut num_recovered = 0; if let Some(leader_schedule_cache) = leader_schedule { let recovered_data = Self::try_shred_recovery( &db, @@ -400,6 +463,7 @@ impl Blocktree { &mut just_inserted_coding_shreds, ); + num_recovered = recovered_data.len(); recovered_data.into_iter().for_each(|shred| { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { if shred.verify(&leader) { @@ -409,15 +473,21 @@ impl Blocktree { &mut slot_meta_working_set, &mut write_batch, &mut just_inserted_coding_shreds, - ) + ); } } }); } + start.stop(); + let shred_recovery_elapsed = start.as_us(); + let mut start = Measure::start("Shred recovery"); // Handle chaining for the working set handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; + start.stop(); + let chaining_elapsed = start.as_us(); + let mut start = Measure::start("Commit Worknig Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( &slot_meta_working_set, &self.completed_slots_senders, @@ -431,8 +501,13 @@ impl Blocktree { for (&slot, index) in index_working_set.iter() { write_batch.put::(slot, index)?; } + start.stop(); + let commit_working_sets_elapsed = start.as_us(); + let mut start = Measure::start("Write Batch"); self.db.write(write_batch)?; + start.stop(); + let write_batch_elapsed = start.as_us(); if should_signal { for signal in &self.new_shreds_signals { @@ -447,7 +522,20 @@ impl Blocktree { newly_completed_slots, )?; - Ok(()) + total_start.stop(); + + Ok(BlocktreeInsertionMetrics { + num_shreds, + total_elapsed: total_start.as_us(), + insert_lock_elapsed, + insert_shreds_elapsed, + shred_recovery_elapsed, + chaining_elapsed, + commit_working_sets_elapsed, + write_batch_elapsed, + num_inserted, + num_recovered, + }) } fn check_insert_coding_shred( @@ -457,7 +545,7 @@ impl Blocktree { index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, - ) { + ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -468,13 +556,16 @@ impl Blocktree { // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { - if let Ok(()) = self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) - { - just_inserted_coding_shreds - .entry((slot, shred_index)) - .or_insert_with(|| shred); - new_index_meta.map(|n| index_working_set.insert(slot, n)); - } + self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) + .map(|_| { + just_inserted_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred); + new_index_meta.map(|n| index_working_set.insert(slot, n)) + }) + .is_ok() + } else { + false } } @@ -485,7 +576,7 @@ impl Blocktree { slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, - ) { + ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); let (index_meta, mut new_index_meta) = @@ -524,6 +615,8 @@ impl Blocktree { if insert_success { new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); } + + insert_success } fn should_insert_coding_shred(