diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 50c112fdf..f32c51304 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -550,9 +550,9 @@ mod tests { full_leader_cache: true, ..ProcessOptions::default() }; - let (bank_forks, cached_leader_schedule, _) = + let (bank_forks, leader_schedule_cache) = test_process_blockstore(&genesis_config, &blockstore, opts); - let leader_schedule_cache = Arc::new(cached_leader_schedule); + let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank_forks = Arc::new(RwLock::new(bank_forks)); let mut me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); diff --git a/core/src/validator.rs b/core/src/validator.rs index 4fee2a58c..2e1ce6648 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -488,6 +488,19 @@ impl Validator { transaction_notifier, ); + let tower = { + let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id); + if let Ok(tower) = &restored_tower { + reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + abort() + }); + } + + post_process_restored_tower(restored_tower, &id, &vote_account, config, &bank_forks) + }; + info!("Tower state: {:?}", tower); + *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; if !config.no_os_network_stats_reporting { @@ -1319,33 +1332,38 @@ fn new_banks_from_ledger( .cache_block_meta_sender .as_ref(); - let (bank_forks, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( - &genesis_config, - &blockstore, - config.account_paths.clone(), - config.account_shrink_paths.clone(), - config.snapshot_config.as_ref(), - &process_options, - cache_block_meta_sender, - accounts_update_notifier, - ); - - let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot) = - blockstore_processor::process_blockstore_from_root( + let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = + bank_forks_utils::load_bank_forks( + &genesis_config, &blockstore, - bank_forks, - &process_options, - transaction_history_services - .transaction_status_sender - .as_ref(), - cache_block_meta_sender, + config.account_paths.clone(), + config.account_shrink_paths.clone(), config.snapshot_config.as_ref(), - accounts_package_sender, - ) - .unwrap_or_else(|err| { - error!("Failed to load ledger: {:?}", err); - abort() - }); + &process_options, + cache_block_meta_sender, + accounts_update_notifier, + ); + + leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); + bank_forks.set_snapshot_config(config.snapshot_config.clone()); + bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots); + + let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root( + &blockstore, + &mut bank_forks, + &leader_schedule_cache, + &process_options, + transaction_history_services + .transaction_status_sender + .as_ref(), + cache_block_meta_sender, + config.snapshot_config.as_ref(), + accounts_package_sender, + ) + .unwrap_or_else(|err| { + error!("Failed to load ledger: {:?}", err); + abort() + }); let last_full_snapshot_slot = last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); @@ -1399,21 +1417,6 @@ fn new_banks_from_ledger( ); } - let tower = post_process_restored_tower( - restored_tower, - validator_identity, - vote_account, - config, - &bank_forks, - ); - - info!("Tower state: {:?}", tower); - - leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); - - bank_forks.set_snapshot_config(config.snapshot_config.clone()); - bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots); - if let Some(blockstore_root_scan) = blockstore_root_scan { if let Err(err) = blockstore_root_scan.join() { warn!("blockstore_root_scan failed to join {:?}", err); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 25b43c0fe..4459178ef 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -47,7 +47,7 @@ pub fn load( accounts_package_sender: AccountsPackageSender, accounts_update_notifier: Option, ) -> LoadResult { - let (bank_forks, starting_snapshot_hashes) = load_bank_forks( + let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks( genesis_config, blockstore, account_paths, @@ -60,16 +60,15 @@ pub fn load( blockstore_processor::process_blockstore_from_root( blockstore, - bank_forks, + &mut bank_forks, + &leader_schedule_cache, &process_options, transaction_status_sender, cache_block_meta_sender, snapshot_config, accounts_package_sender, ) - .map(|(bank_forks, leader_schedule_cache, ..)| { - (bank_forks, leader_schedule_cache, starting_snapshot_hashes) - }) + .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) } #[allow(clippy::too_many_arguments)] @@ -82,7 +81,11 @@ pub fn load_bank_forks( process_options: &ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, accounts_update_notifier: Option, -) -> (BankForks, Option) { +) -> ( + BankForks, + LeaderScheduleCache, + Option, +) { let snapshot_present = if let Some(snapshot_config) = snapshot_config { info!( "Initializing bank snapshot path: {}", @@ -107,7 +110,7 @@ pub fn load_bank_forks( false }; - if snapshot_present { + let (bank_forks, starting_snapshot_hashes) = if snapshot_present { bank_forks_from_snapshot( genesis_config, account_paths, @@ -139,7 +142,13 @@ pub fn load_bank_forks( ), None, ) + }; + + let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank()); + if process_options.full_leader_cache { + leader_schedule_cache.set_max_schedules(std::usize::MAX); } + (bank_forks, leader_schedule_cache, starting_snapshot_hashes) } #[allow(clippy::too_many_arguments)] diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 268d896c6..2426ef0e4 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -95,11 +95,6 @@ impl BlockCostCapacityMeter { } } -pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option); - -pub type BlockstoreProcessorResult = - result::Result; - thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|ix| format!("blockstore_processor_{}", ix)) @@ -570,8 +565,8 @@ pub fn test_process_blockstore( genesis_config: &GenesisConfig, blockstore: &Blockstore, opts: ProcessOptions, -) -> BlockstoreProcessorInner { - let (bank_forks, ..) = crate::bank_forks_utils::load_bank_forks( +) -> (BankForks, LeaderScheduleCache) { + let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks( genesis_config, blockstore, Vec::new(), @@ -584,14 +579,16 @@ pub fn test_process_blockstore( let (accounts_package_sender, _) = unbounded(); process_blockstore_from_root( blockstore, - bank_forks, + &mut bank_forks, + &leader_schedule_cache, &opts, None, None, None, accounts_package_sender, ) - .unwrap() + .unwrap(); + (bank_forks, leader_schedule_cache) } pub(crate) fn process_blockstore_for_bank_0( @@ -632,13 +629,14 @@ pub(crate) fn process_blockstore_for_bank_0( #[allow(clippy::too_many_arguments)] pub fn process_blockstore_from_root( blockstore: &Blockstore, - mut bank_forks: BankForks, + bank_forks: &mut BankForks, + leader_schedule_cache: &LeaderScheduleCache, opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, accounts_package_sender: AccountsPackageSender, -) -> BlockstoreProcessorResult { +) -> result::Result, BlockstoreProcessorError> { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { *pool.borrow_mut() = rayon::ThreadPoolBuilder::new() @@ -692,10 +690,6 @@ pub fn process_blockstore_from_root( let mut timing = ExecuteTimings::default(); // Iterate and replay slots from blockstore starting from `start_slot` - let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); - if opts.full_leader_cache { - leader_schedule_cache.set_max_schedules(std::usize::MAX); - } let mut last_full_snapshot_slot = None; @@ -704,11 +698,11 @@ pub fn process_blockstore_from_root( .unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot)) { load_frozen_forks( - &mut bank_forks, + bank_forks, start_slot, &start_slot_meta, blockstore, - &leader_schedule_cache, + leader_schedule_cache, opts, transaction_status_sender, cache_block_meta_sender, @@ -764,7 +758,7 @@ pub fn process_blockstore_from_root( ); assert!(bank_forks.active_banks().is_empty()); - Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot)) + Ok(last_full_snapshot_slot) } /// Verify that a segment of entries has the correct number of ticks and hashes @@ -2374,7 +2368,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - let (_bank_forks, leader_schedule, _) = + let (_bank_forks, leader_schedule) = test_process_blockstore(&genesis_config, &blockstore, opts); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); } @@ -3160,11 +3154,14 @@ pub mod tests { None, ); + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1); + // Test process_blockstore_from_root() from slot 1 onwards let (accounts_package_sender, _) = unbounded(); - let (bank_forks, ..) = process_blockstore_from_root( + process_blockstore_from_root( &blockstore, - bank_forks, + &mut bank_forks, + &leader_schedule_cache, &opts, None, None, @@ -3268,10 +3265,12 @@ pub mod tests { }; let (accounts_package_sender, accounts_package_receiver) = unbounded(); + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); process_blockstore_from_root( &blockstore, - bank_forks, + &mut bank_forks, + &leader_schedule_cache, &opts, None, None,