Blockstore special columns: minimize deletes in PurgeType::Exact (#33498)

* Adjust test_purge_transaction_status_exact to test slots that cross primary indexes

* Minimize deletes by checking the primary-index range

* Fix test_purge_special_columns_compaction_filter
This commit is contained in:
Tyera 2023-10-03 18:58:30 -06:00 committed by GitHub
parent eb262aabe3
commit 144e6d6eec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 8 deletions

View File

@ -354,15 +354,32 @@ impl Blockstore {
) -> Result<()> {
let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
let slot_indexes = |slot: Slot| -> Vec<u64> {
let mut indexes = vec![];
if slot <= index0.max_slot && (index0.frozen || slot >= index1.max_slot) {
indexes.push(0);
}
if slot <= index1.max_slot && (index1.frozen || slot >= index0.max_slot) {
indexes.push(1);
}
indexes
};
for slot in from_slot..=to_slot {
let primary_indexes = slot_indexes(slot);
if primary_indexes.is_empty() {
continue;
}
let slot_entries = self.get_any_valid_slot_entries(slot, 0);
let transactions = slot_entries
.into_iter()
.flat_map(|entry| entry.transactions);
for transaction in transactions {
if let Some(&signature) = transaction.signatures.get(0) {
batch.delete::<cf::TransactionStatus>((0, signature, slot))?;
batch.delete::<cf::TransactionStatus>((1, signature, slot))?;
for primary_index in &primary_indexes {
batch.delete::<cf::TransactionStatus>((*primary_index, signature, slot))?;
}
let meta = self.read_transaction_status((signature, slot))?;
let loaded_addresses = meta.map(|meta| meta.loaded_addresses);
@ -372,8 +389,14 @@ impl Blockstore {
);
for pubkey in account_keys.iter() {
batch.delete::<cf::AddressSignatures>((0, *pubkey, slot, signature))?;
batch.delete::<cf::AddressSignatures>((1, *pubkey, slot, signature))?;
for primary_index in &primary_indexes {
batch.delete::<cf::AddressSignatures>((
*primary_index,
*pubkey,
slot,
signature,
))?;
}
}
}
}
@ -697,7 +720,7 @@ pub mod tests {
blockstore.initialize_transaction_status_index().unwrap();
*blockstore.active_transaction_status_index.write().unwrap() = 0;
for x in 0..index0_max_slot + 1 {
for x in 0..index0_max_slot {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
&entries,
@ -726,6 +749,36 @@ pub mod tests {
)
.unwrap();
}
// Add slot that crosses primary indexes
let entries = make_slot_entries_with_transactions(2);
let shreds = entries_to_test_shreds(
&entries,
index0_max_slot, // slot
index0_max_slot.saturating_sub(1), // parent_slot
true, // is_full_slot
0, // version
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let signatures = entries
.iter()
.filter(|entry| !entry.is_tick())
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| transaction.signatures[0])
.collect::<Vec<Signature>>();
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
index0_max_slot,
signatures[0],
vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()],
vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()],
TransactionStatusMeta::default(),
)
.unwrap();
// Freeze index 0
let mut write_batch = blockstore.db.batch().unwrap();
let mut w_active_transaction_status_index =
@ -740,6 +793,19 @@ pub mod tests {
drop(w_active_transaction_status_index);
blockstore.db.write(write_batch).unwrap();
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
index0_max_slot,
signatures[1],
vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()],
vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()],
TransactionStatusMeta::default(),
)
.unwrap();
// Note: index0_max_slot exists in both indexes
for x in index0_max_slot + 1..index1_max_slot + 1 {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
@ -993,7 +1059,7 @@ pub mod tests {
for _ in 13..index1_max_slot + 1 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 1);
assert!(entry.2 > 12);
assert!(entry.2 == index0_max_slot || entry.2 > 12);
}
drop(status_entry_iterator);
@ -1190,6 +1256,8 @@ pub mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let index0_max_slot = 9;
let index1_max_slot = 19;
// includes slot 0, and slot 9 has 2 transactions
let num_total_transactions = index1_max_slot + 2;
clear_and_repopulate_transaction_statuses_for_test(
&blockstore,
@ -1227,7 +1295,7 @@ pub mod tests {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, index1_max_slot - (oldest_slot - 1));
assert_eq!(count, num_total_transactions - oldest_slot);
clear_and_repopulate_transaction_statuses_for_test(
&blockstore,
@ -1265,6 +1333,6 @@ pub mod tests {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, index1_max_slot - (oldest_slot - 1));
assert_eq!(count, num_total_transactions - oldest_slot - 1); // Extra transaction in slot 9
}
}