Cleanup backup_and_clear_blockstore() (#32461)

Some of the cleanup tasks include ...
- Make subfunctions return a Result and allow error handling above
- Add some clarifying comments
- Give backup directory name a more meaningful name
- Add some additional logs (with timing info) for long running parts
This commit is contained in:
steviez 2023-07-28 06:43:04 -05:00 committed by GitHub
parent e3f253d559
commit e337631f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 105 additions and 91 deletions

View File

@ -31,7 +31,6 @@ use {
crossbeam_channel::{bounded, unbounded, Receiver}, crossbeam_channel::{bounded, unbounded, Receiver},
lazy_static::lazy_static, lazy_static::lazy_static,
quinn::Endpoint, quinn::Endpoint,
rand::{thread_rng, Rng},
solana_client::connection_cache::{ConnectionCache, Protocol}, solana_client::connection_cache::{ConnectionCache, Protocol},
solana_entry::poh::compute_hash_time_ns, solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::{ solana_geyser_plugin_manager::{
@ -552,15 +551,21 @@ impl Validator {
)); ));
} }
if let Some(shred_version) = config.expected_shred_version { if let Some(expected_shred_version) = config.expected_shred_version {
if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority { if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority {
*start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore; *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
backup_and_clear_blockstore( backup_and_clear_blockstore(
ledger_path, ledger_path,
config, config,
wait_for_supermajority_slot + 1, wait_for_supermajority_slot + 1,
shred_version, expected_shred_version,
); )
.map_err(|err| {
format!(
"Failed to backup and clear shreds with incorrect \
shred version from blockstore: {err}"
)
})?;
} }
} }
@ -1951,85 +1956,104 @@ fn maybe_warp_slot(
Ok(()) Ok(())
} }
/// Searches the blockstore for data shreds with the incorrect shred version.
fn blockstore_contains_bad_shred_version( fn blockstore_contains_bad_shred_version(
blockstore: &Blockstore, blockstore: &Blockstore,
start_slot: Slot, start_slot: Slot,
shred_version: u16, expected_shred_version: u16,
) -> bool { ) -> Result<bool, BlockstoreError> {
let now = Instant::now(); const TIMEOUT: Duration = Duration::from_secs(60);
let timer = Instant::now();
// Search for shreds with incompatible version in blockstore // Search for shreds with incompatible version in blockstore
if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) { let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
info!("Searching for incorrect shreds..");
for (slot, _meta) in slot_meta_iterator { info!("Searching blockstore for shred with incorrect version..");
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { for (slot, _meta) in slot_meta_iterator {
for shred in &shreds { let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
if shred.version() != shred_version { for shred in &shreds {
return true; if shred.version() != expected_shred_version {
} return Ok(true);
}
}
if now.elapsed().as_secs() > 60 {
info!("Didn't find incorrect shreds after 60 seconds, aborting");
return false;
} }
} }
if timer.elapsed() > TIMEOUT {
info!("Didn't find incorrect shreds after 60 seconds, aborting");
break;
}
} }
false Ok(false)
} }
/// If the blockstore contains any shreds with the incorrect shred version,
/// copy them to a backup blockstore and purge them from the actual blockstore.
fn backup_and_clear_blockstore( fn backup_and_clear_blockstore(
ledger_path: &Path, ledger_path: &Path,
config: &ValidatorConfig, config: &ValidatorConfig,
start_slot: Slot, start_slot: Slot,
shred_version: u16, expected_shred_version: u16,
) { ) -> Result<(), BlockstoreError> {
let blockstore = let blockstore =
Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config)).unwrap(); Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config))?;
let do_copy_and_clear = let do_copy_and_clear =
blockstore_contains_bad_shred_version(&blockstore, start_slot, shred_version); blockstore_contains_bad_shred_version(&blockstore, start_slot, expected_shred_version)?;
// If found, then copy shreds to another db and clear from start_slot
if do_copy_and_clear { if do_copy_and_clear {
let folder_name = format!( // .unwrap() safe because getting to this point implies blockstore has slots/shreds
"backup_{}_{}", let end_slot = blockstore.highest_slot()?.unwrap();
// Backing up the shreds that will be deleted from primary blockstore is
// not critical, so swallow errors from backup blockstore operations.
let backup_folder = format!(
"{}_backup_{}_{}_{}",
config config
.ledger_column_options .ledger_column_options
.shred_storage_type .shred_storage_type
.blockstore_directory(), .blockstore_directory(),
thread_rng().gen_range(0, 99999) expected_shred_version,
start_slot,
end_slot
); );
let backup_blockstore = Blockstore::open_with_options( match Blockstore::open_with_options(
&ledger_path.join(folder_name), &ledger_path.join(backup_folder),
blockstore_options_from_config(config), blockstore_options_from_config(config),
); ) {
let mut last_print = Instant::now(); Ok(backup_blockstore) => {
let mut copied = 0; info!("Backing up slots from {start_slot} to {end_slot}");
let mut last_slot = None; let mut timer = Measure::start("blockstore backup");
let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot).unwrap();
for (slot, _meta) in slot_meta_iterator { const PRINT_INTERVAL: Duration = Duration::from_secs(5);
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { let mut print_timer = Instant::now();
if let Ok(ref backup_blockstore) = backup_blockstore { let mut num_slots_copied = 0;
copied += shreds.len(); let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
for (slot, _meta) in slot_meta_iterator {
let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
let _ = backup_blockstore.insert_shreds(shreds, None, true); let _ = backup_blockstore.insert_shreds(shreds, None, true);
num_slots_copied += 1;
if print_timer.elapsed() > PRINT_INTERVAL {
info!("Backed up {num_slots_copied} slots thus far");
print_timer = Instant::now();
}
} }
timer.stop();
info!("Backing up slots done. {timer}");
} }
if last_print.elapsed().as_millis() > 3000 { Err(err) => {
info!( warn!("Unable to backup shreds with incorrect shred version: {err}");
"Copying shreds from slot {} copied {} so far.",
start_slot, copied
);
last_print = Instant::now();
} }
last_slot = Some(slot);
} }
let end_slot = last_slot.unwrap(); info!("Purging slots {start_slot} to {end_slot} from blockstore");
info!("Purging slots {} to {}", start_slot, end_slot); let mut timer = Measure::start("blockstore purge");
blockstore.purge_from_next_slots(start_slot, end_slot); blockstore.purge_from_next_slots(start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!("done"); timer.stop();
info!("Purging slots done. {timer}");
} else {
info!("Only shreds with the correct version were found in the blockstore");
} }
drop(blockstore);
Ok(())
} }
fn initialize_rpc_transaction_history_services( fn initialize_rpc_transaction_history_services(
@ -2316,8 +2340,12 @@ mod tests {
use { use {
super::*, super::*,
crossbeam_channel::{bounded, RecvTimeoutError}, crossbeam_channel::{bounded, RecvTimeoutError},
solana_entry::entry,
solana_gossip::contact_info::{ContactInfo, LegacyContactInfo}, solana_gossip::contact_info::{ContactInfo, LegacyContactInfo},
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_ledger::{
blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{ solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
@ -2375,51 +2403,37 @@ mod tests {
#[test] #[test]
fn test_backup_and_clear_blockstore() { fn test_backup_and_clear_blockstore() {
use std::time::Instant;
solana_logger::setup(); solana_logger::setup();
use {
solana_entry::entry,
solana_ledger::{blockstore, get_tmp_ledger_path},
};
let validator_config = ValidatorConfig::default_for_test(); let validator_config = ValidatorConfig::default_for_test();
let blockstore_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let entries = entry::create_ticks(1, 0, Hash::default()); let entries = entry::create_ticks(1, 0, Hash::default());
for i in 1..10 {
let shreds = blockstore::entries_to_test_shreds(
&entries,
i, // slot
i - 1, // parent_slot
true, // is_full_slot
1, // version
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, true).unwrap();
}
drop(blockstore);
info!("creating shreds"); // this purges and compacts all slots greater than or equal to 5
let mut last_print = Instant::now(); backup_and_clear_blockstore(ledger_path.path(), &validator_config, 5, 2).unwrap();
for i in 1..10 {
let shreds = blockstore::entries_to_test_shreds(
&entries,
i, // slot
i - 1, // parent_slot
true, // is_full_slot
1, // version
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, true).unwrap();
if last_print.elapsed().as_millis() > 5000 {
info!("inserted {}", i);
last_print = Instant::now();
}
}
drop(blockstore);
// this purges and compacts all slots greater than or equal to 5 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
backup_and_clear_blockstore(&blockstore_path, &validator_config, 5, 2); // assert that slots less than 5 aren't affected
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
let blockstore = Blockstore::open(&blockstore_path).unwrap(); for i in 5..10 {
// assert that slots less than 5 aren't affected assert!(blockstore
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); .get_data_shreds_for_slot(i, 0)
for i in 5..10 { .unwrap()
assert!(blockstore .is_empty());
.get_data_shreds_for_slot(i, 0)
.unwrap()
.is_empty());
}
} }
} }