From c52830980a525df502358bef0d3f3708233f3f46 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 31 Oct 2019 14:03:41 -0700 Subject: [PATCH] Rework get_slot_meta (#6642) * Assert slotmeta is not orphan * Clean up get_slot_meta functionality * Add test --- ledger/src/blocktree.rs | 186 ++++++++++++++++++++++++---------------- 1 file changed, 112 insertions(+), 74 deletions(-) diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index ce3ff6e86..d8d764e2d 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -42,7 +42,6 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; -pub type SlotMetaWorkingSetEntry = (Rc>, Option); pub type CompletedSlotsReceiver = Receiver>; // ledger window @@ -63,6 +62,16 @@ pub struct Blocktree { pub struct IndexMetaWorkingSetEntry { index: Index, + // true only if at least one shred for this Index was inserted since the time this + // struct was created + did_insert_occur: bool, +} + +pub struct SlotMetaWorkingSetEntry { + new_slot_meta: Rc>, + old_slot_meta: Option, + // True only if at least one shred for this SlotMeta was inserted since the time this + // struct was created. did_insert_occur: bool, } @@ -80,6 +89,16 @@ pub struct BlocktreeInsertionMetrics { pub index_meta_time: u64, } +impl SlotMetaWorkingSetEntry { + fn new(new_slot_meta: Rc>, old_slot_meta: Option) -> Self { + Self { + new_slot_meta, + old_slot_meta, + did_insert_occur: false, + } + } +} + impl BlocktreeInsertionMetrics { pub fn report_metrics(&self, metric_name: &'static str) { datapoint_debug!( @@ -460,8 +479,8 @@ impl Blocktree { } }); start.stop(); - let insert_shreds_elapsed = start.as_us(); + let insert_shreds_elapsed = start.as_us(); let mut start = Measure::start("Shred recovery"); let mut num_recovered = 0; if let Some(leader_schedule_cache) = leader_schedule { @@ -493,12 +512,13 @@ impl Blocktree { let shred_recovery_elapsed = start.as_us(); let mut start = Measure::start("Shred recovery"); - // Handle chaining for the working set - handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; + // Handle chaining for the members of the slot_meta_working_set that were inserted into, + // drop the others + handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; start.stop(); let chaining_elapsed = start.as_us(); - let mut start = Measure::start("Commit Worknig Sets"); + let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( &slot_meta_working_set, &self.completed_slots_senders, @@ -602,41 +622,30 @@ impl Blocktree { get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); let index_meta = &mut index_meta_working_set_entry.index; - let (slot_meta_entry, mut new_slot_meta_entry) = + let slot_meta_entry = get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); - let insert_success = { - let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); - let mut slot_meta = entry.0.borrow_mut(); + let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); - if Blocktree::should_insert_data_shred( - &shred, - &slot_meta, - index_meta.data(), - &self.last_root, - ) { - if let Ok(()) = self.insert_data_shred( - &mut slot_meta, - index_meta.data_mut(), - &shred, - write_batch, - ) { - just_inserted_data_shreds.insert((slot, shred_index), shred); - index_meta_working_set_entry.did_insert_occur = true; - true - } else { - false - } + if Blocktree::should_insert_data_shred( + &shred, + slot_meta, + index_meta.data(), + &self.last_root, + ) { + if let Ok(()) = + self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) + { + just_inserted_data_shreds.insert((slot, shred_index), shred); + index_meta_working_set_entry.did_insert_occur = true; + slot_meta_entry.did_insert_occur = true; + true } else { false } - }; - - if insert_success { - new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); + } else { + false } - - insert_success } fn should_insert_coding_shred( @@ -775,7 +784,6 @@ impl Blocktree { ) -> Result<()> { let slot = shred.slot(); let index = u64::from(shred.index()); - let parent = shred.parent(); let last_in_slot = if shred.last_in_slot() { debug!("got last in slot"); @@ -791,9 +799,8 @@ impl Blocktree { false }; - if is_orphan(slot_meta) { - slot_meta.parent_slot = parent; - } + // Parent for slot meta should have been set by this point + assert!(!is_orphan(slot_meta)); let data_cf = self.db.column::(); @@ -1351,39 +1358,30 @@ fn get_slot_meta_entry<'a>( slot_meta_working_set: &'a mut HashMap, slot: u64, parent_slot: u64, -) -> ( - Option<&'a mut SlotMetaWorkingSetEntry>, - Option, -) { +) -> &'a mut SlotMetaWorkingSetEntry { let meta_cf = db.column::(); // Check if we've already inserted the slot metadata for this blob's slot - slot_meta_working_set - .get_mut(&slot) - .map(|s| (Some(s), None)) - .unwrap_or_else(|| { - // Store a 2-tuple of the metadata (working copy, backup copy) - if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") { - let backup = Some(meta.clone()); - // If parent_slot == std::u64::MAX, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if is_orphan(&meta) { - meta.parent_slot = parent_slot; - } - - (None, Some((Rc::new(RefCell::new(meta)), backup))) - } else { - ( - None, - Some(( - Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))), - None, - )), - ) + slot_meta_working_set.entry(slot).or_insert_with(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") { + let backup = Some(meta.clone()); + // If parent_slot == std::u64::MAX, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if is_orphan(&meta) { + meta.parent_slot = parent_slot; } - }) + + SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup) + } else { + SlotMetaWorkingSetEntry::new( + Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))), + None, + ) + } + }) } fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool { @@ -1437,8 +1435,11 @@ fn commit_slot_meta_working_set( // Check if any metadata was changed, if so, insert the new version of the // metadata into the write batch - for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta); + for (slot, slot_meta_entry) in slot_meta_working_set.iter() { + // Any slot that wasn't written to should have been filtered out by now. + assert!(slot_meta_entry.did_insert_occur); + let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta); + let meta_backup = &slot_meta_entry.old_slot_meta; if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) { newly_completed_slots.push(*slot); } @@ -1498,8 +1499,8 @@ fn find_slot_meta_in_cached_state<'a>( chained_slots: &'a HashMap>>, slot: u64, ) -> Result>>> { - if let Some((entry, _)) = working_set.get(&slot) { - Ok(Some(entry.clone())) + if let Some(entry) = working_set.get(&slot) { + Ok(Some(entry.new_slot_meta.clone())) } else if let Some(entry) = chained_slots.get(&slot) { Ok(Some(entry.clone())) } else { @@ -1511,12 +1512,14 @@ fn find_slot_meta_in_cached_state<'a>( fn handle_chaining( db: &Database, write_batch: &mut WriteBatch, - working_set: &HashMap, + working_set: &mut HashMap, ) -> Result<()> { + // Handle chaining for all the SlotMetas that were inserted into + working_set.retain(|_, entry| entry.did_insert_occur); let mut new_chained_slots = HashMap::new(); - let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect(); + let working_set_slots: Vec<_> = working_set.keys().collect(); for slot in working_set_slots { - handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, slot)?; + handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?; } // Write all the newly changed slots in new_chained_slots to the write_batch @@ -1534,10 +1537,13 @@ fn handle_chaining_for_slot( new_chained_slots: &mut HashMap>>, slot: u64, ) -> Result<()> { - let (meta, meta_backup) = working_set + let slot_meta_entry = working_set .get(&slot) .expect("Slot must exist in the working_set hashmap"); + let meta = &slot_meta_entry.new_slot_meta; + let meta_backup = &slot_meta_entry.old_slot_meta; + { let mut meta_mut = meta.borrow_mut(); let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); @@ -3772,5 +3778,37 @@ pub mod tests { .expect("Expected successful write of shreds"); assert!(blocktree.get_slot_entries(slot, 0, None).is_err()); } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_no_insert_but_modify_slot_meta() { + // This tests correctness of the SlotMeta in various cases in which a shred + // that gets filtered out by checks + let (shreds0, _) = make_slot_entries(0, 0, 200); + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Insert the first 5 shreds, we don't have a "is_last" shred yet + blocktree + .insert_shreds(shreds0[0..5].to_vec(), None) + .unwrap(); + + // Insert a repetitive shred for slot 's', should get ignored, but also + // insert shreds that chains to 's', should see the update in the SlotMeta + // for 's'. + let (mut shreds2, _) = make_slot_entries(2, 0, 200); + let (mut shreds3, _) = make_slot_entries(3, 0, 200); + shreds2.push(shreds0[1].clone()); + shreds3.insert(0, shreds0[1].clone()); + blocktree.insert_shreds(shreds2, None).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + assert_eq!(slot_meta.next_slots, vec![2]); + blocktree.insert_shreds(shreds3, None).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + assert_eq!(slot_meta.next_slots, vec![2, 3]); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } }