Blockstore: clean/save old TransactionMemos sensibly (#33678)
* Convert OldestSlot to named struct * Add clean_slot_0 to OldestSlot * Set AtomicBool to true when all primary-index keys returning slot 0 should be purged * Add PurgedFilter::clean_slot_0 * Use clean_slot_0 to preserve deprecated TransactionMemos * Also set AtomicBool to true immediately on boot, if highest_primary_index_slot.is_none * Add test * Fixup test
This commit is contained in:
parent
53925b6182
commit
01a3b1b52f
|
@ -2154,6 +2154,8 @@ impl Blockstore {
|
|||
}
|
||||
if highest_primary_index_slot.is_some() {
|
||||
self.set_highest_primary_index_slot(highest_primary_index_slot);
|
||||
} else {
|
||||
self.db.set_clean_slot_0(true);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2167,6 +2169,9 @@ impl Blockstore {
|
|||
self.transaction_status_index_cf.delete(1)?;
|
||||
}
|
||||
}
|
||||
if w_highest_primary_index_slot.is_none() {
|
||||
self.db.set_clean_slot_0(true);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -990,4 +990,109 @@ pub mod tests {
|
|||
}
|
||||
assert_eq!(count, max_slot - (oldest_slot - 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_purge_transaction_memos_compaction_filter() {
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
|
||||
let oldest_slot = 5;
|
||||
|
||||
fn random_signature() -> Signature {
|
||||
use rand::Rng;
|
||||
|
||||
let mut key = [0u8; 64];
|
||||
rand::thread_rng().fill(&mut key[..]);
|
||||
Signature::from(key)
|
||||
}
|
||||
|
||||
// Insert some deprecated TransactionMemos
|
||||
blockstore
|
||||
.transaction_memos_cf
|
||||
.put_deprecated(random_signature(), &"this is a memo".to_string())
|
||||
.unwrap();
|
||||
blockstore
|
||||
.transaction_memos_cf
|
||||
.put_deprecated(random_signature(), &"another memo".to_string())
|
||||
.unwrap();
|
||||
// Set clean_slot_0 to false, since we have deprecated memos
|
||||
blockstore.db.set_clean_slot_0(false);
|
||||
|
||||
// Insert some current TransactionMemos
|
||||
blockstore
|
||||
.transaction_memos_cf
|
||||
.put(
|
||||
(random_signature(), oldest_slot - 1),
|
||||
&"this is a new memo in slot 4".to_string(),
|
||||
)
|
||||
.unwrap();
|
||||
blockstore
|
||||
.transaction_memos_cf
|
||||
.put(
|
||||
(random_signature(), oldest_slot),
|
||||
&"this is a memo in slot 5 ".to_string(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let first_index = {
|
||||
let mut memos_iterator = blockstore
|
||||
.transaction_memos_cf
|
||||
.iterator_cf_raw_key(IteratorMode::Start);
|
||||
memos_iterator.next().unwrap().unwrap().0
|
||||
};
|
||||
let last_index = {
|
||||
let mut memos_iterator = blockstore
|
||||
.transaction_memos_cf
|
||||
.iterator_cf_raw_key(IteratorMode::End);
|
||||
memos_iterator.next().unwrap().unwrap().0
|
||||
};
|
||||
|
||||
// Purge at slot 0 should not affect any memos
|
||||
blockstore.db.set_oldest_slot(0);
|
||||
blockstore
|
||||
.db
|
||||
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
|
||||
let memos_iterator = blockstore
|
||||
.transaction_memos_cf
|
||||
.iterator_cf_raw_key(IteratorMode::Start);
|
||||
let mut count = 0;
|
||||
for item in memos_iterator {
|
||||
let _item = item.unwrap();
|
||||
count += 1;
|
||||
}
|
||||
assert_eq!(count, 4);
|
||||
|
||||
// Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4
|
||||
blockstore.db.set_oldest_slot(oldest_slot);
|
||||
blockstore
|
||||
.db
|
||||
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
|
||||
let memos_iterator = blockstore
|
||||
.transaction_memos_cf
|
||||
.iterator_cf_raw_key(IteratorMode::Start);
|
||||
let mut count = 0;
|
||||
for item in memos_iterator {
|
||||
let (key, _value) = item.unwrap();
|
||||
let slot = <cf::TransactionMemos as Column>::index(&key).1;
|
||||
assert!(slot == 0 || slot >= oldest_slot);
|
||||
count += 1;
|
||||
}
|
||||
assert_eq!(count, 3);
|
||||
|
||||
// Purge at oldest_slot with clean_slot_0 purges deprecated memos
|
||||
blockstore.db.set_clean_slot_0(true);
|
||||
blockstore
|
||||
.db
|
||||
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
|
||||
let memos_iterator = blockstore
|
||||
.transaction_memos_cf
|
||||
.iterator_cf_raw_key(IteratorMode::Start);
|
||||
let mut count = 0;
|
||||
for item in memos_iterator {
|
||||
let (key, _value) = item.unwrap();
|
||||
let slot = <cf::TransactionMemos as Column>::index(&key).1;
|
||||
assert!(slot >= oldest_slot);
|
||||
count += 1;
|
||||
}
|
||||
assert_eq!(count, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ use {
|
|||
marker::PhantomData,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
},
|
||||
|
@ -348,7 +348,10 @@ pub mod columns {
|
|||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
struct OldestSlot(Arc<AtomicU64>);
|
||||
struct OldestSlot {
|
||||
slot: Arc<AtomicU64>,
|
||||
clean_slot_0: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl OldestSlot {
|
||||
pub fn set(&self, oldest_slot: Slot) {
|
||||
|
@ -356,7 +359,7 @@ impl OldestSlot {
|
|||
// also, compaction_filters are created via its factories, creating short-lived copies of
|
||||
// this atomic value for the single job of compaction. So, Relaxed store can be justified
|
||||
// in total
|
||||
self.0.store(oldest_slot, Ordering::Relaxed);
|
||||
self.slot.store(oldest_slot, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Slot {
|
||||
|
@ -365,7 +368,15 @@ impl OldestSlot {
|
|||
// requirement at the moment
|
||||
// also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
|
||||
// require strictly synchronized semantics in this regard
|
||||
self.0.load(Ordering::Relaxed)
|
||||
self.slot.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
|
||||
self.clean_slot_0.store(clean_slot_0, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn get_clean_slot_0(&self) -> bool {
|
||||
self.clean_slot_0.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1427,6 +1438,10 @@ impl Database {
|
|||
self.backend.oldest_slot.set(oldest_slot);
|
||||
}
|
||||
|
||||
pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
|
||||
self.backend.oldest_slot.set_clean_slot_0(clean_slot_0);
|
||||
}
|
||||
|
||||
pub fn live_files_metadata(&self) -> Result<Vec<LiveFile>> {
|
||||
self.backend.live_files_metadata()
|
||||
}
|
||||
|
@ -1835,6 +1850,10 @@ impl<'a> WriteBatch<'a> {
|
|||
struct PurgedSlotFilter<C: Column + ColumnName> {
|
||||
/// The oldest slot to keep; any slot < oldest_slot will be removed
|
||||
oldest_slot: Slot,
|
||||
/// Whether to preserve keys that return slot 0, even when oldest_slot > 0.
|
||||
// This is used to delete old column data that wasn't keyed with a Slot, and so always returns
|
||||
// `C::slot() == 0`
|
||||
clean_slot_0: bool,
|
||||
name: CString,
|
||||
_phantom: PhantomData<C>,
|
||||
}
|
||||
|
@ -1844,7 +1863,7 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
|
|||
use rocksdb::CompactionDecision::*;
|
||||
|
||||
let slot_in_key = C::slot(C::index(key));
|
||||
if slot_in_key >= self.oldest_slot {
|
||||
if slot_in_key >= self.oldest_slot || (slot_in_key == 0 && !self.clean_slot_0) {
|
||||
Keep
|
||||
} else {
|
||||
Remove
|
||||
|
@ -1867,8 +1886,10 @@ impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory
|
|||
|
||||
fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
|
||||
let copied_oldest_slot = self.oldest_slot.get();
|
||||
let copied_clean_slot_0 = self.oldest_slot.get_clean_slot_0();
|
||||
PurgedSlotFilter::<C> {
|
||||
oldest_slot: copied_oldest_slot,
|
||||
clean_slot_0: copied_clean_slot_0,
|
||||
name: CString::new(format!(
|
||||
"purged_slot_filter({}, {:?})",
|
||||
C::NAME,
|
||||
|
@ -2113,6 +2134,7 @@ pub mod tests {
|
|||
is_manual_compaction: true,
|
||||
};
|
||||
let oldest_slot = OldestSlot::default();
|
||||
oldest_slot.set_clean_slot_0(true);
|
||||
|
||||
let mut factory = PurgedSlotFilterFactory::<ShredData> {
|
||||
oldest_slot: oldest_slot.clone(),
|
||||
|
|
Loading…
Reference in New Issue