diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 53e9b0d43c..0fa536a2a2 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -462,16 +462,16 @@ impl Accounts { } pub fn calculate_capitalization(&self, ancestors: &Ancestors) -> u64 { - let balances = self - .load_all(ancestors) - .into_iter() - .map(|(_pubkey, account, _slot)| { - AccountsDB::account_balance_for_capitalization( - account.lamports, - &account.owner, - account.executable, - ) - }); + let balances = + self.load_all_unchecked(ancestors) + .into_iter() + .map(|(_pubkey, account, _slot)| { + AccountsDB::account_balance_for_capitalization( + account.lamports, + &account.owner, + account.executable, + ) + }); AccountsDB::checked_sum_for_capitalization(balances) } @@ -541,6 +541,19 @@ impl Accounts { ) } + fn load_all_unchecked(&self, ancestors: &Ancestors) -> Vec<(Pubkey, Account, Slot)> { + self.accounts_db.unchecked_scan_accounts( + ancestors, + |collector: &mut Vec<(Pubkey, Account, Slot)>, some_account_tuple| { + if let Some((pubkey, account, slot)) = + some_account_tuple.filter(|(_, account, _)| Self::is_loadable(account)) + { + collector.push((*pubkey, account, slot)) + } + }, + ) + } + pub fn load_to_collect_rent_eagerly>( &self, ancestors: &Ancestors, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 378211daba..b5cdc478c3 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -744,6 +744,14 @@ impl AccountsDB { // hold a lock to prevent slot shrinking from running because it might modify some rooted // slot storages which can not happen as long as we're cleaning accounts because we're also // modifying the rooted slot storages! + let max_clean_root = match (self.accounts_index.min_ongoing_scan_root(), max_clean_root) { + (None, None) => None, + (Some(min_scan_root), None) => Some(min_scan_root), + (None, Some(max_clean_root)) => Some(max_clean_root), + (Some(min_scan_root), Some(max_clean_root)) => { + Some(std::cmp::min(min_scan_root, max_clean_root)) + } + }; let mut candidates = self.shrink_candidate_slots.lock().unwrap(); self.report_store_stats(); @@ -1289,6 +1297,22 @@ impl AccountsDB { collector } + pub fn unchecked_scan_accounts(&self, ancestors: &Ancestors, scan_func: F) -> A + where + F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>), + A: Default, + { + let mut collector = A::default(); + self.accounts_index + .unchecked_scan_accounts(ancestors, |pubkey, (account_info, slot)| { + let account_slot = self + .get_account_from_storage(slot, account_info) + .map(|account| (pubkey, account, slot)); + scan_func(&mut collector, account_slot) + }); + collector + } + pub fn range_scan_accounts(&self, ancestors: &Ancestors, range: R, scan_func: F) -> A where F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>), diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index ee5ece0681..142eb1a4fd 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -13,7 +13,7 @@ use std::{ ops::{Range, RangeBounds}, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; -const ITER_BATCH_SIZE: usize = 1000; +pub const ITER_BATCH_SIZE: usize = 1000; pub type SlotList = Vec<(Slot, T)>; pub type SlotSlice<'s, T> = &'s [(Slot, T)]; @@ -112,6 +112,7 @@ impl WriteAccountMapEntry { #[derive(Debug, Default)] pub struct RootsTracker { roots: HashSet, + max_root: Slot, uncleaned_roots: HashSet, previous_uncleaned_roots: HashSet, } @@ -184,6 +185,7 @@ impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> { pub struct AccountsIndex { pub account_maps: RwLock>>, roots_tracker: RwLock, + ongoing_scan_roots: RwLock>, } impl AccountsIndex { @@ -194,15 +196,70 @@ impl AccountsIndex { AccountsIndexIterator::new(&self.account_maps, range) } - fn do_scan_accounts<'a, F, R>(&'a self, ancestors: &Ancestors, mut func: F, range: Option) - where + fn do_checked_scan_accounts<'a, F, R>( + &'a self, + ancestors: &Ancestors, + func: F, + range: Option, + ) where + F: FnMut(&Pubkey, (&T, Slot)), + R: RangeBounds, + { + let max_root = { + let mut w_ongoing_scan_roots = self + // This lock is also grabbed by clean_accounts(), so clean + // has at most cleaned up to the current `max_root` (since + // clean only happens *after* BankForks::set_root() which sets + // the `max_root`) + .ongoing_scan_roots + .write() + .unwrap(); + // `max_root()` grabs a lock while + // the `ongoing_scan_roots` lock is held, + // make sure inverse doesn't happen to avoid + // deadlock + let max_root = self.max_root(); + *w_ongoing_scan_roots.entry(max_root).or_default() += 1; + max_root + }; + + self.do_scan_accounts(ancestors, func, range, Some(max_root)); + { + let mut ongoing_scan_roots = self.ongoing_scan_roots.write().unwrap(); + let count = ongoing_scan_roots.get_mut(&max_root).unwrap(); + *count -= 1; + if *count == 0 { + ongoing_scan_roots.remove(&max_root); + } + } + } + + fn do_unchecked_scan_accounts<'a, F, R>( + &'a self, + ancestors: &Ancestors, + func: F, + range: Option, + ) where + F: FnMut(&Pubkey, (&T, Slot)), + R: RangeBounds, + { + self.do_scan_accounts(ancestors, func, range, None); + } + + fn do_scan_accounts<'a, F, R>( + &'a self, + ancestors: &Ancestors, + mut func: F, + range: Option, + max_root: Option, + ) where F: FnMut(&Pubkey, (&T, Slot)), R: RangeBounds, { for pubkey_list in self.iter(range) { for (pubkey, list) in pubkey_list { let list_r = &list.slot_list.read().unwrap(); - if let Some(index) = self.latest_slot(Some(ancestors), &list_r, None) { + if let Some(index) = self.latest_slot(Some(ancestors), &list_r, max_root) { func(&pubkey, (&list_r[index].1, list_r[index].0)); } } @@ -269,7 +326,14 @@ impl AccountsIndex { where F: FnMut(&Pubkey, (&T, Slot)), { - self.do_scan_accounts(ancestors, func, None::>); + self.do_checked_scan_accounts(ancestors, func, None::>); + } + + pub(crate) fn unchecked_scan_accounts(&self, ancestors: &Ancestors, func: F) + where + F: FnMut(&Pubkey, (&T, Slot)), + { + self.do_unchecked_scan_accounts(ancestors, func, None::>); } /// call func with every pubkey and index visible from a given set of ancestors with range @@ -278,7 +342,8 @@ impl AccountsIndex { F: FnMut(&Pubkey, (&T, Slot)), R: RangeBounds, { - self.do_scan_accounts(ancestors, func, Some(range)); + // Only the rent logic should be calling this, which doesn't need the safety checks + self.do_unchecked_scan_accounts(ancestors, func, Some(range)); } pub fn get_rooted_entries(&self, slice: SlotSlice) -> SlotList { @@ -324,23 +389,27 @@ impl AccountsIndex { }) } + pub fn min_ongoing_scan_root(&self) -> Option { + self.ongoing_scan_roots + .read() + .unwrap() + .keys() + .next() + .cloned() + } + // Given a SlotSlice `L`, a list of ancestors and a maximum slot, find the latest element - // in `L`, where the slot `S < max_slot`, and `S` is an ancestor or root. + // in `L`, where the slot `S` is an ancestor or root, and if `S` is a root, then `S <= max_root` fn latest_slot( &self, ancestors: Option<&Ancestors>, slice: SlotSlice, - max_slot: Option, + max_root: Option, ) -> Option { let mut current_max = 0; - let max_slot = max_slot.unwrap_or(std::u64::MAX); - let mut rv = None; for (i, (slot, _t)) in slice.iter().rev().enumerate() { - if *slot >= current_max - && *slot <= max_slot - && self.is_ancestor_or_root(ancestors, *slot) - { + if *slot >= current_max && self.is_ancestor_or_root(*slot, ancestors, max_root) { rv = Some((slice.len() - 1) - i); current_max = *slot; } @@ -352,8 +421,17 @@ impl AccountsIndex { // Checks that the given slot is either: // 1) in the `ancestors` set // 2) or is a root - fn is_ancestor_or_root(&self, ancestors: Option<&Ancestors>, slot: Slot) -> bool { - ancestors.map_or(false, |ancestors| ancestors.contains_key(&slot)) || (self.is_root(slot)) + fn is_ancestor_or_root( + &self, + slot: Slot, + ancestors: Option<&Ancestors>, + max_root: Option, + ) -> bool { + ancestors.map_or(false, |ancestors| ancestors.contains_key(&slot)) || + // If the slot is a root, it must be less than the maximum root specified. This + // allows scans on non-rooted slots to specify and read data from + // ancestors > max_root, while not seeing rooted data update during the scan + (max_root.map_or(true, |max_root| slot <= max_root) && (self.is_root(slot))) } /// Get an account @@ -483,7 +561,13 @@ impl AccountsIndex { let mut w_roots_tracker = self.roots_tracker.write().unwrap(); w_roots_tracker.roots.insert(slot); w_roots_tracker.uncleaned_roots.insert(slot); + w_roots_tracker.max_root = std::cmp::max(slot, w_roots_tracker.max_root); } + + fn max_root(&self) -> Slot { + self.roots_tracker.read().unwrap().max_root + } + /// Remove the slot when the storage for the slot is freed /// Accounts no longer reference this slot. pub fn clean_dead_slot(&self, slot: Slot) { @@ -969,19 +1053,20 @@ mod tests { index.add_root(5); assert_eq!(index.latest_slot(None, &slot_slice, None).unwrap(), 1); - // Given a maximum -= root, should still return the root + // Given a max_root == root, should still return the root assert_eq!(index.latest_slot(None, &slot_slice, Some(5)).unwrap(), 1); - // Given a maximum < root, should filter out the root + // Given a max_root < root, should filter out the root assert!(index.latest_slot(None, &slot_slice, Some(4)).is_none()); - // Given a maximum, should filter out the ancestors > maximum + // Given a max_root, should filter out roots < max_root, but specified + // ancestors should not be affected let ancestors: HashMap = vec![(3, 1), (7, 1)].into_iter().collect(); assert_eq!( index .latest_slot(Some(&ancestors), &slot_slice, Some(4)) .unwrap(), - 2 + 3 ); assert_eq!( index @@ -990,21 +1075,13 @@ mod tests { 3 ); - // Given no maximum, should just return the greatest ancestor or root + // Given no max_root, should just return the greatest ancestor or root assert_eq!( index .latest_slot(Some(&ancestors), &slot_slice, None) .unwrap(), 3 ); - - // Because the given maximum `m == root`, ancestors > root - assert_eq!( - index - .latest_slot(Some(&ancestors), &slot_slice, Some(5)) - .unwrap(), - 1 - ); } #[test] diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d4b09eb7b1..82ff2a13f8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4029,7 +4029,7 @@ impl Bank { // accounts that were included in the bank delta hash when the bank was frozen, // and if we clean them here, any newly created snapshot's hash for this bank // may not match the frozen hash. - Some(self.slot() - 1) + Some(self.slot().saturating_sub(1)) } else { None }; @@ -4295,7 +4295,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) { pub(crate) mod tests { use super::*; use crate::{ - accounts_index::{AccountMap, Ancestors}, + accounts_index::{AccountMap, Ancestors, ITER_BATCH_SIZE}, genesis_utils::{ activate_all_features, create_genesis_config_with_leader, create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, @@ -4304,6 +4304,7 @@ pub(crate) mod tests { native_loader::NativeLoaderError, status_cache::MAX_CACHE_ENTRIES, }; + use crossbeam_channel::bounded; use solana_sdk::{ account_utils::StateMut, clock::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, @@ -4332,7 +4333,7 @@ pub(crate) mod tests { vote_instruction, vote_state::{self, BlockTimestamp, Vote, VoteInit, VoteState, MAX_LOCKOUT_HISTORY}, }; - use std::{result, time::Duration}; + use std::{result, thread::Builder, time::Duration}; #[test] fn test_hash_age_kind_is_durable_nonce() { @@ -10445,4 +10446,148 @@ pub(crate) mod tests { let debug = format!("{:#?}", bank); assert!(!debug.is_empty()); } + + fn test_store_scan_consistency(update_f: F) + where + F: Fn(Arc, crossbeam_channel::Sender>, Arc>, Pubkey, u64) + + std::marker::Send, + { + // Set up initial bank + let mut genesis_config = create_genesis_config_with_leader( + 10, + &solana_sdk::pubkey::new_rand(), + 374_999_998_287_840, + ) + .genesis_config; + genesis_config.rent = Rent::free(); + let bank0 = Arc::new(Bank::new(&genesis_config)); + + // Set up pubkeys to write to + let total_pubkeys = ITER_BATCH_SIZE * 10; + let total_pubkeys_to_modify = 10; + let all_pubkeys: Vec = std::iter::repeat_with(solana_sdk::pubkey::new_rand) + .take(total_pubkeys) + .collect(); + let program_id = system_program::id(); + let starting_lamports = 1; + let starting_account = Account::new(starting_lamports, 0, &program_id); + + // Write accounts to the store + for key in &all_pubkeys { + bank0.store_account(&key, &starting_account); + } + + // Set aside a subset of accounts to modify + let pubkeys_to_modify: Arc> = Arc::new( + all_pubkeys + .into_iter() + .take(total_pubkeys_to_modify) + .collect(), + ); + let exit = Arc::new(AtomicBool::new(false)); + + // Thread that runs scan and constantly checks for + // consistency + let pubkeys_to_modify_ = pubkeys_to_modify.clone(); + let exit_ = exit.clone(); + + // Channel over which the bank to scan is sent + let (bank_to_scan_sender, bank_to_scan_receiver): ( + crossbeam_channel::Sender>, + crossbeam_channel::Receiver>, + ) = bounded(1); + let scan_thread = Builder::new() + .name("scan".to_string()) + .spawn(move || loop { + if exit_.load(Relaxed) { + return; + } + if let Ok(bank_to_scan) = + bank_to_scan_receiver.recv_timeout(Duration::from_millis(10)) + { + let accounts = bank_to_scan.get_program_accounts(&program_id); + // Should never seen empty accounts because no slot ever deleted + // any of the original accounts, and the scan should reflect the + // account state at some frozen slot `X` (no partial updates). + assert!(!accounts.is_empty()); + let mut expected_lamports = None; + let mut target_accounts_found = HashSet::new(); + for (pubkey, account) in accounts { + let account_balance = account.lamports; + if pubkeys_to_modify_.contains(&pubkey) { + target_accounts_found.insert(pubkey); + if let Some(expected_lamports) = expected_lamports { + assert_eq!(account_balance, expected_lamports); + } else { + // All pubkeys in the specified set should have the same balance + expected_lamports = Some(account_balance); + } + } + } + + // Should've found all the accounts, i.e. no partial cleans should + // be detected + assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify); + } + }) + .unwrap(); + + // Thread that constantly updates the accounts, sets + // roots, and cleans + let update_thread = Builder::new() + .name("update".to_string()) + .spawn(move || { + update_f( + bank0, + bank_to_scan_sender, + pubkeys_to_modify, + program_id, + starting_lamports, + ); + }) + .unwrap(); + + // Let threads run for a while, check the scans didn't see any mixed slots + std::thread::sleep(Duration::new(5, 0)); + exit.store(true, Relaxed); + scan_thread.join().unwrap(); + update_thread.join().unwrap(); + } + + #[test] + fn test_store_scan_consistency_root() { + test_store_scan_consistency( + |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + let mut current_bank = bank0.clone(); + let mut prev_bank = bank0; + loop { + let lamports_this_round = current_bank.slot() + starting_lamports + 1; + let account = Account::new(lamports_this_round, 0, &program_id); + for key in pubkeys_to_modify.iter() { + current_bank.store_account(key, &account); + } + current_bank.freeze(); + // Send the previous bank to the scan thread to perform the scan. + // Meanwhile this thread will squash and update roots immediately after + // so the roots will update while scanning. + // + // The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting + // the next iteration, allowing the scan to stay in sync with these updates + // such that every scan will see this interruption. + if bank_to_scan_sender.send(prev_bank).is_err() { + // Channel was disconnected, exit + return; + } + current_bank.squash(); + current_bank.clean_accounts(true); + prev_bank = current_bank.clone(); + current_bank = Arc::new(Bank::new_from_parent( + ¤t_bank, + &solana_sdk::pubkey::new_rand(), + current_bank.slot() + 1, + )); + } + }, + ); + } }