Add ledger-tool dead-slots and improve purge a lot (#13065)
* Add ledger-tool dead-slots and improve purge a lot * Reduce batch size... * Add --dead-slots-only and fixed purge ordering
This commit is contained in:
parent
e10de86440
commit
0776fa05c7
|
@ -4117,6 +4117,7 @@ dependencies = [
|
|||
"futures 0.3.5",
|
||||
"futures-util",
|
||||
"histogram",
|
||||
"itertools 0.9.0",
|
||||
"log 0.4.8",
|
||||
"regex",
|
||||
"serde_json",
|
||||
|
|
|
@ -934,8 +934,8 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
|
|||
|
||||
let end_slot = last_slot.unwrap();
|
||||
info!("Purging slots {} to {}", start_slot, end_slot);
|
||||
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
|
||||
blockstore.purge_from_next_slots(start_slot, end_slot);
|
||||
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
|
||||
info!("Purging done, compacting db..");
|
||||
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
|
||||
warn!(
|
||||
|
|
|
@ -15,6 +15,7 @@ clap = "2.33.1"
|
|||
futures = "0.3.5"
|
||||
futures-util = "0.3.5"
|
||||
histogram = "*"
|
||||
itertools = "0.9.0"
|
||||
log = { version = "0.4.8" }
|
||||
regex = "1"
|
||||
serde_json = "1.0.56"
|
||||
|
|
|
@ -2,6 +2,7 @@ use clap::{
|
|||
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg,
|
||||
ArgMatches, SubCommand,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use serde_json::json;
|
||||
|
@ -889,6 +890,11 @@ fn main() {
|
|||
)
|
||||
.arg(&allow_dead_slots_arg)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("dead-slots")
|
||||
.arg(&starting_slot_arg)
|
||||
.about("Print all of dead slots")
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("set-dead-slot")
|
||||
.about("Mark one or more slots dead")
|
||||
|
@ -1203,6 +1209,14 @@ fn main() {
|
|||
.value_name("SLOT")
|
||||
.help("Ending slot to stop purging (inclusive) [default: the highest slot in the ledger]"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("batch_size")
|
||||
.long("batch-size")
|
||||
.value_name("NUM")
|
||||
.takes_value(true)
|
||||
.default_value("1000")
|
||||
.help("Removes at most BATCH_SIZE slots while purging in loop"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("no_compaction")
|
||||
.long("no-compaction")
|
||||
|
@ -1210,6 +1224,13 @@ fn main() {
|
|||
.takes_value(false)
|
||||
.help("Skip ledger compaction after purge")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("dead_slots_only")
|
||||
.long("dead-slots-only")
|
||||
.required(false)
|
||||
.takes_value(false)
|
||||
.help("Limit puring to dead slots only")
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("list-roots")
|
||||
|
@ -1445,6 +1466,17 @@ fn main() {
|
|||
true,
|
||||
);
|
||||
}
|
||||
("dead-slots", Some(arg_matches)) => {
|
||||
let blockstore = open_blockstore(
|
||||
&ledger_path,
|
||||
AccessType::TryPrimaryThenSecondary,
|
||||
wal_recovery_mode,
|
||||
);
|
||||
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
|
||||
for slot in blockstore.dead_slots_iterator(starting_slot).unwrap() {
|
||||
println!("{}", slot);
|
||||
}
|
||||
}
|
||||
("set-dead-slot", Some(arg_matches)) => {
|
||||
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
|
||||
let blockstore =
|
||||
|
@ -2045,9 +2077,15 @@ fn main() {
|
|||
("purge", Some(arg_matches)) => {
|
||||
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
|
||||
let end_slot = value_t!(arg_matches, "end_slot", Slot).ok();
|
||||
let no_compaction = arg_matches.is_present("no-compaction");
|
||||
let blockstore =
|
||||
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
|
||||
let no_compaction = arg_matches.is_present("no_compaction");
|
||||
let dead_slots_only = arg_matches.is_present("dead_slots_only");
|
||||
let batch_size = value_t_or_exit!(arg_matches, "batch_size", usize);
|
||||
let access_type = if !no_compaction {
|
||||
AccessType::PrimaryOnly
|
||||
} else {
|
||||
AccessType::PrimaryOnlyForMaintenance
|
||||
};
|
||||
let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode);
|
||||
|
||||
let end_slot = match end_slot {
|
||||
Some(end_slot) => end_slot,
|
||||
|
@ -2074,13 +2112,48 @@ fn main() {
|
|||
);
|
||||
exit(1);
|
||||
}
|
||||
println!("Purging data from slots {} to {}", start_slot, end_slot);
|
||||
info!(
|
||||
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
|
||||
start_slot,
|
||||
end_slot,
|
||||
end_slot - start_slot,
|
||||
no_compaction,
|
||||
dead_slots_only,
|
||||
);
|
||||
let purge_from_blockstore = |start_slot, end_slot| {
|
||||
blockstore.purge_from_next_slots(start_slot, end_slot);
|
||||
if no_compaction {
|
||||
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
|
||||
} else {
|
||||
blockstore.purge_and_compact_slots(start_slot, end_slot);
|
||||
}
|
||||
blockstore.purge_from_next_slots(start_slot, end_slot);
|
||||
};
|
||||
if !dead_slots_only {
|
||||
let slots_iter = &(start_slot..=end_slot).chunks(batch_size);
|
||||
for slots in slots_iter {
|
||||
let slots = slots.collect::<Vec<_>>();
|
||||
assert!(!slots.is_empty());
|
||||
|
||||
let start_slot = *slots.first().unwrap();
|
||||
let end_slot = *slots.last().unwrap();
|
||||
info!(
|
||||
"Purging chunked slots from {} to {} ({} slots)",
|
||||
start_slot,
|
||||
end_slot,
|
||||
end_slot - start_slot
|
||||
);
|
||||
purge_from_blockstore(start_slot, end_slot);
|
||||
}
|
||||
} else {
|
||||
let dead_slots_iter = blockstore
|
||||
.dead_slots_iterator(start_slot)
|
||||
.unwrap()
|
||||
.take_while(|s| *s <= end_slot);
|
||||
for dead_slot in dead_slots_iter {
|
||||
info!("Purging dead slot {}", dead_slot);
|
||||
purge_from_blockstore(dead_slot, dead_slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
("list-roots", Some(arg_matches)) => {
|
||||
let blockstore = open_blockstore(
|
||||
|
|
|
@ -59,7 +59,11 @@ impl Blockstore {
|
|||
meta.next_slots
|
||||
.retain(|slot| *slot < from_slot || *slot > to_slot);
|
||||
if meta.next_slots.len() != original_len {
|
||||
info!("purge_from_next_slots: adjusted meta for slot {}", slot);
|
||||
info!(
|
||||
"purge_from_next_slots: meta for slot {} no longer refers to slots {:?}",
|
||||
slot,
|
||||
from_slot..=to_slot
|
||||
);
|
||||
self.put_meta_bytes(
|
||||
slot,
|
||||
&bincode::serialize(&meta).expect("couldn't update meta"),
|
||||
|
|
|
@ -154,6 +154,7 @@ pub mod columns {
|
|||
|
||||
pub enum AccessType {
|
||||
PrimaryOnly,
|
||||
PrimaryOnlyForMaintenance, // this indicates no compaction
|
||||
TryPrimaryThenSecondary,
|
||||
}
|
||||
|
||||
|
@ -217,37 +218,45 @@ impl Rocks {
|
|||
fs::create_dir_all(&path)?;
|
||||
|
||||
// Use default database options
|
||||
let mut db_options = get_db_options();
|
||||
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
|
||||
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
|
||||
}
|
||||
let mut db_options = get_db_options(&access_type);
|
||||
if let Some(recovery_mode) = recovery_mode {
|
||||
db_options.set_wal_recovery_mode(recovery_mode.into());
|
||||
}
|
||||
|
||||
// Column family names
|
||||
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
|
||||
let meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
|
||||
let dead_slots_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
|
||||
let duplicate_slots_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
|
||||
let erasure_meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
|
||||
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
|
||||
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
|
||||
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
|
||||
let orphans_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
|
||||
let root_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
|
||||
let index_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
|
||||
let shred_data_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
|
||||
let shred_code_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
|
||||
let transaction_status_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
|
||||
let address_signatures_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
|
||||
let transaction_status_index_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options());
|
||||
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
|
||||
let rewards_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
|
||||
let blocktime_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
|
||||
let perf_samples_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
|
||||
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));
|
||||
|
||||
let cfs = vec![
|
||||
(SlotMeta::NAME, meta_cf_descriptor),
|
||||
|
@ -272,7 +281,7 @@ impl Rocks {
|
|||
|
||||
// Open the database
|
||||
let db = match access_type {
|
||||
AccessType::PrimaryOnly => Rocks(
|
||||
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
|
||||
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
|
||||
ActualAccessType::Primary,
|
||||
),
|
||||
|
@ -1003,7 +1012,7 @@ impl<'a> WriteBatch<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
fn get_cf_options() -> Options {
|
||||
fn get_cf_options(access_type: &AccessType) -> Options {
|
||||
let mut options = Options::default();
|
||||
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
|
||||
options.set_max_write_buffer_number(8);
|
||||
|
@ -1017,10 +1026,14 @@ fn get_cf_options() -> Options {
|
|||
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
|
||||
options.set_max_bytes_for_level_base(total_size_base);
|
||||
options.set_target_file_size_base(file_size_base);
|
||||
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
|
||||
options.set_disable_auto_compactions(true);
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
|
||||
fn get_db_options() -> Options {
|
||||
fn get_db_options(access_type: &AccessType) -> Options {
|
||||
let mut options = Options::default();
|
||||
options.create_if_missing(true);
|
||||
options.create_missing_column_families(true);
|
||||
|
@ -1029,6 +1042,9 @@ fn get_db_options() -> Options {
|
|||
|
||||
// Set max total wal size to 4G.
|
||||
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
|
||||
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
|
||||
options.set_disable_auto_compactions(true);
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue