From 58a475b7898d14a5a5278fedb9834d21970ddeaa Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 6 Jul 2020 12:43:45 -0700 Subject: [PATCH] Add db recovery methods (#10838) --- core/src/tvu.rs | 2 +- core/src/validator.rs | 6 ++- ledger-tool/src/main.rs | 88 ++++++++++++++++++++++++++++++------- ledger/src/blockstore.rs | 31 ++++++++----- ledger/src/blockstore_db.rs | 55 +++++++++++++++++++++-- validator/src/main.rs | 19 ++++++++ 6 files changed, 167 insertions(+), 34 deletions(-) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 76132040e..3af920f42 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -271,7 +271,7 @@ pub mod tests { let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config); let (blockstore, l_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(&blockstore_path) + Blockstore::open_with_signal(&blockstore_path, None) .expect("Expected to successfully open ledger"); let blockstore = Arc::new(blockstore); let bank = bank_forks.working_bank(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 9c4e38690..27d538345 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -26,6 +26,7 @@ use rand::{thread_rng, Rng}; use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, + blockstore_db::BlockstoreRecoveryMode, blockstore_processor, create_new_tmp_ledger, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, @@ -84,6 +85,7 @@ pub struct ValidatorConfig { pub no_rocksdb_compaction: bool, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, + pub wal_recovery_mode: Option, } impl Default for ValidatorConfig { @@ -111,6 +113,7 @@ impl Default for ValidatorConfig { no_rocksdb_compaction: false, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + wal_recovery_mode: None, } } } @@ -600,7 +603,8 @@ fn new_banks_from_ledger( } let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(ledger_path).expect("Failed to open ledger database"); + Blockstore::open_with_signal(ledger_path, config.wal_recovery_mode.clone()) + .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); let process_options = blockstore_processor::ProcessOptions { diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 0749ca410..9f8ae01de 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -10,7 +10,7 @@ use solana_ledger::{ ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore, - blockstore_db::{self, AccessType, Column, Database}, + blockstore_db::{self, AccessType, BlockstoreRecoveryMode, Column, Database}, blockstore_processor::ProcessOptions, rooted_slot_iterator::RootedSlotIterator, }; @@ -594,8 +594,12 @@ fn analyze_storage(database: &Database) -> Result<(), String> { Ok(()) } -fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore { - match Blockstore::open_with_access_type(ledger_path, access_type) { +fn open_blockstore( + ledger_path: &Path, + access_type: AccessType, + wal_recovery_mode: Option, +) -> Blockstore { + match Blockstore::open_with_access_type(ledger_path, access_type, wal_recovery_mode) { Ok(blockstore) => blockstore, Err(err) => { eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err); @@ -605,7 +609,7 @@ fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore { } fn open_database(ledger_path: &Path, access_type: AccessType) -> Database { - match Database::open(&ledger_path.join("rocksdb"), access_type) { + match Database::open(&ledger_path.join("rocksdb"), access_type, None) { Ok(database) => database, Err(err) => { eprintln!("Unable to read the Ledger rocksdb: {:?}", err); @@ -629,8 +633,9 @@ fn load_bank_forks( genesis_config: &GenesisConfig, process_options: ProcessOptions, access_type: AccessType, + wal_recovery_mode: Option, ) -> bank_forks_utils::LoadResult { - let blockstore = open_blockstore(&ledger_path, access_type); + let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode); let snapshot_path = ledger_path.clone().join(if blockstore.is_primary_access() { "snapshot" } else { @@ -754,6 +759,21 @@ fn main() { .global(true) .help("Use DIR for ledger location"), ) + .arg( + Arg::with_name("wal_recovery_mode") + .long("wal-recovery-mode") + .value_name("MODE") + .takes_value(true) + .global(true) + .possible_values(&[ + "tolerate_corrupted_tail_records", + "absolute_consistency", + "point_in_time", + "skip_any_corrupted_record"]) + .help( + "Mode to recovery the ledger db write ahead log." + ), + ) .subcommand( SubCommand::with_name("print") .about("Print the ledger") @@ -1048,6 +1068,10 @@ fn main() { exit(1); }); + let wal_recovery_mode = matches + .value_of("wal_recovery_mode") + .map(BlockstoreRecoveryMode::from); + match matches.subcommand() { ("print", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); @@ -1056,7 +1080,11 @@ fn main() { let only_rooted = arg_matches.is_present("only_rooted"); let verbose = arg_matches.occurrences_of("verbose"); output_ledger( - open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary), + open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ), starting_slot, allow_dead_slots, LedgerOutputMethod::Print, @@ -1069,8 +1097,8 @@ fn main() { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot); let target_db = PathBuf::from(value_t_or_exit!(arg_matches, "target_db", String)); - let source = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); - let target = open_blockstore(&target_db, AccessType::PrimaryOnly); + let source = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary, None); + let target = open_blockstore(&target_db, AccessType::PrimaryOnly, None); for (slot, _meta) in source.slot_meta_iterator(starting_slot).unwrap() { if slot > ending_slot { break; @@ -1105,6 +1133,7 @@ fn main() { &genesis_config, process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { println!( @@ -1124,7 +1153,11 @@ fn main() { ("slot", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); let allow_dead_slots = arg_matches.is_present("allow_dead_slots"); - let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); + let blockstore = open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ); for slot in slots { println!("Slot {}", slot); if let Err(err) = output_slot( @@ -1142,7 +1175,11 @@ fn main() { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let allow_dead_slots = arg_matches.is_present("allow_dead_slots"); output_ledger( - open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary), + open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ), starting_slot, allow_dead_slots, LedgerOutputMethod::Json, @@ -1153,7 +1190,8 @@ fn main() { } ("set-dead-slot", Some(arg_matches)) => { let slots = values_t_or_exit!(arg_matches, "slots", Slot); - let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly); + let blockstore = + open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode); for slot in slots { match blockstore.set_dead_slot(slot) { Ok(_) => println!("Slot {} dead", slot), @@ -1164,7 +1202,11 @@ fn main() { ("parse_full_frozen", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot); - let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); + let blockstore = open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ); let mut ancestors = BTreeSet::new(); if blockstore.meta(ending_slot).unwrap().is_none() { panic!("Ending slot doesn't exist"); @@ -1243,6 +1285,7 @@ fn main() { &open_genesis_config_by(&ledger_path, arg_matches), process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) .unwrap_or_else(|err| { eprintln!("Ledger verification failed: {:?}", err); @@ -1266,6 +1309,7 @@ fn main() { &open_genesis_config_by(&ledger_path, arg_matches), process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes")); @@ -1318,6 +1362,7 @@ fn main() { &genesis_config, process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { let bank = bank_forks @@ -1415,6 +1460,7 @@ fn main() { &genesis_config, process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { let slot = bank_forks.working_bank().slot(); @@ -1463,6 +1509,7 @@ fn main() { &genesis_config, process_options, AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, ) { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { let slot = bank_forks.working_bank().slot(); @@ -1527,12 +1574,17 @@ fn main() { ("purge", Some(arg_matches)) => { let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot); - let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly); + let blockstore = + open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode); blockstore.purge_and_compact_slots(start_slot, end_slot); blockstore.purge_from_next_slots(start_slot, end_slot); } ("list-roots", Some(arg_matches)) => { - let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary); + let blockstore = open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ); let max_height = if let Some(height) = arg_matches.value_of("max_height") { usize::from_str(height).expect("Maximum height must be a number") } else { @@ -1585,8 +1637,12 @@ fn main() { }); } ("bounds", Some(arg_matches)) => { - match open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary) - .slot_meta_iterator(0) + match open_blockstore( + &ledger_path, + AccessType::TryPrimaryThenSecondary, + wal_recovery_mode, + ) + .slot_meta_iterator(0) { Ok(metas) => { let all = arg_matches.is_present("all"); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index ad189cb51..082941b47 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4,8 +4,8 @@ pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; use crate::{ blockstore_db::{ - columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, - Result, WriteBatch, + columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, + IteratorMode, LedgerColumn, Result, WriteBatch, }, blockstore_meta::*, entry::{create_ticks, Entry}, @@ -231,17 +231,22 @@ impl Blockstore { /// Opens a Ledger in directory, provides "infinite" window of shreds pub fn open(ledger_path: &Path) -> Result { - Self::do_open(ledger_path, AccessType::PrimaryOnly) + Self::do_open(ledger_path, AccessType::PrimaryOnly, None) } pub fn open_with_access_type( ledger_path: &Path, access_type: AccessType, + recovery_mode: Option, ) -> Result { - Self::do_open(ledger_path, access_type) + Self::do_open(ledger_path, access_type, recovery_mode) } - fn do_open(ledger_path: &Path, access_type: AccessType) -> Result { + fn do_open( + ledger_path: &Path, + access_type: AccessType, + recovery_mode: Option, + ) -> Result { fs::create_dir_all(&ledger_path)?; let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY); @@ -250,7 +255,7 @@ impl Blockstore { // Open the database let mut measure = Measure::start("open"); info!("Opening database at {:?}", blockstore_path); - let db = Database::open(&blockstore_path, access_type)?; + let db = Database::open(&blockstore_path, access_type, recovery_mode)?; // Create the metadata column family let meta_cf = db.column(); @@ -331,8 +336,10 @@ impl Blockstore { pub fn open_with_signal( ledger_path: &Path, + recovery_mode: Option, ) -> Result<(Self, Receiver, CompletedSlotsReceiver)> { - let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?; + let mut blockstore = + Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?; let (signal_sender, signal_receiver) = sync_channel(1); let (completed_slots_sender, completed_slots_receiver) = sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); @@ -2717,7 +2724,7 @@ pub fn create_new_ledger( genesis_config.write(&ledger_path)?; // Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger. - let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?; + let blockstore = Blockstore::open_with_access_type(ledger_path, access_type, None)?; let ticks_per_slot = genesis_config.ticks_per_slot; let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0); let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash()); @@ -3654,7 +3661,7 @@ pub mod tests { pub fn test_new_shreds_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path).unwrap(); + let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 50; @@ -3734,7 +3741,7 @@ pub mod tests { pub fn test_completed_shreds_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap(); + let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -3756,7 +3763,7 @@ pub mod tests { pub fn test_completed_shreds_signal_orphans() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap(); + let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -3796,7 +3803,7 @@ pub mod tests { pub fn test_completed_shreds_signal_many() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path).unwrap(); + let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index af06b44c3..edfcd985a 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -4,7 +4,7 @@ use byteorder::{BigEndian, ByteOrder}; use log::*; pub use rocksdb::Direction as IteratorDirection; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, }; use serde::de::DeserializeOwned; @@ -138,11 +138,51 @@ pub enum ActualAccessType { Secondary, } +#[derive(Debug, Clone)] +pub enum BlockstoreRecoveryMode { + TolerateCorruptedTailRecords, + AbsoluteConsistency, + PointInTime, + SkipAnyCorruptedRecord, +} + +impl From<&str> for BlockstoreRecoveryMode { + fn from(string: &str) -> Self { + match string { + "tolerate_corrupted_tail_records" => { + BlockstoreRecoveryMode::TolerateCorruptedTailRecords + } + "absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency, + "point_in_time" => BlockstoreRecoveryMode::PointInTime, + "skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord, + bad_mode => panic!("Invalid recovery mode: {}", bad_mode), + } + } +} +impl Into for BlockstoreRecoveryMode { + fn into(self) -> DBRecoveryMode { + match self { + BlockstoreRecoveryMode::TolerateCorruptedTailRecords => { + DBRecoveryMode::TolerateCorruptedTailRecords + } + BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency, + BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime, + BlockstoreRecoveryMode::SkipAnyCorruptedRecord => { + DBRecoveryMode::SkipAnyCorruptedRecord + } + } + } +} + #[derive(Debug)] struct Rocks(rocksdb::DB, ActualAccessType); impl Rocks { - fn open(path: &Path, access_type: AccessType) -> Result { + fn open( + path: &Path, + access_type: AccessType, + recovery_mode: Option, + ) -> Result { use columns::{ AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, @@ -152,6 +192,9 @@ impl Rocks { // Use default database options let mut db_options = get_db_options(); + if let Some(recovery_mode) = recovery_mode { + db_options.set_wal_recovery_mode(recovery_mode.into()); + } // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); @@ -627,8 +670,12 @@ pub struct WriteBatch<'a> { } impl Database { - pub fn open(path: &Path, access_type: AccessType) -> Result { - let backend = Arc::new(Rocks::open(path, access_type)?); + pub fn open( + path: &Path, + access_type: AccessType, + recovery_mode: Option, + ) -> Result { + let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?); Ok(Database { backend, diff --git a/validator/src/main.rs b/validator/src/main.rs index 4496ad2b4..f334a3bd8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -23,6 +23,7 @@ use solana_core::{ validator::{Validator, ValidatorConfig}, }; use solana_download_utils::{download_genesis_if_missing, download_snapshot}; +use solana_ledger::blockstore_db::BlockstoreRecoveryMode; use solana_perf::recycler::enable_recycler_warming; use solana_runtime::{ bank_forks::{CompressionType, SnapshotConfig, SnapshotVersion}, @@ -852,6 +853,20 @@ pub fn main() { "maximum total uncompressed file size of downloaded genesis archive", ), ) + .arg( + Arg::with_name("wal_recovery_mode") + .long("wal-recovery-mode") + .value_name("MODE") + .takes_value(true) + .possible_values(&[ + "tolerate_corrupted_tail_records", + "absolute_consistency", + "point_in_time", + "skip_any_corrupted_record"]) + .help( + "Mode to recovery the ledger db write ahead log." + ), + ) .get_matches(); let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new)); @@ -869,6 +884,9 @@ pub fn main() { let no_check_vote_account = matches.is_present("no_check_vote_account"); let private_rpc = matches.is_present("private_rpc"); let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction"); + let wal_recovery_mode = matches + .value_of("wal_recovery_mode") + .map(BlockstoreRecoveryMode::from); // Canonicalize ledger path to avoid issues with symlink creation let _ = fs::create_dir_all(&ledger_path); @@ -927,6 +945,7 @@ pub fn main() { trusted_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, + wal_recovery_mode, ..ValidatorConfig::default() };