Purge TransactionStatus and AddressSignatures exactly from ledger-tool (#10358)

* Add failing test

* Add execution path to purge primary-index columns exactly

* Fail gracefully if older TransactionStatus rocksdb keys are present

* Remove columns_empty check for special columns

* Move blockstore purge methods to submodule

* Remove unused column empty check
This commit is contained in:
Tyera Eulberg 2020-06-02 19:49:31 -06:00 committed by GitHub
parent 1303c4964b
commit eee9a08376
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1210 additions and 574 deletions

View File

@ -1,6 +1,6 @@
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
use solana_ledger::blockstore::Blockstore; use solana_ledger::blockstore::{Blockstore, PurgeType};
use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_ledger::blockstore_db::Result as BlockstoreResult;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
@ -172,6 +172,7 @@ impl LedgerCleanupService {
first_slot, first_slot,
lowest_cleanup_slot, lowest_cleanup_slot,
delay_between_purges, delay_between_purges,
PurgeType::PrimaryIndex,
); );
purge_time.stop(); purge_time.stop();
info!("{}", purge_time); info!("{}", purge_time);

View File

@ -56,6 +56,8 @@ use std::{
time::Duration, time::Duration,
}; };
pub mod blockstore_purge;
pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb";
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
@ -64,6 +66,12 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
.build() .build()
.unwrap())); .unwrap()));
thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|ix| format!("blockstore_{}", ix))
.build()
.unwrap()));
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
@ -75,6 +83,13 @@ const TIMESTAMP_SLOT_RANGE: usize = 16;
pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>; pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
type CompletedRanges = Vec<(u32, u32)>;
#[derive(Clone, Copy)]
pub enum PurgeType {
Exact,
PrimaryIndex,
}
// ledger window // ledger window
pub struct Blockstore { pub struct Blockstore {
@ -301,235 +316,6 @@ impl Blockstore {
false false
} }
/// Silently deletes all blockstore column families in the range [from_slot,to_slot]
/// Dangerous; Use with care:
/// Does not check for integrity and does not update slot metas that refer to deleted slots
/// Modifies multiple column families simultaneously
pub fn purge_slots_with_delay(
&self,
from_slot: Slot,
to_slot: Slot,
delay_between_purges: Option<Duration>,
) {
// if there's no upper bound, split the purge request into batches of 1000 slots
const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_start = from_slot;
while batch_start < to_slot {
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
match self.run_purge(batch_start, batch_end) {
Ok(_all_columns_purged) => {
batch_start = batch_end;
if let Some(ref duration) = delay_between_purges {
// Cooperate with other blockstore users
std::thread::sleep(*duration);
}
}
Err(e) => {
error!(
"Error: {:?}; Purge failed in range {:?} to {:?}",
e, batch_start, batch_end
);
break;
}
}
}
if !self.no_compaction {
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
}
}
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots_with_delay(from_slot, to_slot, None)
}
/// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the
/// [from_slot,to_slot] range
///
/// Dangerous; Use with care
pub fn purge_from_next_slots(&self, from_slot: Slot, to_slot: Slot) {
for (slot, mut meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > to_slot {
break;
}
let original_len = meta.next_slots.len();
meta.next_slots
.retain(|slot| *slot < from_slot || *slot > to_slot);
if meta.next_slots.len() != original_len {
info!("purge_from_next_slots: adjusted meta for slot {}", slot);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
)
.expect("couldn't update meta");
}
}
}
// Returns whether or not all columns successfully purged the slot range
fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");
// delete range cf is not inclusive
let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX);
let mut delete_range_timer = Measure::start("delete_range");
let mut columns_empty = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false);
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
if let Some(index) = self.toggle_transaction_status_index(
&mut write_batch,
&mut w_active_transaction_status_index,
to_slot,
)? {
columns_empty &= self
.db
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, index, index + 1)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::AddressSignatures>(&mut write_batch, index, index + 1)
.unwrap_or(false);
}
delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
return Err(e);
}
write_timer.stop();
datapoint_info!(
"blockstore-purge",
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", delete_range_timer.as_us() as i64, i64),
("write_batch_us", write_timer.as_us() as i64, i64)
);
Ok(columns_empty)
}
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
info!("compact_storage: from {} to {}", from_slot, to_slot);
let mut compact_timer = Measure::start("compact_range");
let result = self
.meta_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.db
.column::<cf::Root>()
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.data_shred_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.code_shred_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.dead_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.duplicate_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.erasure_meta_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.orphans_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.index_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.transaction_status_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.address_signatures_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.transaction_status_index_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.rewards_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false);
compact_timer.stop();
if !result {
info!("compact_storage incomplete");
}
datapoint_info!(
"blockstore-compact",
("compact_range_us", compact_timer.as_us() as i64, i64),
);
Ok(result)
}
pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> { pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get((slot, set_index)) self.erasure_meta_cf.get((slot, set_index))
} }
@ -987,7 +773,7 @@ impl Blockstore {
.expect("Couldn't fetch from SlotMeta column family") .expect("Couldn't fetch from SlotMeta column family")
{ {
// Clear all slot related information // Clear all slot related information
self.run_purge(slot, slot) self.run_purge(slot, slot, PurgeType::PrimaryIndex)
.expect("Purge database operations failed"); .expect("Purge database operations failed");
// Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots`
@ -2083,29 +1869,11 @@ impl Blockstore {
return Err(BlockstoreError::DeadSlot); return Err(BlockstoreError::DeadSlot);
} }
// lowest_cleanup_slot is the last slot that was not cleaned up by let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?;
// LedgerCleanupService
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {
return Ok((vec![], 0, false));
}
let slot_meta = slot_meta.unwrap();
// Find all the ranges for the completed data blocks
let completed_ranges = Self::get_completed_data_ranges(
start_index as u32,
&slot_meta.completed_data_indexes[..],
slot_meta.consumed as u32,
);
if completed_ranges.is_empty() { if completed_ranges.is_empty() {
return Ok((vec![], 0, false)); return Ok((vec![], 0, false));
} }
let slot_meta = slot_meta.unwrap();
let num_shreds = completed_ranges let num_shreds = completed_ranges
.last() .last()
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1) .map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
@ -2126,12 +1894,41 @@ impl Blockstore {
Ok((entries, num_shreds, slot_meta.is_full())) Ok((entries, num_shreds, slot_meta.is_full()))
} }
fn get_completed_ranges(
&self,
slot: Slot,
start_index: u64,
) -> Result<(CompletedRanges, Option<SlotMeta>)> {
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {
return Ok((vec![], slot_meta));
}
let slot_meta = slot_meta.unwrap();
// Find all the ranges for the completed data blocks
let completed_ranges = Self::get_completed_data_ranges(
start_index as u32,
&slot_meta.completed_data_indexes[..],
slot_meta.consumed as u32,
);
Ok((completed_ranges, Some(slot_meta)))
}
// Get the range of indexes [start_index, end_index] of every completed data block // Get the range of indexes [start_index, end_index] of every completed data block
fn get_completed_data_ranges( fn get_completed_data_ranges(
mut start_index: u32, mut start_index: u32,
completed_data_end_indexes: &[u32], completed_data_end_indexes: &[u32],
consumed: u32, consumed: u32,
) -> Vec<(u32, u32)> { ) -> CompletedRanges {
let mut completed_data_ranges = vec![]; let mut completed_data_ranges = vec![];
let floor = completed_data_end_indexes let floor = completed_data_end_indexes
.iter() .iter()
@ -2213,6 +2010,30 @@ impl Blockstore {
}) })
} }
fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec<Entry> {
let (completed_ranges, slot_meta) = self
.get_completed_ranges(slot, start_index)
.unwrap_or_default();
if completed_ranges.is_empty() {
return vec![];
}
let slot_meta = slot_meta.unwrap();
let entries: Vec<Vec<Entry>> = PAR_THREAD_POOL_ALL_CPUS.with(|thread_pool| {
thread_pool.borrow().install(|| {
completed_ranges
.par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta)
.unwrap_or_default()
})
.collect()
})
});
entries.into_iter().flatten().collect()
}
// Returns slots connecting to any element of the list `slots`. // Returns slots connecting to any element of the list `slots`.
pub fn get_slots_since(&self, slots: &[u64]) -> Result<HashMap<u64, Vec<u64>>> { pub fn get_slots_since(&self, slots: &[u64]) -> Result<HashMap<u64, Vec<u64>>> {
// Return error if there was a database error during lookup of any of the // Return error if there was a database error during lookup of any of the
@ -3098,7 +2919,7 @@ pub mod tests {
use std::{iter::FromIterator, time::Duration}; use std::{iter::FromIterator, time::Duration};
// used for tests only // used for tests only
fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> { pub(crate) fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new(); let mut entries: Vec<Entry> = Vec::new();
for x in 0..num_entries { for x in 0..num_entries {
let transaction = Transaction::new_with_compiled_instructions( let transaction = Transaction::new_with_compiled_instructions(
@ -3115,99 +2936,6 @@ pub mod tests {
entries entries
} }
// check that all columns are either empty or start at `min_slot`
fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) {
let condition_met = blockstore
.db
.iter::<cf::SlotMeta>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::Root>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::ShredData>(IteratorMode::Start)
.unwrap()
.next()
.map(|((slot, _), _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::ShredCode>(IteratorMode::Start)
.unwrap()
.next()
.map(|((slot, _), _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::DeadSlots>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::DuplicateSlots>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::ErasureMeta>(IteratorMode::Start)
.unwrap()
.next()
.map(|((slot, _), _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::Orphans>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::Index>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::Start)
.unwrap()
.next()
.map(|((primary_index, _, slot), _)| {
slot >= min_slot || (primary_index == 2 && slot == 0)
})
.unwrap_or(true)
& blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::Start)
.unwrap()
.next()
.map(|((primary_index, _, slot, _), _)| {
slot >= min_slot || (primary_index == 2 && slot == 0)
})
.unwrap_or(true)
& blockstore
.db
.iter::<cf::Rewards>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true);
assert!(condition_met);
}
#[test] #[test]
fn test_create_new_ledger() { fn test_create_new_ledger() {
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
@ -3478,7 +3206,9 @@ pub mod tests {
assert!(ledger.get_data_shreds(slot, 0, 1, &mut buf).is_ok()); assert!(ledger.get_data_shreds(slot, 0, 1, &mut buf).is_ok());
let max_purge_slot = 1; let max_purge_slot = 1;
ledger.run_purge(0, max_purge_slot).unwrap(); ledger
.run_purge(0, max_purge_slot, PurgeType::PrimaryIndex)
.unwrap();
*ledger.lowest_cleanup_slot.write().unwrap() = max_purge_slot; *ledger.lowest_cleanup_slot.write().unwrap() = max_purge_slot;
let mut buf = [0; 4096]; let mut buf = [0; 4096];
@ -5034,49 +4764,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_purge_slots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, 5);
test_all_empty_or_min(&blockstore, 6);
blockstore.purge_slots(0, 50);
// min slot shouldn't matter, blockstore should be empty
test_all_empty_or_min(&blockstore, 100);
test_all_empty_or_min(&blockstore, 0);
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(_, _)| {
panic!();
});
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_huge() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_slots(0, 4999);
test_all_empty_or_min(&blockstore, 5000);
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_iter_bounds() { fn test_iter_bounds() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -5791,7 +5478,7 @@ pub mod tests {
assert_eq!(first_address_entry.0, 0); assert_eq!(first_address_entry.0, 0);
assert_eq!(first_address_entry.2, slot0); assert_eq!(first_address_entry.2, slot0);
blockstore.run_purge(0, 8).unwrap(); blockstore.run_purge(0, 8, PurgeType::PrimaryIndex).unwrap();
// First successful prune freezes index 0 // First successful prune freezes index 0
assert_eq!( assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(), transaction_status_index_cf.get(0).unwrap().unwrap(),
@ -5886,7 +5573,9 @@ pub mod tests {
assert_eq!(index1_first_address_entry.0, 1); assert_eq!(index1_first_address_entry.0, 1);
assert_eq!(index1_first_address_entry.2, slot1); assert_eq!(index1_first_address_entry.2, slot1);
blockstore.run_purge(0, 18).unwrap(); blockstore
.run_purge(0, 18, PurgeType::PrimaryIndex)
.unwrap();
// Successful prune toggles TransactionStatusIndex // Successful prune toggles TransactionStatusIndex
assert_eq!( assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(), transaction_status_index_cf.get(0).unwrap().unwrap(),
@ -5932,169 +5621,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
#[allow(clippy::cognitive_complexity)]
fn test_purge_transaction_status() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_index_cf = blockstore.db.column::<cf::TransactionStatusIndex>();
let slot = 10;
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
slot,
Signature::new(&random_bytes),
vec![&Pubkey::new(&random_bytes[0..32])],
vec![&Pubkey::new(&random_bytes[32..])],
&TransactionStatusMeta::default(),
)
.unwrap();
}
// Purge to freeze index 0
blockstore.run_purge(0, 1).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
let mut address_transactions_iterator = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
(0, Pubkey::default(), 0, Signature::default()),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..10 {
let entry = address_transactions_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
// Low purge should not affect state
blockstore.run_purge(0, 5).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
let mut address_transactions_iterator = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..10 {
let entry = address_transactions_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
// Test boundary conditions: < slot should not purge statuses; <= slot should
blockstore.run_purge(0, 9).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
let mut address_transactions_iterator = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..10 {
let entry = address_transactions_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
blockstore.run_purge(0, 10).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
let padding_entry = status_entry_iterator.next().unwrap().0;
assert_eq!(padding_entry.0, 2);
assert_eq!(padding_entry.2, 0);
assert!(status_entry_iterator.next().is_none());
let mut address_transactions_iterator = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap();
let padding_entry = address_transactions_iterator.next().unwrap().0;
assert_eq!(padding_entry.0, 2);
assert_eq!(padding_entry.2, 0);
assert!(address_transactions_iterator.next().is_none());
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: true,
}
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_get_transaction_status() { fn test_get_transaction_status() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -6286,7 +5812,7 @@ pub mod tests {
); );
} }
blockstore.run_purge(0, 2).unwrap(); blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = slot; *blockstore.lowest_cleanup_slot.write().unwrap() = slot;
for (transaction, _) in expected_transactions { for (transaction, _) in expected_transactions {
let signature = transaction.signatures[0]; let signature = transaction.signatures[0];
@ -6322,7 +5848,7 @@ pub mod tests {
.unwrap(); .unwrap();
} }
// Purge to freeze index 0 // Purge to freeze index 0
blockstore.run_purge(0, 1).unwrap(); blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
let slot1 = 20; let slot1 = 20;
for x in 5..9 { for x in 5..9 {
let signature = Signature::new(&[x; 64]); let signature = Signature::new(&[x; 64]);
@ -6382,7 +5908,9 @@ pub mod tests {
} }
// Purge index 0 // Purge index 0
blockstore.run_purge(0, 10).unwrap(); blockstore
.run_purge(0, 10, PurgeType::PrimaryIndex)
.unwrap();
assert_eq!( assert_eq!(
blockstore blockstore
.get_confirmed_signatures_for_address(address0, 0, 50) .get_confirmed_signatures_for_address(address0, 0, 50)
@ -6519,7 +6047,7 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
} }
assert_eq!(blockstore.lowest_slot(), 1); assert_eq!(blockstore.lowest_slot(), 1);
blockstore.run_purge(0, 5).unwrap(); blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap();
assert_eq!(blockstore.lowest_slot(), 6); assert_eq!(blockstore.lowest_slot(), 6);
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");

File diff suppressed because it is too large Load Diff

View File

@ -337,11 +337,15 @@ impl Column for columns::TransactionStatus {
} }
fn index(key: &[u8]) -> (u64, Signature, Slot) { fn index(key: &[u8]) -> (u64, Signature, Slot) {
if key.len() != 80 {
Self::as_index(0)
} else {
let index = BigEndian::read_u64(&key[0..8]); let index = BigEndian::read_u64(&key[0..8]);
let signature = Signature::new(&key[8..72]); let signature = Signature::new(&key[8..72]);
let slot = BigEndian::read_u64(&key[72..80]); let slot = BigEndian::read_u64(&key[72..80]);
(index, signature, slot) (index, signature, slot)
} }
}
fn primary_index(index: Self::Index) -> u64 { fn primary_index(index: Self::Index) -> u64 {
index.0 index.0
@ -660,23 +664,15 @@ impl Database {
Ok(fs_extra::dir::get_size(&self.path)?) Ok(fs_extra::dir::get_size(&self.path)?)
} }
// Adds a range to delete to the given write batch and returns whether or not the column has reached // Adds a range to delete to the given write batch
// its end pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()>
pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<bool>
where where
C: Column + ColumnName, C: Column + ColumnName,
{ {
let cf = self.cf_handle::<C>(); let cf = self.cf_handle::<C>();
let from_index = C::as_index(from); let from_index = C::as_index(from);
let to_index = C::as_index(to); let to_index = C::as_index(to);
let result = batch.delete_range_cf::<C>(cf, from_index, to_index); batch.delete_range_cf::<C>(cf, from_index, to_index)
let max_slot = self
.iter::<C>(IteratorMode::End)?
.next()
.map(|(i, _)| C::primary_index(i))
.unwrap_or(0);
let end = max_slot <= to;
result.map(|_| end)
} }
} }