diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 867761639..f8c833084 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1154,9 +1154,8 @@ impl Blockstore { self.completed_slots_senders.lock().unwrap().clear(); } - /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot` its' parents SlotMeta next_slots list, and - /// clear `slot`'s SlotMeta (except for next_slots). + /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining` + /// for more details. /// /// This function currently requires `insert_shreds_lock`, as both /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` @@ -1164,40 +1163,19 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); - if let Some(mut slot_meta) = self - .meta(slot) - .expect("Couldn't fetch from SlotMeta column family") - { - // Clear all slot related information - self.run_purge(slot, slot, PurgeType::Exact) - .expect("Purge database operations failed"); - - // Clear this slot as a next slot from parent - if let Some(parent_slot) = slot_meta.parent_slot { - let mut parent_slot_meta = self - .meta(parent_slot) - .expect("Couldn't fetch from SlotMeta column family") - .expect("Unconfirmed slot should have had parent slot set"); - // .retain() is a linear scan; however, next_slots should - // only contain several elements so this isn't so bad - parent_slot_meta - .next_slots - .retain(|&next_slot| next_slot != slot); - self.meta_cf - .put(parent_slot, &parent_slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` - // field. - slot_meta.clear_unconfirmed_slot(); - self.meta_cf - .put(slot, &slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } else { - error!( + // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. + // Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list + // will be updated when the child is inserted (see `Blockstore::handle_chaining()`). + // However, we are only purging and repairing the parent slot here. Since the child will not be + // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, + // we must retain the chain by preserving `next_slots`. + match self.purge_slot_cleanup_chaining(slot) { + Ok(_) => {} + Err(BlockstoreError::SlotUnavailable) => error!( "clear_unconfirmed_slot() called on slot {} with no SlotMeta", slot - ); + ), + Err(e) => panic!("Purge database operations failed {}", e), } } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 4b599a353..15a5c4890 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -135,6 +135,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -144,11 +145,60 @@ impl Blockstore { self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) } + /// Purges all columns relating to `slot`. + /// + /// Additionally, we cleanup the parent of `slot` by clearing `slot` from + /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` + /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is + /// replayable upon repair of `slot`. + pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result { + let Some(mut slot_meta) = self.meta(slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + let mut write_batch = self.db.batch()?; + + let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!( + "Parent slot meta {} for child {} is missing or cleaned up. + Falling back to orphan repair to remedy the situation", + parent_slot, slot + ); + } + } + + // Retain a SlotMeta for `slot` with the `next_slots` field retained + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(slot, &slot_meta)?; + + self.db.write(write_batch).inspect_err(|e| { + error!( + "Error: {:?} while submitting write batch for slot {:?}", + e, slot + ) + })?; + Ok(columns_purged) + } + /// A helper function to `purge_slots` that executes the ledger clean up. /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. + /// + /// Note: slots > `to_slot` that chained to a purged slot are not properly + /// cleaned up. This function is not intended to be used if such slots need + /// to be replayed. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -156,97 +206,19 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; + let mut delete_range_timer = Measure::start("delete_range"); - let columns_purged = self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok(); - match purge_type { - PurgeType::Exact => { - self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; - } - PurgeType::CompactionFilter => { - // No explicit action is required here because this purge type completely and - // indefinitely relies on the proper working of compaction filter for those - // special column families, never toggling the primary index from the current - // one. Overall, this enables well uniformly distributed writes, resulting - // in no spiky periodic huge delete_range for them. - } - } + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { + self.db.write(write_batch).inspect(|e| { error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } + "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", + e, from_slot, to_slot + ) + })?; write_timer.stop(); let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); @@ -273,6 +245,93 @@ impl Blockstore { Ok(columns_purged) } + fn purge_range( + &self, + write_batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let columns_purged = self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok(); + + match purge_type { + PurgeType::Exact => { + self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; + } + PurgeType::CompactionFilter => { + // No explicit action is required here because this purge type completely and + // indefinitely relies on the proper working of compaction filter for those + // special column families, never toggling the primary index from the current + // one. Overall, this enables well uniformly distributed writes, resulting + // in no spiky periodic huge delete_range for them. + } + } + Ok(columns_purged) + } + fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { self.db .delete_file_in_range_cf::(from_slot, to_slot) @@ -1103,4 +1162,51 @@ pub mod tests { } assert_eq!(count, 1); } + + #[test] + fn test_purge_slot_cleanup_chaining_missing_slot_meta() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), + BlockstoreError::SlotUnavailable + )); + } + + #[test] + fn test_purge_slot_cleanup_chaining() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (slot_11, _) = make_slot_entries(11, 4, 5, true); + blockstore.insert_shreds(slot_11, None, false).unwrap(); + let (slot_12, _) = make_slot_entries(12, 5, 5, true); + blockstore.insert_shreds(slot_12, None, false).unwrap(); + + blockstore.purge_slot_cleanup_chaining(5).unwrap(); + + let slot_meta = blockstore.meta(5).unwrap().unwrap(); + let expected_slot_meta = SlotMeta { + slot: 5, + // Only the next_slots should be preserved + next_slots: vec![6, 12], + ..SlotMeta::default() + }; + assert_eq!(slot_meta, expected_slot_meta); + + let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); + assert_eq!(parent_slot_meta.next_slots, vec![11]); + + let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + + let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + } }