From cd39a6afd35288a0c2d3b2cf8995b29790889e69 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 25 Jul 2023 10:25:12 -0500 Subject: [PATCH] Make use of Blockstore's LedgerColumn objects (#32506) * Move functions that take a &Database under impl Blockstore {...} * Replace &Database with &self in those functions and update callers * Use LedgerColumn's from Blockstore instead of re-fetching * Add missing roots LedgerColumn and have it report metrics like others * Remove several redundant comments --- ledger/src/blockstore.rs | 645 +++++++++++++++++++-------------------- 1 file changed, 319 insertions(+), 326 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index b65393fba0..6e0bdf46b0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -176,6 +176,7 @@ pub struct Blockstore { meta_cf: LedgerColumn, dead_slots_cf: LedgerColumn, duplicate_slots_cf: LedgerColumn, + roots_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, index_cf: LedgerColumn, @@ -274,20 +275,13 @@ impl Blockstore { info!("Opening database at {:?}", blockstore_path); let db = Database::open(&blockstore_path, options)?; - // Create the metadata column family let meta_cf = db.column(); - - // Create the dead slots column family let dead_slots_cf = db.column(); let duplicate_slots_cf = db.column(); + let roots_cf = db.column(); let erasure_meta_cf = db.column(); - - // Create the orphans column family. An "orphan" is defined as - // the head of a detached chain of slots, i.e. a slot with no - // known parent let orphans_cf = db.column(); let index_cf = db.column(); - let data_shred_cf = db.column(); let code_shred_cf = db.column(); let transaction_status_cf = db.column(); @@ -336,6 +330,7 @@ impl Blockstore { meta_cf, dead_slots_cf, duplicate_slots_cf, + roots_cf, erasure_meta_cf, orphans_cf, index_cf, @@ -705,6 +700,7 @@ impl Blockstore { self.meta_cf.submit_rocksdb_cf_metrics(); self.dead_slots_cf.submit_rocksdb_cf_metrics(); self.duplicate_slots_cf.submit_rocksdb_cf_metrics(); + self.roots_cf.submit_rocksdb_cf_metrics(); self.erasure_meta_cf.submit_rocksdb_cf_metrics(); self.orphans_cf.submit_rocksdb_cf_metrics(); self.index_cf.submit_rocksdb_cf_metrics(); @@ -724,14 +720,12 @@ impl Blockstore { } fn try_shred_recovery( - db: &Database, + &self, erasure_metas: &HashMap, index_working_set: &mut HashMap, prev_inserted_shreds: &HashMap, reed_solomon_cache: &ReedSolomonCache, ) -> Vec { - let data_cf = db.column::(); - let code_cf = db.column::(); let mut recovered_shreds = vec![]; // Recovery rules: // 1. Only try recovery around indexes for which new data or coding shreds are received @@ -749,8 +743,8 @@ impl Blockstore { erasure_meta, prev_inserted_shreds, &mut recovered_shreds, - &data_cf, - &code_cf, + &self.data_shred_cf, + &self.code_shred_cf, reed_solomon_cache, ); } @@ -839,8 +833,7 @@ impl Blockstore { start.stop(); metrics.insert_lock_elapsed_us += start.as_us(); - let db = &*self.db; - let mut write_batch = db.batch()?; + let mut write_batch = self.db.batch()?; let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); @@ -917,8 +910,7 @@ impl Blockstore { metrics.insert_shreds_elapsed_us += start.as_us(); let mut start = Measure::start("Shred recovery"); if let Some(leader_schedule_cache) = leader_schedule { - let recovered_shreds = Self::try_shred_recovery( - db, + let recovered_shreds = self.try_shred_recovery( &erasure_metas, &mut index_working_set, &just_inserted_shreds, @@ -997,7 +989,7 @@ impl Blockstore { let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others - handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; + self.handle_chaining(&mut write_batch, &mut slot_meta_working_set)?; start.stop(); metrics.chaining_elapsed_us += start.as_us(); @@ -1182,7 +1174,7 @@ impl Blockstore { let shred_index = u64::from(shred.index()); let index_meta_working_set_entry = - get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us); + self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; @@ -1354,11 +1346,9 @@ impl Blockstore { let shred_index = u64::from(shred.index()); let index_meta_working_set_entry = - get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us); - + self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; - let slot_meta_entry = get_slot_meta_entry( - &self.db, + let slot_meta_entry = self.get_slot_meta_entry( slot_meta_working_set, slot, shred @@ -1691,10 +1681,9 @@ impl Blockstore { buffer: &mut [u8], ) -> Result<(u64, usize)> { let _lock = self.check_lowest_cleanup_slot(slot)?; - let meta_cf = self.db.column::(); let mut buffer_offset = 0; let mut last_index = 0; - if let Some(meta) = meta_cf.get(slot)? { + if let Some(meta) = self.meta_cf.get(slot)? { if !meta.is_full() { warn!("The slot is not yet full. Will not return any shreds"); return Ok((last_index, buffer_offset)); @@ -2008,8 +1997,7 @@ impl Blockstore { slot: Slot, require_previous_blockhash: bool, ) -> Result { - let slot_meta_cf = self.db.column::(); - let Some(slot_meta) = slot_meta_cf.get(slot)? else { + let Some(slot_meta) = self.meta_cf.get(slot)? else { info!("SlotMeta not found for slot {}", slot); return Err(BlockstoreError::SlotUnavailable); }; @@ -2941,8 +2929,7 @@ impl Blockstore { slot: Slot, start_index: u64, ) -> Result<(CompletedRanges, Option)> { - let slot_meta_cf = self.db.column::(); - let slot_meta = slot_meta_cf.get(slot)?; + let slot_meta = self.meta_cf.get(slot)?; if slot_meta.is_none() { return Ok((vec![], slot_meta)); } @@ -3353,8 +3340,8 @@ impl Blockstore { /// Note that the reported size does not include those recently inserted /// shreds that are still in memory. pub fn total_data_shred_storage_size(&self) -> Result { - let shred_data_cf = self.db.column::(); - shred_data_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) + self.data_shred_cf + .get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) } /// Returns the total physical storage size contributed by all coding shreds. @@ -3362,8 +3349,8 @@ impl Blockstore { /// Note that the reported size does not include those recently inserted /// shreds that are still in memory. pub fn total_coding_shred_storage_size(&self) -> Result { - let shred_code_cf = self.db.column::(); - shred_code_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) + self.code_shred_cf + .get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) } /// Returns whether the blockstore has primary (read and write) access @@ -3491,6 +3478,305 @@ impl Blockstore { self.db.write(write_batch)?; Ok(()) } + + /// For each entry in `working_set` whose `did_insert_occur` is true, this + /// function handles its chaining effect by updating the SlotMeta of both + /// the slot and its parent slot to reflect the slot descends from the + /// parent slot. In addition, when a slot is newly connected, it also + /// checks whether any of its direct and indirect children slots are connected + /// or not. + /// + /// This function may update column families [`cf::SlotMeta`] and + /// [`cf::Orphans`]. + /// + /// For more information about the chaining, check the previous discussion here: + /// https://github.com/solana-labs/solana/pull/2253 + /// + /// Arguments: + /// - `db`: the blockstore db that stores both shreds and their metadata. + /// - `write_batch`: the write batch which includes all the updates of the + /// the current write and ensures their atomicity. + /// - `working_set`: a slot-id to SlotMetaWorkingSetEntry map. This function + /// will remove all entries which insertion did not actually occur. + fn handle_chaining( + &self, + write_batch: &mut WriteBatch, + working_set: &mut HashMap, + ) -> Result<()> { + // Handle chaining for all the SlotMetas that were inserted into + working_set.retain(|_, entry| entry.did_insert_occur); + let mut new_chained_slots = HashMap::new(); + let working_set_slots: Vec<_> = working_set.keys().collect(); + for slot in working_set_slots { + self.handle_chaining_for_slot(write_batch, working_set, &mut new_chained_slots, *slot)?; + } + + // Write all the newly changed slots in new_chained_slots to the write_batch + for (slot, meta) in new_chained_slots.iter() { + let meta: &SlotMeta = &RefCell::borrow(meta); + write_batch.put::(*slot, meta)?; + } + Ok(()) + } + + /// A helper function of handle_chaining which handles the chaining based + /// on the `SlotMetaWorkingSetEntry` of the specified `slot`. Specifically, + /// it handles the following two things: + /// + /// 1. based on the `SlotMetaWorkingSetEntry` for `slot`, check if `slot` + /// did not previously have a parent slot but does now. If `slot` satisfies + /// this condition, update the Orphan property of both `slot` and its parent + /// slot based on their current orphan status. Specifically: + /// - updates the orphan property of slot to no longer be an orphan because + /// it has a parent. + /// - adds the parent to the orphan column family if the parent's parent is + /// currently unknown. + /// + /// 2. if the `SlotMetaWorkingSetEntry` for `slot` indicates this slot + /// is newly connected to a parent slot, then this function will update + /// the is_connected property of all its direct and indirect children slots. + /// + /// This function may update column family [`cf::Orphans`] and indirectly + /// update SlotMeta from its output parameter `new_chained_slots`. + /// + /// Arguments: + /// `db`: the underlying db for blockstore + /// `write_batch`: the write batch which includes all the updates of the + /// the current write and ensures their atomicity. + /// `working_set`: the working set which include the specified `slot` + /// `new_chained_slots`: an output parameter which includes all the slots + /// which connectivity have been updated. + /// `slot`: the slot which we want to handle its chaining effect. + fn handle_chaining_for_slot( + &self, + write_batch: &mut WriteBatch, + working_set: &HashMap, + new_chained_slots: &mut HashMap>>, + slot: Slot, + ) -> Result<()> { + let slot_meta_entry = working_set + .get(&slot) + .expect("Slot must exist in the working_set hashmap"); + + let meta = &slot_meta_entry.new_slot_meta; + let meta_backup = &slot_meta_entry.old_slot_meta; + { + let mut meta_mut = meta.borrow_mut(); + let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); + + // If: + // 1) This is a new slot + // 2) slot != 0 + // then try to chain this slot to a previous slot + if slot != 0 && meta_mut.parent_slot.is_some() { + let prev_slot = meta_mut.parent_slot.unwrap(); + + // Check if the slot represented by meta_mut is either a new slot or a orphan. + // In both cases we need to run the chaining logic b/c the parent on the slot was + // previously unknown. + if meta_backup.is_none() || was_orphan_slot { + let prev_slot_meta = + self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?; + + // This is a newly inserted slot/orphan so run the chaining logic to link it to a + // newly discovered parent + chain_new_slot_to_prev_slot( + &mut prev_slot_meta.borrow_mut(), + slot, + &mut meta_mut, + ); + + // If the parent of `slot` is a newly inserted orphan, insert it into the orphans + // column family + if is_orphan(&RefCell::borrow(&*prev_slot_meta)) { + write_batch.put::(prev_slot, &true)?; + } + } + } + + // At this point this slot has received a parent, so it's no longer an orphan + if was_orphan_slot { + write_batch.delete::(slot)?; + } + } + + // If this is a newly completed slot and the parent is connected, then the + // slot is now connected. Mark the slot as connected, and then traverse the + // children to update their parent_connected and connected status. + let should_propagate_is_connected = + is_newly_completed_slot(&RefCell::borrow(meta), meta_backup) + && RefCell::borrow(meta).is_parent_connected(); + + if should_propagate_is_connected { + meta.borrow_mut().set_connected(); + self.traverse_children_mut( + meta, + working_set, + new_chained_slots, + SlotMeta::set_parent_connected, + )?; + } + + Ok(()) + } + + /// Traverse all the children (direct and indirect) of `slot_meta`, and apply + /// `slot_function` to each of the children (but not `slot_meta`). + /// + /// Arguments: + /// `db`: the blockstore db that stores shreds and their metadata. + /// `slot_meta`: the SlotMeta of the above `slot`. + /// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used + /// to traverse the graph. + /// `passed_visited_slots`: all the traversed slots which have passed the + /// slot_function. This may also include the input `slot`. + /// `slot_function`: a function which updates the SlotMeta of the visisted + /// slots and determine whether to further traverse the children slots of + /// a given slot. + fn traverse_children_mut( + &self, + slot_meta: &Rc>, + working_set: &HashMap, + passed_visisted_slots: &mut HashMap>>, + slot_function: F, + ) -> Result<()> + where + F: Fn(&mut SlotMeta) -> bool, + { + let slot_meta = slot_meta.borrow(); + let mut next_slots: VecDeque = slot_meta.next_slots.to_vec().into(); + while !next_slots.is_empty() { + let slot = next_slots.pop_front().unwrap(); + let meta_ref = + self.find_slot_meta_else_create(working_set, passed_visisted_slots, slot)?; + let mut meta = meta_ref.borrow_mut(); + if slot_function(&mut meta) { + meta.next_slots + .iter() + .for_each(|slot| next_slots.push_back(*slot)); + } + } + Ok(()) + } + + /// Obtain the SlotMeta from the in-memory slot_meta_working_set or load + /// it from the database if it does not exist in slot_meta_working_set. + /// + /// In case none of the above has the specified SlotMeta, a new one will + /// be created. + /// + /// Note that this function will also update the parent slot of the specified + /// slot. + /// + /// Arguments: + /// - `db`: the database + /// - `slot_meta_working_set`: a in-memory structure for storing the cached + /// SlotMeta. + /// - `slot`: the slot for loading its meta. + /// - `parent_slot`: the parent slot to be assigned to the specified slot meta + /// + /// This function returns the matched `SlotMetaWorkingSetEntry`. If such entry + /// does not exist in the database, a new entry will be created. + fn get_slot_meta_entry<'a>( + &self, + slot_meta_working_set: &'a mut HashMap, + slot: Slot, + parent_slot: Slot, + ) -> &'a mut SlotMetaWorkingSetEntry { + // Check if we've already inserted the slot metadata for this shred's slot + slot_meta_working_set.entry(slot).or_insert_with(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = self + .meta_cf + .get(slot) + .expect("Expect database get to succeed") + { + let backup = Some(meta.clone()); + // If parent_slot == None, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if is_orphan(&meta) { + meta.parent_slot = Some(parent_slot); + } + + SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup) + } else { + SlotMetaWorkingSetEntry::new( + Rc::new(RefCell::new(SlotMeta::new(slot, Some(parent_slot)))), + None, + ) + } + }) + } + + /// Returns the `SlotMeta` with the specified `slot_index`. The resulting + /// `SlotMeta` could be either from the cache or from the DB. Specifically, + /// the function: + /// + /// 1) Finds the slot metadata in the cache of dirty slot metadata we've + /// previously touched, otherwise: + /// 2) Searches the database for that slot metadata. If still no luck, then: + /// 3) Create a dummy orphan slot in the database. + /// + /// Also see [`find_slot_meta_in_cached_state`] and [`find_slot_meta_in_db_else_create`]. + fn find_slot_meta_else_create<'a>( + &self, + working_set: &'a HashMap, + chained_slots: &'a mut HashMap>>, + slot_index: u64, + ) -> Result>> { + let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index); + if let Some(slot) = result { + Ok(slot) + } else { + self.find_slot_meta_in_db_else_create(slot_index, chained_slots) + } + } + + /// A helper function to [`find_slot_meta_else_create`] that searches the + /// `SlotMeta` based on the specified `slot` in `db` and updates `insert_map`. + /// + /// If the specified `db` does not contain a matched entry, then it will create + /// a dummy orphan slot in the database. + fn find_slot_meta_in_db_else_create( + &self, + slot: Slot, + insert_map: &mut HashMap>>, + ) -> Result>> { + if let Some(slot_meta) = self.meta_cf.get(slot)? { + insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); + } else { + // If this slot doesn't exist, make a orphan slot. This way we + // remember which slots chained to this one when we eventually get a real shred + // for this slot + insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot)))); + } + Ok(insert_map.get(&slot).unwrap().clone()) + } + + fn get_index_meta_entry<'a>( + &self, + slot: Slot, + index_working_set: &'a mut HashMap, + index_meta_time_us: &mut u64, + ) -> &'a mut IndexMetaWorkingSetEntry { + let mut total_start = Measure::start("Total elapsed"); + let res = index_working_set.entry(slot).or_insert_with(|| { + let newly_inserted_meta = self + .index_cf + .get(slot) + .unwrap() + .unwrap_or_else(|| Index::new(slot)); + IndexMetaWorkingSetEntry { + index: newly_inserted_meta, + did_insert_occur: false, + } + }); + total_start.stop(); + *index_meta_time_us += total_start.as_us(); + res + } } // Update the `completed_data_indexes` with a new shred `new_shred_index`. If a @@ -3563,78 +3849,6 @@ fn update_slot_meta( ) } -fn get_index_meta_entry<'a>( - db: &Database, - slot: Slot, - index_working_set: &'a mut HashMap, - index_meta_time_us: &mut u64, -) -> &'a mut IndexMetaWorkingSetEntry { - let index_cf = db.column::(); - let mut total_start = Measure::start("Total elapsed"); - let res = index_working_set.entry(slot).or_insert_with(|| { - let newly_inserted_meta = index_cf - .get(slot) - .unwrap() - .unwrap_or_else(|| Index::new(slot)); - IndexMetaWorkingSetEntry { - index: newly_inserted_meta, - did_insert_occur: false, - } - }); - total_start.stop(); - *index_meta_time_us += total_start.as_us(); - res -} - -/// Obtain the SlotMeta from the in-memory slot_meta_working_set or load -/// it from the database if it does not exist in slot_meta_working_set. -/// -/// In case none of the above has the specified SlotMeta, a new one will -/// be created. -/// -/// Note that this function will also update the parent slot of the specified -/// slot. -/// -/// Arguments: -/// - `db`: the database -/// - `slot_meta_working_set`: a in-memory structure for storing the cached -/// SlotMeta. -/// - `slot`: the slot for loading its meta. -/// - `parent_slot`: the parent slot to be assigned to the specified slot meta -/// -/// This function returns the matched `SlotMetaWorkingSetEntry`. If such entry -/// does not exist in the database, a new entry will be created. -fn get_slot_meta_entry<'a>( - db: &Database, - slot_meta_working_set: &'a mut HashMap, - slot: Slot, - parent_slot: Slot, -) -> &'a mut SlotMetaWorkingSetEntry { - let meta_cf = db.column::(); - - // Check if we've already inserted the slot metadata for this shred's slot - slot_meta_working_set.entry(slot).or_insert_with(|| { - // Store a 2-tuple of the metadata (working copy, backup copy) - if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") { - let backup = Some(meta.clone()); - // If parent_slot == None, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if is_orphan(&meta) { - meta.parent_slot = Some(parent_slot); - } - - SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup) - } else { - SlotMetaWorkingSetEntry::new( - Rc::new(RefCell::new(SlotMeta::new(slot, Some(parent_slot)))), - None, - ) - } - }) -} - fn get_last_hash<'a>(iterator: impl Iterator + 'a) -> Option { iterator.last().map(|entry| entry.hash) } @@ -3727,51 +3941,6 @@ fn commit_slot_meta_working_set( Ok((should_signal, newly_completed_slots)) } -/// Returns the `SlotMeta` with the specified `slot_index`. The resulting -/// `SlotMeta` could be either from the cache or from the DB. Specifically, -/// the function: -/// -/// 1) Finds the slot metadata in the cache of dirty slot metadata we've -/// previously touched, otherwise: -/// 2) Searches the database for that slot metadata. If still no luck, then: -/// 3) Create a dummy orphan slot in the database. -/// -/// Also see [`find_slot_meta_in_cached_state`] and [`find_slot_meta_in_db_else_create`]. -fn find_slot_meta_else_create<'a>( - db: &Database, - working_set: &'a HashMap, - chained_slots: &'a mut HashMap>>, - slot_index: u64, -) -> Result>> { - let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index); - if let Some(slot) = result { - Ok(slot) - } else { - find_slot_meta_in_db_else_create(db, slot_index, chained_slots) - } -} - -/// A helper function to [`find_slot_meta_else_create`] that searches the -/// `SlotMeta` based on the specified `slot` in `db` and updates `insert_map`. -/// -/// If the specified `db` does not contain a matched entry, then it will create -/// a dummy orphan slot in the database. -fn find_slot_meta_in_db_else_create( - db: &Database, - slot: Slot, - insert_map: &mut HashMap>>, -) -> Result>> { - if let Some(slot_meta) = db.column::().get(slot)? { - insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); - } else { - // If this slot doesn't exist, make a orphan slot. This way we - // remember which slots chained to this one when we eventually get a real shred - // for this slot - insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot)))); - } - Ok(insert_map.get(&slot).unwrap().clone()) -} - /// Returns the `SlotMeta` of the specified `slot` from the two cached states: /// `working_set` and `chained_slots`. If both contain the `SlotMeta`, then /// the latest one from the `working_set` will be returned. @@ -3787,182 +3956,6 @@ fn find_slot_meta_in_cached_state<'a>( } } -/// For each entry in `working_set` whose `did_insert_occur` is true, this -/// function handles its chaining effect by updating the SlotMeta of both -/// the slot and its parent slot to reflect the slot descends from the -/// parent slot. In addition, when a slot is newly connected, it also -/// checks whether any of its direct and indirect children slots are connected -/// or not. -/// -/// This function may update column families [`cf::SlotMeta`] and -/// [`cf::Orphans`]. -/// -/// For more information about the chaining, check the previous discussion here: -/// https://github.com/solana-labs/solana/pull/2253 -/// -/// Arguments: -/// - `db`: the blockstore db that stores both shreds and their metadata. -/// - `write_batch`: the write batch which includes all the updates of the -/// the current write and ensures their atomicity. -/// - `working_set`: a slot-id to SlotMetaWorkingSetEntry map. This function -/// will remove all entries which insertion did not actually occur. -fn handle_chaining( - db: &Database, - write_batch: &mut WriteBatch, - working_set: &mut HashMap, -) -> Result<()> { - // Handle chaining for all the SlotMetas that were inserted into - working_set.retain(|_, entry| entry.did_insert_occur); - let mut new_chained_slots = HashMap::new(); - let working_set_slots: Vec<_> = working_set.keys().collect(); - for slot in working_set_slots { - handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?; - } - - // Write all the newly changed slots in new_chained_slots to the write_batch - for (slot, meta) in new_chained_slots.iter() { - let meta: &SlotMeta = &RefCell::borrow(meta); - write_batch.put::(*slot, meta)?; - } - Ok(()) -} - -/// A helper function of handle_chaining which handles the chaining based -/// on the `SlotMetaWorkingSetEntry` of the specified `slot`. Specifically, -/// it handles the following two things: -/// -/// 1. based on the `SlotMetaWorkingSetEntry` for `slot`, check if `slot` -/// did not previously have a parent slot but does now. If `slot` satisfies -/// this condition, update the Orphan property of both `slot` and its parent -/// slot based on their current orphan status. Specifically: -/// - updates the orphan property of slot to no longer be an orphan because -/// it has a parent. -/// - adds the parent to the orphan column family if the parent's parent is -/// currently unknown. -/// -/// 2. if the `SlotMetaWorkingSetEntry` for `slot` indicates this slot -/// is newly connected to a parent slot, then this function will update -/// the is_connected property of all its direct and indirect children slots. -/// -/// This function may update column family [`cf::Orphans`] and indirectly -/// update SlotMeta from its output parameter `new_chained_slots`. -/// -/// Arguments: -/// `db`: the underlying db for blockstore -/// `write_batch`: the write batch which includes all the updates of the -/// the current write and ensures their atomicity. -/// `working_set`: the working set which include the specified `slot` -/// `new_chained_slots`: an output parameter which includes all the slots -/// which connectivity have been updated. -/// `slot`: the slot which we want to handle its chaining effect. -fn handle_chaining_for_slot( - db: &Database, - write_batch: &mut WriteBatch, - working_set: &HashMap, - new_chained_slots: &mut HashMap>>, - slot: Slot, -) -> Result<()> { - let slot_meta_entry = working_set - .get(&slot) - .expect("Slot must exist in the working_set hashmap"); - - let meta = &slot_meta_entry.new_slot_meta; - let meta_backup = &slot_meta_entry.old_slot_meta; - { - let mut meta_mut = meta.borrow_mut(); - let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); - - // If: - // 1) This is a new slot - // 2) slot != 0 - // then try to chain this slot to a previous slot - if slot != 0 && meta_mut.parent_slot.is_some() { - let prev_slot = meta_mut.parent_slot.unwrap(); - - // Check if the slot represented by meta_mut is either a new slot or a orphan. - // In both cases we need to run the chaining logic b/c the parent on the slot was - // previously unknown. - if meta_backup.is_none() || was_orphan_slot { - let prev_slot_meta = - find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?; - - // This is a newly inserted slot/orphan so run the chaining logic to link it to a - // newly discovered parent - chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut); - - // If the parent of `slot` is a newly inserted orphan, insert it into the orphans - // column family - if is_orphan(&RefCell::borrow(&*prev_slot_meta)) { - write_batch.put::(prev_slot, &true)?; - } - } - } - - // At this point this slot has received a parent, so it's no longer an orphan - if was_orphan_slot { - write_batch.delete::(slot)?; - } - } - - // If this is a newly completed slot and the parent is connected, then the - // slot is now connected. Mark the slot as connected, and then traverse the - // children to update their parent_connected and connected status. - let should_propagate_is_connected = - is_newly_completed_slot(&RefCell::borrow(meta), meta_backup) - && RefCell::borrow(meta).is_parent_connected(); - - if should_propagate_is_connected { - meta.borrow_mut().set_connected(); - traverse_children_mut( - db, - meta, - working_set, - new_chained_slots, - SlotMeta::set_parent_connected, - )?; - } - - Ok(()) -} - -/// Traverse all the children (direct and indirect) of `slot_meta`, and apply -/// `slot_function` to each of the children (but not `slot_meta`). -/// -/// Arguments: -/// `db`: the blockstore db that stores shreds and their metadata. -/// `slot_meta`: the SlotMeta of the above `slot`. -/// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used -/// to traverse the graph. -/// `passed_visited_slots`: all the traversed slots which have passed the -/// slot_function. This may also include the input `slot`. -/// `slot_function`: a function which updates the SlotMeta of the visisted -/// slots and determine whether to further traverse the children slots of -/// a given slot. -fn traverse_children_mut( - db: &Database, - slot_meta: &Rc>, - working_set: &HashMap, - passed_visisted_slots: &mut HashMap>>, - slot_function: F, -) -> Result<()> -where - F: Fn(&mut SlotMeta) -> bool, -{ - let slot_meta = slot_meta.borrow(); - let mut next_slots: VecDeque = slot_meta.next_slots.to_vec().into(); - while !next_slots.is_empty() { - let slot = next_slots.pop_front().unwrap(); - let meta_ref = find_slot_meta_else_create(db, working_set, passed_visisted_slots, slot)?; - let mut meta = meta_ref.borrow_mut(); - if slot_function(&mut meta) { - meta.next_slots - .iter() - .for_each(|slot| next_slots.push_back(*slot)); - } - } - Ok(()) -} - fn is_orphan(meta: &SlotMeta) -> bool { // If we have no parent, then this is the head of a detached chain of // slots