Make use of Blockstore's LedgerColumn objects (#32506)
* Move functions that take a &Database under impl Blockstore {...} * Replace &Database with &self in those functions and update callers * Use LedgerColumn's from Blockstore instead of re-fetching * Add missing roots LedgerColumn and have it report metrics like others * Remove several redundant comments
This commit is contained in:
parent
70a7bae53e
commit
cd39a6afd3
|
@ -176,6 +176,7 @@ pub struct Blockstore {
|
|||
meta_cf: LedgerColumn<cf::SlotMeta>,
|
||||
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
|
||||
duplicate_slots_cf: LedgerColumn<cf::DuplicateSlots>,
|
||||
roots_cf: LedgerColumn<cf::Root>,
|
||||
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
|
||||
orphans_cf: LedgerColumn<cf::Orphans>,
|
||||
index_cf: LedgerColumn<cf::Index>,
|
||||
|
@ -274,20 +275,13 @@ impl Blockstore {
|
|||
info!("Opening database at {:?}", blockstore_path);
|
||||
let db = Database::open(&blockstore_path, options)?;
|
||||
|
||||
// Create the metadata column family
|
||||
let meta_cf = db.column();
|
||||
|
||||
// Create the dead slots column family
|
||||
let dead_slots_cf = db.column();
|
||||
let duplicate_slots_cf = db.column();
|
||||
let roots_cf = db.column();
|
||||
let erasure_meta_cf = db.column();
|
||||
|
||||
// Create the orphans column family. An "orphan" is defined as
|
||||
// the head of a detached chain of slots, i.e. a slot with no
|
||||
// known parent
|
||||
let orphans_cf = db.column();
|
||||
let index_cf = db.column();
|
||||
|
||||
let data_shred_cf = db.column();
|
||||
let code_shred_cf = db.column();
|
||||
let transaction_status_cf = db.column();
|
||||
|
@ -336,6 +330,7 @@ impl Blockstore {
|
|||
meta_cf,
|
||||
dead_slots_cf,
|
||||
duplicate_slots_cf,
|
||||
roots_cf,
|
||||
erasure_meta_cf,
|
||||
orphans_cf,
|
||||
index_cf,
|
||||
|
@ -705,6 +700,7 @@ impl Blockstore {
|
|||
self.meta_cf.submit_rocksdb_cf_metrics();
|
||||
self.dead_slots_cf.submit_rocksdb_cf_metrics();
|
||||
self.duplicate_slots_cf.submit_rocksdb_cf_metrics();
|
||||
self.roots_cf.submit_rocksdb_cf_metrics();
|
||||
self.erasure_meta_cf.submit_rocksdb_cf_metrics();
|
||||
self.orphans_cf.submit_rocksdb_cf_metrics();
|
||||
self.index_cf.submit_rocksdb_cf_metrics();
|
||||
|
@ -724,14 +720,12 @@ impl Blockstore {
|
|||
}
|
||||
|
||||
fn try_shred_recovery(
|
||||
db: &Database,
|
||||
&self,
|
||||
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
prev_inserted_shreds: &HashMap<ShredId, Shred>,
|
||||
reed_solomon_cache: &ReedSolomonCache,
|
||||
) -> Vec<Shred> {
|
||||
let data_cf = db.column::<cf::ShredData>();
|
||||
let code_cf = db.column::<cf::ShredCode>();
|
||||
let mut recovered_shreds = vec![];
|
||||
// Recovery rules:
|
||||
// 1. Only try recovery around indexes for which new data or coding shreds are received
|
||||
|
@ -749,8 +743,8 @@ impl Blockstore {
|
|||
erasure_meta,
|
||||
prev_inserted_shreds,
|
||||
&mut recovered_shreds,
|
||||
&data_cf,
|
||||
&code_cf,
|
||||
&self.data_shred_cf,
|
||||
&self.code_shred_cf,
|
||||
reed_solomon_cache,
|
||||
);
|
||||
}
|
||||
|
@ -839,8 +833,7 @@ impl Blockstore {
|
|||
start.stop();
|
||||
metrics.insert_lock_elapsed_us += start.as_us();
|
||||
|
||||
let db = &*self.db;
|
||||
let mut write_batch = db.batch()?;
|
||||
let mut write_batch = self.db.batch()?;
|
||||
|
||||
let mut just_inserted_shreds = HashMap::with_capacity(shreds.len());
|
||||
let mut erasure_metas = HashMap::new();
|
||||
|
@ -917,8 +910,7 @@ impl Blockstore {
|
|||
metrics.insert_shreds_elapsed_us += start.as_us();
|
||||
let mut start = Measure::start("Shred recovery");
|
||||
if let Some(leader_schedule_cache) = leader_schedule {
|
||||
let recovered_shreds = Self::try_shred_recovery(
|
||||
db,
|
||||
let recovered_shreds = self.try_shred_recovery(
|
||||
&erasure_metas,
|
||||
&mut index_working_set,
|
||||
&just_inserted_shreds,
|
||||
|
@ -997,7 +989,7 @@ impl Blockstore {
|
|||
let mut start = Measure::start("Shred recovery");
|
||||
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
|
||||
// drop the others
|
||||
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
|
||||
self.handle_chaining(&mut write_batch, &mut slot_meta_working_set)?;
|
||||
start.stop();
|
||||
metrics.chaining_elapsed_us += start.as_us();
|
||||
|
||||
|
@ -1182,7 +1174,7 @@ impl Blockstore {
|
|||
let shred_index = u64::from(shred.index());
|
||||
|
||||
let index_meta_working_set_entry =
|
||||
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us);
|
||||
self.get_index_meta_entry(slot, index_working_set, index_meta_time_us);
|
||||
|
||||
let index_meta = &mut index_meta_working_set_entry.index;
|
||||
|
||||
|
@ -1354,11 +1346,9 @@ impl Blockstore {
|
|||
let shred_index = u64::from(shred.index());
|
||||
|
||||
let index_meta_working_set_entry =
|
||||
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time_us);
|
||||
|
||||
self.get_index_meta_entry(slot, index_working_set, index_meta_time_us);
|
||||
let index_meta = &mut index_meta_working_set_entry.index;
|
||||
let slot_meta_entry = get_slot_meta_entry(
|
||||
&self.db,
|
||||
let slot_meta_entry = self.get_slot_meta_entry(
|
||||
slot_meta_working_set,
|
||||
slot,
|
||||
shred
|
||||
|
@ -1691,10 +1681,9 @@ impl Blockstore {
|
|||
buffer: &mut [u8],
|
||||
) -> Result<(u64, usize)> {
|
||||
let _lock = self.check_lowest_cleanup_slot(slot)?;
|
||||
let meta_cf = self.db.column::<cf::SlotMeta>();
|
||||
let mut buffer_offset = 0;
|
||||
let mut last_index = 0;
|
||||
if let Some(meta) = meta_cf.get(slot)? {
|
||||
if let Some(meta) = self.meta_cf.get(slot)? {
|
||||
if !meta.is_full() {
|
||||
warn!("The slot is not yet full. Will not return any shreds");
|
||||
return Ok((last_index, buffer_offset));
|
||||
|
@ -2008,8 +1997,7 @@ impl Blockstore {
|
|||
slot: Slot,
|
||||
require_previous_blockhash: bool,
|
||||
) -> Result<VersionedConfirmedBlock> {
|
||||
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
||||
let Some(slot_meta) = slot_meta_cf.get(slot)? else {
|
||||
let Some(slot_meta) = self.meta_cf.get(slot)? else {
|
||||
info!("SlotMeta not found for slot {}", slot);
|
||||
return Err(BlockstoreError::SlotUnavailable);
|
||||
};
|
||||
|
@ -2941,8 +2929,7 @@ impl Blockstore {
|
|||
slot: Slot,
|
||||
start_index: u64,
|
||||
) -> Result<(CompletedRanges, Option<SlotMeta>)> {
|
||||
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
||||
let slot_meta = slot_meta_cf.get(slot)?;
|
||||
let slot_meta = self.meta_cf.get(slot)?;
|
||||
if slot_meta.is_none() {
|
||||
return Ok((vec![], slot_meta));
|
||||
}
|
||||
|
@ -3353,8 +3340,8 @@ impl Blockstore {
|
|||
/// Note that the reported size does not include those recently inserted
|
||||
/// shreds that are still in memory.
|
||||
pub fn total_data_shred_storage_size(&self) -> Result<i64> {
|
||||
let shred_data_cf = self.db.column::<cf::ShredData>();
|
||||
shred_data_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
|
||||
self.data_shred_cf
|
||||
.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
|
||||
}
|
||||
|
||||
/// Returns the total physical storage size contributed by all coding shreds.
|
||||
|
@ -3362,8 +3349,8 @@ impl Blockstore {
|
|||
/// Note that the reported size does not include those recently inserted
|
||||
/// shreds that are still in memory.
|
||||
pub fn total_coding_shred_storage_size(&self) -> Result<i64> {
|
||||
let shred_code_cf = self.db.column::<cf::ShredCode>();
|
||||
shred_code_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
|
||||
self.code_shred_cf
|
||||
.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
|
||||
}
|
||||
|
||||
/// Returns whether the blockstore has primary (read and write) access
|
||||
|
@ -3491,6 +3478,305 @@ impl Blockstore {
|
|||
self.db.write(write_batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For each entry in `working_set` whose `did_insert_occur` is true, this
|
||||
/// function handles its chaining effect by updating the SlotMeta of both
|
||||
/// the slot and its parent slot to reflect the slot descends from the
|
||||
/// parent slot. In addition, when a slot is newly connected, it also
|
||||
/// checks whether any of its direct and indirect children slots are connected
|
||||
/// or not.
|
||||
///
|
||||
/// This function may update column families [`cf::SlotMeta`] and
|
||||
/// [`cf::Orphans`].
|
||||
///
|
||||
/// For more information about the chaining, check the previous discussion here:
|
||||
/// https://github.com/solana-labs/solana/pull/2253
|
||||
///
|
||||
/// Arguments:
|
||||
/// - `db`: the blockstore db that stores both shreds and their metadata.
|
||||
/// - `write_batch`: the write batch which includes all the updates of the
|
||||
/// the current write and ensures their atomicity.
|
||||
/// - `working_set`: a slot-id to SlotMetaWorkingSetEntry map. This function
|
||||
/// will remove all entries which insertion did not actually occur.
|
||||
fn handle_chaining(
|
||||
&self,
|
||||
write_batch: &mut WriteBatch,
|
||||
working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
) -> Result<()> {
|
||||
// Handle chaining for all the SlotMetas that were inserted into
|
||||
working_set.retain(|_, entry| entry.did_insert_occur);
|
||||
let mut new_chained_slots = HashMap::new();
|
||||
let working_set_slots: Vec<_> = working_set.keys().collect();
|
||||
for slot in working_set_slots {
|
||||
self.handle_chaining_for_slot(write_batch, working_set, &mut new_chained_slots, *slot)?;
|
||||
}
|
||||
|
||||
// Write all the newly changed slots in new_chained_slots to the write_batch
|
||||
for (slot, meta) in new_chained_slots.iter() {
|
||||
let meta: &SlotMeta = &RefCell::borrow(meta);
|
||||
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A helper function of handle_chaining which handles the chaining based
|
||||
/// on the `SlotMetaWorkingSetEntry` of the specified `slot`. Specifically,
|
||||
/// it handles the following two things:
|
||||
///
|
||||
/// 1. based on the `SlotMetaWorkingSetEntry` for `slot`, check if `slot`
|
||||
/// did not previously have a parent slot but does now. If `slot` satisfies
|
||||
/// this condition, update the Orphan property of both `slot` and its parent
|
||||
/// slot based on their current orphan status. Specifically:
|
||||
/// - updates the orphan property of slot to no longer be an orphan because
|
||||
/// it has a parent.
|
||||
/// - adds the parent to the orphan column family if the parent's parent is
|
||||
/// currently unknown.
|
||||
///
|
||||
/// 2. if the `SlotMetaWorkingSetEntry` for `slot` indicates this slot
|
||||
/// is newly connected to a parent slot, then this function will update
|
||||
/// the is_connected property of all its direct and indirect children slots.
|
||||
///
|
||||
/// This function may update column family [`cf::Orphans`] and indirectly
|
||||
/// update SlotMeta from its output parameter `new_chained_slots`.
|
||||
///
|
||||
/// Arguments:
|
||||
/// `db`: the underlying db for blockstore
|
||||
/// `write_batch`: the write batch which includes all the updates of the
|
||||
/// the current write and ensures their atomicity.
|
||||
/// `working_set`: the working set which include the specified `slot`
|
||||
/// `new_chained_slots`: an output parameter which includes all the slots
|
||||
/// which connectivity have been updated.
|
||||
/// `slot`: the slot which we want to handle its chaining effect.
|
||||
fn handle_chaining_for_slot(
|
||||
&self,
|
||||
write_batch: &mut WriteBatch,
|
||||
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot: Slot,
|
||||
) -> Result<()> {
|
||||
let slot_meta_entry = working_set
|
||||
.get(&slot)
|
||||
.expect("Slot must exist in the working_set hashmap");
|
||||
|
||||
let meta = &slot_meta_entry.new_slot_meta;
|
||||
let meta_backup = &slot_meta_entry.old_slot_meta;
|
||||
{
|
||||
let mut meta_mut = meta.borrow_mut();
|
||||
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
|
||||
|
||||
// If:
|
||||
// 1) This is a new slot
|
||||
// 2) slot != 0
|
||||
// then try to chain this slot to a previous slot
|
||||
if slot != 0 && meta_mut.parent_slot.is_some() {
|
||||
let prev_slot = meta_mut.parent_slot.unwrap();
|
||||
|
||||
// Check if the slot represented by meta_mut is either a new slot or a orphan.
|
||||
// In both cases we need to run the chaining logic b/c the parent on the slot was
|
||||
// previously unknown.
|
||||
if meta_backup.is_none() || was_orphan_slot {
|
||||
let prev_slot_meta =
|
||||
self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?;
|
||||
|
||||
// This is a newly inserted slot/orphan so run the chaining logic to link it to a
|
||||
// newly discovered parent
|
||||
chain_new_slot_to_prev_slot(
|
||||
&mut prev_slot_meta.borrow_mut(),
|
||||
slot,
|
||||
&mut meta_mut,
|
||||
);
|
||||
|
||||
// If the parent of `slot` is a newly inserted orphan, insert it into the orphans
|
||||
// column family
|
||||
if is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
|
||||
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point this slot has received a parent, so it's no longer an orphan
|
||||
if was_orphan_slot {
|
||||
write_batch.delete::<cf::Orphans>(slot)?;
|
||||
}
|
||||
}
|
||||
|
||||
// If this is a newly completed slot and the parent is connected, then the
|
||||
// slot is now connected. Mark the slot as connected, and then traverse the
|
||||
// children to update their parent_connected and connected status.
|
||||
let should_propagate_is_connected =
|
||||
is_newly_completed_slot(&RefCell::borrow(meta), meta_backup)
|
||||
&& RefCell::borrow(meta).is_parent_connected();
|
||||
|
||||
if should_propagate_is_connected {
|
||||
meta.borrow_mut().set_connected();
|
||||
self.traverse_children_mut(
|
||||
meta,
|
||||
working_set,
|
||||
new_chained_slots,
|
||||
SlotMeta::set_parent_connected,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Traverse all the children (direct and indirect) of `slot_meta`, and apply
|
||||
/// `slot_function` to each of the children (but not `slot_meta`).
|
||||
///
|
||||
/// Arguments:
|
||||
/// `db`: the blockstore db that stores shreds and their metadata.
|
||||
/// `slot_meta`: the SlotMeta of the above `slot`.
|
||||
/// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used
|
||||
/// to traverse the graph.
|
||||
/// `passed_visited_slots`: all the traversed slots which have passed the
|
||||
/// slot_function. This may also include the input `slot`.
|
||||
/// `slot_function`: a function which updates the SlotMeta of the visisted
|
||||
/// slots and determine whether to further traverse the children slots of
|
||||
/// a given slot.
|
||||
fn traverse_children_mut<F>(
|
||||
&self,
|
||||
slot_meta: &Rc<RefCell<SlotMeta>>,
|
||||
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
passed_visisted_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot_function: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(&mut SlotMeta) -> bool,
|
||||
{
|
||||
let slot_meta = slot_meta.borrow();
|
||||
let mut next_slots: VecDeque<u64> = slot_meta.next_slots.to_vec().into();
|
||||
while !next_slots.is_empty() {
|
||||
let slot = next_slots.pop_front().unwrap();
|
||||
let meta_ref =
|
||||
self.find_slot_meta_else_create(working_set, passed_visisted_slots, slot)?;
|
||||
let mut meta = meta_ref.borrow_mut();
|
||||
if slot_function(&mut meta) {
|
||||
meta.next_slots
|
||||
.iter()
|
||||
.for_each(|slot| next_slots.push_back(*slot));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Obtain the SlotMeta from the in-memory slot_meta_working_set or load
|
||||
/// it from the database if it does not exist in slot_meta_working_set.
|
||||
///
|
||||
/// In case none of the above has the specified SlotMeta, a new one will
|
||||
/// be created.
|
||||
///
|
||||
/// Note that this function will also update the parent slot of the specified
|
||||
/// slot.
|
||||
///
|
||||
/// Arguments:
|
||||
/// - `db`: the database
|
||||
/// - `slot_meta_working_set`: a in-memory structure for storing the cached
|
||||
/// SlotMeta.
|
||||
/// - `slot`: the slot for loading its meta.
|
||||
/// - `parent_slot`: the parent slot to be assigned to the specified slot meta
|
||||
///
|
||||
/// This function returns the matched `SlotMetaWorkingSetEntry`. If such entry
|
||||
/// does not exist in the database, a new entry will be created.
|
||||
fn get_slot_meta_entry<'a>(
|
||||
&self,
|
||||
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
slot: Slot,
|
||||
parent_slot: Slot,
|
||||
) -> &'a mut SlotMetaWorkingSetEntry {
|
||||
// Check if we've already inserted the slot metadata for this shred's slot
|
||||
slot_meta_working_set.entry(slot).or_insert_with(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = self
|
||||
.meta_cf
|
||||
.get(slot)
|
||||
.expect("Expect database get to succeed")
|
||||
{
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == None, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if is_orphan(&meta) {
|
||||
meta.parent_slot = Some(parent_slot);
|
||||
}
|
||||
|
||||
SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup)
|
||||
} else {
|
||||
SlotMetaWorkingSetEntry::new(
|
||||
Rc::new(RefCell::new(SlotMeta::new(slot, Some(parent_slot)))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the `SlotMeta` with the specified `slot_index`. The resulting
|
||||
/// `SlotMeta` could be either from the cache or from the DB. Specifically,
|
||||
/// the function:
|
||||
///
|
||||
/// 1) Finds the slot metadata in the cache of dirty slot metadata we've
|
||||
/// previously touched, otherwise:
|
||||
/// 2) Searches the database for that slot metadata. If still no luck, then:
|
||||
/// 3) Create a dummy orphan slot in the database.
|
||||
///
|
||||
/// Also see [`find_slot_meta_in_cached_state`] and [`find_slot_meta_in_db_else_create`].
|
||||
fn find_slot_meta_else_create<'a>(
|
||||
&self,
|
||||
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
chained_slots: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot_index: u64,
|
||||
) -> Result<Rc<RefCell<SlotMeta>>> {
|
||||
let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index);
|
||||
if let Some(slot) = result {
|
||||
Ok(slot)
|
||||
} else {
|
||||
self.find_slot_meta_in_db_else_create(slot_index, chained_slots)
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper function to [`find_slot_meta_else_create`] that searches the
|
||||
/// `SlotMeta` based on the specified `slot` in `db` and updates `insert_map`.
|
||||
///
|
||||
/// If the specified `db` does not contain a matched entry, then it will create
|
||||
/// a dummy orphan slot in the database.
|
||||
fn find_slot_meta_in_db_else_create(
|
||||
&self,
|
||||
slot: Slot,
|
||||
insert_map: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
) -> Result<Rc<RefCell<SlotMeta>>> {
|
||||
if let Some(slot_meta) = self.meta_cf.get(slot)? {
|
||||
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
|
||||
} else {
|
||||
// If this slot doesn't exist, make a orphan slot. This way we
|
||||
// remember which slots chained to this one when we eventually get a real shred
|
||||
// for this slot
|
||||
insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot))));
|
||||
}
|
||||
Ok(insert_map.get(&slot).unwrap().clone())
|
||||
}
|
||||
|
||||
fn get_index_meta_entry<'a>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
index_meta_time_us: &mut u64,
|
||||
) -> &'a mut IndexMetaWorkingSetEntry {
|
||||
let mut total_start = Measure::start("Total elapsed");
|
||||
let res = index_working_set.entry(slot).or_insert_with(|| {
|
||||
let newly_inserted_meta = self
|
||||
.index_cf
|
||||
.get(slot)
|
||||
.unwrap()
|
||||
.unwrap_or_else(|| Index::new(slot));
|
||||
IndexMetaWorkingSetEntry {
|
||||
index: newly_inserted_meta,
|
||||
did_insert_occur: false,
|
||||
}
|
||||
});
|
||||
total_start.stop();
|
||||
*index_meta_time_us += total_start.as_us();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
|
||||
|
@ -3563,78 +3849,6 @@ fn update_slot_meta(
|
|||
)
|
||||
}
|
||||
|
||||
fn get_index_meta_entry<'a>(
|
||||
db: &Database,
|
||||
slot: Slot,
|
||||
index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
index_meta_time_us: &mut u64,
|
||||
) -> &'a mut IndexMetaWorkingSetEntry {
|
||||
let index_cf = db.column::<cf::Index>();
|
||||
let mut total_start = Measure::start("Total elapsed");
|
||||
let res = index_working_set.entry(slot).or_insert_with(|| {
|
||||
let newly_inserted_meta = index_cf
|
||||
.get(slot)
|
||||
.unwrap()
|
||||
.unwrap_or_else(|| Index::new(slot));
|
||||
IndexMetaWorkingSetEntry {
|
||||
index: newly_inserted_meta,
|
||||
did_insert_occur: false,
|
||||
}
|
||||
});
|
||||
total_start.stop();
|
||||
*index_meta_time_us += total_start.as_us();
|
||||
res
|
||||
}
|
||||
|
||||
/// Obtain the SlotMeta from the in-memory slot_meta_working_set or load
|
||||
/// it from the database if it does not exist in slot_meta_working_set.
|
||||
///
|
||||
/// In case none of the above has the specified SlotMeta, a new one will
|
||||
/// be created.
|
||||
///
|
||||
/// Note that this function will also update the parent slot of the specified
|
||||
/// slot.
|
||||
///
|
||||
/// Arguments:
|
||||
/// - `db`: the database
|
||||
/// - `slot_meta_working_set`: a in-memory structure for storing the cached
|
||||
/// SlotMeta.
|
||||
/// - `slot`: the slot for loading its meta.
|
||||
/// - `parent_slot`: the parent slot to be assigned to the specified slot meta
|
||||
///
|
||||
/// This function returns the matched `SlotMetaWorkingSetEntry`. If such entry
|
||||
/// does not exist in the database, a new entry will be created.
|
||||
fn get_slot_meta_entry<'a>(
|
||||
db: &Database,
|
||||
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
slot: Slot,
|
||||
parent_slot: Slot,
|
||||
) -> &'a mut SlotMetaWorkingSetEntry {
|
||||
let meta_cf = db.column::<cf::SlotMeta>();
|
||||
|
||||
// Check if we've already inserted the slot metadata for this shred's slot
|
||||
slot_meta_working_set.entry(slot).or_insert_with(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == None, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if is_orphan(&meta) {
|
||||
meta.parent_slot = Some(parent_slot);
|
||||
}
|
||||
|
||||
SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup)
|
||||
} else {
|
||||
SlotMetaWorkingSetEntry::new(
|
||||
Rc::new(RefCell::new(SlotMeta::new(slot, Some(parent_slot)))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn get_last_hash<'a>(iterator: impl Iterator<Item = &'a Entry> + 'a) -> Option<Hash> {
|
||||
iterator.last().map(|entry| entry.hash)
|
||||
}
|
||||
|
@ -3727,51 +3941,6 @@ fn commit_slot_meta_working_set(
|
|||
Ok((should_signal, newly_completed_slots))
|
||||
}
|
||||
|
||||
/// Returns the `SlotMeta` with the specified `slot_index`. The resulting
|
||||
/// `SlotMeta` could be either from the cache or from the DB. Specifically,
|
||||
/// the function:
|
||||
///
|
||||
/// 1) Finds the slot metadata in the cache of dirty slot metadata we've
|
||||
/// previously touched, otherwise:
|
||||
/// 2) Searches the database for that slot metadata. If still no luck, then:
|
||||
/// 3) Create a dummy orphan slot in the database.
|
||||
///
|
||||
/// Also see [`find_slot_meta_in_cached_state`] and [`find_slot_meta_in_db_else_create`].
|
||||
fn find_slot_meta_else_create<'a>(
|
||||
db: &Database,
|
||||
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
chained_slots: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot_index: u64,
|
||||
) -> Result<Rc<RefCell<SlotMeta>>> {
|
||||
let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index);
|
||||
if let Some(slot) = result {
|
||||
Ok(slot)
|
||||
} else {
|
||||
find_slot_meta_in_db_else_create(db, slot_index, chained_slots)
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper function to [`find_slot_meta_else_create`] that searches the
|
||||
/// `SlotMeta` based on the specified `slot` in `db` and updates `insert_map`.
|
||||
///
|
||||
/// If the specified `db` does not contain a matched entry, then it will create
|
||||
/// a dummy orphan slot in the database.
|
||||
fn find_slot_meta_in_db_else_create(
|
||||
db: &Database,
|
||||
slot: Slot,
|
||||
insert_map: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
) -> Result<Rc<RefCell<SlotMeta>>> {
|
||||
if let Some(slot_meta) = db.column::<cf::SlotMeta>().get(slot)? {
|
||||
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
|
||||
} else {
|
||||
// If this slot doesn't exist, make a orphan slot. This way we
|
||||
// remember which slots chained to this one when we eventually get a real shred
|
||||
// for this slot
|
||||
insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot))));
|
||||
}
|
||||
Ok(insert_map.get(&slot).unwrap().clone())
|
||||
}
|
||||
|
||||
/// Returns the `SlotMeta` of the specified `slot` from the two cached states:
|
||||
/// `working_set` and `chained_slots`. If both contain the `SlotMeta`, then
|
||||
/// the latest one from the `working_set` will be returned.
|
||||
|
@ -3787,182 +3956,6 @@ fn find_slot_meta_in_cached_state<'a>(
|
|||
}
|
||||
}
|
||||
|
||||
/// For each entry in `working_set` whose `did_insert_occur` is true, this
|
||||
/// function handles its chaining effect by updating the SlotMeta of both
|
||||
/// the slot and its parent slot to reflect the slot descends from the
|
||||
/// parent slot. In addition, when a slot is newly connected, it also
|
||||
/// checks whether any of its direct and indirect children slots are connected
|
||||
/// or not.
|
||||
///
|
||||
/// This function may update column families [`cf::SlotMeta`] and
|
||||
/// [`cf::Orphans`].
|
||||
///
|
||||
/// For more information about the chaining, check the previous discussion here:
|
||||
/// https://github.com/solana-labs/solana/pull/2253
|
||||
///
|
||||
/// Arguments:
|
||||
/// - `db`: the blockstore db that stores both shreds and their metadata.
|
||||
/// - `write_batch`: the write batch which includes all the updates of the
|
||||
/// the current write and ensures their atomicity.
|
||||
/// - `working_set`: a slot-id to SlotMetaWorkingSetEntry map. This function
|
||||
/// will remove all entries which insertion did not actually occur.
|
||||
fn handle_chaining(
|
||||
db: &Database,
|
||||
write_batch: &mut WriteBatch,
|
||||
working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
) -> Result<()> {
|
||||
// Handle chaining for all the SlotMetas that were inserted into
|
||||
working_set.retain(|_, entry| entry.did_insert_occur);
|
||||
let mut new_chained_slots = HashMap::new();
|
||||
let working_set_slots: Vec<_> = working_set.keys().collect();
|
||||
for slot in working_set_slots {
|
||||
handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?;
|
||||
}
|
||||
|
||||
// Write all the newly changed slots in new_chained_slots to the write_batch
|
||||
for (slot, meta) in new_chained_slots.iter() {
|
||||
let meta: &SlotMeta = &RefCell::borrow(meta);
|
||||
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A helper function of handle_chaining which handles the chaining based
|
||||
/// on the `SlotMetaWorkingSetEntry` of the specified `slot`. Specifically,
|
||||
/// it handles the following two things:
|
||||
///
|
||||
/// 1. based on the `SlotMetaWorkingSetEntry` for `slot`, check if `slot`
|
||||
/// did not previously have a parent slot but does now. If `slot` satisfies
|
||||
/// this condition, update the Orphan property of both `slot` and its parent
|
||||
/// slot based on their current orphan status. Specifically:
|
||||
/// - updates the orphan property of slot to no longer be an orphan because
|
||||
/// it has a parent.
|
||||
/// - adds the parent to the orphan column family if the parent's parent is
|
||||
/// currently unknown.
|
||||
///
|
||||
/// 2. if the `SlotMetaWorkingSetEntry` for `slot` indicates this slot
|
||||
/// is newly connected to a parent slot, then this function will update
|
||||
/// the is_connected property of all its direct and indirect children slots.
|
||||
///
|
||||
/// This function may update column family [`cf::Orphans`] and indirectly
|
||||
/// update SlotMeta from its output parameter `new_chained_slots`.
|
||||
///
|
||||
/// Arguments:
|
||||
/// `db`: the underlying db for blockstore
|
||||
/// `write_batch`: the write batch which includes all the updates of the
|
||||
/// the current write and ensures their atomicity.
|
||||
/// `working_set`: the working set which include the specified `slot`
|
||||
/// `new_chained_slots`: an output parameter which includes all the slots
|
||||
/// which connectivity have been updated.
|
||||
/// `slot`: the slot which we want to handle its chaining effect.
|
||||
fn handle_chaining_for_slot(
|
||||
db: &Database,
|
||||
write_batch: &mut WriteBatch,
|
||||
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot: Slot,
|
||||
) -> Result<()> {
|
||||
let slot_meta_entry = working_set
|
||||
.get(&slot)
|
||||
.expect("Slot must exist in the working_set hashmap");
|
||||
|
||||
let meta = &slot_meta_entry.new_slot_meta;
|
||||
let meta_backup = &slot_meta_entry.old_slot_meta;
|
||||
{
|
||||
let mut meta_mut = meta.borrow_mut();
|
||||
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
|
||||
|
||||
// If:
|
||||
// 1) This is a new slot
|
||||
// 2) slot != 0
|
||||
// then try to chain this slot to a previous slot
|
||||
if slot != 0 && meta_mut.parent_slot.is_some() {
|
||||
let prev_slot = meta_mut.parent_slot.unwrap();
|
||||
|
||||
// Check if the slot represented by meta_mut is either a new slot or a orphan.
|
||||
// In both cases we need to run the chaining logic b/c the parent on the slot was
|
||||
// previously unknown.
|
||||
if meta_backup.is_none() || was_orphan_slot {
|
||||
let prev_slot_meta =
|
||||
find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?;
|
||||
|
||||
// This is a newly inserted slot/orphan so run the chaining logic to link it to a
|
||||
// newly discovered parent
|
||||
chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut);
|
||||
|
||||
// If the parent of `slot` is a newly inserted orphan, insert it into the orphans
|
||||
// column family
|
||||
if is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
|
||||
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point this slot has received a parent, so it's no longer an orphan
|
||||
if was_orphan_slot {
|
||||
write_batch.delete::<cf::Orphans>(slot)?;
|
||||
}
|
||||
}
|
||||
|
||||
// If this is a newly completed slot and the parent is connected, then the
|
||||
// slot is now connected. Mark the slot as connected, and then traverse the
|
||||
// children to update their parent_connected and connected status.
|
||||
let should_propagate_is_connected =
|
||||
is_newly_completed_slot(&RefCell::borrow(meta), meta_backup)
|
||||
&& RefCell::borrow(meta).is_parent_connected();
|
||||
|
||||
if should_propagate_is_connected {
|
||||
meta.borrow_mut().set_connected();
|
||||
traverse_children_mut(
|
||||
db,
|
||||
meta,
|
||||
working_set,
|
||||
new_chained_slots,
|
||||
SlotMeta::set_parent_connected,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Traverse all the children (direct and indirect) of `slot_meta`, and apply
|
||||
/// `slot_function` to each of the children (but not `slot_meta`).
|
||||
///
|
||||
/// Arguments:
|
||||
/// `db`: the blockstore db that stores shreds and their metadata.
|
||||
/// `slot_meta`: the SlotMeta of the above `slot`.
|
||||
/// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used
|
||||
/// to traverse the graph.
|
||||
/// `passed_visited_slots`: all the traversed slots which have passed the
|
||||
/// slot_function. This may also include the input `slot`.
|
||||
/// `slot_function`: a function which updates the SlotMeta of the visisted
|
||||
/// slots and determine whether to further traverse the children slots of
|
||||
/// a given slot.
|
||||
fn traverse_children_mut<F>(
|
||||
db: &Database,
|
||||
slot_meta: &Rc<RefCell<SlotMeta>>,
|
||||
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
passed_visisted_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot_function: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(&mut SlotMeta) -> bool,
|
||||
{
|
||||
let slot_meta = slot_meta.borrow();
|
||||
let mut next_slots: VecDeque<u64> = slot_meta.next_slots.to_vec().into();
|
||||
while !next_slots.is_empty() {
|
||||
let slot = next_slots.pop_front().unwrap();
|
||||
let meta_ref = find_slot_meta_else_create(db, working_set, passed_visisted_slots, slot)?;
|
||||
let mut meta = meta_ref.borrow_mut();
|
||||
if slot_function(&mut meta) {
|
||||
meta.next_slots
|
||||
.iter()
|
||||
.for_each(|slot| next_slots.push_back(*slot));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_orphan(meta: &SlotMeta) -> bool {
|
||||
// If we have no parent, then this is the head of a detached chain of
|
||||
// slots
|
||||
|
|
Loading…
Reference in New Issue