diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2d0e8398a6..95463e6ced 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -40,7 +40,8 @@ use { }, solana_runtime::{ accounts_background_service::{ - AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler, + AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver, + SnapshotRequestHandler, }, accounts_db::AccountShrinkThreshold, bank_forks::BankForks, @@ -57,7 +58,6 @@ use { }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, std::{ - boxed::Box, collections::HashSet, net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, @@ -147,6 +147,7 @@ impl Tvu { last_full_snapshot_slot: Option, block_metadata_notifier: Option, wait_to_vote_slot: Option, + pruned_banks_receiver: DroppedSlotsReceiver, ) -> Self { let TvuSockets { repair: repair_socket, @@ -246,23 +247,6 @@ impl Tvu { } }; - let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); - - // Before replay starts, set the callbacks in each of the banks in BankForks - // Note after this callback is created, only the AccountsBackgroundService should be calling - // AccountsDb::purge_slot() to clean up dropped banks. - let callback = bank_forks - .read() - .unwrap() - .root_bank() - .rc - .accounts - .accounts_db - .create_drop_bank_callback(pruned_banks_sender); - for bank in bank_forks.read().unwrap().banks().values() { - bank.set_callback(Some(Box::new(callback.clone()))); - } - let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender); let accounts_background_request_handler = AbsRequestHandler { @@ -462,6 +446,7 @@ pub mod tests { let tower = Tower::default(); let accounts_package_channel = unbounded(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -511,6 +496,7 @@ pub mod tests { None, None, None, + pruned_banks_receiver, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index a0128f68a0..49d3a11c68 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -68,6 +68,7 @@ use { transaction_status_service::TransactionStatusService, }, solana_runtime::{ + accounts_background_service::DroppedSlotsReceiver, accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -508,6 +509,7 @@ impl Validator { }, blockstore_process_options, blockstore_root_scan, + pruned_banks_receiver, ) = load_blockstore( config, ledger_path, @@ -528,6 +530,7 @@ impl Validator { config.snapshot_config.as_ref(), accounts_package_channel.0.clone(), blockstore_root_scan, + pruned_banks_receiver.clone(), ); let last_full_snapshot_slot = last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); @@ -935,6 +938,7 @@ impl Validator { last_full_snapshot_slot, block_metadata_notifier, config.wait_to_vote_slot, + pruned_banks_receiver, ); let tpu = Tpu::new( @@ -1279,6 +1283,7 @@ fn load_blockstore( TransactionHistoryServices, blockstore_processor::ProcessOptions, BlockstoreRootScan, + DroppedSlotsReceiver, ) { info!("loading ledger from {:?}...", ledger_path); *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; @@ -1358,19 +1363,23 @@ fn load_blockstore( TransactionHistoryServices::default() }; - let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = - bank_forks_utils::load_bank_forks( - &genesis_config, - &blockstore, - config.account_paths.clone(), - config.account_shrink_paths.clone(), - config.snapshot_config.as_ref(), - &process_options, - transaction_history_services - .cache_block_meta_sender - .as_ref(), - accounts_update_notifier, - ); + let ( + mut bank_forks, + mut leader_schedule_cache, + starting_snapshot_hashes, + pruned_banks_receiver, + ) = bank_forks_utils::load_bank_forks( + &genesis_config, + &blockstore, + config.account_paths.clone(), + config.account_shrink_paths.clone(), + config.snapshot_config.as_ref(), + &process_options, + transaction_history_services + .cache_block_meta_sender + .as_ref(), + accounts_update_notifier, + ); leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); bank_forks.set_snapshot_config(config.snapshot_config.clone()); @@ -1392,9 +1401,11 @@ fn load_blockstore( transaction_history_services, process_options, blockstore_root_scan, + pruned_banks_receiver, ) } +#[allow(clippy::too_many_arguments)] fn process_blockstore( blockstore: &Blockstore, bank_forks: &mut BankForks, @@ -1405,6 +1416,7 @@ fn process_blockstore( snapshot_config: Option<&SnapshotConfig>, accounts_package_sender: AccountsPackageSender, blockstore_root_scan: BlockstoreRootScan, + pruned_banks_receiver: DroppedSlotsReceiver, ) -> Option { let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root( blockstore, @@ -1415,6 +1427,7 @@ fn process_blockstore( cache_block_meta_sender, snapshot_config, accounts_package_sender, + pruned_banks_receiver, ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 1ee8b11069..efaf5aca99 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -7,8 +7,10 @@ use { }, leader_schedule_cache::LeaderScheduleCache, }, + crossbeam_channel::unbounded, log::*, solana_runtime::{ + accounts_background_service::DroppedSlotsReceiver, accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter, @@ -47,16 +49,17 @@ pub fn load( accounts_package_sender: AccountsPackageSender, accounts_update_notifier: Option, ) -> LoadResult { - let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks( - genesis_config, - blockstore, - account_paths, - shrink_paths, - snapshot_config, - &process_options, - cache_block_meta_sender, - accounts_update_notifier, - ); + let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) = + load_bank_forks( + genesis_config, + blockstore, + account_paths, + shrink_paths, + snapshot_config, + &process_options, + cache_block_meta_sender, + accounts_update_notifier, + ); blockstore_processor::process_blockstore_from_root( blockstore, @@ -67,6 +70,7 @@ pub fn load( cache_block_meta_sender, snapshot_config, accounts_package_sender, + pruned_banks_receiver, ) .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) } @@ -85,6 +89,7 @@ pub fn load_bank_forks( BankForks, LeaderScheduleCache, Option, + DroppedSlotsReceiver, ) { let snapshot_present = if let Some(snapshot_config) = snapshot_config { info!( @@ -144,12 +149,30 @@ pub fn load_bank_forks( ) }; - let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank()); + // Before replay starts, set the callbacks in each of the banks in BankForks so that + // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank + // drop behavior can be safely synchronized with any other ongoing accounts activity like + // cache flush, clean, shrink, as long as the same thread performing those activities also + // is processing the dropped banks from the `pruned_banks_receiver` channel. + + // There should only be one bank, the root bank in BankForks. Thus all banks added to + // BankForks from now on will be descended from the root bank and thus will inherit + // the bank drop callback. + assert_eq!(bank_forks.banks().len(), 1); + let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let root_bank = bank_forks.root_bank(); + let callback = root_bank + .rc + .accounts + .accounts_db + .create_drop_bank_callback(pruned_banks_sender); + root_bank.set_callback(Some(Box::new(callback))); + + let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank); if process_options.full_leader_cache { leader_schedule_cache.set_max_schedules(std::usize::MAX); } - assert_eq!(bank_forks.banks().len(), 1); if let Some(ref new_hard_forks) = process_options.new_hard_forks { let root_bank = bank_forks.root_bank(); let hard_forks = root_bank.hard_forks(); @@ -166,7 +189,12 @@ pub fn load_bank_forks( } } - (bank_forks, leader_schedule_cache, starting_snapshot_hashes) + ( + bank_forks, + leader_schedule_cache, + starting_snapshot_hashes, + pruned_banks_receiver, + ) } #[allow(clippy::too_many_arguments)] diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 5b9b83bc95..a6190eb8af 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -17,6 +17,7 @@ use { solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{ + accounts_background_service::DroppedSlotsReceiver, accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -567,16 +568,17 @@ pub fn test_process_blockstore( blockstore: &Blockstore, opts: ProcessOptions, ) -> (BankForks, LeaderScheduleCache) { - let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks( - genesis_config, - blockstore, - Vec::new(), - None, - None, - &opts, - None, - None, - ); + let (mut bank_forks, leader_schedule_cache, .., pruned_banks_receiver) = + crate::bank_forks_utils::load_bank_forks( + genesis_config, + blockstore, + Vec::new(), + None, + None, + &opts, + None, + None, + ); let (accounts_package_sender, _) = unbounded(); process_blockstore_from_root( blockstore, @@ -587,6 +589,7 @@ pub fn test_process_blockstore( None, None, accounts_package_sender, + pruned_banks_receiver, ) .unwrap(); (bank_forks, leader_schedule_cache) @@ -637,6 +640,7 @@ pub fn process_blockstore_from_root( cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, accounts_package_sender: AccountsPackageSender, + pruned_banks_receiver: DroppedSlotsReceiver, ) -> result::Result, BlockstoreProcessorError> { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { @@ -696,6 +700,7 @@ pub fn process_blockstore_from_root( accounts_package_sender, &mut timing, &mut last_full_snapshot_slot, + pruned_banks_receiver, )?; } else { // If there's no meta for the input `start_slot`, then we started from a snapshot @@ -1117,6 +1122,7 @@ fn load_frozen_forks( accounts_package_sender: AccountsPackageSender, timing: &mut ExecuteTimings, last_full_snapshot_slot: &mut Option, + pruned_banks_receiver: DroppedSlotsReceiver, ) -> result::Result<(), BlockstoreProcessorError> { let recyclers = VerifyRecyclers::default(); let mut all_banks = HashMap::new(); @@ -1285,6 +1291,17 @@ fn load_frozen_forks( } if last_free.elapsed() > Duration::from_secs(10) { + // Purge account state for all dropped banks + for (pruned_slot, pruned_bank_id) in pruned_banks_receiver.try_iter() { + // Simulate this purge being from the AccountsBackgroundService + let is_from_abs = true; + new_root_bank.rc.accounts.purge_slot( + pruned_slot, + pruned_bank_id, + is_from_abs, + ); + } + // Must be called after `squash()`, so that AccountsDb knows what // the roots are for the cache flushing in exhaustively_free_unused_resource(). // This could take few secs; so update last_free later @@ -3147,6 +3164,7 @@ pub mod tests { // Test process_blockstore_from_root() from slot 1 onwards let (accounts_package_sender, _) = unbounded(); + let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); process_blockstore_from_root( &blockstore, &mut bank_forks, @@ -3156,6 +3174,7 @@ pub mod tests { None, None, accounts_package_sender, + pruned_banks_receiver, ) .unwrap(); @@ -3256,6 +3275,7 @@ pub mod tests { let (accounts_package_sender, accounts_package_receiver) = unbounded(); let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); + let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); process_blockstore_from_root( &blockstore, &mut bank_forks, @@ -3265,6 +3285,7 @@ pub mod tests { None, Some(&snapshot_config), accounts_package_sender.clone(), + pruned_banks_receiver, ) .unwrap(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index b20b9b3e6a..2c467c1978 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6753,10 +6753,7 @@ impl Drop for Bank { if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() { drop_callback.callback(self); } else { - // Default case - // 1. Tests - // 2. At startup when replaying blockstore and there's no - // AccountsBackgroundService to perform cleanups yet. + // Default case for tests self.rc .accounts .purge_slot(self.slot(), self.bank_id(), false);