Remove ledger purge batching (#10830)
This commit is contained in:
parent
59aa299d05
commit
583cec922b
|
@ -29,9 +29,6 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
|
||||||
// and starve other blockstore users.
|
// and starve other blockstore users.
|
||||||
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
|
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
|
||||||
|
|
||||||
// Delay between purges to cooperate with other blockstore users
|
|
||||||
pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);
|
|
||||||
|
|
||||||
// Compacting at a slower interval than purging helps keep IOPS down.
|
// Compacting at a slower interval than purging helps keep IOPS down.
|
||||||
// Once a day should be ample
|
// Once a day should be ample
|
||||||
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;
|
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;
|
||||||
|
@ -67,7 +64,6 @@ impl LedgerCleanupService {
|
||||||
max_ledger_shreds,
|
max_ledger_shreds,
|
||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
DEFAULT_PURGE_SLOT_INTERVAL,
|
DEFAULT_PURGE_SLOT_INTERVAL,
|
||||||
Some(DEFAULT_DELAY_BETWEEN_PURGES),
|
|
||||||
&mut last_compaction_slot,
|
&mut last_compaction_slot,
|
||||||
DEFAULT_COMPACTION_SLOT_INTERVAL,
|
DEFAULT_COMPACTION_SLOT_INTERVAL,
|
||||||
) {
|
) {
|
||||||
|
@ -142,7 +138,6 @@ impl LedgerCleanupService {
|
||||||
max_ledger_shreds: u64,
|
max_ledger_shreds: u64,
|
||||||
last_purge_slot: &mut u64,
|
last_purge_slot: &mut u64,
|
||||||
purge_interval: u64,
|
purge_interval: u64,
|
||||||
delay_between_purges: Option<Duration>,
|
|
||||||
last_compaction_slot: &mut u64,
|
last_compaction_slot: &mut u64,
|
||||||
compaction_interval: u64,
|
compaction_interval: u64,
|
||||||
) -> Result<(), RecvTimeoutError> {
|
) -> Result<(), RecvTimeoutError> {
|
||||||
|
@ -156,6 +151,7 @@ impl LedgerCleanupService {
|
||||||
"purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}",
|
"purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}",
|
||||||
root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre
|
root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre
|
||||||
);
|
);
|
||||||
|
|
||||||
*last_purge_slot = root;
|
*last_purge_slot = root;
|
||||||
|
|
||||||
let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
|
let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
|
||||||
|
@ -183,11 +179,10 @@ impl LedgerCleanupService {
|
||||||
purge_first_slot, lowest_cleanup_slot
|
purge_first_slot, lowest_cleanup_slot
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut purge_time = Measure::start("purge_slots_with_delay");
|
let mut purge_time = Measure::start("purge_slots");
|
||||||
blockstore.purge_slots_with_delay(
|
blockstore.purge_slots(
|
||||||
purge_first_slot,
|
purge_first_slot,
|
||||||
lowest_cleanup_slot,
|
lowest_cleanup_slot,
|
||||||
delay_between_purges,
|
|
||||||
PurgeType::PrimaryIndex,
|
PurgeType::PrimaryIndex,
|
||||||
);
|
);
|
||||||
purge_time.stop();
|
purge_time.stop();
|
||||||
|
@ -275,7 +270,6 @@ mod tests {
|
||||||
5,
|
5,
|
||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
|
||||||
&mut last_compaction_slot,
|
&mut last_compaction_slot,
|
||||||
10,
|
10,
|
||||||
)
|
)
|
||||||
|
@ -333,7 +327,6 @@ mod tests {
|
||||||
initial_slots,
|
initial_slots,
|
||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
|
||||||
&mut last_compaction_slot,
|
&mut last_compaction_slot,
|
||||||
10,
|
10,
|
||||||
)
|
)
|
||||||
|
|
|
@ -684,7 +684,7 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
|
||||||
|
|
||||||
let end_slot = last_slot.unwrap();
|
let end_slot = last_slot.unwrap();
|
||||||
info!("Purging slots {} to {}", start_slot, end_slot);
|
info!("Purging slots {} to {}", start_slot, end_slot);
|
||||||
blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact);
|
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
|
||||||
blockstore.purge_from_next_slots(start_slot, end_slot);
|
blockstore.purge_from_next_slots(start_slot, end_slot);
|
||||||
info!("Purging done, compacting db..");
|
info!("Purging done, compacting db..");
|
||||||
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
|
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
|
||||||
|
|
|
@ -382,7 +382,6 @@ mod tests {
|
||||||
max_ledger_shreds,
|
max_ledger_shreds,
|
||||||
&mut last_purge_slot,
|
&mut last_purge_slot,
|
||||||
10,
|
10,
|
||||||
None,
|
|
||||||
&mut last_compaction_slot,
|
&mut last_compaction_slot,
|
||||||
10,
|
10,
|
||||||
)
|
)
|
||||||
|
|
|
@ -1432,7 +1432,7 @@ fn main() {
|
||||||
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
|
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
|
||||||
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
|
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
|
||||||
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
|
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
|
||||||
blockstore.purge_slots(start_slot, end_slot);
|
blockstore.purge_and_compact_slots(start_slot, end_slot);
|
||||||
blockstore.purge_from_next_slots(start_slot, end_slot);
|
blockstore.purge_from_next_slots(start_slot, end_slot);
|
||||||
}
|
}
|
||||||
("list-roots", Some(arg_matches)) => {
|
("list-roots", Some(arg_matches)) => {
|
||||||
|
|
|
@ -6155,14 +6155,14 @@ pub mod tests {
|
||||||
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
|
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test inserting just the codes, enough for recovery
|
// Test inserting just the codes, enough for recovery
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
|
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test inserting some codes, but not enough for recovery
|
// Test inserting some codes, but not enough for recovery
|
||||||
blockstore
|
blockstore
|
||||||
|
@ -6173,7 +6173,7 @@ pub mod tests {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test inserting just the codes, and some data, enough for recovery
|
// Test inserting just the codes, and some data, enough for recovery
|
||||||
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
|
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
|
||||||
|
@ -6185,7 +6185,7 @@ pub mod tests {
|
||||||
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
|
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test inserting some codes, and some data, but enough for recovery
|
// Test inserting some codes, and some data, but enough for recovery
|
||||||
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
|
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
|
||||||
|
@ -6197,7 +6197,7 @@ pub mod tests {
|
||||||
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
|
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test inserting all shreds in 2 rounds, make sure nothing is lost
|
// Test inserting all shreds in 2 rounds, make sure nothing is lost
|
||||||
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
|
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
|
||||||
|
@ -6217,7 +6217,7 @@ pub mod tests {
|
||||||
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
|
// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
|
||||||
// make sure nothing is lost
|
// make sure nothing is lost
|
||||||
|
@ -6242,7 +6242,7 @@ pub mod tests {
|
||||||
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
|
|
||||||
// Test insert shreds in 2 rounds, but not enough to trigger
|
// Test insert shreds in 2 rounds, but not enough to trigger
|
||||||
// recovery, make sure nothing is lost
|
// recovery, make sure nothing is lost
|
||||||
|
@ -6267,7 +6267,7 @@ pub mod tests {
|
||||||
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
verify_index_integrity(&blockstore, slot);
|
verify_index_integrity(&blockstore, slot);
|
||||||
blockstore.purge_slots(0, slot);
|
blockstore.purge_and_compact_slots(0, slot);
|
||||||
}
|
}
|
||||||
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct PurgeStats {
|
pub struct PurgeStats {
|
||||||
|
@ -12,61 +11,28 @@ impl Blockstore {
|
||||||
/// Dangerous; Use with care:
|
/// Dangerous; Use with care:
|
||||||
/// Does not check for integrity and does not update slot metas that refer to deleted slots
|
/// Does not check for integrity and does not update slot metas that refer to deleted slots
|
||||||
/// Modifies multiple column families simultaneously
|
/// Modifies multiple column families simultaneously
|
||||||
pub fn purge_slots_with_delay(
|
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType) {
|
||||||
&self,
|
|
||||||
from_slot: Slot,
|
|
||||||
to_slot: Slot,
|
|
||||||
delay_between_purges: Option<Duration>,
|
|
||||||
purge_type: PurgeType,
|
|
||||||
) {
|
|
||||||
// if there's no upper bound, split the purge request into batches of 1000 slots
|
|
||||||
const PURGE_BATCH_SIZE: u64 = 1000;
|
|
||||||
let mut batch_start = from_slot;
|
|
||||||
let mut purge_stats = PurgeStats::default();
|
let mut purge_stats = PurgeStats::default();
|
||||||
let mut last_datapoint = Instant::now();
|
let purge_result =
|
||||||
let mut datapoint_start = batch_start;
|
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut purge_stats);
|
||||||
while batch_start < to_slot {
|
|
||||||
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);
|
|
||||||
|
|
||||||
let purge_result =
|
datapoint_info!(
|
||||||
self.run_purge_with_stats(batch_start, batch_end, purge_type, &mut purge_stats);
|
"blockstore-purge",
|
||||||
|
("from_slot", from_slot as i64, i64),
|
||||||
if last_datapoint.elapsed().as_millis() > 1000 {
|
("to_slot", to_slot as i64, i64),
|
||||||
datapoint_info!(
|
("delete_range_us", purge_stats.delete_range as i64, i64),
|
||||||
"blockstore-purge",
|
("write_batch_us", purge_stats.write_batch as i64, i64)
|
||||||
("from_slot", datapoint_start as i64, i64),
|
);
|
||||||
("to_slot", batch_end as i64, i64),
|
if let Err(e) = purge_result {
|
||||||
("delete_range_us", purge_stats.delete_range as i64, i64),
|
error!(
|
||||||
("write_batch_us", purge_stats.write_batch as i64, i64)
|
"Error: {:?}; Purge failed in range {:?} to {:?}",
|
||||||
);
|
e, from_slot, to_slot
|
||||||
last_datapoint = Instant::now();
|
);
|
||||||
purge_stats = PurgeStats::default();
|
|
||||||
datapoint_start = batch_end;
|
|
||||||
}
|
|
||||||
|
|
||||||
match purge_result {
|
|
||||||
Ok(_all_columns_purged) => {
|
|
||||||
batch_start = batch_end;
|
|
||||||
|
|
||||||
if let Some(ref duration) = delay_between_purges {
|
|
||||||
// Cooperate with other blockstore users
|
|
||||||
std::thread::sleep(*duration);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
"Error: {:?}; Purge failed in range {:?} to {:?}",
|
|
||||||
e, batch_start, batch_end
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: rename purge_slots() to purge_and_compact_slots()
|
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
|
||||||
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
|
self.purge_slots(from_slot, to_slot, PurgeType::Exact);
|
||||||
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact);
|
|
||||||
if let Err(e) = self.compact_storage(from_slot, to_slot) {
|
if let Err(e) = self.compact_storage(from_slot, to_slot) {
|
||||||
// This error is not fatal and indicates an internal error?
|
// This error is not fatal and indicates an internal error?
|
||||||
error!(
|
error!(
|
||||||
|
@ -443,11 +409,11 @@ pub mod tests {
|
||||||
let (shreds, _) = make_many_slot_entries(0, 50, 5);
|
let (shreds, _) = make_many_slot_entries(0, 50, 5);
|
||||||
blockstore.insert_shreds(shreds, None, false).unwrap();
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||||
|
|
||||||
blockstore.purge_slots(0, 5);
|
blockstore.purge_and_compact_slots(0, 5);
|
||||||
|
|
||||||
test_all_empty_or_min(&blockstore, 6);
|
test_all_empty_or_min(&blockstore, 6);
|
||||||
|
|
||||||
blockstore.purge_slots(0, 50);
|
blockstore.purge_and_compact_slots(0, 50);
|
||||||
|
|
||||||
// min slot shouldn't matter, blockstore should be empty
|
// min slot shouldn't matter, blockstore should be empty
|
||||||
test_all_empty_or_min(&blockstore, 100);
|
test_all_empty_or_min(&blockstore, 100);
|
||||||
|
@ -471,7 +437,7 @@ pub mod tests {
|
||||||
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
|
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
|
||||||
blockstore.insert_shreds(shreds, None, false).unwrap();
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||||
|
|
||||||
blockstore.purge_slots(0, 4999);
|
blockstore.purge_and_compact_slots(0, 4999);
|
||||||
|
|
||||||
test_all_empty_or_min(&blockstore, 5000);
|
test_all_empty_or_min(&blockstore, 5000);
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() {
|
||||||
assert_eq!(meta0.next_slots, expected_next_slots);
|
assert_eq!(meta0.next_slots, expected_next_slots);
|
||||||
|
|
||||||
// Delete slots for next iteration
|
// Delete slots for next iteration
|
||||||
blockstore.purge_slots(0, num_threads + 1);
|
blockstore.purge_and_compact_slots(0, num_threads + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
|
|
Loading…
Reference in New Issue