From 55fc963595c96eafb334bdcccf4559c2307a3cb6 Mon Sep 17 00:00:00 2001 From: carllin Date: Sat, 12 Dec 2020 17:22:34 -0800 Subject: [PATCH] Move slot cleanup to AccountsBackgroundService (#13911) * Move bank drop to AccountsBackgroundService * Send to ABS on drop instead, protects against other places banks are dropped * Fix Abi * test Co-authored-by: Carl Lin --- banking-bench/src/main.rs | 6 +- core/src/commitment_service.rs | 15 +- core/src/consensus.rs | 3 +- .../optimistically_confirmed_bank_tracker.rs | 9 +- core/src/replay_stage.rs | 32 ++-- core/src/rpc.rs | 9 +- core/src/tvu.rs | 31 ++- core/tests/snapshots.rs | 10 +- runtime/src/accounts_background_service.rs | 177 ++++++++++++++++-- runtime/src/bank.rs | 42 ++++- runtime/src/bank_forks.rs | 30 ++- 11 files changed, 297 insertions(+), 67 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 67b61a8fd3..7bdf2b7275 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -17,7 +17,9 @@ use solana_ledger::{ }; use solana_measure::measure::Measure; use solana_perf::packet::to_packets_chunked; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_runtime::{ + accounts_background_service::ABSRequestSender, bank::Bank, bank_forks::BankForks, +}; use solana_sdk::{ hash::Hash, signature::Keypair, @@ -323,7 +325,7 @@ fn main() { poh_recorder.lock().unwrap().set_bank(&bank); assert!(poh_recorder.lock().unwrap().bank().is_some()); if bank.slot() > 32 { - bank_forks.set_root(root, &None, None); + bank_forks.set_root(root, &ABSRequestSender::default(), None); root += 1; } debug!( diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index 333602b821..d73f769181 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -249,6 +249,7 @@ mod tests { use super::*; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_runtime::{ + accounts_background_service::ABSRequestSender, bank_forks::BankForks, genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, }; @@ -539,7 +540,7 @@ mod tests { &working_bank, ); for x in 0..root { - bank_forks.set_root(x, &None, None); + bank_forks.set_root(x, &ABSRequestSender::default(), None); } // Add an additional bank/vote that will root slot 2 @@ -576,7 +577,11 @@ mod tests { .read() .unwrap() .highest_confirmed_root(); - bank_forks.set_root(root, &None, Some(highest_confirmed_root)); + bank_forks.set_root( + root, + &ABSRequestSender::default(), + Some(highest_confirmed_root), + ); let highest_confirmed_root_bank = bank_forks.get(highest_confirmed_root); assert!(highest_confirmed_root_bank.is_some()); @@ -641,7 +646,11 @@ mod tests { .read() .unwrap() .highest_confirmed_root(); - bank_forks.set_root(root, &None, Some(highest_confirmed_root)); + bank_forks.set_root( + root, + &ABSRequestSender::default(), + Some(highest_confirmed_root), + ); let highest_confirmed_root_bank = bank_forks.get(highest_confirmed_root); assert!(highest_confirmed_root_bank.is_some()); } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 7b12a45ada..8838e1dcee 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1245,6 +1245,7 @@ pub mod test { }; use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path}; use solana_runtime::{ + accounts_background_service::ABSRequestSender, bank::Bank, bank_forks::BankForks, genesis_utils::{ @@ -1417,7 +1418,7 @@ pub mod test { new_root, &self.bank_forks, &mut self.progress, - &None, + &ABSRequestSender::default(), &mut PubkeyReferences::default(), None, &mut self.heaviest_subtree_fork_choice, diff --git a/core/src/optimistically_confirmed_bank_tracker.rs b/core/src/optimistically_confirmed_bank_tracker.rs index 8de96bac88..7dec2b8d20 100644 --- a/core/src/optimistically_confirmed_bank_tracker.rs +++ b/core/src/optimistically_confirmed_bank_tracker.rs @@ -167,7 +167,9 @@ impl OptimisticallyConfirmedBankTracker { mod tests { use super::*; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_runtime::commitment::BlockCommitmentCache; + use solana_runtime::{ + accounts_background_service::ABSRequestSender, commitment::BlockCommitmentCache, + }; use solana_sdk::pubkey::Pubkey; #[test] @@ -279,7 +281,10 @@ mod tests { let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone(); let bank7 = Bank::new_from_parent(&bank5, &Pubkey::default(), 7); bank_forks.write().unwrap().insert(bank7); - bank_forks.write().unwrap().set_root(7, &None, None); + bank_forks + .write() + .unwrap() + .set_root(7, &ABSRequestSender::default(), None); OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(6), &bank_forks, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index e6e511f655..22b34ef04e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -29,7 +29,7 @@ use solana_ledger::{ use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::inc_new_counter_info; use solana_runtime::{ - accounts_background_service::SnapshotRequestSender, bank::Bank, bank_forks::BankForks, + accounts_background_service::ABSRequestSender, bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ @@ -100,7 +100,7 @@ pub struct ReplayStageConfig { pub subscriptions: Arc, pub leader_schedule_cache: Arc, pub latest_root_senders: Vec>, - pub snapshot_request_sender: Option, + pub accounts_background_request_sender: ABSRequestSender, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, @@ -232,7 +232,7 @@ impl ReplayStage { subscriptions, leader_schedule_cache, latest_root_senders, - snapshot_request_sender, + accounts_background_request_sender, block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -447,7 +447,7 @@ impl ReplayStage { &blockstore, &leader_schedule_cache, &lockouts_sender, - &snapshot_request_sender, + &accounts_background_request_sender, &latest_root_senders, &mut all_pubkeys, &subscriptions, @@ -1039,7 +1039,7 @@ impl ReplayStage { blockstore: &Arc, leader_schedule_cache: &Arc, lockouts_sender: &Sender, - snapshot_request_sender: &Option, + accounts_background_request_sender: &ABSRequestSender, latest_root_senders: &[Sender], all_pubkeys: &mut PubkeyReferences, subscriptions: &Arc, @@ -1096,7 +1096,7 @@ impl ReplayStage { new_root, &bank_forks, progress, - snapshot_request_sender, + accounts_background_request_sender, all_pubkeys, highest_confirmed_root, heaviest_subtree_fork_choice, @@ -1816,7 +1816,7 @@ impl ReplayStage { new_root: Slot, bank_forks: &RwLock, progress: &mut ProgressMap, - snapshot_request_sender: &Option, + accounts_background_request_sender: &ABSRequestSender, all_pubkeys: &mut PubkeyReferences, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, @@ -1824,7 +1824,7 @@ impl ReplayStage { let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); bank_forks.write().unwrap().set_root( new_root, - snapshot_request_sender, + accounts_background_request_sender, highest_confirmed_root, ); let r_bank_forks = bank_forks.read().unwrap(); @@ -2002,6 +2002,7 @@ pub(crate) mod tests { }, }; use solana_runtime::{ + accounts_background_service::ABSRequestSender, commitment::BlockCommitment, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, }; @@ -2240,7 +2241,7 @@ pub(crate) mod tests { root, &bank_forks, &mut progress, - &None, + &ABSRequestSender::default(), &mut PubkeyReferences::default(), None, &mut heaviest_subtree_fork_choice, @@ -2285,7 +2286,7 @@ pub(crate) mod tests { root, &bank_forks, &mut progress, - &None, + &ABSRequestSender::default(), &mut PubkeyReferences::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, @@ -3269,7 +3270,7 @@ pub(crate) mod tests { bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 9)); let bank9 = bank_forks.get(9).unwrap().clone(); bank_forks.insert(Bank::new_from_parent(&bank9, &Pubkey::default(), 10)); - bank_forks.set_root(9, &None, None); + bank_forks.set_root(9, &ABSRequestSender::default(), None); let total_epoch_stake = bank0.total_epoch_stake(); // Insert new ForkProgress for slot 10 and its @@ -3361,7 +3362,7 @@ pub(crate) mod tests { .get_propagated_stats_mut(0) .unwrap() .is_leader_slot = true; - bank_forks.set_root(0, &None, None); + bank_forks.set_root(0, &ABSRequestSender::default(), None); let total_epoch_stake = bank_forks.root_bank().total_epoch_stake(); // Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only @@ -3442,7 +3443,7 @@ pub(crate) mod tests { .get_propagated_stats_mut(0) .unwrap() .is_leader_slot = true; - bank_forks.set_root(0, &None, None); + bank_forks.set_root(0, &ABSRequestSender::default(), None); let total_epoch_stake = num_validators as u64 * stake_per_validator; @@ -3788,7 +3789,10 @@ pub(crate) mod tests { )); // Try to purge the root - bank_forks.write().unwrap().set_root(3, &None, None); + bank_forks + .write() + .unwrap() + .set_root(3, &ABSRequestSender::default(), None); let mut descendants = bank_forks.read().unwrap().descendants(); let mut ancestors = bank_forks.read().unwrap().ancestors(); let slot_3_descendants = descendants.get(&3).unwrap().clone(); diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 5e753f11ed..e53d09577f 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -2732,7 +2732,9 @@ pub mod tests { blockstore_processor::fill_blockstore_slot_with_ticks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; - use solana_runtime::commitment::BlockCommitment; + use solana_runtime::{ + accounts_background_service::ABSRequestSender, commitment::BlockCommitment, + }; use solana_sdk::{ clock::MAX_RECENT_BLOCKHASHES, fee_calculator::DEFAULT_BURN_PERCENT, @@ -2833,7 +2835,10 @@ pub mod tests { bank_forks.write().unwrap().insert(new_bank); for root in roots.iter() { - bank_forks.write().unwrap().set_root(*root, &None, Some(0)); + bank_forks + .write() + .unwrap() + .set_root(*root, &ABSRequestSender::default(), Some(0)); let mut stakes = HashMap::new(); stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default())); let block_time = bank_forks diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 97e1413c08..51bb8f9614 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -28,7 +28,10 @@ use solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, }; use solana_runtime::{ - accounts_background_service::{AccountsBackgroundService, SnapshotRequestHandler}, + accounts_background_service::{ + ABSRequestHandler, ABSRequestSender, AccountsBackgroundService, SendDroppedBankCallback, + SnapshotRequestHandler, + }, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, snapshot_package::AccountsPackageSender, @@ -39,6 +42,7 @@ use solana_sdk::{ signature::{Keypair, Signer}, }; use std::{ + boxed::Box, collections::HashSet, net::UdpSocket, sync::{ @@ -208,6 +212,22 @@ impl Tvu { .unwrap_or((None, None)) }; + let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + + // Before replay starts, set the callbacks in each of the banks in BankForks + for bank in bank_forks.read().unwrap().banks.values() { + bank.set_callback(Some(Box::new(SendDroppedBankCallback::new( + pruned_banks_sender.clone(), + )))); + } + + let accounts_background_request_sender = ABSRequestSender::new(snapshot_request_sender); + + let accounts_background_request_handler = ABSRequestHandler { + snapshot_request_handler, + pruned_banks_receiver, + }; + let replay_stage_config = ReplayStageConfig { my_pubkey: keypair.pubkey(), vote_account: *vote_account, @@ -216,7 +236,7 @@ impl Tvu { subscriptions: subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), latest_root_senders: vec![ledger_cleanup_slot_sender], - snapshot_request_sender, + accounts_background_request_sender, block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -248,8 +268,11 @@ impl Tvu { ) }); - let accounts_background_service = - AccountsBackgroundService::new(bank_forks.clone(), &exit, snapshot_request_handler); + let accounts_background_service = AccountsBackgroundService::new( + bank_forks.clone(), + &exit, + accounts_background_request_handler, + ); Tvu { fetch_stage, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index df46b43ce7..eb5adc5464 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -43,7 +43,7 @@ mod tests { snapshot_packager_service::SnapshotPackagerService, }; use solana_runtime::{ - accounts_background_service::SnapshotRequestHandler, + accounts_background_service::{ABSRequestSender, SnapshotRequestHandler}, bank::{Bank, BankSlotDelta}, bank_forks::{BankForks, CompressionType, SnapshotConfig}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -187,7 +187,7 @@ mod tests { let (s, snapshot_request_receiver) = unbounded(); let (accounts_package_sender, _r) = channel(); - let snapshot_request_sender = Some(s); + let request_sender = ABSRequestSender::new(Some(s)); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, @@ -202,7 +202,7 @@ mod tests { // kick in if slot % set_root_interval == 0 || slot == last_slot - 1 { // set_root should send a snapshot request - bank_forks.set_root(bank.slot(), &snapshot_request_sender, None); + bank_forks.set_root(bank.slot(), &request_sender, None); snapshot_request_handler.handle_snapshot_requests(); } } @@ -444,7 +444,7 @@ mod tests { (*add_root_interval * num_set_roots * 2) as u64, ); let mut current_bank = snapshot_test_config.bank_forks[0].clone(); - let snapshot_sender = Some(snapshot_sender); + let request_sender = ABSRequestSender::new(Some(snapshot_sender)); for _ in 0..num_set_roots { for _ in 0..*add_root_interval { let new_slot = current_bank.slot() + 1; @@ -455,7 +455,7 @@ mod tests { } snapshot_test_config.bank_forks.set_root( current_bank.slot(), - &snapshot_sender, + &request_sender, None, ); } diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 6d82b5643a..1b9f3ba612 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -3,21 +3,26 @@ // This can be expensive since we have to walk the append vecs being cleaned up. use crate::{ - bank::{Bank, BankSlotDelta}, + bank::{Bank, BankSlotDelta, DropCallback}, bank_forks::{BankForks, SnapshotConfig}, snapshot_package::AccountsPackageSender, snapshot_utils, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::{Receiver, SendError, Sender}; use log::*; use rand::{thread_rng, Rng}; use solana_measure::measure::Measure; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, +use solana_sdk::clock::Slot; +use std::{ + boxed::Box, + fmt::{Debug, Formatter}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, }; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; const INTERVAL_MS: u64 = 100; const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250; @@ -27,6 +32,37 @@ const CLEAN_INTERVAL_BLOCKS: u64 = 100; pub type SnapshotRequestSender = Sender; pub type SnapshotRequestReceiver = Receiver; +pub type DroppedSlotsSender = Sender; +pub type DroppedSlotsReceiver = Receiver; + +#[derive(Clone)] +pub struct SendDroppedBankCallback { + sender: DroppedSlotsSender, +} + +impl DropCallback for SendDroppedBankCallback { + fn callback(&self, bank: &Bank) { + if let Err(e) = self.sender.send(bank.slot()) { + warn!("Error sending dropped banks: {:?}", e); + } + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Debug for SendDroppedBankCallback { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "SendDroppedBankCallback({:p})", self) + } +} + +impl SendDroppedBankCallback { + pub fn new(sender: DroppedSlotsSender) -> Self { + Self { sender } + } +} pub struct SnapshotRequest { pub snapshot_root_bank: Arc, @@ -109,6 +145,60 @@ impl SnapshotRequestHandler { } } +#[derive(Default)] +pub struct ABSRequestSender { + snapshot_request_sender: Option, +} + +impl ABSRequestSender { + pub fn new(snapshot_request_sender: Option) -> Self { + ABSRequestSender { + snapshot_request_sender, + } + } + + pub fn is_snapshot_creation_enabled(&self) -> bool { + self.snapshot_request_sender.is_some() + } + + pub fn send_snapshot_request( + &self, + snapshot_request: SnapshotRequest, + ) -> Result<(), SendError> { + if let Some(ref snapshot_request_sender) = self.snapshot_request_sender { + snapshot_request_sender.send(snapshot_request) + } else { + Ok(()) + } + } +} + +pub struct ABSRequestHandler { + pub snapshot_request_handler: Option, + pub pruned_banks_receiver: DroppedSlotsReceiver, +} + +impl ABSRequestHandler { + // Returns the latest requested snapshot block height, if one exists + pub fn handle_snapshot_requests(&self) -> Option { + self.snapshot_request_handler + .as_ref() + .and_then(|snapshot_request_handler| { + snapshot_request_handler.handle_snapshot_requests() + }) + } + + pub fn handle_pruned_banks<'a>(&'a self, bank: &Bank) -> usize { + let mut count = 0; + for pruned_slot in self.pruned_banks_receiver.try_iter() { + count += 1; + bank.rc.accounts.purge_slot(pruned_slot); + } + + count + } +} + pub struct AccountsBackgroundService { t_background: JoinHandle<()>, } @@ -117,12 +207,14 @@ impl AccountsBackgroundService { pub fn new( bank_forks: Arc>, exit: &Arc, - snapshot_request_handler: Option, + request_handler: ABSRequestHandler, ) -> Self { info!("AccountsBackgroundService active"); let exit = exit.clone(); let mut consumed_budget = 0; let mut last_cleaned_block_height = 0; + let mut removed_slots_count = 0; + let mut total_remove_slots_time = 0; let t_background = Builder::new() .name("solana-accounts-background".to_string()) .spawn(move || loop { @@ -133,6 +225,14 @@ impl AccountsBackgroundService { // Grab the current root bank let bank = bank_forks.read().unwrap().root_bank().clone(); + // Purge accounts of any dead slots + Self::remove_dead_slots( + &bank, + &request_handler, + &mut removed_slots_count, + &mut total_remove_slots_time, + ); + // Check to see if there were any requests for snapshotting banks // < the current root bank `bank` above. @@ -148,14 +248,9 @@ impl AccountsBackgroundService { // // However, this is impossible because BankForks.set_root() will always flush the snapshot // request for `N` to the snapshot request channel before setting a root `R > N`, and - // snapshot_request_handler.handle_snapshot_requests() will always look for the latest + // snapshot_request_handler.handle_requests() will always look for the latest // available snapshot in the channel. - let snapshot_block_height = - snapshot_request_handler - .as_ref() - .and_then(|snapshot_request_handler| { - snapshot_request_handler.handle_snapshot_requests() - }); + let snapshot_block_height = request_handler.handle_snapshot_requests(); if let Some(snapshot_block_height) = snapshot_block_height { // Safe, see proof above @@ -174,7 +269,6 @@ impl AccountsBackgroundService { last_cleaned_block_height = bank.block_height(); } } - sleep(Duration::from_millis(INTERVAL_MS)); }) .unwrap(); @@ -184,4 +278,55 @@ impl AccountsBackgroundService { pub fn join(self) -> thread::Result<()> { self.t_background.join() } + + fn remove_dead_slots( + bank: &Bank, + request_handler: &ABSRequestHandler, + removed_slots_count: &mut usize, + total_remove_slots_time: &mut u64, + ) { + let mut remove_slots_time = Measure::start("remove_slots_time"); + *removed_slots_count += request_handler.handle_pruned_banks(&bank); + remove_slots_time.stop(); + *total_remove_slots_time += remove_slots_time.as_us(); + + if *removed_slots_count >= 100 { + datapoint_info!( + "remove_slots_timing", + ("remove_slots_time", *total_remove_slots_time, i64), + ("removed_slots_count", *removed_slots_count, i64), + ); + *total_remove_slots_time = 0; + *removed_slots_count = 0; + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::genesis_utils::create_genesis_config; + use crossbeam_channel::unbounded; + use solana_sdk::{account::Account, pubkey::Pubkey}; + + #[test] + fn test_accounts_background_service_remove_dead_slots() { + let genesis = create_genesis_config(10); + let bank0 = Arc::new(Bank::new(&genesis.genesis_config)); + let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let request_handler = ABSRequestHandler { + snapshot_request_handler: None, + pruned_banks_receiver, + }; + + // Store an account in slot 0 + let account_key = Pubkey::new_unique(); + bank0.store_account(&account_key, &Account::new(264, 0, &Pubkey::default())); + assert!(bank0.get_account(&account_key).is_some()); + pruned_banks_sender.send(0).unwrap(); + AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0); + + // Slot should be removed + assert!(bank0.get_account(&account_key).is_none()); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2d7c278bcd..7ab392e2ac 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -699,6 +699,11 @@ impl fmt::Display for RewardType { } } +pub trait DropCallback: fmt::Debug { + fn callback(&self, b: &Bank); + fn clone_box(&self) -> Box; +} + #[derive(Debug, PartialEq, Serialize, Deserialize, AbiExample, Clone, Copy)] pub struct RewardInfo { pub reward_type: RewardType, @@ -706,6 +711,16 @@ pub struct RewardInfo { pub post_balance: u64, // Account balance in lamports after `lamports` was applied } +#[derive(Debug, Default)] +pub struct OptionalDropCallback(Option>); + +#[cfg(RUSTC_WITH_SPECIALIZATION)] +impl AbiExample for OptionalDropCallback { + fn example() -> Self { + Self(None) + } +} + /// Manager for the state of all accounts and programs after processing its entries. /// AbiExample is needed even without Serialize/Deserialize; actual (de-)serialization /// are implemented elsewhere for versioning @@ -849,6 +864,8 @@ pub struct Bank { pub transaction_log_collector: Arc>, pub feature_set: Arc, + + pub drop_callback: RwLock, } impl Default for BlockhashQueue { @@ -995,6 +1012,15 @@ impl Bank { transaction_log_collector_config: parent.transaction_log_collector_config.clone(), transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())), feature_set: parent.feature_set.clone(), + drop_callback: RwLock::new(OptionalDropCallback( + parent + .drop_callback + .read() + .unwrap() + .0 + .as_ref() + .map(|drop_callback| drop_callback.clone_box()), + )), }; datapoint_info!( @@ -1035,6 +1061,10 @@ impl Bank { new } + pub fn set_callback(&self, callback: Option>) { + *self.drop_callback.write().unwrap() = OptionalDropCallback(callback); + } + /// Like `new_from_parent` but additionally: /// * Doesn't assume that the parent is anywhere near `slot`, parent could be millions of slots /// in the past @@ -1113,6 +1143,7 @@ impl Bank { transaction_log_collector_config: new(), transaction_log_collector: new(), feature_set: new(), + drop_callback: RwLock::new(OptionalDropCallback(None)), }; bank.finish_init(genesis_config, additional_builtins); @@ -4615,9 +4646,16 @@ impl Bank { impl Drop for Bank { fn drop(&mut self) { - // For root slots this is a noop if !self.skip_drop.load(Relaxed) { - self.rc.accounts.purge_slot(self.slot()); + if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() { + drop_callback.callback(self); + } else { + // Default case + // 1. Tests + // 2. At startup when replaying blockstore and there's no + // AccountsBackgroundService to perform cleanups yet. + self.rc.accounts.purge_slot(self.slot()); + } } } } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 21b1e4082f..b5e8feac28 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -1,7 +1,7 @@ //! The `bank_forks` module implements BankForks a DAG of checkpointed Banks use crate::{ - accounts_background_service::{SnapshotRequest, SnapshotRequestSender}, + accounts_background_service::{ABSRequestSender, SnapshotRequest}, bank::Bank, }; use log::*; @@ -170,7 +170,7 @@ impl BankForks { pub fn set_root( &mut self, root: Slot, - snapshot_request_sender: &Option, + accounts_background_request_sender: &ABSRequestSender, highest_confirmed_root: Option, ) { let old_epoch = self.root_bank().epoch(); @@ -216,20 +216,19 @@ impl BankForks { bank.squash(); is_root_bank_squashed = bank_slot == root; - if self.snapshot_config.is_some() && snapshot_request_sender.is_some() { + if self.snapshot_config.is_some() + && accounts_background_request_sender.is_snapshot_creation_enabled() + { let snapshot_root_bank = self.root_bank().clone(); let root_slot = snapshot_root_bank.slot(); if let Err(e) = - snapshot_request_sender - .as_ref() - .unwrap() - .send(SnapshotRequest { - snapshot_root_bank, - // Save off the status cache because these may get pruned - // if another `set_root()` is called before the snapshots package - // can be generated - status_cache_slot_deltas: bank.src.slot_deltas(&bank.src.roots()), - }) + accounts_background_request_sender.send_snapshot_request(SnapshotRequest { + snapshot_root_bank, + // Save off the status cache because these may get pruned + // if another `set_root()` is called before the snapshots package + // can be generated + status_cache_slot_deltas: bank.src.slot_deltas(&bank.src.roots()), + }) { warn!( "Error sending snapshot request for bank: {}, err: {:?}", @@ -244,7 +243,6 @@ impl BankForks { root_bank.squash(); } let new_tx_count = root_bank.transaction_count(); - self.prune_non_root(root, highest_confirmed_root); inc_new_counter_info!( @@ -405,7 +403,7 @@ mod tests { let bank0 = Bank::new(&genesis_config); let mut bank_forks0 = BankForks::new(bank0); - bank_forks0.set_root(0, &None, None); + bank_forks0.set_root(0, &ABSRequestSender::default(), None); let bank1 = Bank::new(&genesis_config); let mut bank_forks1 = BankForks::new(bank1); @@ -438,7 +436,7 @@ mod tests { // Set root in bank_forks0 to truncate the ancestor history bank_forks0.insert(child1); - bank_forks0.set_root(slot, &None, None); + bank_forks0.set_root(slot, &ABSRequestSender::default(), None); // Don't set root in bank_forks1 to keep the ancestor history bank_forks1.insert(child2);