diff --git a/core/src/validator.rs b/core/src/validator.rs index cdec8993f9..df4c39529e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -31,7 +31,6 @@ use { crossbeam_channel::{bounded, unbounded, Receiver}, lazy_static::lazy_static, quinn::Endpoint, - rand::{thread_rng, Rng}, solana_client::connection_cache::{ConnectionCache, Protocol}, solana_entry::poh::compute_hash_time_ns, solana_geyser_plugin_manager::{ @@ -552,15 +551,21 @@ impl Validator { )); } - if let Some(shred_version) = config.expected_shred_version { + if let Some(expected_shred_version) = config.expected_shred_version { if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority { *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore; backup_and_clear_blockstore( ledger_path, config, wait_for_supermajority_slot + 1, - shred_version, - ); + expected_shred_version, + ) + .map_err(|err| { + format!( + "Failed to backup and clear shreds with incorrect \ + shred version from blockstore: {err}" + ) + })?; } } @@ -1951,85 +1956,104 @@ fn maybe_warp_slot( Ok(()) } +/// Searches the blockstore for data shreds with the incorrect shred version. fn blockstore_contains_bad_shred_version( blockstore: &Blockstore, start_slot: Slot, - shred_version: u16, -) -> bool { - let now = Instant::now(); + expected_shred_version: u16, +) -> Result { + const TIMEOUT: Duration = Duration::from_secs(60); + let timer = Instant::now(); // Search for shreds with incompatible version in blockstore - if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) { - info!("Searching for incorrect shreds.."); - for (slot, _meta) in slot_meta_iterator { - if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { - for shred in &shreds { - if shred.version() != shred_version { - return true; - } - } - } - if now.elapsed().as_secs() > 60 { - info!("Didn't find incorrect shreds after 60 seconds, aborting"); - return false; + let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?; + + info!("Searching blockstore for shred with incorrect version.."); + for (slot, _meta) in slot_meta_iterator { + let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?; + for shred in &shreds { + if shred.version() != expected_shred_version { + return Ok(true); } } + if timer.elapsed() > TIMEOUT { + info!("Didn't find incorrect shreds after 60 seconds, aborting"); + break; + } } - false + Ok(false) } +/// If the blockstore contains any shreds with the incorrect shred version, +/// copy them to a backup blockstore and purge them from the actual blockstore. fn backup_and_clear_blockstore( ledger_path: &Path, config: &ValidatorConfig, start_slot: Slot, - shred_version: u16, -) { + expected_shred_version: u16, +) -> Result<(), BlockstoreError> { let blockstore = - Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config)).unwrap(); + Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config))?; let do_copy_and_clear = - blockstore_contains_bad_shred_version(&blockstore, start_slot, shred_version); + blockstore_contains_bad_shred_version(&blockstore, start_slot, expected_shred_version)?; - // If found, then copy shreds to another db and clear from start_slot if do_copy_and_clear { - let folder_name = format!( - "backup_{}_{}", + // .unwrap() safe because getting to this point implies blockstore has slots/shreds + let end_slot = blockstore.highest_slot()?.unwrap(); + + // Backing up the shreds that will be deleted from primary blockstore is + // not critical, so swallow errors from backup blockstore operations. + let backup_folder = format!( + "{}_backup_{}_{}_{}", config .ledger_column_options .shred_storage_type .blockstore_directory(), - thread_rng().gen_range(0, 99999) + expected_shred_version, + start_slot, + end_slot ); - let backup_blockstore = Blockstore::open_with_options( - &ledger_path.join(folder_name), + match Blockstore::open_with_options( + &ledger_path.join(backup_folder), blockstore_options_from_config(config), - ); - let mut last_print = Instant::now(); - let mut copied = 0; - let mut last_slot = None; - let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot).unwrap(); - for (slot, _meta) in slot_meta_iterator { - if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { - if let Ok(ref backup_blockstore) = backup_blockstore { - copied += shreds.len(); + ) { + Ok(backup_blockstore) => { + info!("Backing up slots from {start_slot} to {end_slot}"); + let mut timer = Measure::start("blockstore backup"); + + const PRINT_INTERVAL: Duration = Duration::from_secs(5); + let mut print_timer = Instant::now(); + let mut num_slots_copied = 0; + let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?; + for (slot, _meta) in slot_meta_iterator { + let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?; let _ = backup_blockstore.insert_shreds(shreds, None, true); + num_slots_copied += 1; + + if print_timer.elapsed() > PRINT_INTERVAL { + info!("Backed up {num_slots_copied} slots thus far"); + print_timer = Instant::now(); + } } + + timer.stop(); + info!("Backing up slots done. {timer}"); } - if last_print.elapsed().as_millis() > 3000 { - info!( - "Copying shreds from slot {} copied {} so far.", - start_slot, copied - ); - last_print = Instant::now(); + Err(err) => { + warn!("Unable to backup shreds with incorrect shred version: {err}"); } - last_slot = Some(slot); } - let end_slot = last_slot.unwrap(); - info!("Purging slots {} to {}", start_slot, end_slot); + info!("Purging slots {start_slot} to {end_slot} from blockstore"); + let mut timer = Measure::start("blockstore purge"); blockstore.purge_from_next_slots(start_slot, end_slot); blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); - info!("done"); + timer.stop(); + info!("Purging slots done. {timer}"); + } else { + info!("Only shreds with the correct version were found in the blockstore"); } - drop(blockstore); + + Ok(()) } fn initialize_rpc_transaction_history_services( @@ -2316,8 +2340,12 @@ mod tests { use { super::*, crossbeam_channel::{bounded, RecvTimeoutError}, + solana_entry::entry, solana_gossip::contact_info::{ContactInfo, LegacyContactInfo}, - solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, + solana_ledger::{ + blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader, + get_tmp_ledger_path_auto_delete, + }, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, @@ -2375,51 +2403,37 @@ mod tests { #[test] fn test_backup_and_clear_blockstore() { - use std::time::Instant; solana_logger::setup(); - use { - solana_entry::entry, - solana_ledger::{blockstore, get_tmp_ledger_path}, - }; let validator_config = ValidatorConfig::default_for_test(); - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let entries = entry::create_ticks(1, 0, Hash::default()); + let entries = entry::create_ticks(1, 0, Hash::default()); + for i in 1..10 { + let shreds = blockstore::entries_to_test_shreds( + &entries, + i, // slot + i - 1, // parent_slot + true, // is_full_slot + 1, // version + true, // merkle_variant + ); + blockstore.insert_shreds(shreds, None, true).unwrap(); + } + drop(blockstore); - info!("creating shreds"); - let mut last_print = Instant::now(); - for i in 1..10 { - let shreds = blockstore::entries_to_test_shreds( - &entries, - i, // slot - i - 1, // parent_slot - true, // is_full_slot - 1, // version - true, // merkle_variant - ); - blockstore.insert_shreds(shreds, None, true).unwrap(); - if last_print.elapsed().as_millis() > 5000 { - info!("inserted {}", i); - last_print = Instant::now(); - } - } - drop(blockstore); + // this purges and compacts all slots greater than or equal to 5 + backup_and_clear_blockstore(ledger_path.path(), &validator_config, 5, 2).unwrap(); - // this purges and compacts all slots greater than or equal to 5 - backup_and_clear_blockstore(&blockstore_path, &validator_config, 5, 2); - - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - // assert that slots less than 5 aren't affected - assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); - for i in 5..10 { - assert!(blockstore - .get_data_shreds_for_slot(i, 0) - .unwrap() - .is_empty()); - } + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + // assert that slots less than 5 aren't affected + assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); + for i in 5..10 { + assert!(blockstore + .get_data_shreds_for_slot(i, 0) + .unwrap() + .is_empty()); } }