Fix the boundary inconsistency between delete_file_in_range and delete_range (#27201)
#### Problem RocksDB's delete_range applies to [from, to) while delete_file_in_range applies to [from, to] by default, and the rust-rocksdb api does not include the option to make delete_file_in_range apply to [from, to). Such inconsistency might cause `blockstore::run_purge` to produce an inconsistent result as it invokes both delete_range and delete_file_in_range. #### Summary of Changes This PR makes all our purge / delete related functions to be inclusive on both starting and ending slots.
This commit is contained in:
parent
9094076de7
commit
6d070cee08
|
@ -2059,8 +2059,8 @@ impl Blockstore {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Toggles the active primary index between `0` and `1`, and clears the stored max-slot of the
|
/// Toggles the active primary index between `0` and `1`, and clears the
|
||||||
/// frozen index in preparation for pruning.
|
/// stored max-slot of the frozen index in preparation for pruning.
|
||||||
fn toggle_transaction_status_index(
|
fn toggle_transaction_status_index(
|
||||||
&self,
|
&self,
|
||||||
batch: &mut WriteBatch,
|
batch: &mut WriteBatch,
|
||||||
|
|
|
@ -145,8 +145,8 @@ impl Blockstore {
|
||||||
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
|
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A helper function to `purge_slots` that executes the ledger clean up
|
/// A helper function to `purge_slots` that executes the ledger clean up.
|
||||||
/// from `from_slot` to `to_slot`.
|
/// The cleanup applies to \[`from_slot`, `to_slot`\].
|
||||||
///
|
///
|
||||||
/// When `from_slot` is 0, any sst-file with a key-range completely older
|
/// When `from_slot` is 0, any sst-file with a key-range completely older
|
||||||
/// than `to_slot` will also be deleted.
|
/// than `to_slot` will also be deleted.
|
||||||
|
@ -161,9 +161,6 @@ impl Blockstore {
|
||||||
.db
|
.db
|
||||||
.batch()
|
.batch()
|
||||||
.expect("Database Error: Failed to get write batch");
|
.expect("Database Error: Failed to get write batch");
|
||||||
// delete range cf is not inclusive
|
|
||||||
let to_slot = to_slot.saturating_add(1);
|
|
||||||
|
|
||||||
let mut delete_range_timer = Measure::start("delete_range");
|
let mut delete_range_timer = Measure::start("delete_range");
|
||||||
let mut columns_purged = self
|
let mut columns_purged = self
|
||||||
.db
|
.db
|
||||||
|
@ -444,15 +441,18 @@ impl Blockstore {
|
||||||
/// deserializing each slot being purged and iterating through all
|
/// deserializing each slot being purged and iterating through all
|
||||||
/// transactions to determine the keys of individual records.
|
/// transactions to determine the keys of individual records.
|
||||||
///
|
///
|
||||||
|
/// The purge range applies to \[`from_slot`, `to_slot`\].
|
||||||
|
///
|
||||||
/// **This method is very slow.**
|
/// **This method is very slow.**
|
||||||
fn purge_special_columns_exact(
|
fn purge_special_columns_exact(
|
||||||
&self,
|
&self,
|
||||||
batch: &mut WriteBatch,
|
batch: &mut WriteBatch,
|
||||||
from_slot: Slot,
|
from_slot: Slot,
|
||||||
to_slot: Slot, // Exclusive
|
to_slot: Slot,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
|
let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
|
||||||
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
|
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
|
||||||
|
let to_slot = to_slot.saturating_add(1);
|
||||||
for slot in from_slot..to_slot {
|
for slot in from_slot..to_slot {
|
||||||
let slot_entries = self.get_any_valid_slot_entries(slot, 0);
|
let slot_entries = self.get_any_valid_slot_entries(slot, 0);
|
||||||
let transactions = slot_entries
|
let transactions = slot_entries
|
||||||
|
@ -501,22 +501,18 @@ impl Blockstore {
|
||||||
if let Some(purged_index) = self.toggle_transaction_status_index(
|
if let Some(purged_index) = self.toggle_transaction_status_index(
|
||||||
write_batch,
|
write_batch,
|
||||||
w_active_transaction_status_index,
|
w_active_transaction_status_index,
|
||||||
to_slot,
|
to_slot + 1,
|
||||||
)? {
|
)? {
|
||||||
*columns_purged &= self
|
*columns_purged &= self
|
||||||
.db
|
.db
|
||||||
.delete_range_cf::<cf::TransactionStatus>(
|
.delete_range_cf::<cf::TransactionStatus>(write_batch, purged_index, purged_index)
|
||||||
write_batch,
|
|
||||||
purged_index,
|
|
||||||
purged_index + 1,
|
|
||||||
)
|
|
||||||
.is_ok()
|
.is_ok()
|
||||||
& self
|
& self
|
||||||
.db
|
.db
|
||||||
.delete_range_cf::<cf::AddressSignatures>(
|
.delete_range_cf::<cf::AddressSignatures>(
|
||||||
write_batch,
|
write_batch,
|
||||||
purged_index,
|
purged_index,
|
||||||
purged_index + 1,
|
purged_index,
|
||||||
)
|
)
|
||||||
.is_ok();
|
.is_ok();
|
||||||
}
|
}
|
||||||
|
@ -783,7 +779,7 @@ pub mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_and_repopulate_transaction_statuses(
|
fn clear_and_repopulate_transaction_statuses_for_test(
|
||||||
blockstore: &mut Blockstore,
|
blockstore: &mut Blockstore,
|
||||||
index0_max_slot: u64,
|
index0_max_slot: u64,
|
||||||
index1_max_slot: u64,
|
index1_max_slot: u64,
|
||||||
|
@ -795,11 +791,11 @@ pub mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
blockstore
|
blockstore
|
||||||
.db
|
.db
|
||||||
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, 0, 3)
|
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, 0, 2)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
blockstore
|
blockstore
|
||||||
.db
|
.db
|
||||||
.delete_range_cf::<cf::TransactionStatusIndex>(&mut write_batch, 0, 3)
|
.delete_range_cf::<cf::TransactionStatusIndex>(&mut write_batch, 0, 2)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
blockstore.db.write(write_batch).unwrap();
|
blockstore.db.write(write_batch).unwrap();
|
||||||
blockstore.initialize_transaction_status_index().unwrap();
|
blockstore.initialize_transaction_status_index().unwrap();
|
||||||
|
@ -897,7 +893,7 @@ pub mod tests {
|
||||||
let index1_max_slot = 19;
|
let index1_max_slot = 19;
|
||||||
|
|
||||||
// Test purge outside bounds
|
// Test purge outside bounds
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -944,7 +940,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge inside index 0
|
// Test purge inside index 0
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -993,7 +989,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge inside index 0 at upper boundary
|
// Test purge inside index 0 at upper boundary
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -1044,7 +1040,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge inside index 1 at lower boundary
|
// Test purge inside index 1 at lower boundary
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -1092,7 +1088,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge across index boundaries
|
// Test purge across index boundaries
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -1142,7 +1138,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge include complete index 1
|
// Test purge include complete index 1
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
@ -1189,7 +1185,7 @@ pub mod tests {
|
||||||
drop(status_entry_iterator);
|
drop(status_entry_iterator);
|
||||||
|
|
||||||
// Test purge all
|
// Test purge all
|
||||||
clear_and_repopulate_transaction_statuses(
|
clear_and_repopulate_transaction_statuses_for_test(
|
||||||
&mut blockstore,
|
&mut blockstore,
|
||||||
index0_max_slot,
|
index0_max_slot,
|
||||||
index1_max_slot,
|
index1_max_slot,
|
||||||
|
|
|
@ -455,6 +455,7 @@ impl Rocks {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete files whose slot range is within \[`from`, `to`\].
|
||||||
fn delete_file_in_range_cf(
|
fn delete_file_in_range_cf(
|
||||||
&self,
|
&self,
|
||||||
cf: &ColumnFamily,
|
cf: &ColumnFamily,
|
||||||
|
@ -1119,17 +1120,23 @@ impl Database {
|
||||||
Ok(fs_extra::dir::get_size(&self.path)?)
|
Ok(fs_extra::dir::get_size(&self.path)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a range to delete to the given write batch
|
/// Adds a \[`from`, `to`\] range to delete to the given write batch
|
||||||
pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()>
|
pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()>
|
||||||
where
|
where
|
||||||
C: Column + ColumnName,
|
C: Column + ColumnName,
|
||||||
{
|
{
|
||||||
let cf = self.cf_handle::<C>();
|
let cf = self.cf_handle::<C>();
|
||||||
|
// Note that the default behavior of rocksdb's delete_range_cf deletes
|
||||||
|
// files within [from, to), while our purge logic applies to [from, to].
|
||||||
|
//
|
||||||
|
// For consistency, we make our delete_range_cf works for [from, to] by
|
||||||
|
// adjusting the `to` slot range by 1.
|
||||||
let from_index = C::as_index(from);
|
let from_index = C::as_index(from);
|
||||||
let to_index = C::as_index(to);
|
let to_index = C::as_index(to.saturating_add(1));
|
||||||
batch.delete_range_cf::<C>(cf, from_index, to_index)
|
batch.delete_range_cf::<C>(cf, from_index, to_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete files whose slot range is within \[`from`, `to`\].
|
||||||
pub fn delete_file_in_range_cf<C>(&self, from: Slot, to: Slot) -> Result<()>
|
pub fn delete_file_in_range_cf<C>(&self, from: Slot, to: Slot) -> Result<()>
|
||||||
where
|
where
|
||||||
C: Column + ColumnName,
|
C: Column + ColumnName,
|
||||||
|
@ -1439,11 +1446,17 @@ impl<'a> WriteBatch<'a> {
|
||||||
self.map[C::NAME]
|
self.map[C::NAME]
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete_range_cf<C: Column>(
|
/// Adds a \[`from`, `to`) range deletion entry to the batch.
|
||||||
|
///
|
||||||
|
/// Note that the \[`from`, `to`) deletion range of WriteBatch::delete_range_cf
|
||||||
|
/// is different from \[`from`, `to`\] of Database::delete_range_cf as we makes
|
||||||
|
/// the semantics of Database::delete_range_cf matches the blockstore purge
|
||||||
|
/// logic.
|
||||||
|
fn delete_range_cf<C: Column>(
|
||||||
&mut self,
|
&mut self,
|
||||||
cf: &ColumnFamily,
|
cf: &ColumnFamily,
|
||||||
from: C::Index,
|
from: C::Index,
|
||||||
to: C::Index,
|
to: C::Index, // exclusive
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.write_batch
|
self.write_batch
|
||||||
.delete_range_cf(cf, C::key(from), C::key(to));
|
.delete_range_cf(cf, C::key(from), C::key(to));
|
||||||
|
|
Loading…
Reference in New Issue