diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 42743f4006..ffa158d526 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -188,9 +188,14 @@ where } prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); + let repairs: Vec<_> = repair_infos + .iter() + .map(|repair_info| repair_info.is_some()) + .collect(); let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate( shreds, + repairs, Some(leader_schedule_cache), false, &handle_duplicate, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f4c867dff5..b055aa4356 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -45,7 +45,7 @@ use std::{ borrow::Cow, cell::RefCell, cmp, - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, convert::TryInto, fs, io::{Error as IoError, ErrorKind}, @@ -56,6 +56,7 @@ use std::{ mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, Arc, Mutex, RwLock, RwLockWriteGuard, }, + time::Instant, }; use thiserror::Error; use trees::{Tree, TreeWalk}; @@ -148,6 +149,27 @@ pub struct Blockstore { pub completed_slots_senders: Vec, pub lowest_cleanup_slot: Arc>, no_compaction: bool, + slots_stats: Arc>, +} + +struct SlotsStats { + last_cleanup_ts: Instant, + stats: BTreeMap, +} + +impl Default for SlotsStats { + fn default() -> Self { + SlotsStats { + last_cleanup_ts: Instant::now(), + stats: BTreeMap::new(), + } + } +} + +#[derive(Default)] +struct SlotStats { + num_repaired: usize, + num_recovered: usize, } pub struct IndexMetaWorkingSetEntry { @@ -165,6 +187,13 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } +#[derive(PartialEq, Debug, Clone)] +enum ShredSource { + Turbine, + Repaired, + Recovered, +} + #[derive(Default)] pub struct BlockstoreInsertionMetrics { pub num_shreds: usize, @@ -367,6 +396,7 @@ impl Blockstore { last_root, lowest_cleanup_slot: Arc::new(RwLock::new(0)), no_compaction: false, + slots_stats: Arc::new(Mutex::new(SlotsStats::default())), }; if initialize_transaction_status_index { blockstore.initialize_transaction_status_index()?; @@ -758,6 +788,7 @@ impl Blockstore { pub fn insert_shreds_handle_duplicate( &self, shreds: Vec, + is_repaired: Vec, leader_schedule: Option<&Arc>, is_trusted: bool, handle_duplicate: &F, @@ -766,6 +797,7 @@ impl Blockstore { where F: Fn(Shred), { + assert_eq!(shreds.len(), is_repaired.len()); let mut total_start = Measure::start("Total elapsed"); let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); @@ -787,46 +819,56 @@ impl Blockstore { let mut index_meta_time = 0; let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); - shreds.into_iter().enumerate().for_each(|(i, shred)| { - if shred.is_data() { - let shred_slot = shred.slot(); - if let Ok(completed_data_sets) = self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - false, - ) { - newly_completed_data_sets.extend(completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - )); - inserted_indices.push(i); - num_inserted += 1; + shreds + .into_iter() + .zip(is_repaired.into_iter()) + .enumerate() + .for_each(|(i, (shred, is_repaired))| { + if shred.is_data() { + let shred_slot = shred.slot(); + let shred_source = if is_repaired { + ShredSource::Repaired + } else { + ShredSource::Turbine + }; + if let Ok(completed_data_sets) = self.check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + shred_source, + ) { + newly_completed_data_sets.extend(completed_data_sets.into_iter().map( + |(start_index, end_index)| CompletedDataSetInfo { + slot: shred_slot, + start_index, + end_index, + }, + )); + inserted_indices.push(i); + num_inserted += 1; + } + } else if shred.is_code() { + self.check_cache_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_inserted_coding_shreds, + &mut index_meta_time, + handle_duplicate, + is_trusted, + is_repaired, + ); + } else { + panic!("There should be no other case"); } - } else if shred.is_code() { - self.check_cache_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut just_inserted_coding_shreds, - &mut index_meta_time, - handle_duplicate, - is_trusted, - ); - } else { - panic!("There should be no other case"); - } - }); + }); start.stop(); let insert_shreds_elapsed = start.as_us(); @@ -861,7 +903,7 @@ impl Blockstore { is_trusted, &handle_duplicate, leader_schedule, - true, + ShredSource::Recovered, ) { Err(InsertDataShredError::Exists) => { num_recovered_exists += 1; @@ -993,8 +1035,10 @@ impl Blockstore { leader_schedule: Option<&Arc>, is_trusted: bool, ) -> Result<(Vec, Vec)> { + let shreds_len = shreds.len(); self.insert_shreds_handle_duplicate( shreds, + vec![false; shreds_len], leader_schedule, is_trusted, &|_| {}, @@ -1038,6 +1082,7 @@ impl Blockstore { index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, + is_repaired: bool, ) -> bool where F: Fn(Shred), @@ -1105,6 +1150,12 @@ impl Blockstore { return false; } + if is_repaired { + let mut slots_stats = self.slots_stats.lock().unwrap(); + let mut e = slots_stats.stats.entry(slot).or_default(); + e.num_repaired += 1; + } + // Should be safe to modify index_meta here. Two cases // 1) Recovery happens: Then all inserted erasure metas are removed // from just_received_coding_shreds, and nothing will be committed by @@ -1165,7 +1216,7 @@ impl Blockstore { is_trusted: bool, handle_duplicate: &F, leader_schedule: Option<&Arc>, - is_recovered: bool, + shred_source: ShredSource, ) -> std::result::Result, InsertDataShredError> where F: Fn(Shred), @@ -1208,15 +1259,20 @@ impl Blockstore { just_inserted_data_shreds, &self.last_root, leader_schedule, - is_recovered, + shred_source.clone(), ) { return Err(InsertDataShredError::InvalidShred); } } let set_index = u64::from(shred.common_header.fec_set_index); - let newly_completed_data_sets = - self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?; + let newly_completed_data_sets = self.insert_data_shred( + slot_meta, + index_meta.data_mut(), + &shred, + write_batch, + shred_source, + )?; just_inserted_data_shreds.insert((slot, shred_index), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -1295,7 +1351,7 @@ impl Blockstore { just_inserted_data_shreds: &HashMap<(u64, u64), Shred>, last_root: &RwLock, leader_schedule: Option<&Arc>, - is_recovered: bool, + shred_source: ShredSource, ) -> bool { use crate::shred::SHRED_PAYLOAD_SIZE; let shred_index = u64::from(shred.index()); @@ -1371,8 +1427,8 @@ impl Blockstore { ( "error", format!( - "Leader {:?}, slot {}: received index {} >= slot.last_index {}, is_recovered: {}", - leader_pubkey, slot, shred_index, last_index, is_recovered + "Leader {:?}, slot {}: received index {} >= slot.last_index {}, shred_source: {:?}", + leader_pubkey, slot, shred_index, last_index, shred_source ), String ) @@ -1407,8 +1463,8 @@ impl Blockstore { ( "error", format!( - "Leader {:?}, slot {}: received shred_index {} < slot.received {}, is_recovered: {}", - leader_pubkey, slot, shred_index, slot_meta.received, is_recovered + "Leader {:?}, slot {}: received shred_index {} < slot.received {}, shred_source: {:?}", + leader_pubkey, slot, shred_index, slot_meta.received, shred_source ), String ) @@ -1426,6 +1482,7 @@ impl Blockstore { data_index: &mut ShredIndex, shred: &Shred, write_batch: &mut WriteBatch, + shred_source: ShredSource, ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1476,7 +1533,30 @@ impl Blockstore { shred.reference_tick(), data_index, ); + if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { + let mut slots_stats = self.slots_stats.lock().unwrap(); + let mut e = slots_stats.stats.entry(slot_meta.slot).or_default(); + if shred_source == ShredSource::Repaired { + e.num_repaired += 1; + } + if shred_source == ShredSource::Recovered { + e.num_recovered += 1; + } + } if slot_meta.is_full() { + let (num_repaired, num_recovered) = { + let mut slots_stats = self.slots_stats.lock().unwrap(); + if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) { + if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 { + let root = self.last_root(); + slots_stats.stats = slots_stats.stats.split_off(&root); + slots_stats.last_cleanup_ts = Instant::now(); + } + (e.num_repaired, e.num_recovered) + } else { + (0, 0) + } + }; datapoint_info!( "shred_insert_is_full", ( @@ -1486,6 +1566,8 @@ impl Blockstore { ), ("slot", slot_meta.slot, i64), ("last_index", slot_meta.last_index, i64), + ("num_repaired", num_repaired, i64), + ("num_recovered", num_recovered, i64), ); } trace!("inserted shred into slot {:?} and index {:?}", slot, index); @@ -5405,7 +5487,7 @@ pub mod tests { &HashMap::new(), &last_root, None, - false + ShredSource::Turbine )); // Ensure that an empty shred (one with no data) would get inserted. Such shreds @@ -5428,7 +5510,7 @@ pub mod tests { &HashMap::new(), &last_root, None, - false + ShredSource::Repaired, )); empty_shred.data_header.size = 0; assert!(!blockstore.should_insert_data_shred( @@ -5437,7 +5519,7 @@ pub mod tests { &HashMap::new(), &last_root, None, - false + ShredSource::Recovered, )); // Trying to insert another "is_last" shred with index < the received index should fail @@ -5461,7 +5543,7 @@ pub mod tests { &HashMap::new(), &last_root, None, - false + ShredSource::Repaired, )); assert!(blockstore.has_duplicate_shreds_in_slot(0)); @@ -5482,7 +5564,7 @@ pub mod tests { &HashMap::new(), &last_root, None, - false + ShredSource::Repaired, )); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); @@ -5550,6 +5632,7 @@ pub mod tests { panic!("no dupes"); }, false, + false, )); // insert again fails on dupe @@ -5565,6 +5648,7 @@ pub mod tests { counter.fetch_add(1, Ordering::Relaxed); }, false, + false, )); assert_eq!(counter.load(Ordering::Relaxed), 1); }