From 83e041299a159eb605607e9fb887ac90170b588e Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 18 Mar 2022 12:43:20 -0700 Subject: [PATCH] Run real snapshot packager while processing blockstore at validator startup --- core/src/validator.rs | 177 +++++++++++----------- ledger-tool/src/main.rs | 2 - ledger/src/bank_forks_utils.rs | 72 +++------ ledger/src/blockstore_processor.rs | 233 +++-------------------------- runtime/src/bank.rs | 28 ---- 5 files changed, 128 insertions(+), 384 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 3321c609d..0b3081b6d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -115,6 +115,9 @@ use { const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; +/// maximum drop bank signal queue length +const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000; + pub struct ValidatorConfig { pub halt_at_slot: Option, pub expected_genesis_hash: Option, @@ -529,66 +532,18 @@ impl Validator { Some(poh_timing_point_sender.clone()), ); - let pending_accounts_package = PendingAccountsPackage::default(); - let last_full_snapshot_slot = process_blockstore( - &blockstore, - &bank_forks, - &leader_schedule_cache, - &blockstore_process_options, - transaction_status_sender.as_ref(), - cache_block_meta_sender.as_ref(), - config.snapshot_config.as_ref(), - Arc::clone(&pending_accounts_package), - blockstore_root_scan, - pruned_banks_receiver, - &start_progress, - ); - let last_full_snapshot_slot = - last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); - - maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache); - - let tower = { - let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id); - if let Ok(tower) = &restored_tower { - reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| { - error!("Failed to reconcile blockstore with tower: {:?}", err); - abort() - }); - } - - post_process_restored_tower( - restored_tower, - &id, - vote_account, - config, - &bank_forks.read().unwrap(), - ) - }; - info!("Tower state: {:?}", tower); - - *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; - - let leader_schedule_cache = Arc::new(leader_schedule_cache); - - let sample_performance_service = - if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { - Some(SamplePerformanceService::new( - &bank_forks, - &blockstore, - &exit, - )) - } else { - None - }; - - let bank = bank_forks.read().unwrap().working_bank(); - info!("Starting validator with working bank slot {}", bank.slot()); - node.info.wallclock = timestamp(); node.info.shred_version = compute_shred_version( &genesis_config.hash(), - Some(&bank.hard_forks().read().unwrap()), + Some( + &bank_forks + .read() + .unwrap() + .working_bank() + .hard_forks() + .read() + .unwrap(), + ), ); Self::print_node_info(&node); @@ -613,28 +568,13 @@ impl Validator { cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); let cluster_info = Arc::new(cluster_info); - // Before replay starts, set the callbacks in each of the banks in BankForks - // Note after this callback is created, only the AccountsBackgroundService should be calling - // AccountsDb::purge_slot() to clean up dropped banks. - let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); - let callback = bank_forks - .read() - .unwrap() - .root_bank() - .rc - .accounts - .accounts_db - .create_drop_bank_callback(pruned_banks_sender); - for bank in bank_forks.read().unwrap().banks().values() { - bank.set_callback(Some(Box::new(callback.clone()))); - } - let ( accounts_background_service, accounts_hash_verifier, snapshot_packager_service, accounts_background_request_sender, ) = { + let pending_accounts_package = PendingAccountsPackage::default(); let ( accounts_background_request_sender, snapshot_request_handler, @@ -694,6 +634,7 @@ impl Validator { config.snapshot_config.clone(), ); + let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.hash.0); let accounts_background_service = AccountsBackgroundService::new( bank_forks.clone(), &exit, @@ -714,6 +655,57 @@ impl Validator { ) }; + process_blockstore( + &blockstore, + &bank_forks, + &leader_schedule_cache, + &blockstore_process_options, + transaction_status_sender.as_ref(), + cache_block_meta_sender.as_ref(), + blockstore_root_scan, + &accounts_background_request_sender, + &start_progress, + ); + + maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache); + + let tower = { + let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id); + if let Ok(tower) = &restored_tower { + reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + abort() + }); + } + + post_process_restored_tower( + restored_tower, + &id, + vote_account, + config, + &bank_forks.read().unwrap(), + ) + }; + info!("Tower state: {:?}", tower); + + *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; + + let leader_schedule_cache = Arc::new(leader_schedule_cache); + + let sample_performance_service = + if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { + Some(SamplePerformanceService::new( + &bank_forks, + &blockstore, + &exit, + )) + } else { + None + }; + + let bank = bank_forks.read().unwrap().working_bank(); + info!("Starting validator with working bank slot {}", bank.slot()); + let mut block_commitment_cache = BlockCommitmentCache::default(); block_commitment_cache.initialize_slots(bank.slot(), bank_forks.read().unwrap().root()); let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); @@ -1420,7 +1412,7 @@ fn load_blockstore( TransactionHistoryServices::default() }; - let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) = + let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( &genesis_config, &blockstore, @@ -1434,6 +1426,28 @@ fn load_blockstore( accounts_update_notifier, ); + // Before replay starts, set the callbacks in each of the banks in BankForks so that + // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank + // drop behavior can be safely synchronized with any other ongoing accounts activity like + // cache flush, clean, shrink, as long as the same thread performing those activities also + // is processing the dropped banks from the `pruned_banks_receiver` channel. + + // There should only be one bank, the root bank in BankForks. Thus all banks added to + // BankForks from now on will be descended from the root bank and thus will inherit + // the bank drop callback. + assert_eq!(bank_forks.read().unwrap().banks().len(), 1); + let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE); + { + let root_bank = bank_forks.read().unwrap().root_bank(); + root_bank.set_callback(Some(Box::new( + root_bank + .rc + .accounts + .accounts_db + .create_drop_bank_callback(pruned_banks_sender), + ))); + } + { let hard_forks: Vec<_> = bank_forks .read() @@ -1508,12 +1522,10 @@ fn process_blockstore( process_options: &blockstore_processor::ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - snapshot_config: Option<&SnapshotConfig>, - pending_accounts_package: PendingAccountsPackage, blockstore_root_scan: BlockstoreRootScan, - pruned_banks_receiver: DroppedSlotsReceiver, + accounts_background_request_sender: &AbsRequestSender, start_progress: &Arc>, -) -> Option { +) { let exit = Arc::new(AtomicBool::new(false)); if let Some(max_slot) = highest_slot(blockstore) { let bank_forks = bank_forks.clone(); @@ -1529,17 +1541,14 @@ fn process_blockstore( } }); } - - let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root( + blockstore_processor::process_blockstore_from_root( blockstore, bank_forks, leader_schedule_cache, process_options, transaction_status_sender, cache_block_meta_sender, - snapshot_config, - pending_accounts_package, - pruned_banks_receiver, + accounts_background_request_sender, ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); @@ -1549,8 +1558,6 @@ fn process_blockstore( exit.store(true, Ordering::Relaxed); blockstore_root_scan.join(); - - last_full_snapshot_slot } fn maybe_warp_slot( diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index bf8feab7e..11b65c7f2 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -43,7 +43,6 @@ use { snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, - snapshot_package::PendingAccountsPackage, snapshot_utils::{ self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN, @@ -778,7 +777,6 @@ fn load_bank_forks( process_options, None, None, - PendingAccountsPackage::default(), None, ) .map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes)) diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 802c7ed05..c7c29f8da 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -7,16 +7,13 @@ use { }, leader_schedule_cache::LeaderScheduleCache, }, - crossbeam_channel::bounded, log::*, solana_runtime::{ - accounts_background_service::DroppedSlotsReceiver, accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::{FullSnapshotHash, IncrementalSnapshotHash, StartingSnapshotHashes}, - snapshot_package::PendingAccountsPackage, snapshot_utils, }, solana_sdk::genesis_config::GenesisConfig, @@ -37,9 +34,6 @@ pub type LoadResult = result::Result< BlockstoreProcessorError, >; -/// maximum drop bank signal queue length -const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000; - /// Load the banks via genesis or a snapshot then processes all full blocks in blockstore /// /// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load @@ -54,20 +48,18 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - pending_accounts_package: PendingAccountsPackage, accounts_update_notifier: Option, ) -> LoadResult { - let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) = - load_bank_forks( - genesis_config, - blockstore, - account_paths, - shrink_paths, - snapshot_config, - &process_options, - cache_block_meta_sender, - accounts_update_notifier, - ); + let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks( + genesis_config, + blockstore, + account_paths, + shrink_paths, + snapshot_config, + &process_options, + cache_block_meta_sender, + accounts_update_notifier, + ); blockstore_processor::process_blockstore_from_root( blockstore, @@ -76,9 +68,7 @@ pub fn load( &process_options, transaction_status_sender, cache_block_meta_sender, - snapshot_config, - pending_accounts_package, - pruned_banks_receiver, + &solana_runtime::accounts_background_service::AbsRequestSender::default(), ) .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) } @@ -97,7 +87,6 @@ pub fn load_bank_forks( Arc>, LeaderScheduleCache, Option, - DroppedSlotsReceiver, ) { let snapshot_present = if let Some(snapshot_config) = snapshot_config { info!( @@ -156,33 +145,11 @@ pub fn load_bank_forks( ) }; - // Before replay starts, set the callbacks in each of the banks in BankForks so that - // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank - // drop behavior can be safely synchronized with any other ongoing accounts activity like - // cache flush, clean, shrink, as long as the same thread performing those activities also - // is processing the dropped banks from the `pruned_banks_receiver` channel. - - // There should only be one bank, the root bank in BankForks. Thus all banks added to - // BankForks from now on will be descended from the root bank and thus will inherit - // the bank drop callback. - assert_eq!(bank_forks.read().unwrap().banks().len(), 1); - let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE); - - let leader_schedule_cache = { - let root_bank = bank_forks.read().unwrap().root_bank(); - let callback = root_bank - .rc - .accounts - .accounts_db - .create_drop_bank_callback(pruned_banks_sender); - root_bank.set_callback(Some(Box::new(callback))); - - let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank); - if process_options.full_leader_cache { - leader_schedule_cache.set_max_schedules(std::usize::MAX); - } - leader_schedule_cache - }; + let mut leader_schedule_cache = + LeaderScheduleCache::new_from_bank(&bank_forks.read().unwrap().root_bank()); + if process_options.full_leader_cache { + leader_schedule_cache.set_max_schedules(std::usize::MAX); + } if let Some(ref new_hard_forks) = process_options.new_hard_forks { let root_bank = bank_forks.read().unwrap().root_bank(); @@ -200,12 +167,7 @@ pub fn load_bank_forks( } } - ( - bank_forks, - leader_schedule_cache, - starting_snapshot_hashes, - pruned_banks_receiver, - ) + (bank_forks, leader_schedule_cache, starting_snapshot_hashes) } #[allow(clippy::too_many_arguments)] diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9cd67c5e6..1160c6e1c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -17,7 +17,7 @@ use { solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{ - accounts_background_service::DroppedSlotsReceiver, + accounts_background_service::AbsRequestSender, accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -31,9 +31,6 @@ use { commitment::VOTE_THRESHOLD_SIZE, cost_model::CostModel, runtime_config::RuntimeConfig, - snapshot_config::SnapshotConfig, - snapshot_package::{PendingAccountsPackage, SnapshotType}, - snapshot_utils, transaction_batch::TransactionBatch, transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_account::VoteAccountsHashMap, @@ -569,17 +566,16 @@ pub fn test_process_blockstore( blockstore: &Blockstore, opts: ProcessOptions, ) -> (Arc>, LeaderScheduleCache) { - let (bank_forks, leader_schedule_cache, .., pruned_banks_receiver) = - crate::bank_forks_utils::load_bank_forks( - genesis_config, - blockstore, - Vec::new(), - None, - None, - &opts, - None, - None, - ); + let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks( + genesis_config, + blockstore, + Vec::new(), + None, + None, + &opts, + None, + None, + ); process_blockstore_from_root( blockstore, &bank_forks, @@ -587,9 +583,7 @@ pub fn test_process_blockstore( &opts, None, None, - None, - PendingAccountsPackage::default(), - pruned_banks_receiver, + &AbsRequestSender::default(), ) .unwrap(); (bank_forks, leader_schedule_cache) @@ -639,10 +633,8 @@ pub fn process_blockstore_from_root( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - snapshot_config: Option<&SnapshotConfig>, - pending_accounts_package: PendingAccountsPackage, - pruned_banks_receiver: DroppedSlotsReceiver, -) -> result::Result, BlockstoreProcessorError> { + accounts_background_request_sender: &AbsRequestSender, +) -> result::Result<(), BlockstoreProcessorError> { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { *pool.borrow_mut() = rayon::ThreadPoolBuilder::new() @@ -682,8 +674,6 @@ pub fn process_blockstore_from_root( let mut timing = ExecuteTimings::default(); // Iterate and replay slots from blockstore starting from `start_slot` - let mut last_full_snapshot_slot = None; - if let Some(start_slot_meta) = blockstore .meta(start_slot) .unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot)) @@ -697,11 +687,8 @@ pub fn process_blockstore_from_root( opts, transaction_status_sender, cache_block_meta_sender, - snapshot_config, - pending_accounts_package, &mut timing, - &mut last_full_snapshot_slot, - pruned_banks_receiver, + accounts_background_request_sender, )?; } else { // If there's no meta for the input `start_slot`, then we started from a snapshot @@ -761,7 +748,7 @@ pub fn process_blockstore_from_root( assert!(bank_forks.active_banks().is_empty()); } - Ok(last_full_snapshot_slot) + Ok(()) } /// Verify that a segment of entries has the correct number of ticks and hashes @@ -1159,16 +1146,12 @@ fn load_frozen_forks( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - snapshot_config: Option<&SnapshotConfig>, - pending_accounts_package: PendingAccountsPackage, timing: &mut ExecuteTimings, - last_full_snapshot_slot: &mut Option, - pruned_banks_receiver: DroppedSlotsReceiver, + accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(), BlockstoreProcessorError> { let recyclers = VerifyRecyclers::default(); let mut all_banks = HashMap::new(); let mut last_status_report = Instant::now(); - let mut last_free = Instant::now(); let mut pending_slots = vec![]; let mut slots_elapsed = 0; let mut txs = 0; @@ -1292,63 +1275,10 @@ fn load_frozen_forks( leader_schedule_cache.set_root(new_root_bank); let _ = bank_forks.write().unwrap().set_root( root, - &solana_runtime::accounts_background_service::AbsRequestSender::default(), + accounts_background_request_sender, None, ); - if let Some(snapshot_config) = snapshot_config { - let block_height = new_root_bank.block_height(); - if snapshot_utils::should_take_full_snapshot( - block_height, - snapshot_config.full_snapshot_archive_interval_slots, - ) { - info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", root); - *last_full_snapshot_slot = Some(root); - new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot); - last_free = Instant::now(); - new_root_bank.update_accounts_hash_with_index_option( - false, - snapshot_config.accounts_hash_debug_verify, - false, - ); - snapshot_utils::snapshot_bank( - new_root_bank, - new_root_bank.src.slot_deltas(&new_root_bank.src.roots()), - &pending_accounts_package, - &snapshot_config.bank_snapshots_dir, - &snapshot_config.snapshot_archives_dir, - snapshot_config.snapshot_version, - snapshot_config.archive_format, - None, - Some(SnapshotType::FullSnapshot), - ) - .expect("Failed to snapshot bank while loading frozen banks"); - trace!( - "took bank snapshot for new root bank, block height: {}, slot: {}", - block_height, - root - ); - } - } - - if last_free.elapsed() > Duration::from_secs(10) { - // Purge account state for all dropped banks - for (pruned_slot, pruned_bank_id) in pruned_banks_receiver.try_iter() { - // Simulate this purge is from AccountBackgroundService - new_root_bank.rc.accounts.accounts_db.purge_slot( - pruned_slot, - pruned_bank_id, - true, - ); - } - - // Must be called after `squash()`, so that AccountsDb knows what - // the roots are for the cache flushing in exhaustively_free_unused_resource(). - // This could take few secs; so update last_free later - new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot); - last_free = Instant::now(); - } - // Filter out all non descendants of the new root pending_slots .retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(&root)); @@ -3211,7 +3141,6 @@ pub mod tests { let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1); // Test process_blockstore_from_root() from slot 1 onwards - let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); let bank_forks = RwLock::new(bank_forks); process_blockstore_from_root( &blockstore, @@ -3220,9 +3149,7 @@ pub mod tests { &opts, None, None, - None, - PendingAccountsPackage::default(), - pruned_banks_receiver, + &AbsRequestSender::default(), ) .unwrap(); @@ -3246,128 +3173,6 @@ pub mod tests { verify_fork_infos(&bank_forks); } - /// Test that processing the blockstore is aware of incremental snapshots. When processing the - /// blockstore from a root, like what happens when loading from a snapshot, there may be new - /// roots that cross a full snapshot interval. In these cases, a bank snapshot must be taken, - /// so that a full snapshot archive is created and available by the time the background - /// services spin up. - /// - /// For this test, process enough roots to cross the full snapshot interval multiple times. - /// Ensure afterwards that the snapshots were created. - #[test] - fn test_process_blockstore_from_root_with_snapshots() { - solana_logger::setup(); - let GenesisConfigInfo { - mut genesis_config, .. - } = create_genesis_config(123); - - let ticks_per_slot = 1; - genesis_config.ticks_per_slot = ticks_per_slot; - let (ledger_path, blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - const ROOT_INTERVAL_SLOTS: Slot = 2; - const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = ROOT_INTERVAL_SLOTS * 5; - const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 4; - - let mut last_hash = blockhash; - for i in 1..=LAST_SLOT { - last_hash = - fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, i, i - 1, last_hash); - } - - let roots_to_set = (0..=LAST_SLOT) - .step_by(ROOT_INTERVAL_SLOTS as usize) - .collect_vec(); - blockstore.set_roots(roots_to_set.iter()).unwrap(); - - // Set up bank1 - let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); - let bank0 = bank_forks.get(0).unwrap().clone(); - let opts = ProcessOptions { - poh_verify: true, - accounts_db_test_hash_calculation: true, - ..ProcessOptions::default() - }; - let recyclers = VerifyRecyclers::default(); - process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); - - let slot_start_processing = 1; - let bank = bank_forks.insert(Bank::new_from_parent( - &bank0, - &Pubkey::default(), - slot_start_processing, - )); - confirm_full_slot( - &blockstore, - &bank, - &opts, - &recyclers, - &mut ConfirmationProgress::new(bank0.last_blockhash()), - None, - None, - &mut ExecuteTimings::default(), - ) - .unwrap(); - bank_forks.set_root( - 1, - &solana_runtime::accounts_background_service::AbsRequestSender::default(), - None, - ); - - let bank_snapshots_tempdir = tempfile::TempDir::new().unwrap(); - let snapshot_config = SnapshotConfig { - full_snapshot_archive_interval_slots: FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS, - bank_snapshots_dir: bank_snapshots_tempdir.path().to_path_buf(), - ..SnapshotConfig::default() - }; - - let pending_accounts_package = PendingAccountsPackage::default(); - let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); - - let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); - process_blockstore_from_root( - &blockstore, - &RwLock::new(bank_forks), - &leader_schedule_cache, - &opts, - None, - None, - Some(&snapshot_config), - Arc::clone(&pending_accounts_package), - pruned_banks_receiver, - ) - .unwrap(); - - // Ensure the last AccountsPackage was created and is pending - let received_accounts_package_slot = pending_accounts_package - .lock() - .unwrap() - .take() - .unwrap() - .slot; - let expected_accounts_package_slot = (slot_start_processing..=LAST_SLOT) - .filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0) - .max() - .unwrap(); - assert_eq!( - received_accounts_package_slot, - expected_accounts_package_slot - ); - - // Ensure all the bank snapshots were created - let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir); - let mut bank_snapshot_slots = bank_snapshots - .into_iter() - .map(|bank_snapshot| bank_snapshot.slot) - .collect::>(); - bank_snapshot_slots.sort_unstable(); - let expected_slots = (slot_start_processing..=LAST_SLOT) - .filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0) - .collect::>(); - assert_eq!(bank_snapshot_slots, expected_slots); - } - #[test] #[ignore] fn test_process_entries_stress() { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c0d2dc4b6..2adab82da 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3261,34 +3261,6 @@ impl Bank { } } - // Should not be called outside of startup, will race with - // concurrent cleaning logic in AccountsBackgroundService - pub fn exhaustively_free_unused_resource(&self, last_full_snapshot_slot: Option) { - const IS_STARTUP: bool = true; // this is only called at startup, and we want to use more threads - let mut flush = Measure::start("flush"); - // Flush all the rooted accounts. Must be called after `squash()`, - // so that AccountsDb knows what the roots are. - self.force_flush_accounts_cache(); - flush.stop(); - - let mut clean = Measure::start("clean"); - // Don't clean the slot we're snapshotting because it may have zero-lamport - // accounts that were included in the bank delta hash when the bank was frozen, - // and if we clean them here, any newly created snapshot's hash for this bank - // may not match the frozen hash. - self.clean_accounts(true, IS_STARTUP, last_full_snapshot_slot); - clean.stop(); - - let mut shrink = Measure::start("shrink"); - self.shrink_all_slots(IS_STARTUP, last_full_snapshot_slot); - shrink.stop(); - - info!( - "exhaustively_free_unused_resource() {} {} {}", - flush, clean, shrink, - ); - } - pub fn epoch_schedule(&self) -> &EpochSchedule { &self.epoch_schedule }