Ledger cleanup fixes (#9131)
* Fix purging happening every slot when cleanup service is not started at slot 0 * Purge by shred count instead of slots since slots can have variable number of shreds
This commit is contained in:
parent
e6db701c17
commit
73e99cc513
|
@ -1,6 +1,8 @@
|
||||||
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
|
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
|
||||||
|
|
||||||
use solana_ledger::blockstore::Blockstore;
|
use solana_ledger::blockstore::Blockstore;
|
||||||
|
use solana_ledger::blockstore_db::Result as BlockstoreResult;
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::datapoint_debug;
|
use solana_metrics::datapoint_debug;
|
||||||
use solana_sdk::clock::Slot;
|
use solana_sdk::clock::Slot;
|
||||||
use std::string::ToString;
|
use std::string::ToString;
|
||||||
|
@ -11,13 +13,22 @@ use std::thread;
|
||||||
use std::thread::{Builder, JoinHandle};
|
use std::thread::{Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
// - To try and keep the RocksDB size under 512GB:
|
||||||
|
// Seeing about 1600b/shred, using 2000b/shred for margin, so 250m shreds can be stored in 512gb.
|
||||||
|
// at 5k shreds/slot at 50k tps, this is 500k slots (~5.5 hours).
|
||||||
|
// At idle, 60 shreds/slot this is about 4m slots (18 days)
|
||||||
// This is chosen to allow enough time for
|
// This is chosen to allow enough time for
|
||||||
// - To try and keep the RocksDB size under 512GB at 50k tps (100 slots take ~2GB).
|
|
||||||
// - A validator to download a snapshot from a peer and boot from it
|
// - A validator to download a snapshot from a peer and boot from it
|
||||||
// - To make sure that if a validator needs to reboot from its own snapshot, it has enough slots locally
|
// - To make sure that if a validator needs to reboot from its own snapshot, it has enough slots locally
|
||||||
// to catch back up to where it was when it stopped
|
// to catch back up to where it was when it stopped
|
||||||
pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = 270_000;
|
pub const DEFAULT_MAX_LEDGER_SHREDS: u64 = 250_000_000;
|
||||||
// Remove a fixed number of slots at a time, it's more efficient than doing it one-by-one
|
|
||||||
|
// Check for removing slots at this interval so we don't purge too often
|
||||||
|
// and starve other blockstore users.
|
||||||
|
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
|
||||||
|
|
||||||
|
// Remove a limited number of slots at a time, so the operation
|
||||||
|
// does not take too long and block other blockstore users.
|
||||||
pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256;
|
pub const DEFAULT_PURGE_BATCH_SIZE: u64 = 256;
|
||||||
|
|
||||||
pub struct LedgerCleanupService {
|
pub struct LedgerCleanupService {
|
||||||
|
@ -36,7 +47,7 @@ impl LedgerCleanupService {
|
||||||
max_ledger_slots
|
max_ledger_slots
|
||||||
);
|
);
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let mut next_purge_batch = max_ledger_slots;
|
let mut last_purge_slot = 0;
|
||||||
let t_cleanup = Builder::new()
|
let t_cleanup = Builder::new()
|
||||||
.name("solana-ledger-cleanup".to_string())
|
.name("solana-ledger-cleanup".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
|
@ -47,7 +58,8 @@ impl LedgerCleanupService {
|
||||||
&new_root_receiver,
|
&new_root_receiver,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
max_ledger_slots,
|
max_ledger_slots,
|
||||||
&mut next_purge_batch,
|
&mut last_purge_slot,
|
||||||
|
DEFAULT_PURGE_SLOT_INTERVAL,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
RecvTimeoutError::Disconnected => break,
|
RecvTimeoutError::Disconnected => break,
|
||||||
|
@ -59,45 +71,123 @@ impl LedgerCleanupService {
|
||||||
Self { t_cleanup }
|
Self { t_cleanup }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn find_slots_to_clean(
|
||||||
|
blockstore: &Arc<Blockstore>,
|
||||||
|
root: Slot,
|
||||||
|
max_ledger_shreds: u64,
|
||||||
|
) -> (u64, Slot, Slot) {
|
||||||
|
let mut shreds = Vec::new();
|
||||||
|
let mut iterate_time = Measure::start("iterate_time");
|
||||||
|
let mut total_shreds = 0;
|
||||||
|
let mut first_slot = 0;
|
||||||
|
for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() {
|
||||||
|
if i == 0 {
|
||||||
|
first_slot = slot;
|
||||||
|
debug!("purge: searching from slot: {}", slot);
|
||||||
|
}
|
||||||
|
// Not exact since non-full slots will have holes
|
||||||
|
total_shreds += meta.received;
|
||||||
|
shreds.push((slot, meta.received));
|
||||||
|
if slot > root {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
iterate_time.stop();
|
||||||
|
info!(
|
||||||
|
"checking for ledger purge: max_shreds: {} slots: {} total_shreds: {} {}",
|
||||||
|
max_ledger_shreds,
|
||||||
|
shreds.len(),
|
||||||
|
total_shreds,
|
||||||
|
iterate_time
|
||||||
|
);
|
||||||
|
if (total_shreds as u64) < max_ledger_shreds {
|
||||||
|
return (0, 0, 0);
|
||||||
|
}
|
||||||
|
let mut cur_shreds = 0;
|
||||||
|
let mut lowest_slot_to_clean = shreds[0].0;
|
||||||
|
for (slot, num_shreds) in shreds.iter().rev() {
|
||||||
|
cur_shreds += *num_shreds as u64;
|
||||||
|
if cur_shreds > max_ledger_shreds {
|
||||||
|
lowest_slot_to_clean = *slot;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(cur_shreds, lowest_slot_to_clean, first_slot)
|
||||||
|
}
|
||||||
|
|
||||||
fn cleanup_ledger(
|
fn cleanup_ledger(
|
||||||
new_root_receiver: &Receiver<Slot>,
|
new_root_receiver: &Receiver<Slot>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
max_ledger_slots: u64,
|
max_ledger_shreds: u64,
|
||||||
next_purge_batch: &mut u64,
|
last_purge_slot: &mut u64,
|
||||||
|
purge_interval: u64,
|
||||||
) -> Result<(), RecvTimeoutError> {
|
) -> Result<(), RecvTimeoutError> {
|
||||||
|
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||||
|
// Get the newest root
|
||||||
|
while let Ok(new_root) = new_root_receiver.try_recv() {
|
||||||
|
root = new_root;
|
||||||
|
}
|
||||||
|
|
||||||
|
if root - *last_purge_slot > purge_interval {
|
||||||
let disk_utilization_pre = blockstore.storage_size();
|
let disk_utilization_pre = blockstore.storage_size();
|
||||||
|
info!(
|
||||||
|
"purge: new root: {} last_purge: {} purge_interval: {} disk: {:?}",
|
||||||
|
root, last_purge_slot, purge_interval, disk_utilization_pre
|
||||||
|
);
|
||||||
|
*last_purge_slot = root;
|
||||||
|
|
||||||
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
let (num_shreds_to_clean, lowest_slot_to_clean, mut first_slot) =
|
||||||
|
Self::find_slots_to_clean(blockstore, root, max_ledger_shreds);
|
||||||
|
|
||||||
// Notify blockstore of impending purge
|
if num_shreds_to_clean > 0 {
|
||||||
if root > *next_purge_batch {
|
debug!(
|
||||||
//cleanup
|
"cleaning up to: {} shreds: {} first: {}",
|
||||||
let lowest_slot = root - max_ledger_slots;
|
lowest_slot_to_clean, num_shreds_to_clean, first_slot
|
||||||
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_slot;
|
);
|
||||||
blockstore.purge_slots(0, Some(lowest_slot));
|
loop {
|
||||||
*next_purge_batch += DEFAULT_PURGE_BATCH_SIZE;
|
let current_lowest =
|
||||||
|
std::cmp::min(lowest_slot_to_clean, first_slot + DEFAULT_PURGE_BATCH_SIZE);
|
||||||
|
|
||||||
|
let mut slot_update_time = Measure::start("slot_update");
|
||||||
|
*blockstore.lowest_cleanup_slot.write().unwrap() = current_lowest;
|
||||||
|
slot_update_time.stop();
|
||||||
|
|
||||||
|
let mut clean_time = Measure::start("ledger_clean");
|
||||||
|
blockstore.purge_slots(first_slot, Some(current_lowest));
|
||||||
|
clean_time.stop();
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"ledger purge {} -> {}: {} {}",
|
||||||
|
first_slot, current_lowest, slot_update_time, clean_time
|
||||||
|
);
|
||||||
|
first_slot += DEFAULT_PURGE_BATCH_SIZE;
|
||||||
|
if current_lowest == lowest_slot_to_clean {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
thread::sleep(Duration::from_millis(500));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let disk_utilization_post = blockstore.storage_size();
|
let disk_utilization_post = blockstore.storage_size();
|
||||||
|
|
||||||
if let (Ok(disk_utilization_pre), Ok(disk_utilization_post)) =
|
Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post);
|
||||||
(disk_utilization_pre, disk_utilization_post)
|
|
||||||
{
|
|
||||||
datapoint_debug!(
|
|
||||||
"ledger_disk_utilization",
|
|
||||||
("disk_utilization_pre", disk_utilization_pre as i64, i64),
|
|
||||||
("disk_utilization_post", disk_utilization_post as i64, i64),
|
|
||||||
(
|
|
||||||
"disk_utilization_delta",
|
|
||||||
(disk_utilization_pre as i64 - disk_utilization_post as i64),
|
|
||||||
i64
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn report_disk_metrics(pre: BlockstoreResult<u64>, post: BlockstoreResult<u64>) {
|
||||||
|
if let (Ok(pre), Ok(post)) = (pre, post) {
|
||||||
|
datapoint_debug!(
|
||||||
|
"ledger_disk_utilization",
|
||||||
|
("disk_utilization_pre", pre as i64, i64),
|
||||||
|
("disk_utilization_post", post as i64, i64),
|
||||||
|
("disk_utilization_delta", (pre as i64 - post as i64), i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
self.t_cleanup.join()
|
self.t_cleanup.join()
|
||||||
}
|
}
|
||||||
|
@ -111,6 +201,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_cleanup() {
|
fn test_cleanup() {
|
||||||
|
solana_logger::setup();
|
||||||
let blockstore_path = get_tmp_ledger_path!();
|
let blockstore_path = get_tmp_ledger_path!();
|
||||||
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||||
let (shreds, _) = make_many_slot_entries(0, 50, 5);
|
let (shreds, _) = make_many_slot_entries(0, 50, 5);
|
||||||
|
@ -118,10 +209,10 @@ mod tests {
|
||||||
let blockstore = Arc::new(blockstore);
|
let blockstore = Arc::new(blockstore);
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
|
|
||||||
//send a signal to kill slots 0-40
|
//send a signal to kill all but 5 shreds, which will be in the newest slots
|
||||||
let mut next_purge_slot = 0;
|
let mut last_purge_slot = 0;
|
||||||
sender.send(50).unwrap();
|
sender.send(50).unwrap();
|
||||||
LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 10, &mut next_purge_slot)
|
LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
//check that 0-40 don't exist
|
//check that 0-40 don't exist
|
||||||
|
@ -134,6 +225,62 @@ mod tests {
|
||||||
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cleanup_speed() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let blockstore_path = get_tmp_ledger_path!();
|
||||||
|
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
|
||||||
|
blockstore.set_no_compaction(true);
|
||||||
|
let blockstore = Arc::new(blockstore);
|
||||||
|
let (sender, receiver) = channel();
|
||||||
|
|
||||||
|
let mut first_insert = Measure::start("first_insert");
|
||||||
|
let initial_slots = 50;
|
||||||
|
let initial_entries = 5;
|
||||||
|
let (shreds, _) = make_many_slot_entries(0, initial_slots, initial_entries);
|
||||||
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||||
|
first_insert.stop();
|
||||||
|
info!("{}", first_insert);
|
||||||
|
|
||||||
|
let mut last_purge_slot = 0;
|
||||||
|
let mut slot = initial_slots;
|
||||||
|
let mut num_slots = 6;
|
||||||
|
for _ in 0..5 {
|
||||||
|
let mut insert_time = Measure::start("insert time");
|
||||||
|
let batch_size = 2;
|
||||||
|
let batches = num_slots / batch_size;
|
||||||
|
for i in 0..batches {
|
||||||
|
let (shreds, _) = make_many_slot_entries(slot + i * batch_size, batch_size, 5);
|
||||||
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||||
|
if i % 100 == 0 {
|
||||||
|
info!("inserting..{} of {}", i, batches);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
insert_time.stop();
|
||||||
|
|
||||||
|
let mut time = Measure::start("purge time");
|
||||||
|
sender.send(slot + num_slots).unwrap();
|
||||||
|
LedgerCleanupService::cleanup_ledger(
|
||||||
|
&receiver,
|
||||||
|
&blockstore,
|
||||||
|
initial_slots,
|
||||||
|
&mut last_purge_slot,
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
time.stop();
|
||||||
|
info!(
|
||||||
|
"slot: {} size: {} {} {}",
|
||||||
|
slot, num_slots, insert_time, time
|
||||||
|
);
|
||||||
|
slot += num_slots;
|
||||||
|
num_slots *= 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(blockstore);
|
||||||
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_compaction() {
|
fn test_compaction() {
|
||||||
let blockstore_path = get_tmp_ledger_path!();
|
let blockstore_path = get_tmp_ledger_path!();
|
||||||
|
@ -142,7 +289,7 @@ mod tests {
|
||||||
let n = 10_000;
|
let n = 10_000;
|
||||||
let batch_size = 100;
|
let batch_size = 100;
|
||||||
let batches = n / batch_size;
|
let batches = n / batch_size;
|
||||||
let max_ledger_slots = 100;
|
let max_ledger_shreds = 100;
|
||||||
|
|
||||||
for i in 0..batches {
|
for i in 0..batches {
|
||||||
let (shreds, _) = make_many_slot_entries(i * batch_size, batch_size, 1);
|
let (shreds, _) = make_many_slot_entries(i * batch_size, batch_size, 1);
|
||||||
|
@ -158,8 +305,9 @@ mod tests {
|
||||||
LedgerCleanupService::cleanup_ledger(
|
LedgerCleanupService::cleanup_ledger(
|
||||||
&receiver,
|
&receiver,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
max_ledger_slots,
|
max_ledger_shreds,
|
||||||
&mut next_purge_batch,
|
&mut next_purge_batch,
|
||||||
|
10,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -170,7 +318,7 @@ mod tests {
|
||||||
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
|
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
|
||||||
|
|
||||||
// check that early slots don't exist
|
// check that early slots don't exist
|
||||||
let max_slot = n - max_ledger_slots;
|
let max_slot = n - max_ledger_shreds - 1;
|
||||||
blockstore
|
blockstore
|
||||||
.slot_meta_iterator(0)
|
.slot_meta_iterator(0)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
@ -53,7 +53,7 @@ pub enum BlockstoreError {
|
||||||
FsExtraError(#[from] fs_extra::error::Error),
|
FsExtraError(#[from] fs_extra::error::Error),
|
||||||
SlotCleanedUp,
|
SlotCleanedUp,
|
||||||
}
|
}
|
||||||
pub(crate) type Result<T> = std::result::Result<T, BlockstoreError>;
|
pub type Result<T> = std::result::Result<T, BlockstoreError>;
|
||||||
|
|
||||||
impl std::fmt::Display for BlockstoreError {
|
impl std::fmt::Display for BlockstoreError {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
|
|
@ -10,7 +10,7 @@ use solana_clap_utils::{
|
||||||
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
|
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
|
||||||
};
|
};
|
||||||
use solana_client::rpc_client::RpcClient;
|
use solana_client::rpc_client::RpcClient;
|
||||||
use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
|
use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SHREDS;
|
||||||
use solana_core::{
|
use solana_core::{
|
||||||
cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
|
cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
|
@ -401,7 +401,7 @@ fn download_then_check_genesis_hash(
|
||||||
pub fn main() {
|
pub fn main() {
|
||||||
let default_dynamic_port_range =
|
let default_dynamic_port_range =
|
||||||
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
||||||
let default_limit_ledger_size = &DEFAULT_MAX_LEDGER_SLOTS.to_string();
|
let default_limit_ledger_size = &DEFAULT_MAX_LEDGER_SHREDS.to_string();
|
||||||
|
|
||||||
let matches = App::new(crate_name!()).about(crate_description!())
|
let matches = App::new(crate_name!()).about(crate_description!())
|
||||||
.version(solana_clap_utils::version!())
|
.version(solana_clap_utils::version!())
|
||||||
|
@ -615,12 +615,12 @@ pub fn main() {
|
||||||
.arg(
|
.arg(
|
||||||
clap::Arg::with_name("limit_ledger_size")
|
clap::Arg::with_name("limit_ledger_size")
|
||||||
.long("limit-ledger-size")
|
.long("limit-ledger-size")
|
||||||
.value_name("SLOT_COUNT")
|
.value_name("SHRED_COUNT")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.min_values(0)
|
.min_values(0)
|
||||||
.max_values(1)
|
.max_values(1)
|
||||||
.default_value(default_limit_ledger_size)
|
.default_value(default_limit_ledger_size)
|
||||||
.help("Drop ledger data for slots older than this value"),
|
.help("Keep this amount of shreds in root slots."),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
clap::Arg::with_name("skip_poh_verify")
|
clap::Arg::with_name("skip_poh_verify")
|
||||||
|
@ -878,10 +878,10 @@ pub fn main() {
|
||||||
|
|
||||||
if matches.is_present("limit_ledger_size") {
|
if matches.is_present("limit_ledger_size") {
|
||||||
let limit_ledger_size = value_t_or_exit!(matches, "limit_ledger_size", u64);
|
let limit_ledger_size = value_t_or_exit!(matches, "limit_ledger_size", u64);
|
||||||
if limit_ledger_size < DEFAULT_MAX_LEDGER_SLOTS {
|
if limit_ledger_size < DEFAULT_MAX_LEDGER_SHREDS {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"The provided --limit-ledger-size value was too small, the minimum value is {}",
|
"The provided --limit-ledger-size value was too small, the minimum value is {}",
|
||||||
DEFAULT_MAX_LEDGER_SLOTS
|
DEFAULT_MAX_LEDGER_SHREDS
|
||||||
);
|
);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue