From 8b1724bb706f19bc3c708f80fd488924a41235fc Mon Sep 17 00:00:00 2001 From: "Mark E. Sinclair" <48664490+mark-solana@users.noreply.github.com> Date: Fri, 26 Apr 2019 10:52:10 -0500 Subject: [PATCH] Serialize blocktree writes by locking the database (#4008) Move several private methods to free functions --- core/src/blocktree.rs | 1483 ++++++++++++++++++++------------------ core/src/blocktree/db.rs | 93 +-- 2 files changed, 839 insertions(+), 737 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 6f8cbdf089..4e292c9efc 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,7 +3,7 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; -use crate::erasure; +use crate::erasure::{self, Session}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; #[cfg(feature = "kvstore")] @@ -73,7 +73,7 @@ pub enum BlocktreeError { // ledger window pub struct Blocktree { - db: Arc, + db: Arc>, meta_cf: LedgerColumn, data_cf: LedgerColumn, erasure_cf: LedgerColumn, @@ -103,27 +103,29 @@ impl Blocktree { let ledger_path = Path::new(&ledger_path).join(BLOCKTREE_DIRECTORY); // Open the database - let db = Arc::new(Database::open(&ledger_path)?); + let db = Database::open(&ledger_path)?; // Create the metadata column family - let meta_cf = LedgerColumn::new(&db); + let meta_cf = db.column(); // Create the data column family - let data_cf = LedgerColumn::new(&db); + let data_cf = db.column(); // Create the erasure column family - let erasure_cf = LedgerColumn::new(&db); + let erasure_cf = db.column(); - let erasure_meta_cf = LedgerColumn::new(&db); + let erasure_meta_cf = db.column(); // Create the orphans column family. An "orphan" is defined as // the head of a detached chain of slots, i.e. a slot with no // known parent - let orphans_cf = LedgerColumn::new(&db); + let orphans_cf = db.column(); // setup erasure let session = Arc::new(erasure::Session::default()); + let db = Arc::new(RwLock::new(db)); + Ok(Blocktree { db, meta_cf, @@ -153,15 +155,22 @@ impl Blocktree { } pub fn meta(&self, slot: u64) -> Result> { - self.meta_cf.get(slot) + self.meta_cf.get(&*self.db.read().unwrap(), slot) + } + + pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result> { + self.erasure_meta_cf + .get(&*self.db.read().unwrap(), (slot, set_index)) } pub fn orphan(&self, slot: u64) -> Result> { - self.orphans_cf.get(slot) + self.orphans_cf.get(&*self.db.read().unwrap(), slot) } pub fn get_next_slot(&self, slot: u64) -> Result> { - let mut db_iterator = self.db.cursor::()?; + let db = self.db.read().unwrap(); + let mut db_iterator = db.cursor::()?; + db_iterator.seek(slot + 1); if !db_iterator.valid() { Ok(None) @@ -255,7 +264,9 @@ impl Blocktree { I: IntoIterator, I::Item: Borrow, { - let mut write_batch = self.db.batch()?; + let mut db = self.db.write().unwrap(); + let mut write_batch = db.batch()?; + let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut recovered_data = vec![]; @@ -274,14 +285,15 @@ impl Blocktree { .entry((blob_slot, set_index)) .or_insert_with(|| { self.erasure_meta_cf - .get((blob_slot, set_index)) + .get(&db, (blob_slot, set_index)) .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::new(set_index)) }); } - self.insert_data_blob_batch( + insert_data_blob_batch( new_blobs.iter().map(Borrow::borrow), + &db, &mut slot_meta_working_set, &mut erasure_meta_working_set, &mut prev_inserted_blob_datas, @@ -289,9 +301,14 @@ impl Blocktree { )?; for (&(slot, _), erasure_meta) in erasure_meta_working_set.iter_mut() { - if let Some((data, coding)) = - self.try_erasure_recover(&erasure_meta, slot, &prev_inserted_blob_datas, None)? - { + if let Some((data, coding)) = try_erasure_recover( + &db, + &self.session, + &erasure_meta, + slot, + &prev_inserted_blob_datas, + None, + )? { for data_blob in data { recovered_data.push(data_blob); } @@ -307,8 +324,9 @@ impl Blocktree { } } - self.insert_data_blob_batch( + insert_data_blob_batch( recovered_data.iter(), + &db, &mut slot_meta_working_set, &mut erasure_meta_working_set, &mut prev_inserted_blob_datas, @@ -316,7 +334,7 @@ impl Blocktree { )?; // Handle chaining for the working set - self.handle_chaining(&mut write_batch, &slot_meta_working_set)?; + handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; let mut should_signal = false; // Check if any metadata was changed, if so, insert the new version of the @@ -325,7 +343,7 @@ impl Blocktree { let meta: &SlotMeta = &RefCell::borrow(&*meta); // Check if the working copy of the metadata has changed if Some(meta) != meta_backup.as_ref() { - should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); + should_signal = should_signal || slot_has_updates(meta, &meta_backup); write_batch.put::(*slot, &meta)?; } } @@ -340,7 +358,7 @@ impl Blocktree { } } - self.db.write(write_batch)?; + db.write(write_batch)?; Ok(()) } @@ -356,7 +374,9 @@ impl Blocktree { buf: &mut [u8], slot: u64, ) -> Result<(u64, u64)> { - let mut db_iterator = self.db.cursor::()?; + let db = self.db.read().unwrap(); + let mut db_iterator = db.cursor::()?; + db_iterator.seek((slot, start_index)); let mut total_blobs = 0; let mut total_current_size = 0; @@ -407,68 +427,81 @@ impl Blocktree { } pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.erasure_cf.get_bytes((slot, index)) + let db = self.db.read().unwrap(); + self.erasure_cf.get_bytes(&db, (slot, index)) } pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); + let mut db = self.db.write().unwrap(); let mut erasure_meta = self .erasure_meta_cf - .get((slot, set_index))? + .get(&db, (slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); erasure_meta.set_coding_present(index, false); - let mut batch = self.db.batch()?; + let mut batch = db.batch()?; batch.delete::((slot, index))?; batch.put::((slot, set_index), &erasure_meta)?; - self.db.write(batch)?; + db.write(batch)?; Ok(()) } pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.data_cf.get_bytes((slot, index)) + let db = self.db.read().unwrap(); + self.data_cf.get_bytes(&db, (slot, index)) } /// For benchmarks, testing, and setup. /// Does no metadata tracking. Use with care. pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.data_cf.put_bytes((slot, index), bytes) + let mut db = self.db.write().unwrap(); + self.data_cf.put_bytes(&mut db, (slot, index), bytes) } /// For benchmarks, testing, and setup. /// Does no metadata tracking. Use with care. pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.erasure_cf.put_bytes((slot, index), bytes) + let mut db = self.db.write().unwrap(); + self.erasure_cf.put_bytes(&mut db, (slot, index), bytes) } /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); + let mut db = self.db.write().unwrap(); + let mut erasure_meta = self .erasure_meta_cf - .get((slot, set_index))? + .get(&db, (slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); erasure_meta.set_coding_present(index, true); erasure_meta.set_size(bytes.len() - BLOB_HEADER_SIZE); - let mut writebatch = self.db.batch()?; + let mut writebatch = db.batch()?; writebatch.put_bytes::((slot, index), bytes)?; - if let Some((data, coding)) = - self.try_erasure_recover(&erasure_meta, slot, &HashMap::new(), Some((index, bytes)))? - { + if let Some((data, coding)) = try_erasure_recover( + &db, + &self.session, + &erasure_meta, + slot, + &HashMap::new(), + Some((index, bytes)), + )? { let mut erasure_meta_working_set = HashMap::new(); erasure_meta_working_set.insert((slot, set_index), erasure_meta); - self.insert_data_blob_batch( + insert_data_blob_batch( &data[..], + &db, &mut HashMap::new(), &mut erasure_meta_working_set, &mut HashMap::new(), @@ -489,96 +522,7 @@ impl Blocktree { writebatch.put::((slot, set_index), &erasure_meta)?; - self.db.write(writebatch)?; - - Ok(()) - } - - fn try_erasure_recover( - &self, - erasure_meta: &ErasureMeta, - slot: u64, - prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - new_coding_blob: Option<(u64, &[u8])>, - ) -> Result, Vec)>> { - use crate::erasure::ERASURE_SET_SIZE; - - let blobs = match erasure_meta.status() { - ErasureMetaStatus::CanRecover => { - let erasure_result = self.recover( - slot, - erasure_meta, - prev_inserted_blob_datas, - new_coding_blob, - ); - - match erasure_result { - Ok((data, coding)) => { - let recovered = data.len() + coding.len(); - assert_eq!( - ERASURE_SET_SIZE, - recovered - + (erasure_meta.coding.count_ones() - + erasure_meta.data.count_ones()) - as usize, - "Recovery should always complete a set" - ); - - info!("[try_erasure] recovered {} blobs", recovered); - inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); - Some((data, coding)) - } - Err(Error::ErasureError(e)) => { - inc_new_counter_info!("blocktree-erasure-recovery_failed", 1); - error!( - "[try_erasure] recovery failed: slot: {}, set_index: {}, cause: {}", - slot, erasure_meta.set_index, e - ); - None - } - - Err(e) => return Err(e), - } - } - ErasureMetaStatus::StillNeed(needed) => { - inc_new_counter_info!("blocktree-erasure-blobs_needed", needed); - None - } - ErasureMetaStatus::DataFull => { - inc_new_counter_info!("blocktree-erasure-complete", 1); - None - } - }; - - Ok(blobs) - } - - fn insert_data_blob_batch<'a, I>( - &self, - new_blobs: I, - slot_meta_working_set: &mut HashMap>, Option)>, - erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>, - prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, - write_batch: &mut WriteBatch, - ) -> Result<()> - where - I: IntoIterator, - { - for blob in new_blobs.into_iter() { - let inserted = self.check_insert_data_blob( - blob, - slot_meta_working_set, - prev_inserted_blob_datas, - write_batch, - ); - - if inserted { - erasure_meta_working_set - .get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index()))) - .unwrap() - .set_data_present(blob.index(), true); - } - } + db.write(writebatch)?; Ok(()) } @@ -675,7 +619,8 @@ impl Blocktree { end_index: u64, max_missing: usize, ) -> Vec { - if let Ok(mut db_iterator) = self.data_cf.cursor() { + let db = self.db.read().unwrap(); + if let Ok(mut db_iterator) = db.cursor::() { Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) } else { vec![] @@ -693,11 +638,11 @@ impl Blocktree { .map(|x| x.0) } - pub fn read_ledger_blobs(&self) -> impl Iterator { - self.data_cf - .iter() - .unwrap() - .map(|(_, blob_data)| Blob::new(&blob_data)) + pub fn read_ledger_blobs(&self) -> impl Iterator + '_ { + let db = self.db.read().unwrap(); + + let iter = db.iter::().unwrap(); + iter.map(|(_, blob_data)| Blob::new(&blob_data)) } /// Return an iterator for all the entries in the given file. @@ -754,7 +699,7 @@ impl Blocktree { None } } - let mut db_iterator = self.data_cf.cursor()?; + let mut db_iterator = self.db.read().unwrap().cursor::()?; db_iterator.seek_to_first(); Ok(EntryIterator { @@ -771,10 +716,15 @@ impl Blocktree { max_entries: Option, ) -> Result<(Vec, usize)> { // Find the next consecutive block of blobs. - let consecutive_blobs = - self.get_slot_consecutive_blobs(slot, &HashMap::new(), blob_start_index, max_entries)?; + let consecutive_blobs = get_slot_consecutive_blobs( + slot, + &self.db.read().unwrap(), + &HashMap::new(), + blob_start_index, + max_entries, + )?; let num = consecutive_blobs.len(); - Ok((Self::deserialize_blobs(&consecutive_blobs), num)) + Ok((deserialize_blobs(&consecutive_blobs), num)) } // Returns slots connecting to any element of the list `slots`. @@ -808,18 +758,22 @@ impl Blocktree { } pub fn set_root(&self, slot: u64) -> Result<()> { - *self.root_slot.write().unwrap() = slot; + let mut root_slot = self.root_slot.write().unwrap(); + let mut db = self.db.write().unwrap(); - if let Some(mut meta) = self.meta_cf.get(slot)? { + *root_slot = slot; + if let Some(mut meta) = self.meta_cf.get(&db, slot)? { meta.is_root = true; - self.meta_cf.put(slot, &meta)?; + self.meta_cf.put(&mut db, slot, &meta)?; } Ok(()) } pub fn get_orphans(&self, max: Option) -> Vec { let mut results = vec![]; - let mut iter = self.orphans_cf.cursor().unwrap(); + let db = self.db.read().unwrap(); + + let mut iter = db.cursor::().unwrap(); iter.seek_to_first(); while iter.valid() { if let Some(max) = max { @@ -833,461 +787,6 @@ impl Blocktree { results } - fn deserialize_blobs(blob_datas: &[I]) -> Vec - where - I: Borrow<[u8]>, - { - blob_datas - .iter() - .flat_map(|blob_data| { - let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..]; - Self::deserialize_blob_data(serialized_entries_data) - .expect("Ledger should only contain well formed data") - }) - .collect() - } - - fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option) -> bool { - // We should signal that there are updates if we extended the chain of consecutive blocks starting - // from block 0, which is true iff: - // 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks - // starting from block 0, - slot_meta.is_connected && - // AND either: - // 1) The slot didn't exist in the database before, and now we have a consecutive - // block for that slot - ((slot_meta_backup.is_none() && slot_meta.consumed != 0) || - // OR - // 2) The slot did exist, but now we have a new consecutive block for that slot - (slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed)) - } - - // Chaining based on latest discussion here: https://github.com/solana-labs/solana/pull/2253 - fn handle_chaining( - &self, - write_batch: &mut WriteBatch, - working_set: &HashMap>, Option)>, - ) -> Result<()> { - let mut new_chained_slots = HashMap::new(); - let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect(); - for slot in working_set_slots { - self.handle_chaining_for_slot(write_batch, working_set, &mut new_chained_slots, slot)?; - } - - // Write all the newly changed slots in new_chained_slots to the write_batch - for (slot, meta) in new_chained_slots.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta); - write_batch.put::(*slot, meta)?; - } - Ok(()) - } - - fn handle_chaining_for_slot( - &self, - write_batch: &mut WriteBatch, - working_set: &HashMap>, Option)>, - new_chained_slots: &mut HashMap>>, - slot: u64, - ) -> Result<()> { - let (meta, meta_backup) = working_set - .get(&slot) - .expect("Slot must exist in the working_set hashmap"); - - { - let is_orphan = meta_backup.is_some() && Self::is_orphan(meta_backup.as_ref().unwrap()); - - let mut meta_mut = meta.borrow_mut(); - - // If: - // 1) This is a new slot - // 2) slot != 0 - // then try to chain this slot to a previous slot - if slot != 0 { - let prev_slot = meta_mut.parent_slot; - - // Check if the slot represented by meta_mut is either a new slot or a orphan. - // In both cases we need to run the chaining logic b/c the parent on the slot was - // previously unknown. - if meta_backup.is_none() || is_orphan { - let prev_slot_meta = - self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?; - - // This is a newly inserted slot so run the chaining logic - self.chain_new_slot_to_prev_slot( - &mut prev_slot_meta.borrow_mut(), - slot, - &mut meta_mut, - ); - - if Self::is_orphan(&RefCell::borrow(&*prev_slot_meta)) { - write_batch.put::(prev_slot, &true)?; - } - } - } - - // At this point this slot has received a parent, so no longer a orphan - if is_orphan { - write_batch.delete::(slot)?; - } - } - - // This is a newly inserted slot and slot.is_connected is true, so update all - // child slots so that their `is_connected` = true - let should_propagate_is_connected = - Self::is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup) - && RefCell::borrow(&*meta).is_connected; - - if should_propagate_is_connected { - // slot_function returns a boolean indicating whether to explore the children - // of the input slot - let slot_function = |slot: &mut SlotMeta| { - slot.is_connected = true; - - // We don't want to set the is_connected flag on the children of non-full - // slots - slot.is_full() - }; - - self.traverse_children_mut(slot, &meta, working_set, new_chained_slots, slot_function)?; - } - - Ok(()) - } - - fn traverse_children_mut( - &self, - slot: u64, - slot_meta: &Rc>, - working_set: &HashMap>, Option)>, - new_chained_slots: &mut HashMap>>, - slot_function: F, - ) -> Result<()> - where - F: Fn(&mut SlotMeta) -> bool, - { - let mut next_slots: Vec<(u64, Rc>)> = vec![(slot, slot_meta.clone())]; - while !next_slots.is_empty() { - let (_, current_slot) = next_slots.pop().unwrap(); - // Check whether we should explore the children of this slot - if slot_function(&mut current_slot.borrow_mut()) { - let current_slot = &RefCell::borrow(&*current_slot); - for next_slot_index in current_slot.next_slots.iter() { - let next_slot = self.find_slot_meta_else_create( - working_set, - new_chained_slots, - *next_slot_index, - )?; - next_slots.push((*next_slot_index, next_slot)); - } - } - } - - Ok(()) - } - - fn data_blob_exists(&self, slot: u64, index: u64) -> bool { - self.erasure_meta_cf - .get((slot, ErasureMeta::set_index_for(index))) - .expect("Expect database get to succeed") - .map(|e| e.is_data_present(index)) - .unwrap_or(false) - } - - fn is_orphan(meta: &SlotMeta) -> bool { - // If we have no parent, then this is the head of a detached chain of - // slots - !meta.is_parent_set() - } - - // 1) Chain current_slot to the previous slot defined by prev_slot_meta - // 2) Determine whether to set the is_connected flag - fn chain_new_slot_to_prev_slot( - &self, - prev_slot_meta: &mut SlotMeta, - current_slot: u64, - current_slot_meta: &mut SlotMeta, - ) { - prev_slot_meta.next_slots.push(current_slot); - current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full(); - } - - fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option) -> bool { - slot_meta.is_full() - && (backup_slot_meta.is_none() - || Self::is_orphan(&backup_slot_meta.as_ref().unwrap()) - || slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed) - } - - // 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched, - // else: - // 2) Search the database for that slot metadata. If still no luck, then: - // 3) Create a dummy orphan slot in the database - fn find_slot_meta_else_create<'a>( - &self, - working_set: &'a HashMap>, Option)>, - chained_slots: &'a mut HashMap>>, - slot_index: u64, - ) -> Result>> { - let result = self.find_slot_meta_in_cached_state(working_set, chained_slots, slot_index)?; - if let Some(slot) = result { - Ok(slot) - } else { - self.find_slot_meta_in_db_else_create(slot_index, chained_slots) - } - } - - // Search the database for that slot metadata. If still no luck, then - // create a dummy orphan slot in the database - fn find_slot_meta_in_db_else_create<'a>( - &self, - slot: u64, - insert_map: &'a mut HashMap>>, - ) -> Result>> { - if let Some(slot_meta) = self.meta(slot)? { - insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); - Ok(insert_map.get(&slot).unwrap().clone()) - } else { - // If this slot doesn't exist, make a orphan slot. This way we - // remember which slots chained to this one when we eventually get a real blob - // for this slot - insert_map.insert( - slot, - Rc::new(RefCell::new(SlotMeta::new(slot, std::u64::MAX))), - ); - Ok(insert_map.get(&slot).unwrap().clone()) - } - } - - // Find the slot metadata in the cache of dirty slot metadata we've previously touched - fn find_slot_meta_in_cached_state<'a>( - &self, - working_set: &'a HashMap>, Option)>, - chained_slots: &'a HashMap>>, - slot: u64, - ) -> Result>>> { - if let Some((entry, _)) = working_set.get(&slot) { - Ok(Some(entry.clone())) - } else if let Some(entry) = chained_slots.get(&slot) { - Ok(Some(entry.clone())) - } else { - Ok(None) - } - } - - /// Checks to see if the data blob passes integrity checks for insertion. Proceeds with - /// insertion if it does. - fn check_insert_data_blob<'a>( - &self, - blob: &'a Blob, - slot_meta_working_set: &mut HashMap>, Option)>, - prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, - write_batch: &mut WriteBatch, - ) -> bool { - let blob_slot = blob.slot(); - let parent_slot = blob.parent(); - - // Check if we've already inserted the slot metadata for this blob's slot - let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { - // Store a 2-tuple of the metadata (working copy, backup copy) - if let Some(mut meta) = self - .meta(blob_slot) - .expect("Expect database get to succeed") - { - let backup = Some(meta.clone()); - // If parent_slot == std::u64::MAX, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if Blocktree::is_orphan(&meta) { - meta.parent_slot = parent_slot; - } - - (Rc::new(RefCell::new(meta)), backup) - } else { - ( - Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))), - None, - ) - } - }); - - let slot_meta = &mut entry.0.borrow_mut(); - - // This slot is full, skip the bogus blob - // Check if this blob should be inserted - if !self.should_insert_blob(&slot_meta, &prev_inserted_blob_datas, blob) { - false - } else { - let _ = self.insert_data_blob(blob, prev_inserted_blob_datas, slot_meta, write_batch); - true - } - } - - /// Insert a blob into ledger, updating the slot_meta if necessary - fn insert_data_blob<'a>( - &self, - blob_to_insert: &'a Blob, - prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, - slot_meta: &mut SlotMeta, - write_batch: &mut WriteBatch, - ) -> Result<()> { - let blob_index = blob_to_insert.index(); - let blob_slot = blob_to_insert.slot(); - let blob_size = blob_to_insert.size(); - - let new_consumed = { - if slot_meta.consumed == blob_index { - let blob_datas = self.get_slot_consecutive_blobs( - blob_slot, - prev_inserted_blob_datas, - // Don't start looking for consecutive blobs at blob_index, - // because we haven't inserted/committed the new blob_to_insert - // into the database or prev_inserted_blob_datas hashmap yet. - blob_index + 1, - None, - )?; - - // Add one because we skipped this current blob when calling - // get_slot_consecutive_blobs() earlier - slot_meta.consumed + blob_datas.len() as u64 + 1 - } else { - slot_meta.consumed - } - }; - - let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; - - // Commit step: commit all changes to the mutable structures at once, or none at all. - // We don't want only some of these changes going through. - write_batch.put_bytes::((blob_slot, blob_index), serialized_blob_data)?; - prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); - // Index is zero-indexed, while the "received" height starts from 1, - // so received = index + 1 for the same blob. - slot_meta.received = cmp::max(blob_index + 1, slot_meta.received); - slot_meta.consumed = new_consumed; - slot_meta.last_index = { - // If the last slot hasn't been set before, then - // set it to this blob index - if slot_meta.last_index == std::u64::MAX { - if blob_to_insert.is_last_in_slot() { - blob_index - } else { - std::u64::MAX - } - } else { - slot_meta.last_index - } - }; - Ok(()) - } - - /// Attempts recovery using erasure coding - fn recover( - &self, - slot: u64, - erasure_meta: &ErasureMeta, - prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - new_coding: Option<(u64, &[u8])>, - ) -> Result<(Vec, Vec)> { - use crate::erasure::ERASURE_SET_SIZE; - - let start_idx = erasure_meta.start_index(); - let size = erasure_meta.size(); - - let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); - - let present = &mut [true; ERASURE_SET_SIZE]; - let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); - - for i in start_idx..coding_end_idx { - if erasure_meta.is_coding_present(i) { - let mut blob_bytes = match new_coding { - Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), - _ => self - .erasure_cf - .get_bytes((slot, i))? - .expect("ErasureMeta must have no false positives"), - }; - - blob_bytes.drain(..BLOB_HEADER_SIZE); - - blobs.push(blob_bytes); - } else { - let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize; - blobs.push(vec![0; size]); - present[set_relative_idx] = false; - } - } - - assert_ne!(size, 0); - - for i in start_idx..data_end_idx { - let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; - - if erasure_meta.is_data_present(i) { - let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { - Some(bytes) => bytes.to_vec(), - None => self - .data_cf - .get_bytes((slot, i))? - .expect("erasure_meta must have no false positives"), - }; - - // If data is too short, extend it with zeroes - blob_bytes.resize(size, 0u8); - - blobs.insert(set_relative_idx, blob_bytes); - } else { - blobs.insert(set_relative_idx, vec![0u8; size]); - // data erasures must come before any coding erasures if present - present[set_relative_idx] = false; - } - } - - let (recovered_data, recovered_coding) = self - .session - .reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; - - trace!( - "[recover] reconstruction OK slot: {}, indexes: [{},{})", - slot, - start_idx, - data_end_idx - ); - - Ok((recovered_data, recovered_coding)) - } - - /// Returns the next consumed index and the number of ticks in the new consumed - /// range - fn get_slot_consecutive_blobs<'a>( - &self, - slot: u64, - prev_inserted_blob_datas: &HashMap<(u64, u64), &'a [u8]>, - mut current_index: u64, - max_blobs: Option, - ) -> Result>> { - let mut blobs: Vec> = vec![]; - loop { - if Some(blobs.len() as u64) == max_blobs { - break; - } - // Try to find the next blob we're looking for in the prev_inserted_blob_datas - if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { - blobs.push(Cow::Borrowed(*prev_blob_data)); - } else if let Some(blob_data) = self.data_cf.get_bytes((slot, current_index))? { - // Try to find the next blob we're looking for in the database - blobs.push(Cow::Owned(blob_data)); - } else { - break; - } - - current_index += 1; - } - - Ok(blobs) - } - // Handle special case of writing genesis blobs. For instance, the first two entries // don't count as ticks, even if they're empty entries fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { @@ -1295,76 +794,627 @@ impl Blocktree { let mut bootstrap_meta = SlotMeta::new(0, 1); let last = blobs.last().unwrap(); + let mut db = self.db.write().unwrap(); + bootstrap_meta.consumed = last.index() + 1; bootstrap_meta.received = last.index() + 1; bootstrap_meta.is_connected = true; - let mut batch = self.db.batch()?; + let mut batch = db.batch()?; batch.put::(0, &bootstrap_meta)?; for blob in blobs { let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; batch.put_bytes::((blob.slot(), blob.index()), serialized_blob_datas)?; } - self.db.write(batch)?; + db.write(batch)?; Ok(()) } +} - fn should_insert_blob( - &self, - slot: &SlotMeta, - prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - blob: &Blob, - ) -> bool { - let blob_index = blob.index(); - let blob_slot = blob.slot(); +fn insert_data_blob_batch<'a, I>( + new_blobs: I, + db: &Database, + slot_meta_working_set: &mut HashMap>, Option)>, + erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + write_batch: &mut WriteBatch, +) -> Result<()> +where + I: IntoIterator, +{ + for blob in new_blobs.into_iter() { + let inserted = check_insert_data_blob( + blob, + db, + slot_meta_working_set, + prev_inserted_blob_datas, + write_batch, + ); - // Check that the blob doesn't already exist - if blob_index < slot.consumed - || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) - || self.data_blob_exists(blob_slot, blob_index) + if inserted { + erasure_meta_working_set + .get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index()))) + .unwrap() + .set_data_present(blob.index(), true); + } + } + + Ok(()) +} + +/// Insert a blob into ledger, updating the slot_meta if necessary +fn insert_data_blob<'a>( + blob_to_insert: &'a Blob, + db: &Database, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + slot_meta: &mut SlotMeta, + write_batch: &mut WriteBatch, +) -> Result<()> { + let blob_index = blob_to_insert.index(); + let blob_slot = blob_to_insert.slot(); + let blob_size = blob_to_insert.size(); + + let new_consumed = { + if slot_meta.consumed == blob_index { + let blob_datas = get_slot_consecutive_blobs( + blob_slot, + db, + prev_inserted_blob_datas, + // Don't start looking for consecutive blobs at blob_index, + // because we haven't inserted/committed the new blob_to_insert + // into the database or prev_inserted_blob_datas hashmap yet. + blob_index + 1, + None, + )?; + + // Add one because we skipped this current blob when calling + // get_slot_consecutive_blobs() earlier + slot_meta.consumed + blob_datas.len() as u64 + 1 + } else { + slot_meta.consumed + } + }; + + let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; + + // Commit step: commit all changes to the mutable structures at once, or none at all. + // We don't want only some of these changes going through. + write_batch.put_bytes::((blob_slot, blob_index), serialized_blob_data)?; + prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); + // Index is zero-indexed, while the "received" height starts from 1, + // so received = index + 1 for the same blob. + slot_meta.received = cmp::max(blob_index + 1, slot_meta.received); + slot_meta.consumed = new_consumed; + slot_meta.last_index = { + // If the last slot hasn't been set before, then + // set it to this blob index + if slot_meta.last_index == std::u64::MAX { + if blob_to_insert.is_last_in_slot() { + blob_index + } else { + std::u64::MAX + } + } else { + slot_meta.last_index + } + }; + Ok(()) +} + +/// Checks to see if the data blob passes integrity checks for insertion. Proceeds with +/// insertion if it does. +fn check_insert_data_blob<'a>( + blob: &'a Blob, + db: &Database, + slot_meta_working_set: &mut HashMap>, Option)>, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + write_batch: &mut WriteBatch, +) -> bool { + let blob_slot = blob.slot(); + let parent_slot = blob.parent(); + let meta_cf = db.column::(); + + // Check if we've already inserted the slot metadata for this blob's slot + let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = meta_cf + .get(db, blob_slot) + .expect("Expect database get to succeed") { - return false; - } + let backup = Some(meta.clone()); + // If parent_slot == std::u64::MAX, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if is_orphan(&meta) { + meta.parent_slot = parent_slot; + } - // Check that we do not receive blobs >= than the last_index - // for the slot - let last_index = slot.last_index; - if blob_index >= last_index { - solana_metrics::submit( - solana_metrics::influxdb::Point::new("blocktree_error") - .add_field( - "error", - solana_metrics::influxdb::Value::String(format!( - "Received last blob with index {} >= slot.last_index {}", - blob_index, last_index - )), - ) - .to_owned(), - ); - return false; + (Rc::new(RefCell::new(meta)), backup) + } else { + ( + Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))), + None, + ) } + }); - // Check that we do not receive a blob with "last_index" true, but index - // less than our current received - if blob.is_last_in_slot() && blob_index < slot.received { - solana_metrics::submit( - solana_metrics::influxdb::Point::new("blocktree_error") - .add_field( - "error", - solana_metrics::influxdb::Value::String(format!( - "Received last blob with index {} < slot.received {}", - blob_index, slot.received - )), - ) - .to_owned(), - ); - return false; - } + let slot_meta = &mut entry.0.borrow_mut(); + // This slot is full, skip the bogus blob + // Check if this blob should be inserted + if !should_insert_blob(&slot_meta, db, &prev_inserted_blob_datas, blob) { + false + } else { + let _ = insert_data_blob(blob, db, prev_inserted_blob_datas, slot_meta, write_batch); true } } +fn should_insert_blob( + slot: &SlotMeta, + db: &Database, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + blob: &Blob, +) -> bool { + let blob_index = blob.index(); + let blob_slot = blob.slot(); + let data_cf = db.column::(); + + // Check that the blob doesn't already exist + if blob_index < slot.consumed + || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) + || data_cf + .get_bytes(db, (blob_slot, blob_index)) + .map(|opt| opt.is_some()) + .unwrap_or(false) + { + return false; + } + + // Check that we do not receive blobs >= than the last_index + // for the slot + let last_index = slot.last_index; + if blob_index >= last_index { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String(format!( + "Received last blob with index {} >= slot.last_index {}", + blob_index, last_index + )), + ) + .to_owned(), + ); + return false; + } + + // Check that we do not receive a blob with "last_index" true, but index + // less than our current received + if blob.is_last_in_slot() && blob_index < slot.received { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String(format!( + "Received last blob with index {} < slot.received {}", + blob_index, slot.received + )), + ) + .to_owned(), + ); + return false; + } + + true +} + +// 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched, +// else: +// 2) Search the database for that slot metadata. If still no luck, then: +// 3) Create a dummy orphan slot in the database +fn find_slot_meta_else_create<'a>( + db: &Database, + working_set: &'a HashMap>, Option)>, + chained_slots: &'a mut HashMap>>, + slot_index: u64, +) -> Result>> { + let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index)?; + if let Some(slot) = result { + Ok(slot) + } else { + find_slot_meta_in_db_else_create(db, slot_index, chained_slots) + } +} + +// Search the database for that slot metadata. If still no luck, then +// create a dummy orphan slot in the database +fn find_slot_meta_in_db_else_create<'a>( + db: &Database, + slot: u64, + insert_map: &'a mut HashMap>>, +) -> Result>> { + if let Some(slot_meta) = db.column::().get(db, slot)? { + insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); + Ok(insert_map.get(&slot).unwrap().clone()) + } else { + // If this slot doesn't exist, make a orphan slot. This way we + // remember which slots chained to this one when we eventually get a real blob + // for this slot + insert_map.insert( + slot, + Rc::new(RefCell::new(SlotMeta::new(slot, std::u64::MAX))), + ); + Ok(insert_map.get(&slot).unwrap().clone()) + } +} + +// Find the slot metadata in the cache of dirty slot metadata we've previously touched +fn find_slot_meta_in_cached_state<'a>( + working_set: &'a HashMap>, Option)>, + chained_slots: &'a HashMap>>, + slot: u64, +) -> Result>>> { + if let Some((entry, _)) = working_set.get(&slot) { + Ok(Some(entry.clone())) + } else if let Some(entry) = chained_slots.get(&slot) { + Ok(Some(entry.clone())) + } else { + Ok(None) + } +} + +/// Returns the next consumed index and the number of ticks in the new consumed +/// range +fn get_slot_consecutive_blobs<'a>( + slot: u64, + db: &Database, + prev_inserted_blob_datas: &HashMap<(u64, u64), &'a [u8]>, + mut current_index: u64, + max_blobs: Option, +) -> Result>> { + let mut blobs: Vec> = vec![]; + let data_cf = db.column::(); + + loop { + if Some(blobs.len() as u64) == max_blobs { + break; + } + // Try to find the next blob we're looking for in the prev_inserted_blob_datas + if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { + blobs.push(Cow::Borrowed(*prev_blob_data)); + } else if let Some(blob_data) = data_cf.get_bytes(db, (slot, current_index))? { + // Try to find the next blob we're looking for in the database + blobs.push(Cow::Owned(blob_data)); + } else { + break; + } + + current_index += 1; + } + + Ok(blobs) +} + +// Chaining based on latest discussion here: https://github.com/solana-labs/solana/pull/2253 +fn handle_chaining( + db: &Database, + write_batch: &mut WriteBatch, + working_set: &HashMap>, Option)>, +) -> Result<()> { + let mut new_chained_slots = HashMap::new(); + let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect(); + for slot in working_set_slots { + handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, slot)?; + } + + // Write all the newly changed slots in new_chained_slots to the write_batch + for (slot, meta) in new_chained_slots.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta); + write_batch.put::(*slot, meta)?; + } + Ok(()) +} + +fn handle_chaining_for_slot( + db: &Database, + write_batch: &mut WriteBatch, + working_set: &HashMap>, Option)>, + new_chained_slots: &mut HashMap>>, + slot: u64, +) -> Result<()> { + let (meta, meta_backup) = working_set + .get(&slot) + .expect("Slot must exist in the working_set hashmap"); + + { + let is_orphaned = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); + + let mut meta_mut = meta.borrow_mut(); + + // If: + // 1) This is a new slot + // 2) slot != 0 + // then try to chain this slot to a previous slot + if slot != 0 { + let prev_slot = meta_mut.parent_slot; + + // Check if the slot represented by meta_mut is either a new slot or a orphan. + // In both cases we need to run the chaining logic b/c the parent on the slot was + // previously unknown. + if meta_backup.is_none() || is_orphaned { + let prev_slot_meta = + find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?; + + // This is a newly inserted slot so run the chaining logic + chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut); + + if is_orphan(&RefCell::borrow(&*prev_slot_meta)) { + write_batch.put::(prev_slot, &true)?; + } + } + } + + // At this point this slot has received a parent, so no longer a orphan + if is_orphaned { + write_batch.delete::(slot)?; + } + } + + // This is a newly inserted slot and slot.is_connected is true, so update all + // child slots so that their `is_connected` = true + let should_propagate_is_connected = + is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup) + && RefCell::borrow(&*meta).is_connected; + + if should_propagate_is_connected { + // slot_function returns a boolean indicating whether to explore the children + // of the input slot + let slot_function = |slot: &mut SlotMeta| { + slot.is_connected = true; + + // We don't want to set the is_connected flag on the children of non-full + // slots + slot.is_full() + }; + + traverse_children_mut( + db, + slot, + &meta, + working_set, + new_chained_slots, + slot_function, + )?; + } + + Ok(()) +} + +fn traverse_children_mut( + db: &Database, + slot: u64, + slot_meta: &Rc>, + working_set: &HashMap>, Option)>, + new_chained_slots: &mut HashMap>>, + slot_function: F, +) -> Result<()> +where + F: Fn(&mut SlotMeta) -> bool, +{ + let mut next_slots: Vec<(u64, Rc>)> = vec![(slot, slot_meta.clone())]; + while !next_slots.is_empty() { + let (_, current_slot) = next_slots.pop().unwrap(); + // Check whether we should explore the children of this slot + if slot_function(&mut current_slot.borrow_mut()) { + let current_slot = &RefCell::borrow(&*current_slot); + for next_slot_index in current_slot.next_slots.iter() { + let next_slot = find_slot_meta_else_create( + db, + working_set, + new_chained_slots, + *next_slot_index, + )?; + next_slots.push((*next_slot_index, next_slot)); + } + } + } + + Ok(()) +} + +fn is_orphan(meta: &SlotMeta) -> bool { + // If we have no parent, then this is the head of a detached chain of + // slots + !meta.is_parent_set() +} + +// 1) Chain current_slot to the previous slot defined by prev_slot_meta +// 2) Determine whether to set the is_connected flag +fn chain_new_slot_to_prev_slot( + prev_slot_meta: &mut SlotMeta, + current_slot: u64, + current_slot_meta: &mut SlotMeta, +) { + prev_slot_meta.next_slots.push(current_slot); + current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full(); +} + +fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option) -> bool { + slot_meta.is_full() + && (backup_slot_meta.is_none() + || is_orphan(&backup_slot_meta.as_ref().unwrap()) + || slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed) +} + +/// Attempts recovery using erasure coding +fn try_erasure_recover( + db: &Database, + session: &Session, + erasure_meta: &ErasureMeta, + slot: u64, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + new_coding_blob: Option<(u64, &[u8])>, +) -> Result, Vec)>> { + use crate::erasure::ERASURE_SET_SIZE; + + let blobs = match erasure_meta.status() { + ErasureMetaStatus::CanRecover => { + let erasure_result = recover( + db, + session, + slot, + erasure_meta, + prev_inserted_blob_datas, + new_coding_blob, + ); + + match erasure_result { + Ok((data, coding)) => { + let recovered = data.len() + coding.len(); + assert_eq!( + ERASURE_SET_SIZE, + recovered + + (erasure_meta.coding.count_ones() + erasure_meta.data.count_ones()) + as usize, + "Recovery should always complete a set" + ); + + info!("[try_erasure] recovered {} blobs", recovered); + inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); + Some((data, coding)) + } + Err(Error::ErasureError(e)) => { + inc_new_counter_info!("blocktree-erasure-recovery_failed", 1); + error!( + "[try_erasure] recovery failed: slot: {}, set_index: {}, cause: {}", + slot, erasure_meta.set_index, e + ); + None + } + + Err(e) => return Err(e), + } + } + ErasureMetaStatus::StillNeed(needed) => { + inc_new_counter_info!("blocktree-erasure-blobs_needed", needed); + None + } + ErasureMetaStatus::DataFull => { + inc_new_counter_info!("blocktree-erasure-complete", 1); + None + } + }; + + Ok(blobs) +} + +fn recover( + db: &Database, + session: &Session, + slot: u64, + erasure_meta: &ErasureMeta, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + new_coding: Option<(u64, &[u8])>, +) -> Result<(Vec, Vec)> { + use crate::erasure::ERASURE_SET_SIZE; + + let start_idx = erasure_meta.start_index(); + let size = erasure_meta.size(); + let data_cf = db.column::(); + let erasure_cf = db.column::(); + + let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); + + let present = &mut [true; ERASURE_SET_SIZE]; + let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); + + for i in start_idx..coding_end_idx { + if erasure_meta.is_coding_present(i) { + let mut blob_bytes = match new_coding { + Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), + _ => erasure_cf + .get_bytes(db, (slot, i))? + .expect("ErasureMeta must have no false positives"), + }; + + blob_bytes.drain(..BLOB_HEADER_SIZE); + + blobs.push(blob_bytes); + } else { + let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize; + blobs.push(vec![0; size]); + present[set_relative_idx] = false; + } + } + + assert_ne!(size, 0); + + for i in start_idx..data_end_idx { + let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; + + if erasure_meta.is_data_present(i) { + let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { + Some(bytes) => bytes.to_vec(), + None => data_cf + .get_bytes(db, (slot, i))? + .expect("erasure_meta must have no false positives"), + }; + + // If data is too short, extend it with zeroes + blob_bytes.resize(size, 0u8); + + blobs.insert(set_relative_idx, blob_bytes); + } else { + blobs.insert(set_relative_idx, vec![0u8; size]); + // data erasures must come before any coding erasures if present + present[set_relative_idx] = false; + } + } + + let (recovered_data, recovered_coding) = + session.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; + + trace!( + "[recover] reconstruction OK slot: {}, indexes: [{},{})", + slot, + start_idx, + data_end_idx + ); + + Ok((recovered_data, recovered_coding)) +} + +fn deserialize_blobs(blob_datas: &[I]) -> Vec +where + I: Borrow<[u8]>, +{ + blob_datas + .iter() + .flat_map(|blob_data| { + let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..]; + Blocktree::deserialize_blob_data(serialized_entries_data) + .expect("Ledger should only contain well formed data") + }) + .collect() +} + +fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option) -> bool { + // We should signal that there are updates if we extended the chain of consecutive blocks starting + // from block 0, which is true iff: + // 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks + // starting from block 0, + slot_meta.is_connected && + // AND either: + // 1) The slot didn't exist in the database before, and now we have a consecutive + // block for that slot + ((slot_meta_backup.is_none() && slot_meta.consumed != 0) || + // OR + // 2) The slot did exist, but now we have a new consecutive block for that slot + (slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed)) +} + // Creates a new ledger with slot 0 full of ticks (and only ticks). // // Returns the blockhash that can be used to append entries with. @@ -1571,13 +1621,14 @@ pub mod tests { fn test_put_get_simple() { let ledger_path = get_tmp_ledger_path("test_put_get_simple"); let ledger = Blocktree::open(&ledger_path).unwrap(); + let mut db = ledger.db.write().unwrap(); // Test meta column family let meta = SlotMeta::new(0, 1); - ledger.meta_cf.put(0, &meta).unwrap(); + ledger.meta_cf.put(&mut db, 0, &meta).unwrap(); let result = ledger .meta_cf - .get(0) + .get(&db, 0) .unwrap() .expect("Expected meta object to exist"); @@ -1586,11 +1637,14 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; let erasure_key = (0, 0); - ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); + ledger + .erasure_cf + .put_bytes(&mut db, erasure_key, &erasure) + .unwrap(); let result = ledger .erasure_cf - .get_bytes(erasure_key) + .get_bytes(&db, erasure_key) .unwrap() .expect("Expected erasure object to exist"); @@ -1599,17 +1653,18 @@ pub mod tests { // Test data column family let data = vec![2u8; 16]; let data_key = (0, 0); - ledger.data_cf.put_bytes(data_key, &data).unwrap(); + ledger.data_cf.put_bytes(&mut db, data_key, &data).unwrap(); let result = ledger .data_cf - .get_bytes(data_key) + .get_bytes(&db, data_key) .unwrap() .expect("Expected data object to exist"); assert_eq!(result, data); // Destroying database without closing it first is undefined behavior + drop(db); drop(ledger); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -1697,8 +1752,7 @@ pub mod tests { assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); let meta = ledger - .meta_cf - .get(0) + .meta(0) .unwrap() .expect("Expected new metadata object to be created"); assert!(meta.consumed == 0 && meta.received == num_entries); @@ -1712,8 +1766,7 @@ pub mod tests { assert_eq!(result, entries); let meta = ledger - .meta_cf - .get(0) + .meta(0) .unwrap() .expect("Expected new metadata object to exist"); assert_eq!(meta.consumed, num_entries); @@ -1742,8 +1795,7 @@ pub mod tests { let result = ledger.get_slot_entries(0, 0, None).unwrap(); let meta = ledger - .meta_cf - .get(0) + .meta(0) .unwrap() .expect("Expected metadata object to exist"); assert_eq!(meta.parent_slot, 0); @@ -1791,6 +1843,8 @@ pub mod tests { let mut db_iterator = blocktree .db + .read() + .unwrap() .cursor::() .expect("Expected to be able to open database iterator"); @@ -1920,7 +1974,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + let meta = blocktree.meta(slot).unwrap().unwrap(); if num_entries % 2 == 0 { assert_eq!(meta.received, num_entries); } else { @@ -1942,7 +1996,7 @@ pub mod tests { original_entries, ); - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + let meta = blocktree.meta(slot).unwrap().unwrap(); assert_eq!(meta.received, num_entries); assert_eq!(meta.consumed, num_entries); assert_eq!(meta.parent_slot, parent_slot); @@ -1995,7 +2049,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,); - let meta = blocktree.meta_cf.get(0).unwrap().unwrap(); + let meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(meta.consumed, num_unique_entries); assert_eq!(meta.received, num_unique_entries); assert_eq!(meta.parent_slot, 0); @@ -2440,7 +2494,10 @@ pub mod tests { } // No orphan slots should exist - assert!(blocktree.orphans_cf.is_empty().unwrap()) + assert!(blocktree + .orphans_cf + .is_empty(&blocktree.db.read().unwrap()) + .unwrap()) } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2457,14 +2514,20 @@ pub mod tests { assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty()); let mut meta0 = SlotMeta::new(0, 0); - blocktree.meta_cf.put(0, &meta0).unwrap(); + blocktree + .meta_cf + .put(&mut blocktree.db.write().unwrap(), 0, &meta0) + .unwrap(); // Slot exists, chains to nothing let expected: HashMap> = HashMap::from_iter(vec![(0, vec![])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected); meta0.next_slots = vec![1, 2]; - blocktree.meta_cf.put(0, &meta0).unwrap(); + blocktree + .meta_cf + .put(&mut blocktree.db.write().unwrap(), 0, &meta0) + .unwrap(); // Slot exists, chains to some other slots let expected: HashMap> = @@ -2474,7 +2537,10 @@ pub mod tests { let mut meta3 = SlotMeta::new(3, 1); meta3.next_slots = vec![10, 5]; - blocktree.meta_cf.put(3, &meta3).unwrap(); + blocktree + .meta_cf + .put(&mut blocktree.db.write().unwrap(), 3, &meta3) + .unwrap(); let expected: HashMap> = HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected); @@ -2500,7 +2566,7 @@ pub mod tests { .meta(1) .expect("Expect database get to succeed") .unwrap(); - assert!(Blocktree::is_orphan(&meta)); + assert!(is_orphan(&meta)); assert_eq!(blocktree.get_orphans(None), vec![1]); // Write slot 1 which chains to slot 0, so now slot 0 is the @@ -2510,12 +2576,12 @@ pub mod tests { .meta(1) .expect("Expect database get to succeed") .unwrap(); - assert!(!Blocktree::is_orphan(&meta)); + assert!(!is_orphan(&meta)); let meta = blocktree .meta(0) .expect("Expect database get to succeed") .unwrap(); - assert!(Blocktree::is_orphan(&meta)); + assert!(is_orphan(&meta)); assert_eq!(blocktree.get_orphans(None), vec![0]); // Write some slot that also chains to existing slots and orphan, @@ -2532,10 +2598,13 @@ pub mod tests { .meta(i) .expect("Expect database get to succeed") .unwrap(); - assert!(!Blocktree::is_orphan(&meta)); + assert!(!is_orphan(&meta)); } // Orphans cf is empty - assert!(blocktree.orphans_cf.is_empty().unwrap()) + assert!(blocktree + .orphans_cf + .is_empty(&blocktree.db.read().unwrap()) + .unwrap()) } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } @@ -2580,7 +2649,7 @@ pub mod tests { entries[i as usize] ); - let meta = blocktree.meta_cf.get(i).unwrap().unwrap(); + let meta = blocktree.meta(i).unwrap().unwrap(); assert_eq!(meta.received, i + 1); assert_eq!(meta.last_index, i); if i != 0 { @@ -2766,12 +2835,22 @@ pub mod tests { // Trying to insert a blob less than consumed should fail let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, 5); - assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[4].clone())); + assert!(!should_insert_blob( + &slot_meta, + &blocktree.db.read().unwrap(), + &HashMap::new(), + &blobs[4].clone() + )); // Trying to insert the same blob again should fail blocktree.insert_data_blobs(&blobs[7..8]).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); - assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[7].clone())); + assert!(!should_insert_blob( + &slot_meta, + &blocktree.db.read().unwrap(), + &HashMap::new(), + &blobs[7].clone() + )); // Trying to insert another "is_last" blob with index < the received index // should fail @@ -2779,7 +2858,12 @@ pub mod tests { let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.received, 9); blobs[8].set_is_last_in_slot(); - assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[8].clone())); + assert!(!should_insert_blob( + &slot_meta, + &blocktree.db.read().unwrap(), + &HashMap::new(), + &blobs[8].clone() + )); // Insert the 10th blob, which is marked as "is_last" blobs[9].set_is_last_in_slot(); @@ -2787,7 +2871,12 @@ pub mod tests { let slot_meta = blocktree.meta(0).unwrap().unwrap(); // Trying to insert a blob with index > the "is_last" blob should fail - assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[10].clone())); + assert!(!should_insert_blob( + &slot_meta, + &blocktree.db.read().unwrap(), + &HashMap::new(), + &blobs[10].clone() + )); drop(blocktree); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2852,8 +2941,7 @@ pub mod tests { blocktree.write_blobs(&blobs[8..16]).unwrap(); let erasure_meta_opt = blocktree - .erasure_meta_cf - .get((slot, 0)) + .erasure_meta(slot, 0) .expect("DB get must succeed"); assert!(erasure_meta_opt.is_some()); @@ -2864,8 +2952,7 @@ pub mod tests { blocktree.write_blobs(&blobs[..8]).unwrap(); let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 0)) + .erasure_meta(slot, 0) .expect("DB get must succeed") .unwrap(); @@ -2886,8 +2973,7 @@ pub mod tests { } let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 0)) + .erasure_meta(slot, 0) .expect("DB get must succeed") .unwrap(); @@ -2899,8 +2985,7 @@ pub mod tests { blocktree.write_blobs(&blobs[16..24]).unwrap(); let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) + .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); @@ -2921,8 +3006,7 @@ pub mod tests { } let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) + .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); @@ -2934,8 +3018,7 @@ pub mod tests { blocktree.write_blobs(&blobs[24..27]).unwrap(); let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) + .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); @@ -2947,8 +3030,7 @@ pub mod tests { blocktree.write_blobs(&blobs[28..29]).unwrap(); let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) + .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); @@ -2963,8 +3045,7 @@ pub mod tests { } let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) + .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); @@ -3018,9 +3099,11 @@ pub mod tests { (slot, blob.index()); } + let db = blocktree.db.read().unwrap(); + let erasure_meta = blocktree .erasure_meta_cf - .get((slot, set_index as u64)) + .get(&db, (slot, set_index as u64)) .expect("Erasure Meta should be present") .unwrap(); @@ -3029,7 +3112,7 @@ pub mod tests { let retrieved_data = blocktree .data_cf - .get_bytes((slot, focused_index as u64)) + .get_bytes(&db, (slot, focused_index as u64)) .unwrap(); assert!(retrieved_data.is_some()); @@ -3080,7 +3163,7 @@ pub mod tests { // try recovery even though there aren't enough blobs let erasure_meta = blocktree .erasure_meta_cf - .get((SLOT, SET_INDEX)) + .get(&blocktree.db.read().unwrap(), (SLOT, SET_INDEX)) .unwrap() .unwrap(); @@ -3088,8 +3171,14 @@ pub mod tests { let prev_inserted_blob_datas = HashMap::new(); - let attempt_result = - blocktree.try_erasure_recover(&erasure_meta, SLOT, &prev_inserted_blob_datas, None); + let attempt_result = try_erasure_recover( + &blocktree.db.read().unwrap(), + &blocktree.session, + &erasure_meta, + SLOT, + &prev_inserted_blob_datas, + None, + ); assert!(attempt_result.is_ok()); let recovered_blobs_opt = attempt_result.unwrap(); @@ -3098,7 +3187,6 @@ pub mod tests { } #[test] - #[ignore] fn test_recovery_multi_slot_multi_thread() { use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use std::thread; @@ -3227,14 +3315,17 @@ pub mod tests { // triggering recovery. let erasure_meta = blocktree .erasure_meta_cf - .get((slot, erasure_set.set_index)) + .get( + &blocktree.db.read().unwrap(), + (slot, erasure_set.set_index), + ) .unwrap() .unwrap(); let status = erasure_meta.status(); attempt += 1; - info!( + debug!( "[multi_slot] thread_id: {}, attempt: {}, slot: {}, set_index: {}, status: {:?}", thread_id, attempt, slot, erasure_set.set_index, status ); @@ -3261,7 +3352,7 @@ pub mod tests { let erasure_meta = blocktree .erasure_meta_cf - .get((slot, set_index)) + .get(&blocktree.db.read().unwrap(), (slot, set_index)) .expect("DB get must succeed") .expect("ErasureMeta must be present for each erasure set"); diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 365673792a..3cd4fff9c1 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -9,7 +9,6 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; -use std::sync::Arc; pub mod columns { #[derive(Debug)] @@ -133,7 +132,7 @@ where B: Backend, C: Column, { - pub db: Arc>, + backend: PhantomData, column: PhantomData, } @@ -171,7 +170,7 @@ where .get_cf(self.cf_handle::(), C::key(key).borrow()) } - pub fn put_bytes(&self, key: C::Index, data: &[u8]) -> Result<()> + pub fn put_bytes(&mut self, key: C::Index, data: &[u8]) -> Result<()> where C: Column, { @@ -179,7 +178,7 @@ where .put_cf(self.cf_handle::(), C::key(key).borrow(), data) } - pub fn delete(&self, key: C::Index) -> Result<()> + pub fn delete(&mut self, key: C::Index) -> Result<()> where C: Column, { @@ -203,7 +202,7 @@ where } } - pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> + pub fn put(&mut self, key: C::Index, value: &C::Type) -> Result<()> where C: TypedColumn, { @@ -241,7 +240,7 @@ where Ok(iter) } - pub fn batch(&self) -> Result> { + pub fn batch(&mut self) -> Result> { let db_write_batch = self.backend.batch()?; let map = self .backend @@ -257,7 +256,7 @@ where }) } - pub fn write(&self, batch: WriteBatch) -> Result<()> { + pub fn write(&mut self, batch: WriteBatch) -> Result<()> { self.backend.write(batch.write_batch) } @@ -268,6 +267,16 @@ where { self.backend.cf_handle(C::NAME).clone() } + + pub fn column(&self) -> LedgerColumn + where + C: Column, + { + LedgerColumn { + backend: PhantomData, + column: PhantomData, + } + } } impl Cursor @@ -324,43 +333,24 @@ where B: Backend, C: Column, { - pub fn new(db: &Arc>) -> Self { - LedgerColumn { - db: Arc::clone(db), - column: PhantomData, - } + pub fn get_bytes(&self, db: &Database, key: C::Index) -> Result>> { + db.backend.get_cf(self.handle(db), C::key(key).borrow()) } - pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { - self.db - .backend - .put_cf(self.handle(), C::key(key).borrow(), value) + pub fn cursor(&self, db: &Database) -> Result> { + db.cursor() } - pub fn get_bytes(&self, key: C::Index) -> Result>> { - self.db.backend.get_cf(self.handle(), C::key(key).borrow()) + pub fn iter(&self, db: &Database) -> Result)>> { + db.iter::() } - pub fn delete(&self, key: C::Index) -> Result<()> { - self.db - .backend - .delete_cf(self.handle(), C::key(key).borrow()) + pub fn handle(&self, db: &Database) -> B::ColumnFamily { + db.cf_handle::() } - pub fn cursor(&self) -> Result> { - self.db.cursor() - } - - pub fn iter(&self) -> Result)>> { - self.db.iter::() - } - - pub fn handle(&self) -> B::ColumnFamily { - self.db.cf_handle::() - } - - pub fn is_empty(&self) -> Result { - let mut cursor = self.cursor()?; + pub fn is_empty(&self, db: &Database) -> Result { + let mut cursor = self.cursor(db)?; cursor.seek_to_first(); Ok(!cursor.valid()) } @@ -369,14 +359,35 @@ where impl LedgerColumn where B: Backend, - C: TypedColumn, + C: Column, { - pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { - self.db.put::(key, value) + pub fn put_bytes(&self, db: &mut Database, key: C::Index, value: &[u8]) -> Result<()> { + db.backend + .put_cf(self.handle(db), C::key(key).borrow(), value) } - pub fn get(&self, key: C::Index) -> Result> { - self.db.get::(key) + pub fn delete(&self, db: &mut Database, key: C::Index) -> Result<()> { + db.backend.delete_cf(self.handle(db), C::key(key).borrow()) + } +} + +impl LedgerColumn +where + B: Backend, + C: TypedColumn, +{ + pub fn get(&self, db: &Database, key: C::Index) -> Result> { + db.get::(key) + } +} + +impl LedgerColumn +where + B: Backend, + C: TypedColumn, +{ + pub fn put(&self, db: &mut Database, key: C::Index, value: &C::Type) -> Result<()> { + db.put::(key, value) } }