diff --git a/Cargo.lock b/Cargo.lock index e26b78abe..9d5e5ad87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4117,6 +4117,7 @@ dependencies = [ "futures 0.3.5", "futures-util", "histogram", + "itertools 0.9.0", "log 0.4.8", "regex", "serde_json", diff --git a/core/src/validator.rs b/core/src/validator.rs index 14664005b..ffe4c7dd0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -934,8 +934,8 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi let end_slot = last_slot.unwrap(); info!("Purging slots {} to {}", start_slot, end_slot); - blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); blockstore.purge_from_next_slots(start_slot, end_slot); + blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); info!("Purging done, compacting db.."); if let Err(e) = blockstore.compact_storage(start_slot, end_slot) { warn!( diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index cba01e925..284f2fe6b 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -15,6 +15,7 @@ clap = "2.33.1" futures = "0.3.5" futures-util = "0.3.5" histogram = "*" +itertools = "0.9.0" log = { version = "0.4.8" } regex = "1" serde_json = "1.0.56" diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 04a1c96a2..b3b17ee9c 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -2,6 +2,7 @@ use clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg, ArgMatches, SubCommand, }; +use itertools::Itertools; use log::*; use regex::Regex; use serde_json::json; @@ -889,6 +890,11 @@ fn main() { ) .arg(&allow_dead_slots_arg) ) + .subcommand( + SubCommand::with_name("dead-slots") + .arg(&starting_slot_arg) + .about("Print all of dead slots") + ) .subcommand( SubCommand::with_name("set-dead-slot") .about("Mark one or more slots dead") @@ -1203,6 +1209,14 @@ fn main() { .value_name("SLOT") .help("Ending slot to stop purging (inclusive) [default: the highest slot in the ledger]"), ) + .arg( + Arg::with_name("batch_size") + .long("batch-size") + .value_name("NUM") + .takes_value(true) + .default_value("1000") + .help("Removes at most BATCH_SIZE slots while purging in loop"), + ) .arg( Arg::with_name("no_compaction") .long("no-compaction") @@ -1210,6 +1224,13 @@ fn main() { .takes_value(false) .help("Skip ledger compaction after purge") ) + .arg( + Arg::with_name("dead_slots_only") + .long("dead-slots-only") + .required(false) + .takes_value(false) + .help("Limit puring to dead slots only") + ) ) .subcommand( SubCommand::with_name("list-roots") @@ -1445,6 +1466,17 @@ fn main() { true, ); } + ("dead-slots", Some(arg_matches)) => { + let blockstore = open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ); + let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); + for slot in blockstore.dead_slots_iterator(starting_slot).unwrap() { + println!("{}", slot); + } + } ("set-dead-slot", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); let blockstore = @@ -2045,9 +2077,15 @@ fn main() { ("purge", Some(arg_matches)) => { let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); let end_slot = value_t!(arg_matches, "end_slot", Slot).ok(); - let no_compaction = arg_matches.is_present("no-compaction"); - let blockstore = - open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode); + let no_compaction = arg_matches.is_present("no_compaction"); + let dead_slots_only = arg_matches.is_present("dead_slots_only"); + let batch_size = value_t_or_exit!(arg_matches, "batch_size", usize); + let access_type = if !no_compaction { + AccessType::PrimaryOnly + } else { + AccessType::PrimaryOnlyForMaintenance + }; + let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode); let end_slot = match end_slot { Some(end_slot) => end_slot, @@ -2074,13 +2112,48 @@ fn main() { ); exit(1); } - println!("Purging data from slots {} to {}", start_slot, end_slot); - if no_compaction { - blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); + info!( + "Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})", + start_slot, + end_slot, + end_slot - start_slot, + no_compaction, + dead_slots_only, + ); + let purge_from_blockstore = |start_slot, end_slot| { + blockstore.purge_from_next_slots(start_slot, end_slot); + if no_compaction { + blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); + } else { + blockstore.purge_and_compact_slots(start_slot, end_slot); + } + }; + if !dead_slots_only { + let slots_iter = &(start_slot..=end_slot).chunks(batch_size); + for slots in slots_iter { + let slots = slots.collect::>(); + assert!(!slots.is_empty()); + + let start_slot = *slots.first().unwrap(); + let end_slot = *slots.last().unwrap(); + info!( + "Purging chunked slots from {} to {} ({} slots)", + start_slot, + end_slot, + end_slot - start_slot + ); + purge_from_blockstore(start_slot, end_slot); + } } else { - blockstore.purge_and_compact_slots(start_slot, end_slot); + let dead_slots_iter = blockstore + .dead_slots_iterator(start_slot) + .unwrap() + .take_while(|s| *s <= end_slot); + for dead_slot in dead_slots_iter { + info!("Purging dead slot {}", dead_slot); + purge_from_blockstore(dead_slot, dead_slot); + } } - blockstore.purge_from_next_slots(start_slot, end_slot); } ("list-roots", Some(arg_matches)) => { let blockstore = open_blockstore( diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 5030858c8..0d4ad0a91 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -59,7 +59,11 @@ impl Blockstore { meta.next_slots .retain(|slot| *slot < from_slot || *slot > to_slot); if meta.next_slots.len() != original_len { - info!("purge_from_next_slots: adjusted meta for slot {}", slot); + info!( + "purge_from_next_slots: meta for slot {} no longer refers to slots {:?}", + slot, + from_slot..=to_slot + ); self.put_meta_bytes( slot, &bincode::serialize(&meta).expect("couldn't update meta"), diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 9337d764e..e77a09c00 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -154,6 +154,7 @@ pub mod columns { pub enum AccessType { PrimaryOnly, + PrimaryOnlyForMaintenance, // this indicates no compaction TryPrimaryThenSecondary, } @@ -217,37 +218,45 @@ impl Rocks { fs::create_dir_all(&path)?; // Use default database options - let mut db_options = get_db_options(); + if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { + warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update..."); + } + let mut db_options = get_db_options(&access_type); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); } // Column family names - let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); + let meta_cf_descriptor = + ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type)); let dead_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type)); let duplicate_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type)); let erasure_meta_cf_descriptor = - ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); - let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); - let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); - let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type)); + let orphans_cf_descriptor = + ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type)); + let root_cf_descriptor = + ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type)); + let index_cf_descriptor = + ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type)); let shred_data_cf_descriptor = - ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type)); let shred_code_cf_descriptor = - ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type)); let transaction_status_cf_descriptor = - ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type)); let address_signatures_cf_descriptor = - ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type)); let transaction_status_index_cf_descriptor = - ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options()); - let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type)); + let rewards_cf_descriptor = + ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type)); let blocktime_cf_descriptor = - ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type)); let perf_samples_cf_descriptor = - ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type)); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), @@ -272,7 +281,7 @@ impl Rocks { // Open the database let db = match access_type { - AccessType::PrimaryOnly => Rocks( + AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks( DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?, ActualAccessType::Primary, ), @@ -1003,7 +1012,7 @@ impl<'a> WriteBatch<'a> { } } -fn get_cf_options() -> Options { +fn get_cf_options(access_type: &AccessType) -> Options { let mut options = Options::default(); // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM options.set_max_write_buffer_number(8); @@ -1017,10 +1026,14 @@ fn get_cf_options() -> Options { options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32); options.set_max_bytes_for_level_base(total_size_base); options.set_target_file_size_base(file_size_base); + if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { + options.set_disable_auto_compactions(true); + } + options } -fn get_db_options() -> Options { +fn get_db_options(access_type: &AccessType) -> Options { let mut options = Options::default(); options.create_if_missing(true); options.create_missing_column_families(true); @@ -1029,6 +1042,9 @@ fn get_db_options() -> Options { // Set max total wal size to 4G. options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); + if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { + options.set_disable_auto_compactions(true); + } options }