diff --git a/accounts-bench/src/main.rs b/accounts-bench/src/main.rs index 30ded4282..3abb883d6 100644 --- a/accounts-bench/src/main.rs +++ b/accounts-bench/src/main.rs @@ -88,7 +88,7 @@ fn main() { for x in 0..iterations { if clean { let mut time = Measure::start("clean"); - accounts.accounts_db.clean_accounts(); + accounts.accounts_db.clean_accounts(None); time.stop(); println!("{}", time); for slot in 0..num_slots { diff --git a/core/src/accounts_background_service.rs b/core/src/accounts_background_service.rs deleted file mode 100644 index 92c052c75..000000000 --- a/core/src/accounts_background_service.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Service to clean up dead slots in accounts_db -// -// This can be expensive since we have to walk the append vecs being cleaned up. - -use rand::{thread_rng, Rng}; -use solana_runtime::bank_forks::BankForks; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, -}; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; - -pub struct AccountsBackgroundService { - t_background: JoinHandle<()>, -} - -const INTERVAL_MS: u64 = 100; -const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250; -const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize = - SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize); -const CLEAN_INTERVAL_SLOTS: u64 = 100; - -impl AccountsBackgroundService { - pub fn new(bank_forks: Arc>, exit: &Arc) -> Self { - info!("AccountsBackgroundService active"); - let exit = exit.clone(); - let mut consumed_budget = 0; - let mut last_cleaned_slot = 0; - let t_background = Builder::new() - .name("solana-accounts-background".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - let bank = bank_forks.read().unwrap().root_bank().clone(); - - bank.process_dead_slots(); - - consumed_budget = bank - .process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL); - - if bank.block_height() - last_cleaned_slot - > (CLEAN_INTERVAL_SLOTS + thread_rng().gen_range(0, 10)) - { - bank.clean_accounts(); - last_cleaned_slot = bank.block_height(); - } - - sleep(Duration::from_millis(INTERVAL_MS)); - }) - .unwrap(); - Self { t_background } - } - - pub fn join(self) -> thread::Result<()> { - self.t_background.join() - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index ed08e7da9..d7c2130d4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,6 @@ #[macro_use] extern crate solana_bpf_loader_program; -pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; pub mod bigtable_upload_service; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 09852ced6..cbef8f42a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -29,8 +29,8 @@ use solana_ledger::{ use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::inc_new_counter_info; use solana_runtime::{ - bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, - snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender, + accounts_background_service::SnapshotRequestSender, bank::Bank, bank_forks::BankForks, + commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -103,7 +103,7 @@ pub struct ReplayStageConfig { pub subscriptions: Arc, pub leader_schedule_cache: Arc, pub latest_root_senders: Vec>, - pub accounts_hash_sender: Option, + pub snapshot_request_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, @@ -234,7 +234,7 @@ impl ReplayStage { subscriptions, leader_schedule_cache, latest_root_senders, - accounts_hash_sender, + snapshot_request_sender, block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -455,7 +455,7 @@ impl ReplayStage { &blockstore, &leader_schedule_cache, &lockouts_sender, - &accounts_hash_sender, + &snapshot_request_sender, &latest_root_senders, &mut all_pubkeys, &subscriptions, @@ -1025,7 +1025,7 @@ impl ReplayStage { blockstore: &Arc, leader_schedule_cache: &Arc, lockouts_sender: &Sender, - accounts_hash_sender: &Option, + snapshot_request_sender: &Option, latest_root_senders: &[Sender], all_pubkeys: &mut PubkeyReferences, subscriptions: &Arc, @@ -1081,7 +1081,7 @@ impl ReplayStage { new_root, &bank_forks, progress, - accounts_hash_sender, + snapshot_request_sender, all_pubkeys, highest_confirmed_root, heaviest_subtree_fork_choice, @@ -1778,7 +1778,7 @@ impl ReplayStage { new_root: Slot, bank_forks: &RwLock, progress: &mut ProgressMap, - accounts_hash_sender: &Option, + snapshot_request_sender: &Option, all_pubkeys: &mut PubkeyReferences, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, @@ -1786,7 +1786,7 @@ impl ReplayStage { let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); bank_forks.write().unwrap().set_root( new_root, - accounts_hash_sender, + snapshot_request_sender, highest_confirmed_root, ); let r_bank_forks = bank_forks.read().unwrap(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 489e0768b..a0d7674dd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -2,7 +2,6 @@ //! validation pipeline in software. use crate::{ - accounts_background_service::AccountsBackgroundService, accounts_hash_verifier::AccountsHashVerifier, broadcast_stage::RetransmitSlotsSender, cache_block_time_service::CacheBlockTimeSender, @@ -28,8 +27,11 @@ use solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, }; use solana_runtime::{ - bank_forks::BankForks, commitment::BlockCommitmentCache, - snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender, + accounts_background_service::{AccountsBackgroundService, SnapshotRequestHandler}, + bank_forks::{BankForks, SnapshotConfig}, + commitment::BlockCommitmentCache, + snapshot_package::AccountsPackageSender, + vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ pubkey::Pubkey, @@ -100,7 +102,7 @@ impl Tvu { transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_time_sender: Option, - snapshot_package_sender: Option, + snapshot_config_and_package_sender: Option<(SnapshotConfig, AccountsPackageSender)>, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, verified_vote_receiver: VerifiedVoteReceiver, @@ -171,10 +173,15 @@ impl Tvu { } }; info!("snapshot_interval_slots: {}", snapshot_interval_slots); + let (snapshot_config, accounts_package_sender) = snapshot_config_and_package_sender + .map(|(snapshot_config, accounts_package_sender)| { + (Some(snapshot_config), Some(accounts_package_sender)) + }) + .unwrap_or((None, None)); let (accounts_hash_sender, accounts_hash_receiver) = channel(); let accounts_hash_verifier = AccountsHashVerifier::new( accounts_hash_receiver, - snapshot_package_sender, + accounts_package_sender, exit, &cluster_info, tvu_config.trusted_validators.clone(), @@ -183,6 +190,22 @@ impl Tvu { snapshot_interval_slots, ); + let (snapshot_request_sender, snapshot_request_handler) = { + snapshot_config + .map(|snapshot_config| { + let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); + ( + Some(snapshot_request_sender), + Some(SnapshotRequestHandler { + snapshot_config, + snapshot_request_receiver, + accounts_package_sender: accounts_hash_sender, + }), + ) + }) + .unwrap_or((None, None)) + }; + let replay_stage_config = ReplayStageConfig { my_pubkey: keypair.pubkey(), vote_account: *vote_account, @@ -191,7 +214,7 @@ impl Tvu { subscriptions: subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), latest_root_senders: vec![ledger_cleanup_slot_sender], - accounts_hash_sender: Some(accounts_hash_sender), + snapshot_request_sender, block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -222,7 +245,8 @@ impl Tvu { ) }); - let accounts_background_service = AccountsBackgroundService::new(bank_forks.clone(), &exit); + let accounts_background_service = + AccountsBackgroundService::new(bank_forks.clone(), &exit, snapshot_request_handler); Tvu { fetch_stage, diff --git a/core/src/validator.rs b/core/src/validator.rs index 44cdfa4a3..59ace24e3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -454,13 +454,16 @@ impl Validator { cluster_info.set_entrypoint(cluster_entrypoint.clone()); } - let (snapshot_packager_service, snapshot_package_sender) = - if config.snapshot_config.is_some() { + let (snapshot_packager_service, snapshot_config_and_package_sender) = + if let Some(snapshot_config) = config.snapshot_config.clone() { // Start a snapshot packaging service let (sender, receiver) = channel(); let snapshot_packager_service = SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); - (Some(snapshot_packager_service), Some(sender)) + ( + Some(snapshot_packager_service), + Some((snapshot_config, sender)), + ) } else { (None, None) }; @@ -523,7 +526,7 @@ impl Validator { transaction_status_sender.clone(), rewards_recorder_sender, cache_block_time_sender, - snapshot_package_sender, + snapshot_config_and_package_sender, vote_tracker.clone(), retransmit_slots_sender, verified_vote_receiver, diff --git a/core/tests/bank_forks.rs b/core/tests/snapshots.rs similarity index 93% rename from core/tests/bank_forks.rs rename to core/tests/snapshots.rs index b31ed8f15..ee9cb0599 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/snapshots.rs @@ -35,12 +35,15 @@ macro_rules! DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS { #[cfg(test)] mod tests { use bincode::serialize_into; + use crossbeam_channel::unbounded; use fs_extra::dir::CopyOptions; use itertools::Itertools; - use solana_core::cluster_info::ClusterInfo; - use solana_core::contact_info::ContactInfo; - use solana_core::snapshot_packager_service::SnapshotPackagerService; + use solana_core::{ + cluster_info::ClusterInfo, contact_info::ContactInfo, + snapshot_packager_service::SnapshotPackagerService, + }; use solana_runtime::{ + accounts_background_service::SnapshotRequestHandler, bank::{Bank, BankSlotDelta}, bank_forks::{BankForks, CompressionType, SnapshotConfig}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -182,8 +185,14 @@ mod tests { let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; - let (s, _r) = channel(); - let sender = Some(s); + let (s, snapshot_request_receiver) = unbounded(); + let (accounts_package_sender, _r) = channel(); + let snapshot_request_sender = Some(s); + let snapshot_request_handler = SnapshotRequestHandler { + snapshot_config: snapshot_test_config.snapshot_config.clone(), + snapshot_request_receiver, + accounts_package_sender, + }; for slot in 0..last_slot { let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1); f(&mut bank, &mint_keypair); @@ -192,7 +201,9 @@ mod tests { // and to allow snapshotting of bank and the purging logic on status_cache to // kick in if slot % set_root_interval == 0 || slot == last_slot - 1 { - bank_forks.set_root(bank.slot(), &sender, None); + // set_root should send a snapshot request + bank_forks.set_root(bank.slot(), &snapshot_request_sender, None); + snapshot_request_handler.handle_snapshot_requests(); } } @@ -207,7 +218,7 @@ mod tests { last_bank, &last_slot_snapshot_path, snapshot_path, - &last_bank.src.roots(), + last_bank.src.slot_deltas(&last_bank.src.roots()), &snapshot_config.snapshot_package_output_path, last_bank.get_snapshot_storages(), CompressionType::Bzip2, @@ -312,7 +323,6 @@ mod tests { assert_eq!(bank.process_transaction(&tx), Ok(())); bank.squash(); let accounts_hash = bank.update_accounts_hash(); - bank_forks.insert(bank); let package_sender = { if slot == saved_slot as u64 { @@ -325,10 +335,18 @@ mod tests { } }; - bank_forks - .generate_accounts_package(slot, &[], &package_sender) - .unwrap(); + snapshot_utils::snapshot_bank( + &bank, + vec![], + &package_sender, + &snapshot_path, + &snapshot_package_output_path, + snapshot_config.snapshot_version, + &snapshot_config.compression, + ) + .unwrap(); + bank_forks.insert(bank); if slot == saved_slot as u64 { let options = CopyOptions::new(); fs_extra::dir::copy(accounts_dir, &saved_accounts_dir, &options).unwrap(); @@ -359,7 +377,7 @@ mod tests { // Purge all the outdated snapshots, including the ones needed to generate the package // currently sitting in the channel - bank_forks.purge_old_snapshots(); + snapshot_utils::purge_old_snapshots(&snapshot_path); assert!(snapshot_utils::get_snapshot_paths(&snapshots_dir) .into_iter() .map(|path| path.slot) @@ -418,7 +436,7 @@ mod tests { let num_set_roots = MAX_CACHE_ENTRIES * 2; for add_root_interval in &[1, 3, 9] { - let (snapshot_sender, _snapshot_receiver) = channel(); + let (snapshot_sender, _snapshot_receiver) = unbounded(); // Make sure this test never clears bank.slots_since_snapshot let mut snapshot_test_config = SnapshotTestConfig::new( snapshot_version, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 14c7b35f0..4772cfc46 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1805,7 +1805,7 @@ fn main() { ); assert!(bank.is_complete()); bank.squash(); - bank.clean_accounts(); + bank.clean_accounts(true); bank.update_accounts_hash(); if rehash { bank.rehash(); @@ -1823,7 +1823,7 @@ fn main() { &bank, &slot_snapshot_paths, &temp_dir, - &bank.src.roots(), + bank.src.slot_deltas(&bank.src.roots()), output_directory, storages, CompressionType::Bzip2, diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index b14c4ea09..b1d17b7ea 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -137,6 +137,6 @@ fn bench_delete_dependencies(bencher: &mut Bencher) { accounts.add_root(i); } bencher.iter(|| { - accounts.accounts_db.clean_accounts(); + accounts.accounts_db.clean_accounts(None); }); } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 191fe78ed..cc7d02bf9 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1810,7 +1810,7 @@ mod tests { } } info!("done..cleaning.."); - accounts.accounts_db.clean_accounts(); + accounts.accounts_db.clean_accounts(None); } fn load_accounts_no_store( diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs new file mode 100644 index 000000000..5edb88f3d --- /dev/null +++ b/runtime/src/accounts_background_service.rs @@ -0,0 +1,182 @@ +// Service to clean up dead slots in accounts_db +// +// This can be expensive since we have to walk the append vecs being cleaned up. + +use crate::{ + bank::{Bank, BankSlotDelta}, + bank_forks::{BankForks, SnapshotConfig}, + snapshot_package::AccountsPackageSender, + snapshot_utils, +}; +use crossbeam_channel::{Receiver, Sender}; +use log::*; +use rand::{thread_rng, Rng}; +use solana_measure::measure::Measure; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, +}; +use std::thread::{self, sleep, Builder, JoinHandle}; +use std::time::Duration; + +const INTERVAL_MS: u64 = 100; +const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250; +const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize = + SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize); +const CLEAN_INTERVAL_BLOCKS: u64 = 100; + +pub type SnapshotRequestSender = Sender; +pub type SnapshotRequestReceiver = Receiver; + +pub struct SnapshotRequest { + pub snapshot_root_bank: Arc, + pub status_cache_slot_deltas: Vec, +} + +pub struct SnapshotRequestHandler { + pub snapshot_config: SnapshotConfig, + pub snapshot_request_receiver: SnapshotRequestReceiver, + pub accounts_package_sender: AccountsPackageSender, +} + +impl SnapshotRequestHandler { + // Returns the latest requested snapshot slot, if one exists + pub fn handle_snapshot_requests(&self) -> Option { + self.snapshot_request_receiver + .try_iter() + .last() + .map(|snapshot_request| { + let SnapshotRequest { + snapshot_root_bank, + status_cache_slot_deltas, + } = snapshot_request; + + let mut shrink_time = Measure::start("shrink_time"); + snapshot_root_bank.process_stale_slot_with_budget(0, SHRUNKEN_ACCOUNT_PER_INTERVAL); + shrink_time.stop(); + + let mut clean_time = Measure::start("clean_time"); + // Don't clean the slot we're snapshotting because it may have zero-lamport + // accounts that were included in the bank delta hash when the bank was frozen, + // and if we clean them here, the newly created snapshot's hash may not match + // the frozen hash. + snapshot_root_bank.clean_accounts(true); + clean_time.stop(); + + // Generate an accounts package + let mut snapshot_time = Measure::start("snapshot_time"); + let r = snapshot_utils::snapshot_bank( + &snapshot_root_bank, + status_cache_slot_deltas, + &self.accounts_package_sender, + &self.snapshot_config.snapshot_path, + &self.snapshot_config.snapshot_package_output_path, + self.snapshot_config.snapshot_version, + &self.snapshot_config.compression, + ); + if r.is_err() { + warn!( + "Error generating snapshot for bank: {}, err: {:?}", + snapshot_root_bank.slot(), + r + ); + } + snapshot_time.stop(); + + // Cleanup outdated snapshots + let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time"); + snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path); + purge_old_snapshots_time.stop(); + + datapoint_info!( + "handle_snapshot_requests-timing", + ("shrink_time", shrink_time.as_us(), i64), + ("clean_time", clean_time.as_us(), i64), + ("snapshot_time", snapshot_time.as_us(), i64), + ( + "purge_old_snapshots_time", + purge_old_snapshots_time.as_us(), + i64 + ) + ); + snapshot_root_bank.block_height() + }) + } +} + +pub struct AccountsBackgroundService { + t_background: JoinHandle<()>, +} + +impl AccountsBackgroundService { + pub fn new( + bank_forks: Arc>, + exit: &Arc, + snapshot_request_handler: Option, + ) -> Self { + info!("AccountsBackgroundService active"); + let exit = exit.clone(); + let mut consumed_budget = 0; + let mut last_cleaned_block_height = 0; + let t_background = Builder::new() + .name("solana-accounts-background".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + // Grab the current root bank + let bank = bank_forks.read().unwrap().root_bank().clone(); + + // Check to see if there were any requests for snapshotting banks + // < the current root bank `bank` above. + + // Claim: Any snapshot request for slot `N` found here implies that the last cleanup + // slot `M` satisfies `M < N` + // + // Proof: Assume for contradiction that we find a snapshot request for slot `N` here, + // but cleanup has already happened on some slot `M >= N`. Because the call to + // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`, + // then that means in some *previous* iteration of this loop, we must have gotten a root + // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the + // snapshot request channel. + // + // However, this is impossible because BankForks.set_root() will always flush the snapshot + // request for `N` to the snapshot request channel before setting a root `R > N`, and + // snapshot_request_handler.handle_snapshot_requests() will always look for the latest + // available snapshot in the channel. + let snapshot_block_height = + snapshot_request_handler + .as_ref() + .and_then(|snapshot_request_handler| { + snapshot_request_handler.handle_snapshot_requests() + }); + + if let Some(snapshot_block_height) = snapshot_block_height { + // Safe, see proof above + assert!(last_cleaned_block_height <= snapshot_block_height); + last_cleaned_block_height = snapshot_block_height; + } else { + consumed_budget = bank.process_stale_slot_with_budget( + consumed_budget, + SHRUNKEN_ACCOUNT_PER_INTERVAL, + ); + + if bank.block_height() - last_cleaned_block_height + > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10)) + { + bank.clean_accounts(true); + last_cleaned_block_height = bank.block_height(); + } + } + + sleep(Duration::from_millis(INTERVAL_MS)); + }) + .unwrap(); + Self { t_background } + } + + pub fn join(self) -> thread::Result<()> { + self.t_background.join() + } +} diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3dd057114..0a19c200f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -46,7 +46,7 @@ use std::{ ops::RangeBounds, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}, time::Instant, }; use tempfile::TempDir; @@ -414,8 +414,6 @@ pub struct AccountsDB { pub bank_hashes: RwLock>, - dead_slots: RwLock>, - stats: AccountsStats, pub cluster_type: Option, @@ -478,7 +476,6 @@ impl Default for AccountsDB { min_num_stores: num_threads, bank_hashes: RwLock::new(bank_hashes), frozen_accounts: HashMap::new(), - dead_slots: RwLock::new(HashSet::new()), stats: AccountsStats::default(), cluster_type: None, } @@ -513,6 +510,10 @@ impl AccountsDB { new } + pub fn file_size(&self) -> u64 { + self.file_size + } + #[cfg(test)] pub fn new_single() -> Self { AccountsDB { @@ -539,7 +540,7 @@ impl AccountsDB { // Reclaim older states of rooted non-zero lamport accounts as a general // AccountsDB bloat mitigation and preprocess for better zero-lamport purging. - fn clean_old_rooted_accounts(&self, purges_in_root: Vec) { + fn clean_old_rooted_accounts(&self, purges_in_root: Vec, max_clean_root: Option) { // This number isn't carefully chosen; just guessed randomly such that // the hot loop will be the order of ~Xms. const INDEX_CLEAN_BULK_COUNT: usize = 4096; @@ -552,7 +553,7 @@ impl AccountsDB { let mut reclaims = Vec::new(); let accounts_index = self.accounts_index.read().unwrap(); for pubkey in pubkeys { - accounts_index.clean_rooted_entries(&pubkey, &mut reclaims); + accounts_index.clean_rooted_entries(&pubkey, &mut reclaims, max_clean_root); } reclaims }); @@ -561,20 +562,28 @@ impl AccountsDB { inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize); let mut measure = Measure::start("clean_old_root_reclaims"); - self.handle_reclaims_maybe_cleanup(&reclaims); + self.handle_reclaims(&reclaims, None, false); measure.stop(); debug!("{} {}", clean_rooted, measure); inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize); } - fn do_reset_uncleaned_roots(&self, candidates: &mut MutexGuard>) { - let previous_roots = self.accounts_index.write().unwrap().reset_uncleaned_roots(); + fn do_reset_uncleaned_roots( + &self, + candidates: &mut MutexGuard>, + max_clean_root: Option, + ) { + let previous_roots = self + .accounts_index + .write() + .unwrap() + .reset_uncleaned_roots(max_clean_root); candidates.extend(previous_roots); } #[cfg(test)] fn reset_uncleaned_roots(&self) { - self.do_reset_uncleaned_roots(&mut self.shrink_candidate_slots.lock().unwrap()); + self.do_reset_uncleaned_roots(&mut self.shrink_candidate_slots.lock().unwrap(), None); } fn calc_delete_dependencies( @@ -650,7 +659,7 @@ impl AccountsDB { // collection // Only remove those accounts where the entire rooted history of the account // can be purged because there are no live append vecs in the ancestors - pub fn clean_accounts(&self) { + pub fn clean_accounts(&self, max_clean_root: Option) { // hold a lock to prevent slot shrinking from running because it might modify some rooted // slot storages which can not happen as long as we're cleaning accounts because we're also // modifying the rooted slot storages! @@ -668,7 +677,7 @@ impl AccountsDB { let mut purges_in_root = Vec::new(); let mut purges = HashMap::new(); for pubkey in pubkeys { - if let Some((list, index)) = accounts_index.get(pubkey, None) { + if let Some((list, index)) = accounts_index.get(pubkey, None, max_clean_root) { let (slot, account_info) = &list[index]; if account_info.lamports == 0 { purges.insert(*pubkey, accounts_index.would_purge(pubkey)); @@ -694,9 +703,9 @@ impl AccountsDB { let mut clean_old_rooted = Measure::start("clean_old_roots"); if !purges_in_root.is_empty() { - self.clean_old_rooted_accounts(purges_in_root); + self.clean_old_rooted_accounts(purges_in_root, max_clean_root); } - self.do_reset_uncleaned_roots(&mut candidates); + self.do_reset_uncleaned_roots(&mut candidates, max_clean_root); clean_old_rooted.stop(); let mut store_counts_time = Measure::start("store_counts"); @@ -758,7 +767,7 @@ impl AccountsDB { self.handle_dead_keys(dead_keys); - self.handle_reclaims_maybe_cleanup(&reclaims); + self.handle_reclaims(&reclaims, None, false); reclaims_time.stop(); datapoint_info!( @@ -784,40 +793,49 @@ impl AccountsDB { } } - fn handle_reclaims_maybe_cleanup(&self, reclaims: SlotSlice) { - let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts"); - let dead_slots = self.remove_dead_accounts(reclaims); - dead_accounts.stop(); - let dead_slots_len = { - let mut dead_slots_w = self.dead_slots.write().unwrap(); - dead_slots_w.extend(dead_slots); - dead_slots_w.len() - }; - if dead_slots_len > 5000 { - self.process_dead_slots(None); + // Removes the accounts in the input `reclaims` from the tracked "count" of + // their corresponding storage entries. Note this does not actually free + // the memory from the storage entries until all the storage entries for + // a given slot `S` are empty, at which point `process_dead_slots` will + // remove all the storage entries for `S`. + // + /// # Arguments + /// * `reclaims` - The accounts to remove from storage entries' "count" + /// * `expected_single_dead_slot` - A correctness assertion. If this is equal to `Some(S)`, + /// then the function will check that the only slot being cleaned up in `reclaims` + /// is the slot == `S`. This is true for instance when `handle_reclaims` is called + /// from store or slot shrinking, as those should only touch the slot they are + /// currently storing to or shrinking. + /// * `no_dead_slot` - A correctness assertion. If this is equal to + /// `false`, the function will check that no slots are cleaned up/removed via + /// `process_dead_slots`. For instance, on store, no slots should be cleaned up, + /// but during the background clean accounts purges accounts from old rooted slots, + /// so outdated slots may be removed. + fn handle_reclaims( + &self, + reclaims: SlotSlice, + expected_single_dead_slot: Option, + no_dead_slot: bool, + ) { + if !reclaims.is_empty() { + let dead_slots = self.remove_dead_accounts(reclaims, expected_single_dead_slot); + if no_dead_slot { + assert!(dead_slots.is_empty()); + } else if let Some(expected_single_dead_slot) = expected_single_dead_slot { + assert!(dead_slots.len() <= 1); + if dead_slots.len() == 1 { + assert!(dead_slots.contains(&expected_single_dead_slot)); + } + } + if !dead_slots.is_empty() { + self.process_dead_slots(&dead_slots); + } } } - // Atomically process reclaims and new dead_slots in this thread, guaranteeing - // complete data removal for slots in reclaims. - fn handle_reclaims_ensure_cleanup(&self, reclaims: SlotSlice) { - let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts"); - let dead_slots = self.remove_dead_accounts(reclaims); - dead_accounts.stop(); - let mut dead_slots_w = self.dead_slots.write().unwrap(); - dead_slots_w.extend(dead_slots); - self.process_dead_slots(Some(dead_slots_w)); - } - - pub fn process_dead_slots<'a>( - &'a self, - dead_slots_w: Option>>, - ) { - let empty = HashSet::new(); - let mut dead_slots_w = dead_slots_w.unwrap_or_else(|| self.dead_slots.write().unwrap()); - let dead_slots = std::mem::replace(&mut *dead_slots_w, empty); - drop(dead_slots_w); - + // Must be kept private!, does sensitive cleanup that should only be called from + // supported pipelines in AccountsDb + fn process_dead_slots(&self, dead_slots: &HashSet) { let mut clean_dead_slots = Measure::start("reclaims::purge_slots"); self.clean_dead_slots(&dead_slots); clean_dead_slots.stop(); @@ -914,7 +932,7 @@ impl AccountsDB { (store_id, offset), _write_version, )| { - if let Some((list, _)) = accounts_index.get(pubkey, None) { + if let Some((list, _)) = accounts_index.get(pubkey, None, None) { list.iter() .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) } else { @@ -993,7 +1011,7 @@ impl AccountsDB { update_index_elapsed = start.as_us(); let mut start = Measure::start("update_index_elapsed"); - self.handle_reclaims_maybe_cleanup(&reclaims); + self.handle_reclaims(&reclaims, Some(slot), true); start.stop(); handle_reclaims_elapsed = start.as_us(); @@ -1213,7 +1231,7 @@ impl AccountsDB { accounts_index: &AccountsIndex, pubkey: &Pubkey, ) -> Option<(Account, Slot)> { - let (lock, index) = accounts_index.get(pubkey, Some(ancestors))?; + let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None)?; let slot = lock[index].0; //TODO: thread this as a ref if let Some(slot_storage) = storage.0.get(&slot) { @@ -1230,7 +1248,7 @@ impl AccountsDB { #[cfg(test)] fn load_account_hash(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Hash { let accounts_index = self.accounts_index.read().unwrap(); - let (lock, index) = accounts_index.get(pubkey, Some(ancestors)).unwrap(); + let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None).unwrap(); let slot = lock[index].0; let storage = self.storage.read().unwrap(); let slot_storage = storage.0.get(&slot).unwrap(); @@ -1309,7 +1327,7 @@ impl AccountsDB { self.purge_slots(&slots); } - pub fn purge_slots(&self, slots: &HashSet) { + fn purge_slots(&self, slots: &HashSet) { //add_root should be called first let accounts_index = self.accounts_index.read().unwrap(); let non_roots: Vec<_> = slots @@ -1395,7 +1413,7 @@ impl AccountsDB { // 1) Remove old bank hash from self.bank_hashes // 2) Purge this slot's storage entries from self.storage - self.handle_reclaims_ensure_cleanup(&reclaims); + self.handle_reclaims(&reclaims, Some(remove_slot), false); assert!(self.storage.read().unwrap().0.get(&remove_slot).is_none()); } @@ -1834,7 +1852,7 @@ impl AccountsDB { let hashes: Vec<_> = keys .par_iter() .filter_map(|pubkey| { - if let Some((list, index)) = accounts_index.get(pubkey, Some(ancestors)) { + if let Some((list, index)) = accounts_index.get(pubkey, Some(ancestors), None) { let (slot, account_info) = &list[index]; if account_info.lamports != 0 { storage @@ -1961,7 +1979,7 @@ impl AccountsDB { ); scan.stop(); let mut merge = Measure::start("merge"); - let mut account_maps = accumulator.pop().unwrap(); + let mut account_maps = HashMap::new(); while let Some(maps) = accumulator.pop() { AccountsDB::merge(&mut account_maps, &maps); } @@ -1994,7 +2012,6 @@ impl AccountsDB { ) -> SlotList { let mut reclaims = SlotList::::with_capacity(infos.len() * 2); let index = self.accounts_index.read().unwrap(); - let mut update_index_work = Measure::start("update_index_work"); let inserts: Vec<_> = infos .into_iter() .zip(accounts.iter()) @@ -2013,14 +2030,20 @@ impl AccountsDB { index.insert(slot, pubkey, info, &mut reclaims); } } - update_index_work.stop(); reclaims } - fn remove_dead_accounts(&self, reclaims: SlotSlice) -> HashSet { + fn remove_dead_accounts( + &self, + reclaims: SlotSlice, + expected_slot: Option, + ) -> HashSet { let storage = self.storage.read().unwrap(); let mut dead_slots = HashSet::new(); for (slot, account_info) in reclaims { + if let Some(expected_slot) = expected_slot { + assert_eq!(*slot, expected_slot); + } if let Some(slot_storage) = storage.0.get(slot) { if let Some(store) = slot_storage.get(&account_info.store_id) { assert_eq!( @@ -2049,56 +2072,54 @@ impl AccountsDB { dead_slots } - pub fn clean_dead_slots(&self, dead_slots: &HashSet) { - if !dead_slots.is_empty() { - { - let mut measure = Measure::start("clean_dead_slots-ms"); - let storage = self.storage.read().unwrap(); - let mut stores: Vec> = vec![]; - for slot in dead_slots.iter() { - if let Some(slot_storage) = storage.0.get(slot) { - for store in slot_storage.values() { - stores.push(store.clone()); - } + fn clean_dead_slots(&self, dead_slots: &HashSet) { + { + let mut measure = Measure::start("clean_dead_slots-ms"); + let storage = self.storage.read().unwrap(); + let mut stores: Vec> = vec![]; + for slot in dead_slots.iter() { + if let Some(slot_storage) = storage.0.get(slot) { + for store in slot_storage.values() { + stores.push(store.clone()); } } - drop(storage); - datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64)); - let slot_pubkeys: HashSet<(Slot, Pubkey)> = { - self.thread_pool_clean.install(|| { - stores - .into_par_iter() - .map(|store| { - let accounts = store.accounts.accounts(0); - accounts - .into_iter() - .map(|account| (store.slot, account.meta.pubkey)) - .collect::>() - }) - .reduce(HashSet::new, |mut reduced, store_pubkeys| { - reduced.extend(store_pubkeys); - reduced - }) - }) - }; - let index = self.accounts_index.read().unwrap(); - for (_slot, pubkey) in slot_pubkeys { - index.unref_from_storage(&pubkey); - } - drop(index); - measure.stop(); - inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize); - - let mut index = self.accounts_index.write().unwrap(); - for slot in dead_slots.iter() { - index.clean_dead_slot(*slot); - } } - { - let mut bank_hashes = self.bank_hashes.write().unwrap(); - for slot in dead_slots.iter() { - bank_hashes.remove(slot); - } + drop(storage); + datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64)); + let slot_pubkeys: HashSet<(Slot, Pubkey)> = { + self.thread_pool_clean.install(|| { + stores + .into_par_iter() + .map(|store| { + let accounts = store.accounts.accounts(0); + accounts + .into_iter() + .map(|account| (store.slot, account.meta.pubkey)) + .collect::>() + }) + .reduce(HashSet::new, |mut reduced, store_pubkeys| { + reduced.extend(store_pubkeys); + reduced + }) + }) + }; + let index = self.accounts_index.read().unwrap(); + for (_slot, pubkey) in slot_pubkeys { + index.unref_from_storage(&pubkey); + } + drop(index); + measure.stop(); + inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize); + + let mut index = self.accounts_index.write().unwrap(); + for slot in dead_slots.iter() { + index.clean_dead_slot(*slot); + } + } + { + let mut bank_hashes = self.bank_hashes.write().unwrap(); + for slot in dead_slots.iter() { + bank_hashes.remove(slot); } } } @@ -2190,16 +2211,18 @@ impl AccountsDB { } fn store_with_hashes(&self, slot: Slot, accounts: &[(&Pubkey, &Account)], hashes: &[Hash]) { - let mut store_accounts = Measure::start("store::store_accounts"); let infos = self.store_accounts(slot, accounts, hashes); - store_accounts.stop(); - - let mut update_index = Measure::start("store::update_index"); let reclaims = self.update_index(slot, infos, accounts); - update_index.stop(); - trace!("reclaim: {}", reclaims.len()); - self.handle_reclaims_maybe_cleanup(&reclaims); + // A store for a single slot should: + // 1) Only make "reclaims" for the same slot + // 2) Should not cause any slots to be removed from the storage + // database because + // a) this slot has at least one account (the one being stored), + // b)From 1) we know no other slots are included in the "reclaims" + // + // From 1) and 2) we guarantee passing Some(slot), true is safe + self.handle_reclaims(&reclaims, Some(slot), true); } pub fn add_root(&self, slot: Slot) { @@ -2568,6 +2591,7 @@ pub mod tests { // overwrite old rooted account version; only the slot_0_stores.count() should be // decremented db.store(2, &[(&pubkeys[0], &account)]); + db.clean_accounts(None); { let stores = db.storage.read().unwrap(); let slot_0_stores = &stores.0.get(&0).unwrap(); @@ -2618,7 +2642,7 @@ pub mod tests { .accounts_index .read() .unwrap() - .get(&key, Some(&ancestors)) + .get(&key, Some(&ancestors), None) .is_some()); assert_load_account(&db, unrooted_slot, key, 1); @@ -2639,7 +2663,7 @@ pub mod tests { .accounts_index .read() .unwrap() - .get(&key, Some(&ancestors)) + .get(&key, Some(&ancestors), None) .is_none()); // Test we can store for the same slot again and get the right information @@ -2943,7 +2967,7 @@ pub mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let id = { let index = accounts.accounts_index.read().unwrap(); - let (list, idx) = index.get(&pubkey, Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&pubkey, Some(&ancestors), None).unwrap(); list[idx].1.store_id }; accounts.add_root(1); @@ -2956,8 +2980,7 @@ pub mod tests { //slot is gone accounts.print_accounts_stats("pre-clean"); - accounts.clean_accounts(); - accounts.process_dead_slots(None); + accounts.clean_accounts(None); assert!(accounts.storage.read().unwrap().0.get(&0).is_none()); //new value is there @@ -3028,7 +3051,7 @@ pub mod tests { assert_eq!(accounts.alive_account_count_in_store(0), 1); assert_eq!(accounts.alive_account_count_in_store(1), 1); - accounts.clean_accounts(); + accounts.clean_accounts(None); //now old state is cleaned up assert_eq!(accounts.alive_account_count_in_store(0), 0); @@ -3058,7 +3081,7 @@ pub mod tests { assert_eq!(accounts.alive_account_count_in_store(0), 2); assert_eq!(accounts.alive_account_count_in_store(1), 2); - accounts.clean_accounts(); + accounts.clean_accounts(None); //still old state behind zero-lamport account isn't cleaned up assert_eq!(accounts.alive_account_count_in_store(0), 1); @@ -3092,7 +3115,7 @@ pub mod tests { assert_eq!(accounts.alive_account_count_in_store(1), 1); assert_eq!(accounts.alive_account_count_in_store(2), 1); - accounts.clean_accounts(); + accounts.clean_accounts(None); //both zero lamport and normal accounts are cleaned up assert_eq!(accounts.alive_account_count_in_store(0), 0); @@ -3108,7 +3131,7 @@ pub mod tests { .accounts_index .read() .unwrap() - .get(&pubkey1, None) + .get(&pubkey1, None, None) .is_none()); } @@ -3128,7 +3151,7 @@ pub mod tests { assert_eq!(accounts.uncleaned_root_count(), 1); //now uncleaned roots are cleaned up - accounts.clean_accounts(); + accounts.clean_accounts(None); assert_eq!(accounts.uncleaned_root_count(), 0); } @@ -3145,7 +3168,7 @@ pub mod tests { assert_eq!(accounts.uncleaned_root_count(), 1); //now uncleaned roots are cleaned up - accounts.clean_accounts(); + accounts.clean_accounts(None); assert_eq!(accounts.uncleaned_root_count(), 0); } @@ -3157,15 +3180,18 @@ pub mod tests { // Create 100 accounts in slot 0 create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - assert_eq!(check_storage(&accounts, 0, 100), true); + accounts.clean_accounts(None); check_accounts(&accounts, &pubkeys, 0, 100, 1); // do some updates to those accounts and re-check modify_accounts(&accounts, &pubkeys, 0, 100, 2); + assert_eq!(check_storage(&accounts, 0, 100), true); check_accounts(&accounts, &pubkeys, 0, 100, 2); accounts.add_root(0); let mut pubkeys1: Vec = vec![]; + + // CREATE SLOT 1 let latest_slot = 1; // Modify the first 10 of the slot 0 accounts as updates in slot 1 @@ -3174,27 +3200,38 @@ pub mod tests { // Create 10 new accounts in slot 1 create_account(&accounts, &mut pubkeys1, latest_slot, 10, 0, 0); - // Store a lamports=0 account in slot 1 + // Store a lamports=0 account in slot 1. Slot 1 should now have + // 10 + 10 + 1 = 21 accounts let account = Account::new(0, 0, &Account::default().owner); accounts.store(latest_slot, &[(&pubkeys[30], &account)]); accounts.add_root(latest_slot); - info!("added root 1"); + assert!(check_storage(&accounts, 1, 21)); + // CREATE SLOT 2 let latest_slot = 2; let mut pubkeys2: Vec = vec![]; + // Modify original slot 0 accounts in slot 2 modify_accounts(&accounts, &pubkeys, latest_slot, 20, 4); + accounts.clean_accounts(None); // Create 10 new accounts in slot 2 create_account(&accounts, &mut pubkeys2, latest_slot, 10, 0, 0); - // Store a lamports=0 account in slot 2 + // Store a lamports=0 account in slot 2. Slot 2 should now have + // 10 + 10 + 10 + 1 = 31 accounts let account = Account::new(0, 0, &Account::default().owner); accounts.store(latest_slot, &[(&pubkeys[31], &account)]); accounts.add_root(latest_slot); + assert!(check_storage(&accounts, 2, 31)); - assert!(check_storage(&accounts, 0, 90)); - assert!(check_storage(&accounts, 1, 21)); + accounts.clean_accounts(None); + // The first 20 accounts have been modified in slot 2, so only + // 80 accounts left in slot 0. + assert!(check_storage(&accounts, 0, 80)); + // 10 of the 21 accounts have been modified in slot 2, so only 11 + // accounts left in slot 1. + assert!(check_storage(&accounts, 1, 11)); assert!(check_storage(&accounts, 2, 31)); let daccounts = reconstruct_accounts_db_via_serialization(&accounts, latest_slot); @@ -3300,7 +3337,7 @@ pub mod tests { accounts.print_accounts_stats("pre_purge"); - accounts.clean_accounts(); + accounts.clean_accounts(None); accounts.print_accounts_stats("post_purge"); @@ -3364,8 +3401,7 @@ pub mod tests { info!("ancestors: {:?}", ancestors); let hash = accounts.update_accounts_hash(current_slot, &ancestors); - accounts.clean_accounts(); - accounts.process_dead_slots(None); + accounts.clean_accounts(None); assert_eq!( accounts.update_accounts_hash(current_slot, &ancestors), @@ -3436,7 +3472,7 @@ pub mod tests { accounts.print_accounts_stats("accounts"); - accounts.clean_accounts(); + accounts.clean_accounts(None); accounts.print_accounts_stats("accounts_post_purge"); let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); @@ -3509,7 +3545,7 @@ pub mod tests { fn test_accounts_purge_chained_purge_before_snapshot_restore() { solana_logger::setup(); with_chained_zero_lamport_accounts(|accounts, current_slot| { - accounts.clean_accounts(); + accounts.clean_accounts(None); reconstruct_accounts_db_via_serialization(&accounts, current_slot) }); } @@ -3520,7 +3556,7 @@ pub mod tests { with_chained_zero_lamport_accounts(|accounts, current_slot| { let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); accounts.print_accounts_stats("after_reconstruct"); - accounts.clean_accounts(); + accounts.clean_accounts(None); reconstruct_accounts_db_via_serialization(&accounts, current_slot) }); } @@ -4194,7 +4230,7 @@ pub mod tests { accounts.print_count_and_status("before reconstruct"); let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); accounts.print_count_and_status("before purge zero"); - accounts.clean_accounts(); + accounts.clean_accounts(None); accounts.print_count_and_status("after purge zero"); assert_load_account(&accounts, current_slot, pubkey, old_lamport); @@ -4253,7 +4289,8 @@ pub mod tests { current_slot += 1; assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]); - accounts.process_dead_slots(None); + accounts.clean_accounts(None); + assert_eq!( // Removed one reference from the dead slot (reference only counted once // even though there were two stores to the pubkey in that slot) @@ -4275,9 +4312,9 @@ pub mod tests { // If step C and step D should be purged, snapshot restore would cause // pubkey1 to be revived as the state of step A. // So, prevent that from happening by introducing refcount - accounts.clean_accounts(); + accounts.clean_accounts(None); let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); - accounts.clean_accounts(); + accounts.clean_accounts(None); assert_load_account(&accounts, current_slot, pubkey1, zero_lamport); assert_load_account(&accounts, current_slot, pubkey2, old_lamport); @@ -4289,7 +4326,7 @@ pub mod tests { accounts.add_root(current_slot); // Do clean - accounts.clean_accounts(); + accounts.clean_accounts(None); // Ensure pubkey2 is cleaned from the index finally assert_not_load_account(&accounts, current_slot, pubkey1); @@ -4416,7 +4453,7 @@ pub mod tests { } accounts.add_root(current_slot); - accounts.clean_accounts(); + accounts.clean_accounts(None); assert_eq!( pubkey_count, @@ -4480,7 +4517,7 @@ pub mod tests { } accounts.add_root(current_slot); - accounts.clean_accounts(); + accounts.clean_accounts(None); assert_eq!( pubkey_count, @@ -4614,7 +4651,7 @@ pub mod tests { // let's dance. for _ in 0..10 { - accounts.clean_accounts(); + accounts.clean_accounts(None); std::thread::sleep(std::time::Duration::from_millis(100)); } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index f697f4329..451d7f301 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -31,7 +31,7 @@ impl<'a, T: 'a + Clone> AccountsIndex { { for (pubkey, list) in iter { let list_r = &list.1.read().unwrap(); - if let Some(index) = self.latest_slot(Some(ancestors), &list_r) { + if let Some(index) = self.latest_slot(Some(ancestors), &list_r, None) { func(pubkey, (&list_r[index].1, list_r[index].0)); } } @@ -92,34 +92,50 @@ impl<'a, T: 'a + Clone> AccountsIndex { (reclaims, list.is_empty()) } - // find the latest slot and T in a slice for a given ancestor - // returns index into 'slice' if found, None if not. - fn latest_slot(&self, ancestors: Option<&Ancestors>, slice: SlotSlice) -> Option { - let mut max = 0; + // Given a SlotSlice `L`, a list of ancestors and a maximum slot, find the latest element + // in `L`, where the slot `S < max_slot`, and `S` is an ancestor or root. + fn latest_slot( + &self, + ancestors: Option<&Ancestors>, + slice: SlotSlice, + max_slot: Option, + ) -> Option { + let mut current_max = 0; + let max_slot = max_slot.unwrap_or(std::u64::MAX); + let mut rv = None; for (i, (slot, _t)) in slice.iter().rev().enumerate() { - if *slot >= max - && (ancestors.map_or(false, |ancestors| ancestors.contains_key(slot)) - || self.is_root(*slot)) + if *slot >= current_max + && *slot <= max_slot + && self.is_ancestor_or_root(ancestors, *slot) { rv = Some((slice.len() - 1) - i); - max = *slot; + current_max = *slot; } } + rv } + // Checks that the given slot is either: + // 1) in the `ancestors` set + // 2) or is a root + fn is_ancestor_or_root(&self, ancestors: Option<&Ancestors>, slot: Slot) -> bool { + ancestors.map_or(false, |ancestors| ancestors.contains_key(&slot)) || (self.is_root(slot)) + } + /// Get an account /// The latest account that appears in `ancestors` or `roots` is returned. pub(crate) fn get( &self, pubkey: &Pubkey, ancestors: Option<&Ancestors>, + max_root: Option, ) -> Option<(RwLockReadGuard>, usize)> { self.account_maps.get(pubkey).and_then(|list| { let list_r = list.1.read().unwrap(); let lock = &list_r; - let found_index = self.latest_slot(ancestors, &lock)?; + let found_index = self.latest_slot(ancestors, &lock, max_root)?; Some((list_r, found_index)) }) } @@ -183,9 +199,6 @@ impl<'a, T: 'a + Clone> AccountsIndex { lock.0.fetch_add(1, Ordering::Relaxed); } list.push((slot, account_info)); - // now, do lazy clean - self.purge_older_root_entries(list, reclaims); - None } else { Some(account_info) @@ -208,10 +221,18 @@ impl<'a, T: 'a + Clone> AccountsIndex { } } - fn purge_older_root_entries(&self, list: &mut SlotList, reclaims: &mut SlotList) { + fn purge_older_root_entries( + &self, + list: &mut SlotList, + reclaims: &mut SlotList, + max_clean_root: Option, + ) { let roots = &self.roots; - let max_root = Self::get_max_root(roots, &list); + let mut max_root = Self::get_max_root(roots, &list); + if let Some(max_clean_root) = max_clean_root { + max_root = std::cmp::min(max_root, max_clean_root); + } reclaims.extend( list.iter() @@ -221,10 +242,15 @@ impl<'a, T: 'a + Clone> AccountsIndex { list.retain(|(slot, _)| !Self::can_purge(max_root, *slot)); } - pub fn clean_rooted_entries(&self, pubkey: &Pubkey, reclaims: &mut SlotList) { + pub fn clean_rooted_entries( + &self, + pubkey: &Pubkey, + reclaims: &mut SlotList, + max_clean_root: Option, + ) { if let Some(locked_entry) = self.account_maps.get(pubkey) { let mut list = locked_entry.1.write().unwrap(); - self.purge_older_root_entries(&mut list, reclaims); + self.purge_older_root_entries(&mut list, reclaims, max_clean_root); } } @@ -273,10 +299,19 @@ impl<'a, T: 'a + Clone> AccountsIndex { self.previous_uncleaned_roots.remove(&slot); } - pub fn reset_uncleaned_roots(&mut self) -> HashSet { - let empty = HashSet::new(); - let new_previous = std::mem::replace(&mut self.uncleaned_roots, empty); - std::mem::replace(&mut self.previous_uncleaned_roots, new_previous) + pub fn reset_uncleaned_roots(&mut self, max_clean_root: Option) -> HashSet { + let mut cleaned_roots = HashSet::new(); + self.uncleaned_roots.retain(|root| { + let is_cleaned = max_clean_root + .map(|max_clean_root| *root <= max_clean_root) + .unwrap_or(true); + if is_cleaned { + cleaned_roots.insert(*root); + } + // Only keep the slots that have yet to be cleaned + !is_cleaned + }); + std::mem::replace(&mut self.previous_uncleaned_roots, cleaned_roots) } } @@ -290,8 +325,8 @@ mod tests { let key = Keypair::new(); let index = AccountsIndex::::default(); let ancestors = HashMap::new(); - assert!(index.get(&key.pubkey(), Some(&ancestors)).is_none()); - assert!(index.get(&key.pubkey(), None).is_none()); + assert!(index.get(&key.pubkey(), Some(&ancestors), None).is_none()); + assert!(index.get(&key.pubkey(), None, None).is_none()); let mut num = 0; index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); @@ -307,8 +342,8 @@ mod tests { assert!(gc.is_empty()); let ancestors = HashMap::new(); - assert!(index.get(&key.pubkey(), Some(&ancestors)).is_none()); - assert!(index.get(&key.pubkey(), None).is_none()); + assert!(index.get(&key.pubkey(), Some(&ancestors), None).is_none()); + assert!(index.get(&key.pubkey(), None, None).is_none()); let mut num = 0; index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); @@ -324,7 +359,7 @@ mod tests { assert!(gc.is_empty()); let ancestors = vec![(1, 1)].into_iter().collect(); - assert!(index.get(&key.pubkey(), Some(&ancestors)).is_none()); + assert!(index.get(&key.pubkey(), Some(&ancestors), None).is_none()); let mut num = 0; index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); @@ -340,7 +375,7 @@ mod tests { assert!(gc.is_empty()); let ancestors = vec![(0, 0)].into_iter().collect(); - let (list, idx) = index.get(&key.pubkey(), Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&key.pubkey(), Some(&ancestors), None).unwrap(); assert_eq!(list[idx], (0, true)); let mut num = 0; @@ -372,7 +407,7 @@ mod tests { assert!(gc.is_empty()); index.add_root(0); - let (list, idx) = index.get(&key.pubkey(), None).unwrap(); + let (list, idx) = index.get(&key.pubkey(), None, None).unwrap(); assert_eq!(list[idx], (0, true)); } @@ -406,7 +441,7 @@ mod tests { assert_eq!(2, index.uncleaned_roots.len()); assert_eq!(0, index.previous_uncleaned_roots.len()); - index.reset_uncleaned_roots(); + index.reset_uncleaned_roots(None); assert_eq!(2, index.roots.len()); assert_eq!(0, index.uncleaned_roots.len()); assert_eq!(2, index.previous_uncleaned_roots.len()); @@ -436,14 +471,14 @@ mod tests { let mut gc = Vec::new(); index.insert(0, &key.pubkey(), true, &mut gc); assert!(gc.is_empty()); - let (list, idx) = index.get(&key.pubkey(), Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&key.pubkey(), Some(&ancestors), None).unwrap(); assert_eq!(list[idx], (0, true)); drop(list); let mut gc = Vec::new(); index.insert(0, &key.pubkey(), false, &mut gc); assert_eq!(gc, vec![(0, true)]); - let (list, idx) = index.get(&key.pubkey(), Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&key.pubkey(), Some(&ancestors), None).unwrap(); assert_eq!(list[idx], (0, false)); } @@ -458,10 +493,10 @@ mod tests { assert!(gc.is_empty()); index.insert(1, &key.pubkey(), false, &mut gc); assert!(gc.is_empty()); - let (list, idx) = index.get(&key.pubkey(), Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&key.pubkey(), Some(&ancestors), None).unwrap(); assert_eq!(list[idx], (0, true)); let ancestors = vec![(1, 0)].into_iter().collect(); - let (list, idx) = index.get(&key.pubkey(), Some(&ancestors)).unwrap(); + let (list, idx) = index.get(&key.pubkey(), Some(&ancestors), None).unwrap(); assert_eq!(list[idx], (1, false)); } @@ -479,8 +514,11 @@ mod tests { index.add_root(1); index.add_root(3); index.insert(4, &key.pubkey(), true, &mut gc); - assert_eq!(gc, vec![(0, true), (1, false), (2, true)]); - let (list, idx) = index.get(&key.pubkey(), None).unwrap(); + + // Updating index should not purge older roots, only purges + // previous updates within the same slot + assert_eq!(gc, vec![]); + let (list, idx) = index.get(&key.pubkey(), None, None).unwrap(); assert_eq!(list[idx], (3, true)); let mut num = 0; @@ -516,4 +554,54 @@ mod tests { assert_eq!(None, index.update(1, &key.pubkey(), 9, &mut gc)); } + + #[test] + fn test_latest_slot() { + let slot_slice = vec![(0, true), (5, true), (3, true), (7, true)]; + let mut index = AccountsIndex::::default(); + + // No ancestors, no root, should return None + assert!(index.latest_slot(None, &slot_slice, None).is_none()); + + // Given a root, should return the root + index.add_root(5); + assert_eq!(index.latest_slot(None, &slot_slice, None).unwrap(), 1); + + // Given a maximum -= root, should still return the root + assert_eq!(index.latest_slot(None, &slot_slice, Some(5)).unwrap(), 1); + + // Given a maximum < root, should filter out the root + assert!(index.latest_slot(None, &slot_slice, Some(4)).is_none()); + + // Given a maximum, should filter out the ancestors > maximum + let ancestors: HashMap = vec![(3, 1), (7, 1)].into_iter().collect(); + assert_eq!( + index + .latest_slot(Some(&ancestors), &slot_slice, Some(4)) + .unwrap(), + 2 + ); + assert_eq!( + index + .latest_slot(Some(&ancestors), &slot_slice, Some(7)) + .unwrap(), + 3 + ); + + // Given no maximum, should just return the greatest ancestor or root + assert_eq!( + index + .latest_slot(Some(&ancestors), &slot_slice, None) + .unwrap(), + 3 + ); + + // Because the given maximum `m == root`, ancestors > root + assert_eq!( + index + .latest_slot(Some(&ancestors), &slot_slice, Some(5)) + .unwrap(), + 1 + ); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f815cfc7e..85aef4663 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1306,13 +1306,15 @@ impl Bank { } } + // Should not be called outside of startup, will race with + // concurrent cleaning logic in AccountsBackgroundService pub fn exhaustively_free_unused_resource(&self) { - let mut reclaim = Measure::start("reclaim"); - self.process_dead_slots(); - reclaim.stop(); - let mut clean = Measure::start("clean"); - self.clean_accounts(); + // Don't clean the slot we're snapshotting because it may have zero-lamport + // accounts that were included in the bank delta hash when the bank was frozen, + // and if we clean them here, any newly created snapshot's hash for this bank + // may not match the frozen hash. + self.clean_accounts(true); clean.stop(); let mut shrink = Measure::start("shrink"); @@ -1320,8 +1322,10 @@ impl Bank { shrink.stop(); info!( - "exhaustively_free_unused_resource(): {} {} {}", - reclaim, clean, shrink, + "exhaustively_free_unused_resource() + clean: {}, + shrink: {}", + clean, shrink, ); } @@ -3254,8 +3258,10 @@ impl Bank { /// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash /// calculation and could shield other real accounts. pub fn verify_snapshot_bank(&self) -> bool { - self.clean_accounts(); - self.shrink_all_slots(); + if self.slot() > 0 { + self.clean_accounts(true); + self.shrink_all_slots(); + } // Order and short-circuiting is significant; verify_hash requires a valid bank hash self.verify_bank_hash() && self.verify_hash() } @@ -3534,12 +3540,17 @@ impl Bank { ); } - pub fn clean_accounts(&self) { - self.rc.accounts.accounts_db.clean_accounts(); - } - - pub fn process_dead_slots(&self) { - self.rc.accounts.accounts_db.process_dead_slots(None); + pub fn clean_accounts(&self, skip_last: bool) { + let max_clean_slot = if skip_last { + // Don't clean the slot we're snapshotting because it may have zero-lamport + // accounts that were included in the bank delta hash when the bank was frozen, + // and if we clean them here, any newly created snapshot's hash for this bank + // may not match the frozen hash. + Some(self.slot() - 1) + } else { + None + }; + self.rc.accounts.accounts_db.clean_accounts(max_clean_slot); } pub fn shrink_all_slots(&self) { @@ -5263,7 +5274,7 @@ mod tests { impl Bank { fn slots_by_pubkey(&self, pubkey: &Pubkey, ancestors: &Ancestors) -> Vec { let accounts_index = self.rc.accounts.accounts_db.accounts_index.read().unwrap(); - let (accounts, _) = accounts_index.get(&pubkey, Some(&ancestors)).unwrap(); + let (accounts, _) = accounts_index.get(&pubkey, Some(&ancestors), None).unwrap(); accounts .iter() .map(|(slot, _)| *slot) @@ -5638,7 +5649,7 @@ mod tests { } let hash = bank.update_accounts_hash(); - bank.clean_accounts(); + bank.clean_accounts(false); assert_eq!(bank.update_accounts_hash(), hash); let bank0 = Arc::new(new_from_parent(&bank)); @@ -5658,14 +5669,14 @@ mod tests { info!("bank0 purge"); let hash = bank0.update_accounts_hash(); - bank0.clean_accounts(); + bank0.clean_accounts(false); assert_eq!(bank0.update_accounts_hash(), hash); assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10); assert_eq!(bank1.get_account(&keypair.pubkey()), None); info!("bank1 purge"); - bank1.clean_accounts(); + bank1.clean_accounts(false); assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10); assert_eq!(bank1.get_account(&keypair.pubkey()), None); @@ -5683,7 +5694,7 @@ mod tests { // keypair should have 0 tokens on both forks assert_eq!(bank0.get_account(&keypair.pubkey()), None); assert_eq!(bank1.get_account(&keypair.pubkey()), None); - bank1.clean_accounts(); + bank1.clean_accounts(false); assert!(bank1.verify_bank_hash()); } @@ -6471,7 +6482,6 @@ mod tests { let pubkey = Pubkey::new_rand(); let (genesis_config, mint_keypair) = create_genesis_config(2_000); let bank = Bank::new(&genesis_config); - bank.transfer(1_000, &mint_keypair, &pubkey).unwrap(); bank.freeze(); bank.update_accounts_hash(); @@ -8574,7 +8584,7 @@ mod tests { goto_end_of_slot(Arc::::get_mut(&mut bank).unwrap()); bank.squash(); - bank.clean_accounts(); + bank.clean_accounts(false); let force_to_return_alive_account = 0; assert_eq!( bank.process_stale_slot_with_budget(22, force_to_return_alive_account), @@ -9187,4 +9197,80 @@ mod tests { assert_eq!(bank.get_balance(&inline_spl_token_v2_0::id()), 0); assert_eq!(bank.capitalization(), original_capitalization - 100); } + + fn setup_bank_with_removable_zero_lamport_account() -> Arc { + let (genesis_config, _mint_keypair) = create_genesis_config(2000); + let bank0 = Bank::new(&genesis_config); + bank0.freeze(); + + let bank1 = Arc::new(Bank::new_from_parent( + &Arc::new(bank0), + &Pubkey::default(), + 1, + )); + + let zero_lamport_pubkey = Pubkey::new_rand(); + + bank1.add_account_and_update_capitalization( + &zero_lamport_pubkey, + &Account::new(0, 0, &Pubkey::default()), + ); + // Store another account in a separate AppendVec than `zero_lamport_pubkey` + // (guaranteed because of large file size). We need this to ensure slot is + // not cleaned up after clean is called, so that the bank hash still exists + // when we call rehash() later in this test. + let large_account_pubkey = Pubkey::new_rand(); + bank1.add_account_and_update_capitalization( + &large_account_pubkey, + &Account::new( + 1000, + bank1.rc.accounts.accounts_db.file_size() as usize, + &Pubkey::default(), + ), + ); + + bank1.freeze(); + let bank1_hash = bank1.hash(); + + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank2.freeze(); + + // Set a root so clean will happen on this slot + bank1.squash(); + + // All accounts other than `zero_lamport_pubkey` should be updated, which + // means clean should be able to delete the `zero_lamport_pubkey` + bank2.squash(); + + // Bank 1 hash should not change + bank1.rehash(); + let new_bank1_hash = bank1.hash(); + assert_eq!(bank1_hash, new_bank1_hash); + + bank1 + } + + #[test] + fn test_clean_zero_lamport_account_different_hash() { + let bank1 = setup_bank_with_removable_zero_lamport_account(); + let old_hash = bank1.hash(); + + // `zero_lamport_pubkey` should have been deleted, hashes will not match + bank1.clean_accounts(false); + bank1.rehash(); + let new_bank1_hash = bank1.hash(); + assert_ne!(old_hash, new_bank1_hash); + } + + #[test] + fn test_clean_zero_lamport_account_same_hash() { + let bank1 = setup_bank_with_removable_zero_lamport_account(); + let old_hash = bank1.hash(); + + // `zero_lamport_pubkey` will not be deleted, hashes will match + bank1.clean_accounts(true); + bank1.rehash(); + let new_bank1_hash = bank1.hash(); + assert_eq!(old_hash, new_bank1_hash); + } } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index ca30bbee1..290ee72d5 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -1,10 +1,10 @@ //! The `bank_forks` module implements BankForks a DAG of checkpointed Banks -use crate::snapshot_package::{AccountsPackageSendError, AccountsPackageSender}; -use crate::snapshot_utils::{self, SnapshotError}; -use crate::{bank::Bank, status_cache::MAX_CACHE_ENTRIES}; +use crate::{ + accounts_background_service::{SnapshotRequest, SnapshotRequestSender}, + bank::Bank, +}; use log::*; -use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; use solana_sdk::{clock::Slot, timing}; use std::{ @@ -14,7 +14,6 @@ use std::{ sync::Arc, time::Instant, }; -use thiserror::Error; pub use crate::snapshot_utils::SnapshotVersion; @@ -43,21 +42,10 @@ pub struct SnapshotConfig { pub snapshot_version: SnapshotVersion, } -#[derive(Error, Debug)] -pub enum BankForksError { - #[error("snapshot error")] - SnapshotError(#[from] SnapshotError), - - #[error("accounts package send error")] - AccountsPackageSendError(#[from] AccountsPackageSendError), -} -type Result = std::result::Result; - pub struct BankForks { pub banks: HashMap>, root: Slot, pub snapshot_config: Option, - last_snapshot_slot: Slot, pub accounts_hash_interval_slots: Slot, last_accounts_hash_slot: Slot, @@ -155,7 +143,6 @@ impl BankForks { root, banks, snapshot_config: None, - last_snapshot_slot: root, accounts_hash_interval_slots: std::u64::MAX, last_accounts_hash_slot: root, } @@ -183,7 +170,7 @@ impl BankForks { pub fn set_root( &mut self, root: Slot, - accounts_package_sender: &Option, + snapshot_request_sender: &Option, highest_confirmed_root: Option, ) { let old_epoch = self.root_bank().epoch(); @@ -231,27 +218,26 @@ impl BankForks { bank.update_accounts_hash(); - if self.snapshot_config.is_some() && accounts_package_sender.is_some() { - // Generate an accounts package - let mut snapshot_time = Measure::start("total-snapshot-ms"); - let r = self.generate_accounts_package( - bank_slot, - &bank.src.roots(), - accounts_package_sender.as_ref().unwrap(), - ); - if r.is_err() { + if self.snapshot_config.is_some() && snapshot_request_sender.is_some() { + let snapshot_root_bank = self.root_bank().clone(); + let root_slot = snapshot_root_bank.slot(); + if let Err(e) = + snapshot_request_sender + .as_ref() + .unwrap() + .send(SnapshotRequest { + snapshot_root_bank, + // Save off the status cache because these may get pruned + // if another `set_root()` is called before the snapshots package + // can be generated + status_cache_slot_deltas: bank.src.slot_deltas(&bank.src.roots()), + }) + { warn!( - "Error generating snapshot for bank: {}, err: {:?}", - bank_slot, r + "Error sending snapshot request for bank: {}, err: {:?}", + root_slot, e ); - } else { - self.last_snapshot_slot = bank_slot; } - - // Cleanup outdated snapshots - self.purge_old_snapshots(); - snapshot_time.stop(); - inc_new_counter_info!("total-snapshot-ms", snapshot_time.as_ms() as usize); } break; } @@ -277,67 +263,6 @@ impl BankForks { self.root } - pub fn purge_old_snapshots(&self) { - // Remove outdated snapshots - let config = self.snapshot_config.as_ref().unwrap(); - let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); - let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES); - for slot_files in &slot_snapshot_paths[..num_to_remove] { - let r = snapshot_utils::remove_snapshot(slot_files.slot, &config.snapshot_path); - if r.is_err() { - warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path); - } - } - } - - pub fn generate_accounts_package( - &self, - root: Slot, - slots_to_snapshot: &[Slot], - accounts_package_sender: &AccountsPackageSender, - ) -> Result<()> { - let config = self.snapshot_config.as_ref().unwrap(); - - // Add a snapshot for the new root - let bank = self - .get(root) - .cloned() - .expect("root must exist in BankForks"); - - let storages: Vec<_> = bank.get_snapshot_storages(); - let mut add_snapshot_time = Measure::start("add-snapshot-ms"); - snapshot_utils::add_snapshot( - &config.snapshot_path, - &bank, - &storages, - config.snapshot_version, - )?; - add_snapshot_time.stop(); - inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); - - // Package the relevant snapshots - let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); - let latest_slot_snapshot_paths = slot_snapshot_paths - .last() - .expect("no snapshots found in config snapshot_path"); - // We only care about the last bank's snapshot. - // We'll ask the bank for MAX_CACHE_ENTRIES (on the rooted path) worth of statuses - let package = snapshot_utils::package_snapshot( - &bank, - latest_slot_snapshot_paths, - &config.snapshot_path, - slots_to_snapshot, - &config.snapshot_package_output_path, - storages, - config.compression.clone(), - config.snapshot_version, - )?; - - accounts_package_sender.send(package)?; - - Ok(()) - } - fn prune_non_root(&mut self, root: Slot, highest_confirmed_root: Option) { let descendants = self.descendants(); self.banks.retain(|slot, _| { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 137f1d44a..11e975f5e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(specialization))] pub mod accounts; +pub mod accounts_background_service; pub mod accounts_db; pub mod accounts_index; pub mod append_vec; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 85c7456e7..b6b06cd44 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -5,7 +5,8 @@ use crate::{ serde_snapshot::{ bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, }, - snapshot_package::AccountsPackage, + snapshot_package::{AccountsPackage, AccountsPackageSendError, AccountsPackageSender}, + status_cache::MAX_CACHE_ENTRIES, }; use bincode::{config::Options, serialize_into}; use bzip2::bufread::BzDecoder; @@ -124,6 +125,9 @@ pub enum SnapshotError { #[error("Unpack error: {0}")] UnpackError(#[from] UnpackError), + + #[error("accounts package send error")] + AccountsPackageSendError(#[from] AccountsPackageSendError), } pub type Result = std::result::Result; @@ -159,7 +163,7 @@ pub fn package_snapshot, Q: AsRef>( bank: &Bank, snapshot_files: &SlotSnapshotPaths, snapshot_path: Q, - slots_to_snapshot: &[Slot], + status_cache_slot_deltas: Vec, snapshot_package_output_path: P, snapshot_storages: SnapshotStorages, compression: CompressionType, @@ -188,7 +192,7 @@ pub fn package_snapshot, Q: AsRef>( let package = AccountsPackage::new( bank.slot(), bank.block_height(), - bank.src.slot_deltas(slots_to_snapshot), + status_cache_slot_deltas, snapshot_hard_links_dir, snapshot_storages, snapshot_package_output_file, @@ -846,6 +850,57 @@ pub fn verify_snapshot_archive( assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); } +pub fn purge_old_snapshots(snapshot_path: &Path) { + // Remove outdated snapshots + let slot_snapshot_paths = get_snapshot_paths(snapshot_path); + let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES); + for slot_files in &slot_snapshot_paths[..num_to_remove] { + let r = remove_snapshot(slot_files.slot, snapshot_path); + if r.is_err() { + warn!("Couldn't remove snapshot at: {:?}", snapshot_path); + } + } +} + +// Gather the necessary elements for a snapshot of the given `root_bank` +pub fn snapshot_bank( + root_bank: &Bank, + status_cache_slot_deltas: Vec, + accounts_package_sender: &AccountsPackageSender, + snapshot_path: &Path, + snapshot_package_output_path: &Path, + snapshot_version: SnapshotVersion, + compression: &CompressionType, +) -> Result<()> { + let storages: Vec<_> = root_bank.get_snapshot_storages(); + let mut add_snapshot_time = Measure::start("add-snapshot-ms"); + add_snapshot(snapshot_path, &root_bank, &storages, snapshot_version)?; + add_snapshot_time.stop(); + inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); + + // Package the relevant snapshots + let slot_snapshot_paths = get_snapshot_paths(snapshot_path); + let latest_slot_snapshot_paths = slot_snapshot_paths + .last() + .expect("no snapshots found in config snapshot_path"); + // We only care about the last bank's snapshot. + // We'll ask the bank for MAX_CACHE_ENTRIES (on the rooted path) worth of statuses + let package = package_snapshot( + &root_bank, + latest_slot_snapshot_paths, + snapshot_path, + status_cache_slot_deltas, + snapshot_package_output_path, + storages, + compression.clone(), + snapshot_version, + )?; + + accounts_package_sender.send(package)?; + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/runtime/src/system_instruction_processor.rs b/runtime/src/system_instruction_processor.rs index 8189f097f..63253321d 100644 --- a/runtime/src/system_instruction_processor.rs +++ b/runtime/src/system_instruction_processor.rs @@ -1073,7 +1073,7 @@ mod tests { .transfer_and_confirm(50, &alice_keypair, &Pubkey::new_rand()) .unwrap(); - // super fun time; callback chooses to .clean_accounts() or not + // super fun time; callback chooses to .clean_accounts(None) or not callback(&*bank); // create a normal account at the same pubkey as the zero-lamports account @@ -1091,7 +1091,7 @@ mod tests { bank.squash(); // do clean and assert that it actually did its job assert_eq!(3, bank.get_snapshot_storages().len()); - bank.clean_accounts(); + bank.clean_accounts(false); assert_eq!(2, bank.get_snapshot_storages().len()); }); }