blockstore: atomize slot clearing, relax parent slot meta check (#35124)

* blockstore: atomize slot clearing, relax parent slot meta check

clear_unconfirmed_slot can leave blockstore in an irrecoverable state
if it panics in the middle. write batch this function, so that any
errors can be recovered after restart.

additionally relax the constraint that the parent slot meta must exist,
as it could have been cleaned up if outdated.

* pr feedback: use PurgeType, don't pass slot_meta

* pr feedback: add unit test

* pr feedback: refactor into separate function

* pr feedback: add special columns to helper, err msg, comments

* pr feedback: reword comments and write batch error message

* pr feedback: bubble write_batch error to caller

* pr feedback: reword comments

Co-authored-by: steviez <stevecz@umich.edu>

---------

Co-authored-by: steviez <stevecz@umich.edu>
This commit is contained in:
Ashwin Sekar 2024-03-02 20:23:55 -08:00 committed by GitHub
parent ccc6a6bf6f
commit cc4072bce8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 205 additions and 121 deletions

View File

@ -1154,9 +1154,8 @@ impl Blockstore {
self.completed_slots_senders.lock().unwrap().clear(); self.completed_slots_senders.lock().unwrap().clear();
} }
/// Range-delete all entries which prefix matches the specified `slot`, /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining`
/// remove `slot` its' parents SlotMeta next_slots list, and /// for more details.
/// clear `slot`'s SlotMeta (except for next_slots).
/// ///
/// This function currently requires `insert_shreds_lock`, as both /// This function currently requires `insert_shreds_lock`, as both
/// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()`
@ -1164,40 +1163,19 @@ impl Blockstore {
/// family. /// family.
pub fn clear_unconfirmed_slot(&self, slot: Slot) { pub fn clear_unconfirmed_slot(&self, slot: Slot) {
let _lock = self.insert_shreds_lock.lock().unwrap(); let _lock = self.insert_shreds_lock.lock().unwrap();
if let Some(mut slot_meta) = self // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved.
.meta(slot) // Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list
.expect("Couldn't fetch from SlotMeta column family") // will be updated when the child is inserted (see `Blockstore::handle_chaining()`).
{ // However, we are only purging and repairing the parent slot here. Since the child will not be
// Clear all slot related information // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child,
self.run_purge(slot, slot, PurgeType::Exact) // we must retain the chain by preserving `next_slots`.
.expect("Purge database operations failed"); match self.purge_slot_cleanup_chaining(slot) {
Ok(_) => {}
// Clear this slot as a next slot from parent Err(BlockstoreError::SlotUnavailable) => error!(
if let Some(parent_slot) = slot_meta.parent_slot {
let mut parent_slot_meta = self
.meta(parent_slot)
.expect("Couldn't fetch from SlotMeta column family")
.expect("Unconfirmed slot should have had parent slot set");
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
self.meta_cf
.put(parent_slot, &parent_slot_meta)
.expect("Couldn't insert into SlotMeta column family");
}
// Reinsert parts of `slot_meta` that are important to retain, like the `next_slots`
// field.
slot_meta.clear_unconfirmed_slot();
self.meta_cf
.put(slot, &slot_meta)
.expect("Couldn't insert into SlotMeta column family");
} else {
error!(
"clear_unconfirmed_slot() called on slot {} with no SlotMeta", "clear_unconfirmed_slot() called on slot {} with no SlotMeta",
slot slot
); ),
Err(e) => panic!("Purge database operations failed {}", e),
} }
} }

View File

@ -135,6 +135,7 @@ impl Blockstore {
} }
} }
#[cfg(test)]
pub(crate) fn run_purge( pub(crate) fn run_purge(
&self, &self,
from_slot: Slot, from_slot: Slot,
@ -144,11 +145,60 @@ impl Blockstore {
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
} }
/// Purges all columns relating to `slot`.
///
/// Additionally, we cleanup the parent of `slot` by clearing `slot` from
/// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot`
/// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is
/// replayable upon repair of `slot`.
pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result<bool> {
let Some(mut slot_meta) = self.meta(slot)? else {
return Err(BlockstoreError::SlotUnavailable);
};
let mut write_batch = self.db.batch()?;
let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?;
if let Some(parent_slot) = slot_meta.parent_slot {
let parent_slot_meta = self.meta(parent_slot)?;
if let Some(mut parent_slot_meta) = parent_slot_meta {
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
write_batch.put::<cf::SlotMeta>(parent_slot, &parent_slot_meta)?;
} else {
error!(
"Parent slot meta {} for child {} is missing or cleaned up.
Falling back to orphan repair to remedy the situation",
parent_slot, slot
);
}
}
// Retain a SlotMeta for `slot` with the `next_slots` field retained
slot_meta.clear_unconfirmed_slot();
write_batch.put::<cf::SlotMeta>(slot, &slot_meta)?;
self.db.write(write_batch).inspect_err(|e| {
error!(
"Error: {:?} while submitting write batch for slot {:?}",
e, slot
)
})?;
Ok(columns_purged)
}
/// A helper function to `purge_slots` that executes the ledger clean up. /// A helper function to `purge_slots` that executes the ledger clean up.
/// The cleanup applies to \[`from_slot`, `to_slot`\]. /// The cleanup applies to \[`from_slot`, `to_slot`\].
/// ///
/// When `from_slot` is 0, any sst-file with a key-range completely older /// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted. /// than `to_slot` will also be deleted.
///
/// Note: slots > `to_slot` that chained to a purged slot are not properly
/// cleaned up. This function is not intended to be used if such slots need
/// to be replayed.
pub(crate) fn run_purge_with_stats( pub(crate) fn run_purge_with_stats(
&self, &self,
from_slot: Slot, from_slot: Slot,
@ -156,97 +206,19 @@ impl Blockstore {
purge_type: PurgeType, purge_type: PurgeType,
purge_stats: &mut PurgeStats, purge_stats: &mut PurgeStats,
) -> Result<bool> { ) -> Result<bool> {
let mut write_batch = self let mut write_batch = self.db.batch()?;
.db
.batch()
.expect("Database Error: Failed to get write batch");
let mut delete_range_timer = Measure::start("delete_range"); let mut delete_range_timer = Measure::start("delete_range");
let columns_purged = self let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?;
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BlockHeight>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot)
.is_ok();
match purge_type {
PurgeType::Exact => {
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?;
}
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those
// special column families, never toggling the primary index from the current
// one. Overall, this enables well uniformly distributed writes, resulting
// in no spiky periodic huge delete_range for them.
}
}
delete_range_timer.stop(); delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch"); let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) { self.db.write(write_batch).inspect(|e| {
error!( error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...", "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}",
e, from_slot e, from_slot, to_slot
); )
return Err(e); })?;
}
write_timer.stop(); write_timer.stop();
let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
@ -273,6 +245,93 @@ impl Blockstore {
Ok(columns_purged) Ok(columns_purged)
} }
fn purge_range(
&self,
write_batch: &mut WriteBatch,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
) -> Result<bool> {
let columns_purged = self
.db
.delete_range_cf::<cf::SlotMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredData>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredCode>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DeadSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ErasureMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Orphans>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Index>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Rewards>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Blocktime>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BlockHeight>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(write_batch, from_slot, to_slot)
.is_ok();
match purge_type {
PurgeType::Exact => {
self.purge_special_columns_exact(write_batch, from_slot, to_slot)?;
}
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those
// special column families, never toggling the primary index from the current
// one. Overall, this enables well uniformly distributed writes, resulting
// in no spiky periodic huge delete_range for them.
}
}
Ok(columns_purged)
}
fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.db self.db
.delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot) .delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot)
@ -1103,4 +1162,51 @@ pub mod tests {
} }
assert_eq!(count, 1); assert_eq!(count, 1);
} }
#[test]
fn test_purge_slot_cleanup_chaining_missing_slot_meta() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(matches!(
blockstore.purge_slot_cleanup_chaining(11).unwrap_err(),
BlockstoreError::SlotUnavailable
));
}
#[test]
fn test_purge_slot_cleanup_chaining() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (slot_11, _) = make_slot_entries(11, 4, 5, true);
blockstore.insert_shreds(slot_11, None, false).unwrap();
let (slot_12, _) = make_slot_entries(12, 5, 5, true);
blockstore.insert_shreds(slot_12, None, false).unwrap();
blockstore.purge_slot_cleanup_chaining(5).unwrap();
let slot_meta = blockstore.meta(5).unwrap().unwrap();
let expected_slot_meta = SlotMeta {
slot: 5,
// Only the next_slots should be preserved
next_slots: vec![6, 12],
..SlotMeta::default()
};
assert_eq!(slot_meta, expected_slot_meta);
let parent_slot_meta = blockstore.meta(4).unwrap().unwrap();
assert_eq!(parent_slot_meta.next_slots, vec![11]);
let child_slot_meta = blockstore.meta(6).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);
let child_slot_meta = blockstore.meta(12).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);
}
} }