From 9369ea86ea7d018b346cbad55edd898e908fb190 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 29 Mar 2019 16:07:24 -0700 Subject: [PATCH] Track detached slots in blocktree (#3536) * Add contains_all_parents flag to SlotMeta to prep for tracking detached heads * Add new DetachedHeads column family * Remove has_complete_parents * Fix test --- core/src/blocktree.rs | 243 ++++++++++++++++++++++++++++-------- core/src/blocktree/db.rs | 6 + core/src/blocktree/kvs.rs | 34 +++++ core/src/blocktree/rocks.rs | 40 ++++++ 4 files changed, 268 insertions(+), 55 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 49f6db56c8..98344de353 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -41,7 +41,7 @@ macro_rules! db_imports { LedgerColumnFamilyRaw, }; - pub use $mod::{$db, ErasureCf, MetaCf, DataCf}; + pub use $mod::{$db, ErasureCf, MetaCf, DataCf, DetachedHeadsCf}; pub type BlocktreeRawIterator = <$db as Database>::Cursor; pub type WriteBatch = <$db as Database>::WriteBatch; pub type OwnedKey = <$db as Database>::OwnedKey; @@ -125,6 +125,7 @@ pub struct Blocktree { meta_cf: MetaCf, data_cf: DataCf, erasure_cf: ErasureCf, + detached_heads_cf: DetachedHeadsCf, pub new_blobs_signals: Vec>, } @@ -134,6 +135,8 @@ pub const META_CF: &str = "meta"; pub const DATA_CF: &str = "data"; // Column family for erasure data pub const ERASURE_CF: &str = "erasure"; +// Column family for detached heads data +pub const DETACHED_HEADS_CF: &str = "detached_heads"; impl Blocktree { pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver)> { @@ -148,6 +151,10 @@ impl Blocktree { self.meta_cf.get(&MetaCf::key(&slot)) } + pub fn detached_head(&self, slot: u64) -> Result> { + self.detached_heads_cf.get(&DetachedHeadsCf::key(&slot)) + } + pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> { let meta_key = MetaCf::key(&slot); if let Some(mut meta) = self.meta_cf.get(&meta_key)? { @@ -277,18 +284,16 @@ impl Blocktree { .meta(blob_slot) .expect("Expect database get to succeed") { - // If parent_slot == std::u64::MAX, then this is one of the dummy metadatas inserted + let backup = Some(meta.clone()); + // If parent_slot == std::u64::MAX, then this is one of the detached heads inserted // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details - if meta.parent_slot == std::u64::MAX { + // for details. Slots that are detached heads are missing a parent_slot, so we should + // fill in the parent now that we know it. + if Self::is_detached_head(&meta) { meta.parent_slot = parent_slot; - // Set backup as None so that all the logic for inserting new slots - // still runs, as this placeholder slot is essentially equivalent to - // inserting a new slot - (Rc::new(RefCell::new(meta.clone())), None) - } else { - (Rc::new(RefCell::new(meta.clone())), Some(meta)) } + + (Rc::new(RefCell::new(meta)), backup) } else { ( Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))), @@ -318,8 +323,8 @@ impl Blocktree { // Check if any metadata was changed, if so, insert the new version of the // metadata into the write batch - for (slot, (meta_copy, meta_backup)) in slot_meta_working_set.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta_copy); + for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta); // Check if the working copy of the metadata has changed if Some(meta) != meta_backup.as_ref() { should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); @@ -637,12 +642,12 @@ impl Blocktree { let mut new_chained_slots = HashMap::new(); let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect(); for slot in working_set_slots { - self.handle_chaining_for_slot(working_set, &mut new_chained_slots, slot)?; + self.handle_chaining_for_slot(write_batch, working_set, &mut new_chained_slots, slot)?; } // Write all the newly changed slots in new_chained_slots to the write_batch - for (slot, meta_copy) in new_chained_slots.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta_copy); + for (slot, meta) in new_chained_slots.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta); write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?; } Ok(()) @@ -650,61 +655,108 @@ impl Blocktree { fn handle_chaining_for_slot( &self, + write_batch: &mut WriteBatch, working_set: &HashMap>, Option)>, new_chained_slots: &mut HashMap>>, slot: u64, ) -> Result<()> { - let (meta_copy, meta_backup) = working_set + let (meta, meta_backup) = working_set .get(&slot) .expect("Slot must exist in the working_set hashmap"); + { - let mut slot_meta = meta_copy.borrow_mut(); + let is_detached_head = + meta_backup.is_some() && Self::is_detached_head(meta_backup.as_ref().unwrap()); + + let mut meta_mut = meta.borrow_mut(); // If: // 1) This is a new slot // 2) slot != 0 // then try to chain this slot to a previous slot if slot != 0 { - let prev_slot = slot_meta.parent_slot; + let prev_slot = meta_mut.parent_slot; - // Check if slot_meta is a new slot - if meta_backup.is_none() { - let prev_slot = + // Check if the slot represented by meta_mut is either a new slot or a detached head. + // In both cases we need to run the chaining logic b/c the parent on the slot was + // previously unknown. + if meta_backup.is_none() || is_detached_head { + let prev_slot_meta = self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?; - // This is a newly inserted slot so: - // 1) Chain to the previous slot, and also - // 2) Determine whether to set the is_connected flag + // This is a newly inserted slot so run the chaining logic self.chain_new_slot_to_prev_slot( - &mut prev_slot.borrow_mut(), + &mut prev_slot_meta.borrow_mut(), slot, - &mut slot_meta, + &mut meta_mut, ); + + if Self::is_detached_head(&RefCell::borrow(&*prev_slot_meta)) { + write_batch.put_cf( + self.detached_heads_cf.handle(), + &DetachedHeadsCf::key(&prev_slot), + &serialize(&true)?, + )?; + } } } + + // At this point this slot has received a parent, so no longer a detached head + if is_detached_head { + write_batch.delete_cf( + self.detached_heads_cf.handle(), + &DetachedHeadsCf::key(&slot), + )?; + } } - if self.is_newly_completed_slot(&RefCell::borrow(&*meta_copy), meta_backup) - && RefCell::borrow(&*meta_copy).is_connected - { - // This is a newly inserted slot and slot.is_connected is true, so go through - // and update all child slots with is_connected if applicable - let mut next_slots: Vec<(u64, Rc>)> = - vec![(slot, meta_copy.clone())]; - while !next_slots.is_empty() { - let (_, current_slot) = next_slots.pop().unwrap(); - current_slot.borrow_mut().is_connected = true; + // This is a newly inserted slot and slot.is_connected is true, so update all + // child slots so that their `is_connected` = true + let should_propagate_is_connected = + Self::is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup) + && RefCell::borrow(&*meta).is_connected; + if should_propagate_is_connected { + // slot_function returns a boolean indicating whether to explore the children + // of the input slot + let slot_function = |slot: &mut SlotMeta| { + slot.is_connected = true; + + // We don't want to set the is_connected flag on the children of non-full + // slots + slot.is_full() + }; + + self.traverse_children_mut(slot, &meta, working_set, new_chained_slots, slot_function)?; + } + + Ok(()) + } + + fn traverse_children_mut( + &self, + slot: u64, + slot_meta: &Rc>, + working_set: &HashMap>, Option)>, + new_chained_slots: &mut HashMap>>, + slot_function: F, + ) -> Result<()> + where + F: Fn(&mut SlotMeta) -> bool, + { + let mut next_slots: Vec<(u64, Rc>)> = vec![(slot, slot_meta.clone())]; + while !next_slots.is_empty() { + let (_, current_slot) = next_slots.pop().unwrap(); + // Check whether we should explore the children of this slot + if slot_function(&mut current_slot.borrow_mut()) { let current_slot = &RefCell::borrow(&*current_slot); - if current_slot.is_full() { - for next_slot_index in current_slot.next_slots.iter() { - let next_slot = self.find_slot_meta_else_create( - working_set, - new_chained_slots, - *next_slot_index, - )?; - next_slots.push((*next_slot_index, next_slot)); - } + for next_slot_index in current_slot.next_slots.iter() { + let next_slot = self.find_slot_meta_else_create( + working_set, + new_chained_slots, + *next_slot_index, + )?; + next_slots.push((*next_slot_index, next_slot)); } } } @@ -712,6 +764,14 @@ impl Blocktree { Ok(()) } + fn is_detached_head(meta: &SlotMeta) -> bool { + // If we have children, but no parent, then this is the head of a detached chain of + // slots + meta.parent_slot == std::u64::MAX + } + + // 1) Chain current_slot to the previous slot defined by prev_slot_meta + // 2) Determine whether to set the is_connected flag fn chain_new_slot_to_prev_slot( &self, prev_slot_meta: &mut SlotMeta, @@ -722,20 +782,17 @@ impl Blocktree { current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full(); } - fn is_newly_completed_slot( - &self, - slot_meta: &SlotMeta, - backup_slot_meta: &Option, - ) -> bool { + fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option) -> bool { slot_meta.is_full() && (backup_slot_meta.is_none() + || Self::is_detached_head(&backup_slot_meta.as_ref().unwrap()) || slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed) } // 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched, // else: - // 2) Search the database for that slot metadata. If still no luck, then - // 3) Create a dummy placeholder slot in the database + // 2) Search the database for that slot metadata. If still no luck, then: + // 3) Create a dummy `detached head` slot in the database fn find_slot_meta_else_create<'a>( &self, working_set: &'a HashMap>, Option)>, @@ -751,7 +808,7 @@ impl Blocktree { } // Search the database for that slot metadata. If still no luck, then - // create a dummy placeholder slot in the database + // create a dummy `detached head` slot in the database fn find_slot_meta_in_db_else_create<'a>( &self, slot: u64, @@ -761,7 +818,7 @@ impl Blocktree { insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); Ok(insert_map.get(&slot).unwrap().clone()) } else { - // If this slot doesn't exist, make a placeholder slot. This way we + // If this slot doesn't exist, make a `detached head` slot. This way we // remember which slots chained to this one when we eventually get a real blob // for this slot insert_map.insert( @@ -1020,6 +1077,7 @@ pub fn tmp_copy_blocktree(from: &str, name: &str) -> String { #[cfg(test)] pub mod tests { use super::*; + use crate::blocktree::db::Database; use crate::entry::{ create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice, }; @@ -1817,7 +1875,7 @@ pub mod tests { // If "i" is the index of a slot we just inserted, then next_slots should be empty // for slot "i" because no slots chain to that slot, because slot i + 1 is missing. // However, if it's a slot we haven't inserted, aka one of the gaps, then one of the slots - // we just inserted will chain to that gap, so next_slots for that placeholder + // we just inserted will chain to that gap, so next_slots for that `detached head` // slot won't be empty, but the parent slot is unknown so should equal std::u64::MAX. let s = blocktree.meta(i as u64).unwrap().unwrap(); if i % 2 == 0 { @@ -1990,6 +2048,7 @@ pub mod tests { let slot_meta = blocktree.meta(slot).unwrap().unwrap(); assert_eq!(slot_meta.consumed, entries_per_slot); assert_eq!(slot_meta.received, entries_per_slot); + assert!(slot_meta.is_connected); let slot_parent = { if slot == 0 { 0 @@ -2017,6 +2076,9 @@ pub mod tests { } assert_eq!(expected_children, result); } + + // Detached heads is empty + assert!(blocktree.detached_heads_cf.is_empty().unwrap()) } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2065,6 +2127,63 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_detached_head() { + let blocktree_path = get_tmp_ledger_path("test_is_detached_head"); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Create blobs and entries + let entries_per_slot = 1; + let (blobs, _) = make_many_slot_entries(0, 3, entries_per_slot); + + // Write slot 2, which chains to slot 1. We're missing slot 0, + // so slot 1 is the detached head + blocktree.write_blobs(once(&blobs[2])).unwrap(); + let meta = blocktree + .meta(1) + .expect("Expect database get to succeed") + .unwrap(); + assert!(Blocktree::is_detached_head(&meta)); + assert_eq!(get_detached_heads(&blocktree), vec![1]); + + // Write slot 1 which chains to slot 0, so now slot 0 is the + // detached head, and slot 1 is no longer the detached head. + blocktree.write_blobs(once(&blobs[1])).unwrap(); + let meta = blocktree + .meta(1) + .expect("Expect database get to succeed") + .unwrap(); + assert!(!Blocktree::is_detached_head(&meta)); + let meta = blocktree + .meta(0) + .expect("Expect database get to succeed") + .unwrap(); + assert!(Blocktree::is_detached_head(&meta)); + assert_eq!(get_detached_heads(&blocktree), vec![0]); + + // Write some slot that also chains to existing slots and detached head, + // nothing should change + let blob4 = &make_slot_entries(4, 0, 1).0[0]; + let blob5 = &make_slot_entries(5, 1, 1).0[0]; + blocktree.write_blobs(vec![blob4, blob5]).unwrap(); + assert_eq!(get_detached_heads(&blocktree), vec![0]); + + // Write zeroth slot, no more detached heads + blocktree.write_blobs(once(&blobs[0])).unwrap(); + for i in 0..3 { + let meta = blocktree + .meta(i) + .expect("Expect database get to succeed") + .unwrap(); + assert!(!Blocktree::is_detached_head(&meta)); + } + // Detached heads is empty + assert!(blocktree.detached_heads_cf.is_empty().unwrap()) + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + fn test_insert_data_blobs_slots(name: &str, should_bulk_write: bool) { let blocktree_path = get_tmp_ledger_path(name); { @@ -2325,4 +2444,18 @@ pub mod tests { (blobs, entries) } + + fn get_detached_heads(blocktree: &Blocktree) -> Vec { + let mut results = vec![]; + let mut iter = blocktree + .db + .raw_iterator_cf(blocktree.detached_heads_cf.handle()) + .unwrap(); + iter.seek_to_first(); + while iter.valid() { + results.push(DetachedHeadsCf::index(&iter.key().unwrap())); + iter.next(); + } + results + } } diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index a522f14140..723b196007 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -72,6 +72,12 @@ pub trait LedgerColumnFamily: LedgerColumnFamilyRaw { db.put_cf(self.handle(), key, &serialized)?; Ok(()) } + + fn is_empty(&self) -> Result { + let mut db_iterator = self.db().raw_iterator_cf(self.handle())?; + db_iterator.seek_to_first(); + Ok(!db_iterator.valid()) + } } pub trait LedgerColumnFamilyRaw { diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index 354e05cb6b..eb071debd8 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -31,6 +31,12 @@ pub struct ErasureCf { db: Arc, } +/// The detached heads column family +#[derive(Debug)] +pub struct DetachedHeadsCf { + db: Arc, +} + /// Dummy struct to get things compiling /// TODO: all this goes away with Blocktree pub struct EntryIterator(i32); @@ -208,6 +214,34 @@ impl IndexColumn for MetaCf { } } +impl LedgerColumnFamilyRaw for DetachedHeadsCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() + } +} + +impl LedgerColumnFamily for DetachedHeadsCf { + type ValueType = bool; +} + +impl IndexColumn for DetachedHeadsCf { + type Index = u64; + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } + + fn key(slot: &u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], *slot); + key + } +} + impl std::convert::From for Error { fn from(e: kvstore::Error) -> Error { Error::BlocktreeError(BlocktreeError::KvsDb(e)) diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 1b9e3a2086..ba858944cb 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -48,6 +48,12 @@ pub struct ErasureCf { db: Arc, } +/// The detached heads column family +#[derive(Debug)] +pub struct DetachedHeadsCf { + db: Arc, +} + /// TODO: all this goes away with Blocktree pub struct EntryIterator { db_iterator: DBRawIterator, @@ -82,10 +88,13 @@ impl Blocktree { ColumnFamilyDescriptor::new(super::DATA_CF, Blocktree::get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(super::ERASURE_CF, Blocktree::get_cf_options()); + let detached_heads_descriptor = + ColumnFamilyDescriptor::new(super::DETACHED_HEADS_CF, Blocktree::get_cf_options()); let cfs = vec![ meta_cf_descriptor, data_cf_descriptor, erasure_cf_descriptor, + detached_heads_descriptor, ]; // Open the database @@ -104,11 +113,14 @@ impl Blocktree { // Create the erasure column family let erasure_cf = ErasureCf { db: db.clone() }; + let detached_heads_cf = DetachedHeadsCf { db: db.clone() }; + Ok(Blocktree { db, meta_cf, data_cf, erasure_cf, + detached_heads_cf, new_blobs_signals: vec![], }) } @@ -313,6 +325,34 @@ impl IndexColumn for MetaCf { } } +impl LedgerColumnFamilyRaw for DetachedHeadsCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() + } +} + +impl LedgerColumnFamily for DetachedHeadsCf { + type ValueType = bool; +} + +impl IndexColumn for DetachedHeadsCf { + type Index = u64; + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } + + fn key(slot: &u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], *slot); + key + } +} + impl std::convert::From for Error { fn from(e: rocksdb::Error) -> Error { Error::BlocktreeError(BlocktreeError::RocksDb(e))