Prevent scans from seeing root updates/clean (#13464)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2020-11-16 17:23:11 -08:00 committed by GitHub
parent 8c922a0198
commit 6276360468
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 301 additions and 42 deletions

View File

@ -462,16 +462,16 @@ impl Accounts {
}
pub fn calculate_capitalization(&self, ancestors: &Ancestors) -> u64 {
let balances = self
.load_all(ancestors)
.into_iter()
.map(|(_pubkey, account, _slot)| {
AccountsDB::account_balance_for_capitalization(
account.lamports,
&account.owner,
account.executable,
)
});
let balances =
self.load_all_unchecked(ancestors)
.into_iter()
.map(|(_pubkey, account, _slot)| {
AccountsDB::account_balance_for_capitalization(
account.lamports,
&account.owner,
account.executable,
)
});
AccountsDB::checked_sum_for_capitalization(balances)
}
@ -541,6 +541,19 @@ impl Accounts {
)
}
fn load_all_unchecked(&self, ancestors: &Ancestors) -> Vec<(Pubkey, Account, Slot)> {
self.accounts_db.unchecked_scan_accounts(
ancestors,
|collector: &mut Vec<(Pubkey, Account, Slot)>, some_account_tuple| {
if let Some((pubkey, account, slot)) =
some_account_tuple.filter(|(_, account, _)| Self::is_loadable(account))
{
collector.push((*pubkey, account, slot))
}
},
)
}
pub fn load_to_collect_rent_eagerly<R: RangeBounds<Pubkey>>(
&self,
ancestors: &Ancestors,

View File

@ -744,6 +744,14 @@ impl AccountsDB {
// hold a lock to prevent slot shrinking from running because it might modify some rooted
// slot storages which can not happen as long as we're cleaning accounts because we're also
// modifying the rooted slot storages!
let max_clean_root = match (self.accounts_index.min_ongoing_scan_root(), max_clean_root) {
(None, None) => None,
(Some(min_scan_root), None) => Some(min_scan_root),
(None, Some(max_clean_root)) => Some(max_clean_root),
(Some(min_scan_root), Some(max_clean_root)) => {
Some(std::cmp::min(min_scan_root, max_clean_root))
}
};
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
self.report_store_stats();
@ -1289,6 +1297,22 @@ impl AccountsDB {
collector
}
pub fn unchecked_scan_accounts<F, A>(&self, ancestors: &Ancestors, scan_func: F) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>),
A: Default,
{
let mut collector = A::default();
self.accounts_index
.unchecked_scan_accounts(ancestors, |pubkey, (account_info, slot)| {
let account_slot = self
.get_account_from_storage(slot, account_info)
.map(|account| (pubkey, account, slot));
scan_func(&mut collector, account_slot)
});
collector
}
pub fn range_scan_accounts<F, A, R>(&self, ancestors: &Ancestors, range: R, scan_func: F) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>),

View File

@ -13,7 +13,7 @@ use std::{
ops::{Range, RangeBounds},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
const ITER_BATCH_SIZE: usize = 1000;
pub const ITER_BATCH_SIZE: usize = 1000;
pub type SlotList<T> = Vec<(Slot, T)>;
pub type SlotSlice<'s, T> = &'s [(Slot, T)];
@ -112,6 +112,7 @@ impl<T: 'static + Clone> WriteAccountMapEntry<T> {
#[derive(Debug, Default)]
pub struct RootsTracker {
roots: HashSet<Slot>,
max_root: Slot,
uncleaned_roots: HashSet<Slot>,
previous_uncleaned_roots: HashSet<Slot>,
}
@ -184,6 +185,7 @@ impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> {
pub struct AccountsIndex<T> {
pub account_maps: RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
roots_tracker: RwLock<RootsTracker>,
ongoing_scan_roots: RwLock<BTreeMap<Slot, u64>>,
}
impl<T: 'static + Clone> AccountsIndex<T> {
@ -194,15 +196,70 @@ impl<T: 'static + Clone> AccountsIndex<T> {
AccountsIndexIterator::new(&self.account_maps, range)
}
fn do_scan_accounts<'a, F, R>(&'a self, ancestors: &Ancestors, mut func: F, range: Option<R>)
where
fn do_checked_scan_accounts<'a, F, R>(
&'a self,
ancestors: &Ancestors,
func: F,
range: Option<R>,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey>,
{
let max_root = {
let mut w_ongoing_scan_roots = self
// This lock is also grabbed by clean_accounts(), so clean
// has at most cleaned up to the current `max_root` (since
// clean only happens *after* BankForks::set_root() which sets
// the `max_root`)
.ongoing_scan_roots
.write()
.unwrap();
// `max_root()` grabs a lock while
// the `ongoing_scan_roots` lock is held,
// make sure inverse doesn't happen to avoid
// deadlock
let max_root = self.max_root();
*w_ongoing_scan_roots.entry(max_root).or_default() += 1;
max_root
};
self.do_scan_accounts(ancestors, func, range, Some(max_root));
{
let mut ongoing_scan_roots = self.ongoing_scan_roots.write().unwrap();
let count = ongoing_scan_roots.get_mut(&max_root).unwrap();
*count -= 1;
if *count == 0 {
ongoing_scan_roots.remove(&max_root);
}
}
}
fn do_unchecked_scan_accounts<'a, F, R>(
&'a self,
ancestors: &Ancestors,
func: F,
range: Option<R>,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey>,
{
self.do_scan_accounts(ancestors, func, range, None);
}
fn do_scan_accounts<'a, F, R>(
&'a self,
ancestors: &Ancestors,
mut func: F,
range: Option<R>,
max_root: Option<Slot>,
) where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey>,
{
for pubkey_list in self.iter(range) {
for (pubkey, list) in pubkey_list {
let list_r = &list.slot_list.read().unwrap();
if let Some(index) = self.latest_slot(Some(ancestors), &list_r, None) {
if let Some(index) = self.latest_slot(Some(ancestors), &list_r, max_root) {
func(&pubkey, (&list_r[index].1, list_r[index].0));
}
}
@ -269,7 +326,14 @@ impl<T: 'static + Clone> AccountsIndex<T> {
where
F: FnMut(&Pubkey, (&T, Slot)),
{
self.do_scan_accounts(ancestors, func, None::<Range<Pubkey>>);
self.do_checked_scan_accounts(ancestors, func, None::<Range<Pubkey>>);
}
pub(crate) fn unchecked_scan_accounts<F>(&self, ancestors: &Ancestors, func: F)
where
F: FnMut(&Pubkey, (&T, Slot)),
{
self.do_unchecked_scan_accounts(ancestors, func, None::<Range<Pubkey>>);
}
/// call func with every pubkey and index visible from a given set of ancestors with range
@ -278,7 +342,8 @@ impl<T: 'static + Clone> AccountsIndex<T> {
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey>,
{
self.do_scan_accounts(ancestors, func, Some(range));
// Only the rent logic should be calling this, which doesn't need the safety checks
self.do_unchecked_scan_accounts(ancestors, func, Some(range));
}
pub fn get_rooted_entries(&self, slice: SlotSlice<T>) -> SlotList<T> {
@ -324,23 +389,27 @@ impl<T: 'static + Clone> AccountsIndex<T> {
})
}
pub fn min_ongoing_scan_root(&self) -> Option<Slot> {
self.ongoing_scan_roots
.read()
.unwrap()
.keys()
.next()
.cloned()
}
// Given a SlotSlice `L`, a list of ancestors and a maximum slot, find the latest element
// in `L`, where the slot `S < max_slot`, and `S` is an ancestor or root.
// in `L`, where the slot `S` is an ancestor or root, and if `S` is a root, then `S <= max_root`
fn latest_slot(
&self,
ancestors: Option<&Ancestors>,
slice: SlotSlice<T>,
max_slot: Option<Slot>,
max_root: Option<Slot>,
) -> Option<usize> {
let mut current_max = 0;
let max_slot = max_slot.unwrap_or(std::u64::MAX);
let mut rv = None;
for (i, (slot, _t)) in slice.iter().rev().enumerate() {
if *slot >= current_max
&& *slot <= max_slot
&& self.is_ancestor_or_root(ancestors, *slot)
{
if *slot >= current_max && self.is_ancestor_or_root(*slot, ancestors, max_root) {
rv = Some((slice.len() - 1) - i);
current_max = *slot;
}
@ -352,8 +421,17 @@ impl<T: 'static + Clone> AccountsIndex<T> {
// Checks that the given slot is either:
// 1) in the `ancestors` set
// 2) or is a root
fn is_ancestor_or_root(&self, ancestors: Option<&Ancestors>, slot: Slot) -> bool {
ancestors.map_or(false, |ancestors| ancestors.contains_key(&slot)) || (self.is_root(slot))
fn is_ancestor_or_root(
&self,
slot: Slot,
ancestors: Option<&Ancestors>,
max_root: Option<Slot>,
) -> bool {
ancestors.map_or(false, |ancestors| ancestors.contains_key(&slot)) ||
// If the slot is a root, it must be less than the maximum root specified. This
// allows scans on non-rooted slots to specify and read data from
// ancestors > max_root, while not seeing rooted data update during the scan
(max_root.map_or(true, |max_root| slot <= max_root) && (self.is_root(slot)))
}
/// Get an account
@ -483,7 +561,13 @@ impl<T: 'static + Clone> AccountsIndex<T> {
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.roots.insert(slot);
w_roots_tracker.uncleaned_roots.insert(slot);
w_roots_tracker.max_root = std::cmp::max(slot, w_roots_tracker.max_root);
}
fn max_root(&self) -> Slot {
self.roots_tracker.read().unwrap().max_root
}
/// Remove the slot when the storage for the slot is freed
/// Accounts no longer reference this slot.
pub fn clean_dead_slot(&self, slot: Slot) {
@ -969,19 +1053,20 @@ mod tests {
index.add_root(5);
assert_eq!(index.latest_slot(None, &slot_slice, None).unwrap(), 1);
// Given a maximum -= root, should still return the root
// Given a max_root == root, should still return the root
assert_eq!(index.latest_slot(None, &slot_slice, Some(5)).unwrap(), 1);
// Given a maximum < root, should filter out the root
// Given a max_root < root, should filter out the root
assert!(index.latest_slot(None, &slot_slice, Some(4)).is_none());
// Given a maximum, should filter out the ancestors > maximum
// Given a max_root, should filter out roots < max_root, but specified
// ancestors should not be affected
let ancestors: HashMap<Slot, usize> = vec![(3, 1), (7, 1)].into_iter().collect();
assert_eq!(
index
.latest_slot(Some(&ancestors), &slot_slice, Some(4))
.unwrap(),
2
3
);
assert_eq!(
index
@ -990,21 +1075,13 @@ mod tests {
3
);
// Given no maximum, should just return the greatest ancestor or root
// Given no max_root, should just return the greatest ancestor or root
assert_eq!(
index
.latest_slot(Some(&ancestors), &slot_slice, None)
.unwrap(),
3
);
// Because the given maximum `m == root`, ancestors > root
assert_eq!(
index
.latest_slot(Some(&ancestors), &slot_slice, Some(5))
.unwrap(),
1
);
}
#[test]

View File

@ -4029,7 +4029,7 @@ impl Bank {
// accounts that were included in the bank delta hash when the bank was frozen,
// and if we clean them here, any newly created snapshot's hash for this bank
// may not match the frozen hash.
Some(self.slot() - 1)
Some(self.slot().saturating_sub(1))
} else {
None
};
@ -4295,7 +4295,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) {
pub(crate) mod tests {
use super::*;
use crate::{
accounts_index::{AccountMap, Ancestors},
accounts_index::{AccountMap, Ancestors, ITER_BATCH_SIZE},
genesis_utils::{
activate_all_features, create_genesis_config_with_leader,
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
@ -4304,6 +4304,7 @@ pub(crate) mod tests {
native_loader::NativeLoaderError,
status_cache::MAX_CACHE_ENTRIES,
};
use crossbeam_channel::bounded;
use solana_sdk::{
account_utils::StateMut,
clock::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
@ -4332,7 +4333,7 @@ pub(crate) mod tests {
vote_instruction,
vote_state::{self, BlockTimestamp, Vote, VoteInit, VoteState, MAX_LOCKOUT_HISTORY},
};
use std::{result, time::Duration};
use std::{result, thread::Builder, time::Duration};
#[test]
fn test_hash_age_kind_is_durable_nonce() {
@ -10445,4 +10446,148 @@ pub(crate) mod tests {
let debug = format!("{:#?}", bank);
assert!(!debug.is_empty());
}
fn test_store_scan_consistency<F: 'static>(update_f: F)
where
F: Fn(Arc<Bank>, crossbeam_channel::Sender<Arc<Bank>>, Arc<HashSet<Pubkey>>, Pubkey, u64)
+ std::marker::Send,
{
// Set up initial bank
let mut genesis_config = create_genesis_config_with_leader(
10,
&solana_sdk::pubkey::new_rand(),
374_999_998_287_840,
)
.genesis_config;
genesis_config.rent = Rent::free();
let bank0 = Arc::new(Bank::new(&genesis_config));
// Set up pubkeys to write to
let total_pubkeys = ITER_BATCH_SIZE * 10;
let total_pubkeys_to_modify = 10;
let all_pubkeys: Vec<Pubkey> = std::iter::repeat_with(solana_sdk::pubkey::new_rand)
.take(total_pubkeys)
.collect();
let program_id = system_program::id();
let starting_lamports = 1;
let starting_account = Account::new(starting_lamports, 0, &program_id);
// Write accounts to the store
for key in &all_pubkeys {
bank0.store_account(&key, &starting_account);
}
// Set aside a subset of accounts to modify
let pubkeys_to_modify: Arc<HashSet<Pubkey>> = Arc::new(
all_pubkeys
.into_iter()
.take(total_pubkeys_to_modify)
.collect(),
);
let exit = Arc::new(AtomicBool::new(false));
// Thread that runs scan and constantly checks for
// consistency
let pubkeys_to_modify_ = pubkeys_to_modify.clone();
let exit_ = exit.clone();
// Channel over which the bank to scan is sent
let (bank_to_scan_sender, bank_to_scan_receiver): (
crossbeam_channel::Sender<Arc<Bank>>,
crossbeam_channel::Receiver<Arc<Bank>>,
) = bounded(1);
let scan_thread = Builder::new()
.name("scan".to_string())
.spawn(move || loop {
if exit_.load(Relaxed) {
return;
}
if let Ok(bank_to_scan) =
bank_to_scan_receiver.recv_timeout(Duration::from_millis(10))
{
let accounts = bank_to_scan.get_program_accounts(&program_id);
// Should never seen empty accounts because no slot ever deleted
// any of the original accounts, and the scan should reflect the
// account state at some frozen slot `X` (no partial updates).
assert!(!accounts.is_empty());
let mut expected_lamports = None;
let mut target_accounts_found = HashSet::new();
for (pubkey, account) in accounts {
let account_balance = account.lamports;
if pubkeys_to_modify_.contains(&pubkey) {
target_accounts_found.insert(pubkey);
if let Some(expected_lamports) = expected_lamports {
assert_eq!(account_balance, expected_lamports);
} else {
// All pubkeys in the specified set should have the same balance
expected_lamports = Some(account_balance);
}
}
}
// Should've found all the accounts, i.e. no partial cleans should
// be detected
assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify);
}
})
.unwrap();
// Thread that constantly updates the accounts, sets
// roots, and cleans
let update_thread = Builder::new()
.name("update".to_string())
.spawn(move || {
update_f(
bank0,
bank_to_scan_sender,
pubkeys_to_modify,
program_id,
starting_lamports,
);
})
.unwrap();
// Let threads run for a while, check the scans didn't see any mixed slots
std::thread::sleep(Duration::new(5, 0));
exit.store(true, Relaxed);
scan_thread.join().unwrap();
update_thread.join().unwrap();
}
#[test]
fn test_store_scan_consistency_root() {
test_store_scan_consistency(
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
let mut current_bank = bank0.clone();
let mut prev_bank = bank0;
loop {
let lamports_this_round = current_bank.slot() + starting_lamports + 1;
let account = Account::new(lamports_this_round, 0, &program_id);
for key in pubkeys_to_modify.iter() {
current_bank.store_account(key, &account);
}
current_bank.freeze();
// Send the previous bank to the scan thread to perform the scan.
// Meanwhile this thread will squash and update roots immediately after
// so the roots will update while scanning.
//
// The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting
// the next iteration, allowing the scan to stay in sync with these updates
// such that every scan will see this interruption.
if bank_to_scan_sender.send(prev_bank).is_err() {
// Channel was disconnected, exit
return;
}
current_bank.squash();
current_bank.clean_accounts(true);
prev_bank = current_bank.clone();
current_bank = Arc::new(Bank::new_from_parent(
&current_bank,
&solana_sdk::pubkey::new_rand(),
current_bank.slot() + 1,
));
}
},
);
}
}