Remove dated Blockstore PurgeType::PrimaryIndex code (#33631)

* Update instances of PurgeType::PrimaryIndex to PurgeType::Exact
* Remove now unused functions
* Remove unused active_transaction_status_index field
This commit is contained in:
steviez 2023-10-10 16:35:42 -05:00 committed by GitHub
parent 73a9a14731
commit 33e1dd71f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 128 deletions

View File

@ -206,7 +206,6 @@ pub struct Blockstore {
address_signatures_cf: LedgerColumn<cf::AddressSignatures>, address_signatures_cf: LedgerColumn<cf::AddressSignatures>,
transaction_memos_cf: LedgerColumn<cf::TransactionMemos>, transaction_memos_cf: LedgerColumn<cf::TransactionMemos>,
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>, transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>, rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>, blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>, perf_samples_cf: LedgerColumn<cf::PerfSamples>,
@ -326,21 +325,11 @@ impl Blockstore {
.unwrap_or(0); .unwrap_or(0);
let last_root = RwLock::new(max_root); let last_root = RwLock::new(max_root);
// Get active transaction-status index or 0 // Initialize transaction status index if entries are not present
let active_transaction_status_index = db let initialize_transaction_status_index = db
.iter::<cf::TransactionStatusIndex>(IteratorMode::Start)? .iter::<cf::TransactionStatusIndex>(IteratorMode::Start)?
.next(); .next()
let initialize_transaction_status_index = active_transaction_status_index.is_none(); .is_none();
let active_transaction_status_index = active_transaction_status_index
.and_then(|(_, data)| {
let index0: TransactionStatusIndexMeta = deserialize(&data).unwrap();
if index0.frozen {
Some(1)
} else {
None
}
})
.unwrap_or(0);
measure.stop(); measure.stop();
info!("{:?} {}", blockstore_path, measure); info!("{:?} {}", blockstore_path, measure);
@ -360,7 +349,6 @@ impl Blockstore {
address_signatures_cf, address_signatures_cf,
transaction_memos_cf, transaction_memos_cf,
transaction_status_index_cf, transaction_status_index_cf,
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf, rewards_cf,
blocktime_cf, blocktime_cf,
perf_samples_cf, perf_samples_cf,
@ -1125,7 +1113,7 @@ impl Blockstore {
.expect("Couldn't fetch from SlotMeta column family") .expect("Couldn't fetch from SlotMeta column family")
{ {
// Clear all slot related information // Clear all slot related information
self.run_purge(slot, slot, PurgeType::PrimaryIndex) self.run_purge(slot, slot, PurgeType::Exact)
.expect("Purge database operations failed"); .expect("Purge database operations failed");
// Clear this slot as a next slot from parent // Clear this slot as a next slot from parent
@ -2155,61 +2143,6 @@ impl Blockstore {
Ok(()) Ok(())
} }
/// Toggles the active primary index between `0` and `1`, and clears the
/// stored max-slot of the frozen index in preparation for pruning.
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<Option<u64>> {
let index0 = self.transaction_status_index_cf.get(0)?;
if index0.is_none() {
return Ok(None);
}
let mut index0 = index0.unwrap();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap();
if !index0.frozen && !index1.frozen {
index0.frozen = true;
*w_active_transaction_status_index = 1;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot {
info!(
"Pruning expired primary index 0 up to slot {} (max requested: {})",
index0.max_slot, to_slot
);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
info!(
"Pruning expired primary index 1 up to slot {} (max requested: {})",
index1.max_slot, to_slot
);
Some(1)
} else {
None
};
if let Some(purge_target_primary_index) = purge_target_primary_index {
*w_active_transaction_status_index = purge_target_primary_index;
if index0.frozen {
index0.max_slot = 0
};
index0.frozen = !index0.frozen;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
if index1.frozen {
index1.max_slot = 0
};
index1.frozen = !index1.frozen;
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}
Ok(purge_target_primary_index)
}
}
fn read_deprecated_transaction_status( fn read_deprecated_transaction_status(
&self, &self,
index: (Signature, Slot), index: (Signature, Slot),
@ -4938,7 +4871,7 @@ pub mod tests {
let max_purge_slot = 1; let max_purge_slot = 1;
blockstore blockstore
.run_purge(0, max_purge_slot, PurgeType::PrimaryIndex) .run_purge(0, max_purge_slot, PurgeType::Exact)
.unwrap(); .unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = max_purge_slot; *blockstore.lowest_cleanup_slot.write().unwrap() = max_purge_slot;
@ -9012,7 +8945,7 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
} }
assert_eq!(blockstore.lowest_slot(), 1); assert_eq!(blockstore.lowest_slot(), 1);
blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap(); blockstore.run_purge(0, 5, PurgeType::Exact).unwrap();
assert_eq!(blockstore.lowest_slot(), 6); assert_eq!(blockstore.lowest_slot(), 6);
} }
@ -9028,12 +8961,10 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), Some(slot)); assert_eq!(blockstore.highest_slot().unwrap(), Some(slot));
} }
blockstore blockstore.run_purge(5, 10, PurgeType::Exact).unwrap();
.run_purge(5, 10, PurgeType::PrimaryIndex)
.unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), Some(4)); assert_eq!(blockstore.highest_slot().unwrap(), Some(4));
blockstore.run_purge(0, 4, PurgeType::PrimaryIndex).unwrap(); blockstore.run_purge(0, 4, PurgeType::Exact).unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), None); assert_eq!(blockstore.highest_slot().unwrap(), None);
} }
@ -9768,7 +9699,7 @@ pub mod tests {
// Cleanup the slot // Cleanup the slot
blockstore blockstore
.run_purge(slot, slot, PurgeType::PrimaryIndex) .run_purge(slot, slot, PurgeType::Exact)
.expect("Purge database operations failed"); .expect("Purge database operations failed");
assert!(blockstore.meta(slot).unwrap().is_none()); assert!(blockstore.meta(slot).unwrap().is_none());

View File

@ -16,9 +16,6 @@ pub enum PurgeType {
/// A slower but more accurate way to purge slots by also ensuring higher /// A slower but more accurate way to purge slots by also ensuring higher
/// level of consistency between data during the clean up process. /// level of consistency between data during the clean up process.
Exact, Exact,
/// A faster approximation of `Exact` where the purge process only takes
/// care of the primary index and does not update the associated entries.
PrimaryIndex,
/// The fastest purge mode that relies on the slot-id based TTL /// The fastest purge mode that relies on the slot-id based TTL
/// compaction filter to do the cleanup. /// compaction filter to do the cleanup.
CompactionFilter, CompactionFilter,
@ -158,7 +155,7 @@ impl Blockstore {
.batch() .batch()
.expect("Database Error: Failed to get write batch"); .expect("Database Error: Failed to get write batch");
let mut delete_range_timer = Measure::start("delete_range"); let mut delete_range_timer = Measure::start("delete_range");
let mut columns_purged = self let columns_purged = self
.db .db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.is_ok() .is_ok()
@ -218,20 +215,10 @@ impl Blockstore {
.db .db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok(); .is_ok();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
match purge_type { match purge_type {
PurgeType::Exact => { PurgeType::Exact => {
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?;
} }
PurgeType::PrimaryIndex => {
self.purge_special_columns_with_primary_index(
&mut write_batch,
&mut columns_purged,
&mut w_active_transaction_status_index,
to_slot,
)?;
}
PurgeType::CompactionFilter => { PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and // No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those // indefinitely relies on the proper working of compaction filter for those
@ -273,10 +260,6 @@ impl Blockstore {
purge_stats.write_batch += write_timer.as_us(); purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();
// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged) Ok(columns_purged)
} }
@ -458,37 +441,6 @@ impl Blockstore {
} }
Ok(()) Ok(())
} }
/// Purges special columns (using a non-Slot primary-index) by range. Purge
/// occurs if frozen primary index has a max-slot less than the highest slot
/// being purged.
fn purge_special_columns_with_primary_index(
&self,
write_batch: &mut WriteBatch,
columns_purged: &mut bool,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<()> {
if let Some(purged_index) = self.toggle_transaction_status_index(
write_batch,
w_active_transaction_status_index,
to_slot + 1,
)? {
*columns_purged &= self
.db
.delete_range_cf::<cf::TransactionStatus>(write_batch, purged_index, purged_index)
.is_ok()
& self
.db
.delete_range_cf::<cf::AddressSignatures>(
write_batch,
purged_index,
purged_index,
)
.is_ok();
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]