Track inserted repair shreds (#12455)

This commit is contained in:
sakridge 2020-09-29 14:13:21 -07:00 committed by GitHub
parent adf06b635b
commit ce98088457
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 9 deletions

View File

@ -111,7 +111,7 @@ fn run_check_duplicate(
Ok(()) Ok(())
} }
fn verify_repair(_shred: &Shred, repair_info: &Option<RepairMeta>) -> bool { fn verify_repair(repair_info: &Option<RepairMeta>) -> bool {
repair_info repair_info
.as_ref() .as_ref()
.map(|repair_info| repair_info.nonce == DEFAULT_NONCE) .map(|repair_info| repair_info.nonce == DEFAULT_NONCE)
@ -138,15 +138,24 @@ where
assert_eq!(shreds.len(), repair_infos.len()); assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0; let mut i = 0;
shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); shreds.retain(|_shred| (verify_repair(&repair_infos[i]), i += 1).0);
repair_infos.retain(|repair_info| verify_repair(&repair_info));
assert_eq!(shreds.len(), repair_infos.len());
completed_data_sets_sender.try_send(blockstore.insert_shreds_handle_duplicate( let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
shreds, shreds,
Some(leader_schedule_cache), Some(leader_schedule_cache),
false, false,
&handle_duplicate, &handle_duplicate,
metrics, metrics,
)?)?; )?;
for index in inserted_indices {
if repair_infos[index].is_some() {
metrics.num_repair += 1;
}
}
completed_data_sets_sender.try_send(completed_data_sets)?;
Ok(()) Ok(())
} }

View File

@ -171,6 +171,7 @@ pub struct BlockstoreInsertionMetrics {
pub write_batch_elapsed: u64, pub write_batch_elapsed: u64,
pub total_elapsed: u64, pub total_elapsed: u64,
pub num_inserted: u64, pub num_inserted: u64,
pub num_repair: u64,
pub num_recovered: usize, pub num_recovered: usize,
pub num_recovered_inserted: usize, pub num_recovered_inserted: usize,
pub num_recovered_failed_sig: usize, pub num_recovered_failed_sig: usize,
@ -214,6 +215,7 @@ impl BlockstoreInsertionMetrics {
), ),
("write_batch_elapsed", self.write_batch_elapsed as i64, i64), ("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
("num_inserted", self.num_inserted as i64, i64), ("num_inserted", self.num_inserted as i64, i64),
("num_repair", self.num_repair as i64, i64),
("num_recovered", self.num_recovered as i64, i64), ("num_recovered", self.num_recovered as i64, i64),
( (
"num_recovered_inserted", "num_recovered_inserted",
@ -744,7 +746,7 @@ impl Blockstore {
is_trusted: bool, is_trusted: bool,
handle_duplicate: &F, handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics, metrics: &mut BlockstoreInsertionMetrics,
) -> Result<Vec<CompletedDataSetInfo>> ) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
where where
F: Fn(Shred), F: Fn(Shred),
{ {
@ -768,7 +770,8 @@ impl Blockstore {
let mut num_inserted = 0; let mut num_inserted = 0;
let mut index_meta_time = 0; let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![]; let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
shreds.into_iter().for_each(|shred| { let mut inserted_indices = Vec::new();
shreds.into_iter().enumerate().for_each(|(i, shred)| {
if shred.is_data() { if shred.is_data() {
let shred_slot = shred.slot(); let shred_slot = shred.slot();
if let Ok(completed_data_sets) = self.check_insert_data_shred( if let Ok(completed_data_sets) = self.check_insert_data_shred(
@ -791,6 +794,7 @@ impl Blockstore {
end_index, end_index,
}, },
)); ));
inserted_indices.push(i);
num_inserted += 1; num_inserted += 1;
} }
} else if shred.is_code() { } else if shred.is_code() {
@ -939,7 +943,7 @@ impl Blockstore {
metrics.num_recovered_exists = num_recovered_exists; metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time; metrics.index_meta_time += index_meta_time;
Ok(newly_completed_data_sets) Ok((newly_completed_data_sets, inserted_indices))
} }
pub fn clear_unconfirmed_slot(&self, slot: Slot) { pub fn clear_unconfirmed_slot(&self, slot: Slot) {
@ -971,7 +975,7 @@ impl Blockstore {
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool, is_trusted: bool,
) -> Result<Vec<CompletedDataSetInfo>> { ) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
self.insert_shreds_handle_duplicate( self.insert_shreds_handle_duplicate(
shreds, shreds,
leader_schedule, leader_schedule,
@ -4113,11 +4117,13 @@ pub mod tests {
assert!(blockstore assert!(blockstore
.insert_shreds(shreds[1..].to_vec(), None, false) .insert_shreds(shreds[1..].to_vec(), None, false)
.unwrap() .unwrap()
.0
.is_empty()); .is_empty());
assert_eq!( assert_eq!(
blockstore blockstore
.insert_shreds(vec![shreds[0].clone()], None, false) .insert_shreds(vec![shreds[0].clone()], None, false)
.unwrap(), .unwrap()
.0,
vec![CompletedDataSetInfo { vec![CompletedDataSetInfo {
slot, slot,
start_index: 0, start_index: 0,
@ -4128,6 +4134,7 @@ pub mod tests {
assert!(blockstore assert!(blockstore
.insert_shreds(shreds, None, false) .insert_shreds(shreds, None, false)
.unwrap() .unwrap()
.0
.is_empty()); .is_empty());
} }