From ab373bb1a9ad0dd4d4b91c0d019476a4bdf542a4 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 15 Mar 2022 20:53:46 -0700 Subject: [PATCH] Refactor new_banks_from_ledger() into load and process steps --- core/src/validator.rs | 147 ++++++++++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 54 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index bc0fdeed1f..f9c297f81c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -35,7 +35,9 @@ use { }, solana_ledger::{ bank_forks_utils, - blockstore::{Blockstore, BlockstoreSignals, CompletedSlotsReceiver, PurgeType}, + blockstore::{ + Blockstore, BlockstoreError, BlockstoreSignals, CompletedSlotsReceiver, PurgeType, + }, blockstore_db::{BlockstoreAdvancedOptions, BlockstoreOptions, BlockstoreRecoveryMode}, blockstore_processor::{self, TransactionStatusSender}, leader_schedule::FixedSchedule, @@ -268,6 +270,39 @@ impl Default for ValidatorStartProgress { } } +struct BlockstoreRootScan { + thread: Option>>, +} + +impl BlockstoreRootScan { + fn new(config: &ValidatorConfig, blockstore: &Arc, exit: &Arc) -> Self { + let thread = if config.rpc_addrs.is_some() + && config.rpc_config.enable_rpc_transaction_history + && config.rpc_config.rpc_scan_and_fix_roots + { + let blockstore = blockstore.clone(); + let exit = exit.clone(); + Some( + Builder::new() + .name("blockstore-root-scan".to_string()) + .spawn(move || blockstore.scan_and_fix_roots(&exit)) + .unwrap(), + ) + } else { + None + }; + Self { thread } + } + + fn join(self) { + if let Some(blockstore_root_scan) = self.thread { + if let Err(err) = blockstore_root_scan.join() { + warn!("blockstore_root_scan failed to join {:?}", err); + } + } + } +} + #[derive(Default)] struct TransactionHistoryServices { transaction_status_sender: Option, @@ -456,7 +491,6 @@ impl Validator { ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, - last_full_snapshot_slot, starting_snapshot_hashes, TransactionHistoryServices { transaction_status_sender, @@ -467,16 +501,31 @@ impl Validator { cache_block_meta_sender, cache_block_meta_service, }, - ) = new_banks_from_ledger( + blockstore_process_options, + blockstore_root_scan, + ) = load_blockstore( config, ledger_path, &exit, &start_progress, - accounts_package_channel.0.clone(), accounts_update_notifier, transaction_notifier, ); + let last_full_snapshot_slot = process_blockstore( + &blockstore, + &mut bank_forks, + &leader_schedule_cache, + &blockstore_process_options, + transaction_status_sender.as_ref(), + cache_block_meta_sender.as_ref(), + config.snapshot_config.as_ref(), + accounts_package_channel.0.clone(), + blockstore_root_scan, + ); + let last_full_snapshot_slot = + last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); + maybe_warp_slot(config, ledger_path, &mut bank_forks, &leader_schedule_cache); let tower = { @@ -1197,12 +1246,11 @@ fn post_process_restored_tower( } #[allow(clippy::type_complexity)] -fn new_banks_from_ledger( +fn load_blockstore( config: &ValidatorConfig, ledger_path: &Path, exit: &Arc, start_progress: &Arc>, - accounts_package_sender: AccountsPackageSender, accounts_update_notifier: Option, transaction_notifier: Option, ) -> ( @@ -1212,9 +1260,10 @@ fn new_banks_from_ledger( Receiver, CompletedSlotsReceiver, LeaderScheduleCache, - Option, Option, TransactionHistoryServices, + blockstore_processor::ProcessOptions, + BlockstoreRootScan, ) { info!("loading ledger from {:?}...", ledger_path); *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; @@ -1259,21 +1308,7 @@ fn new_banks_from_ledger( blockstore.set_no_compaction(config.no_rocksdb_compaction); let blockstore = Arc::new(blockstore); - let blockstore_root_scan = if config.rpc_addrs.is_some() - && config.rpc_config.enable_rpc_transaction_history - && config.rpc_config.rpc_scan_and_fix_roots - { - let blockstore = blockstore.clone(); - let exit = exit.clone(); - Some( - Builder::new() - .name("blockstore-root-scan".to_string()) - .spawn(move || blockstore.scan_and_fix_roots(&exit)) - .unwrap(), - ) - } else { - None - }; + let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit); let process_options = blockstore_processor::ProcessOptions { bpf_jit: config.bpf_jit, @@ -1306,10 +1341,6 @@ fn new_banks_from_ledger( TransactionHistoryServices::default() }; - let cache_block_meta_sender = transaction_history_services - .cache_block_meta_sender - .as_ref(); - let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( &genesis_config, @@ -1318,7 +1349,9 @@ fn new_banks_from_ledger( config.account_shrink_paths.clone(), config.snapshot_config.as_ref(), &process_options, - cache_block_meta_sender, + transaction_history_services + .cache_block_meta_sender + .as_ref(), accounts_update_notifier, ); @@ -1326,32 +1359,6 @@ fn new_banks_from_ledger( bank_forks.set_snapshot_config(config.snapshot_config.clone()); bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots); - let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root( - &blockstore, - &mut bank_forks, - &leader_schedule_cache, - &process_options, - transaction_history_services - .transaction_status_sender - .as_ref(), - cache_block_meta_sender, - config.snapshot_config.as_ref(), - accounts_package_sender, - ) - .unwrap_or_else(|err| { - error!("Failed to load ledger: {:?}", err); - abort() - }); - - let last_full_snapshot_slot = - last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); - - if let Some(blockstore_root_scan) = blockstore_root_scan { - if let Err(err) = blockstore_root_scan.join() { - warn!("blockstore_root_scan failed to join {:?}", err); - } - } - ( genesis_config, bank_forks, @@ -1359,12 +1366,44 @@ fn new_banks_from_ledger( ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, - last_full_snapshot_slot, starting_snapshot_hashes, transaction_history_services, + process_options, + blockstore_root_scan, ) } +fn process_blockstore( + blockstore: &Blockstore, + bank_forks: &mut BankForks, + leader_schedule_cache: &LeaderScheduleCache, + process_options: &blockstore_processor::ProcessOptions, + transaction_status_sender: Option<&TransactionStatusSender>, + cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, + blockstore_root_scan: BlockstoreRootScan, +) -> Option { + let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root( + blockstore, + bank_forks, + leader_schedule_cache, + process_options, + transaction_status_sender, + cache_block_meta_sender, + snapshot_config, + accounts_package_sender, + ) + .unwrap_or_else(|err| { + error!("Failed to load ledger: {:?}", err); + abort() + }); + + blockstore_root_scan.join(); + + last_full_snapshot_slot +} + fn maybe_warp_slot( config: &ValidatorConfig, ledger_path: &Path,