From ce9808845767baf49ba00c1e08213ebb25dc0dc2 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 29 Sep 2020 14:13:21 -0700 Subject: [PATCH] Track inserted repair shreds (#12455) --- core/src/window_service.rs | 17 +++++++++++++---- ledger/src/blockstore.rs | 17 ++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index ff72bbbef..598e1c81a 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -111,7 +111,7 @@ fn run_check_duplicate( Ok(()) } -fn verify_repair(_shred: &Shred, repair_info: &Option) -> bool { +fn verify_repair(repair_info: &Option) -> bool { repair_info .as_ref() .map(|repair_info| repair_info.nonce == DEFAULT_NONCE) @@ -138,15 +138,24 @@ where assert_eq!(shreds.len(), repair_infos.len()); let mut i = 0; - shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); + shreds.retain(|_shred| (verify_repair(&repair_infos[i]), i += 1).0); + repair_infos.retain(|repair_info| verify_repair(&repair_info)); + assert_eq!(shreds.len(), repair_infos.len()); - completed_data_sets_sender.try_send(blockstore.insert_shreds_handle_duplicate( + let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), false, &handle_duplicate, metrics, - )?)?; + )?; + for index in inserted_indices { + if repair_infos[index].is_some() { + metrics.num_repair += 1; + } + } + + completed_data_sets_sender.try_send(completed_data_sets)?; Ok(()) } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f1368fcd5..5e07662c8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -171,6 +171,7 @@ pub struct BlockstoreInsertionMetrics { pub write_batch_elapsed: u64, pub total_elapsed: u64, pub num_inserted: u64, + pub num_repair: u64, pub num_recovered: usize, pub num_recovered_inserted: usize, pub num_recovered_failed_sig: usize, @@ -214,6 +215,7 @@ impl BlockstoreInsertionMetrics { ), ("write_batch_elapsed", self.write_batch_elapsed as i64, i64), ("num_inserted", self.num_inserted as i64, i64), + ("num_repair", self.num_repair as i64, i64), ("num_recovered", self.num_recovered as i64, i64), ( "num_recovered_inserted", @@ -744,7 +746,7 @@ impl Blockstore { is_trusted: bool, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, - ) -> Result> + ) -> Result<(Vec, Vec)> where F: Fn(Shred), { @@ -768,7 +770,8 @@ impl Blockstore { let mut num_inserted = 0; let mut index_meta_time = 0; let mut newly_completed_data_sets: Vec = vec![]; - shreds.into_iter().for_each(|shred| { + let mut inserted_indices = Vec::new(); + shreds.into_iter().enumerate().for_each(|(i, shred)| { if shred.is_data() { let shred_slot = shred.slot(); if let Ok(completed_data_sets) = self.check_insert_data_shred( @@ -791,6 +794,7 @@ impl Blockstore { end_index, }, )); + inserted_indices.push(i); num_inserted += 1; } } else if shred.is_code() { @@ -939,7 +943,7 @@ impl Blockstore { metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; - Ok(newly_completed_data_sets) + Ok((newly_completed_data_sets, inserted_indices)) } pub fn clear_unconfirmed_slot(&self, slot: Slot) { @@ -971,7 +975,7 @@ impl Blockstore { shreds: Vec, leader_schedule: Option<&Arc>, is_trusted: bool, - ) -> Result> { + ) -> Result<(Vec, Vec)> { self.insert_shreds_handle_duplicate( shreds, leader_schedule, @@ -4113,11 +4117,13 @@ pub mod tests { assert!(blockstore .insert_shreds(shreds[1..].to_vec(), None, false) .unwrap() + .0 .is_empty()); assert_eq!( blockstore .insert_shreds(vec![shreds[0].clone()], None, false) - .unwrap(), + .unwrap() + .0, vec![CompletedDataSetInfo { slot, start_index: 0, @@ -4128,6 +4134,7 @@ pub mod tests { assert!(blockstore .insert_shreds(shreds, None, false) .unwrap() + .0 .is_empty()); }