Refactor purge_slots_from_cache_and_store() and handle_reclaims() (#17319)

This commit is contained in:
carllin 2021-05-24 13:51:17 -07:00 committed by GitHub
parent 41ec1c8d50
commit d8bc56fa51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 323 additions and 247 deletions

View File

@ -35,8 +35,7 @@ use solana_rpc::{
};
use solana_runtime::{
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SendDroppedBankCallback,
SnapshotRequestHandler,
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler,
},
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
@ -235,10 +234,18 @@ impl Tvu {
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
// Before replay starts, set the callbacks in each of the banks in BankForks
// Note after this callback is created, only the AccountsBackgroundService should be calling
// AccountsDb::purge_slot() to clean up dropped banks.
let callback = bank_forks
.read()
.unwrap()
.root_bank()
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
for bank in bank_forks.read().unwrap().banks().values() {
bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
pruned_banks_sender.clone(),
))));
bank.set_callback(Some(Box::new(callback.clone())));
}
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);

View File

@ -912,8 +912,9 @@ impl Accounts {
/// Purge a slot if it is not a root
/// Root slots cannot be purged
pub fn purge_slot(&self, slot: Slot) {
self.accounts_db.purge_slot(slot);
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) {
self.accounts_db.purge_slot(slot, is_from_abs);
}
/// Add a slot to root. Root slots cannot be purged

View File

@ -263,11 +263,12 @@ impl AbsRequestHandler {
})
}
pub fn handle_pruned_banks(&self, bank: &Bank) -> usize {
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize {
let mut count = 0;
for pruned_slot in self.pruned_banks_receiver.try_iter() {
count += 1;
bank.rc.accounts.purge_slot(pruned_slot);
bank.rc.accounts.purge_slot(pruned_slot, is_from_abs);
}
count
@ -393,7 +394,7 @@ impl AccountsBackgroundService {
total_remove_slots_time: &mut u64,
) {
let mut remove_slots_time = Measure::start("remove_slots_time");
*removed_slots_count += request_handler.handle_pruned_banks(&bank);
*removed_slots_count += request_handler.handle_pruned_banks(&bank, true);
remove_slots_time.stop();
*total_remove_slots_time += remove_slots_time.as_us();

View File

@ -19,6 +19,7 @@
//! commit for each slot entry would be indexed.
use crate::{
accounts_background_service::{DroppedSlotsSender, SendDroppedBankCallback},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass},
accounts_index::{
@ -828,6 +829,8 @@ pub struct AccountsDb {
#[cfg(test)]
load_limit: AtomicU64,
is_bank_drop_callback_enabled: AtomicBool,
}
#[derive(Debug, Default)]
@ -858,7 +861,8 @@ struct AccountsStats {
struct PurgeStats {
last_report: AtomicU64,
safety_checks_elapsed: AtomicU64,
remove_storages_elapsed: AtomicU64,
remove_cache_elapsed: AtomicU64,
remove_storage_entries_elapsed: AtomicU64,
drop_storage_entries_elapsed: AtomicU64,
num_cached_slots_removed: AtomicUsize,
num_stored_slots_removed: AtomicUsize,
@ -866,6 +870,9 @@ struct PurgeStats {
total_removed_cached_bytes: AtomicU64,
total_removed_stored_bytes: AtomicU64,
recycle_stores_write_elapsed: AtomicU64,
scan_storages_elasped: AtomicU64,
purge_accounts_index_elapsed: AtomicU64,
handle_reclaims_elapsed: AtomicU64,
}
impl PurgeStats {
@ -893,8 +900,14 @@ impl PurgeStats {
i64
),
(
"remove_storages_elapsed",
self.remove_storages_elapsed.swap(0, Ordering::Relaxed) as i64,
"remove_cache_elapsed",
self.remove_cache_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"remove_storage_entries_elapsed",
self.remove_storage_entries_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
@ -933,6 +946,21 @@ impl PurgeStats {
self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"scan_storages_elasped",
self.scan_storages_elasped.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"purge_accounts_index_elapsed",
self.purge_accounts_index_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"handle_reclaims_elapsed",
self.handle_reclaims_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
@ -1232,6 +1260,7 @@ impl Default for AccountsDb {
load_delay: u64::default(),
#[cfg(test)]
load_limit: AtomicU64::default(),
is_bank_drop_callback_enabled: AtomicBool::default(),
}
}
}
@ -1359,7 +1388,7 @@ impl AccountsDb {
self.handle_reclaims(
&reclaims,
None,
false,
Some(&self.clean_accounts_stats.purge_stats),
Some(&mut reclaim_result),
reset_accounts,
);
@ -1472,7 +1501,7 @@ impl AccountsDb {
fn purge_keys_exact<'a, C: 'a>(
&'a self,
pubkey_to_slot_set: &'a [(Pubkey, C)],
pubkey_to_slot_set: impl Iterator<Item = &'a (Pubkey, C)>,
) -> Vec<(u64, AccountInfo)>
where
C: Contains<'a, Slot>,
@ -1780,14 +1809,20 @@ impl AccountsDb {
})
.collect();
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter());
// Don't reset from clean, since the pubkeys in those stores may need to be unref'ed
// and those stores may be used for background hashing.
let reset_accounts = false;
let mut reclaim_result = ReclaimResult::default();
let reclaim_result = Some(&mut reclaim_result);
self.handle_reclaims(&reclaims, None, false, reclaim_result, reset_accounts);
self.handle_reclaims(
&reclaims,
None,
Some(&self.clean_accounts_stats.purge_stats),
reclaim_result,
reset_accounts,
);
reclaims_time.stop();
@ -1823,7 +1858,9 @@ impl AccountsDb {
/// remove all the storage entries for `S`.
///
/// # Arguments
/// * `reclaims` - The accounts to remove from storage entries' "count"
/// * `reclaims` - The accounts to remove from storage entries' "count". Note here
/// that we should not remove cache entries, only entries for accounts actually
/// stored in a storage entry.
///
/// * `expected_single_dead_slot` - A correctness assertion. If this is equal to `Some(S)`,
/// then the function will check that the only slot being cleaned up in `reclaims`
@ -1831,13 +1868,16 @@ impl AccountsDb {
/// from store or slot shrinking, as those should only touch the slot they are
/// currently storing to or shrinking.
///
/// * `no_dead_slot` - A correctness assertion. If this is equal to
/// `false`, the function will check that no slots are cleaned up/removed via
/// `process_dead_slots`. For instance, on store, no slots should be cleaned up,
/// but during the background clean accounts purges accounts from old rooted slots,
/// so outdated slots may be removed.
/// * `purge_stats` - The stats used to track performance of purging dead slots. This
/// also serves a correctness assertion. If `purge_stats.is_none()`, this implies
/// there can be no dead slots that happen as a result of this call, and the function
/// will check that no slots are cleaned up/removed via `process_dead_slots`. For instance,
/// on store, no slots should be cleaned up, but during the background clean accounts
/// purges accounts from old rooted slots, so outdated slots may be removed.
///
/// * `reclaim_result` - Information about accounts that were removed from storage, does
/// not include accounts that were removed from the cache
///
/// * `reset_accounts` - Reset the append_vec store when the store is dead (count==0)
/// From the clean and shrink paths it should be false since there may be an in-progress
/// hash operation and the stores may hold accounts that need to be unref'ed.
@ -1845,7 +1885,9 @@ impl AccountsDb {
&self,
reclaims: SlotSlice<AccountInfo>,
expected_single_dead_slot: Option<Slot>,
no_dead_slot: bool,
// TODO: coalesce `purge_stats` and `reclaim_result` together into one option, as they
// are both either Some or None
purge_stats: Option<&PurgeStats>,
reclaim_result: Option<&mut ReclaimResult>,
reset_accounts: bool,
) {
@ -1864,7 +1906,7 @@ impl AccountsDb {
reclaimed_offsets,
reset_accounts,
);
if no_dead_slot {
if purge_stats.is_none() {
assert!(dead_slots.is_empty());
} else if let Some(expected_single_dead_slot) = expected_single_dead_slot {
assert!(dead_slots.len() <= 1);
@ -1872,7 +1914,10 @@ impl AccountsDb {
assert!(dead_slots.contains(&expected_single_dead_slot));
}
}
self.process_dead_slots(&dead_slots, purged_account_slots);
if let Some(purge_stats) = purge_stats {
self.process_dead_slots(&dead_slots, purged_account_slots, purge_stats);
}
}
// Must be kept private!, does sensitive cleanup that should only be called from
@ -1881,6 +1926,7 @@ impl AccountsDb {
&self,
dead_slots: &HashSet<Slot>,
purged_account_slots: Option<&mut AccountSlots>,
purge_stats: &PurgeStats,
) {
if dead_slots.is_empty() {
return;
@ -1890,7 +1936,7 @@ impl AccountsDb {
clean_dead_slots.stop();
let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
self.purge_storage_slots(&dead_slots);
self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats);
purge_removed_slots.stop();
// If the slot is dead, remove the need to shrink the storages as
@ -2586,7 +2632,11 @@ impl AccountsDb {
// | |
// V |
// P3 purge_slots_from_cache_and_store()/ | index
// purge_slot_cache_pubkeys() | (removes existing store_id, offset for caches)
// purge_slot_cache()/ |
// purge_slot_cache_pubkeys() | (removes existing store_id, offset for cache)
// purge_slot_storage()/ |
// purge_keys_exact() | (removes accounts index entries)
// handle_reclaims() | (removes storage entries)
// OR |
// clean_accounts()/ |
// clean_accounts_older_than_root()| (removes existing store_id, offset for stores)
@ -3071,7 +3121,20 @@ impl AccountsDb {
.is_none());
}
pub fn purge_slot(&self, slot: Slot) {
pub fn create_drop_bank_callback(
&self,
pruned_banks_sender: DroppedSlotsSender,
) -> SendDroppedBankCallback {
self.is_bank_drop_callback_enabled
.store(true, Ordering::SeqCst);
SendDroppedBankCallback::new(pruned_banks_sender)
}
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) {
if self.is_bank_drop_callback_enabled.load(Ordering::SeqCst) && !is_from_abs {
panic!("bad drop callpath detected; Bank::drop() must run serially with other logic in ABS like clean_accounts()")
}
let mut slots = HashSet::new();
slots.insert(slot);
self.purge_slots(&slots);
@ -3105,56 +3168,90 @@ impl AccountsDb {
recycle_stores_write_elapsed.as_us()
}
/// Purges every slot in `removed_slots` from both the cache and storage. This includes
/// entries in the accounts index, cache entries, and any backing storage entries.
fn purge_slots_from_cache_and_store<'a>(
&'a self,
can_exist_in_cache: bool,
removed_slots: impl Iterator<Item = &'a Slot>,
purge_stats: &PurgeStats,
) {
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed");
let mut all_removed_slot_storages = vec![];
let mut remove_cache_elapsed_across_slots = 0;
let mut num_cached_slots_removed = 0;
let mut total_removed_cached_bytes = 0;
let mut total_removed_storage_entries = 0;
let mut total_removed_stored_bytes = 0;
for remove_slot in removed_slots {
// This function is only currently safe with respect to `flush_slot_cache()` because
// both functions run serially in AccountsBackgroundService.
let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed");
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for
// the slot and from the Accounts Index
if !can_exist_in_cache {
panic!("The removed slot must alrady have been flushed from the cache");
}
num_cached_slots_removed += 1;
total_removed_cached_bytes += slot_cache.total_bytes();
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
remove_cache_elapsed.stop();
remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us();
} else {
self.purge_slot_storage(*remove_slot, purge_stats);
}
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
}
// Note this only cleans up the storage entries. The accounts index cleaning
// (removing from the slot list, decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
purge_stats
.remove_cache_elapsed
.fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed);
purge_stats
.num_cached_slots_removed
.fetch_add(num_cached_slots_removed, Ordering::Relaxed);
purge_stats
.total_removed_cached_bytes
.fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
}
/// Purge the backing storage entries for the given slot, does not purge from
/// the cache!
fn purge_dead_slots_from_storage<'a>(
&'a self,
removed_slots: impl Iterator<Item = &'a Slot> + Clone,
purge_stats: &PurgeStats,
) {
// Check all slots `removed_slots` are no longer "relevant" roots.
// Note that the slots here could have been rooted slots, but if they're passed here
// for removal it means:
// 1) All updates in that old root have been outdated by updates in newer roots
// 2) Those slots/roots should have already been purged from the accounts index root
// tracking metadata via `accounts_index.clean_dead_slot()`.
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
assert!(self
.accounts_index
.get_rooted_from_list(removed_slots.clone())
.is_empty());
safety_checks_elapsed.stop();
purge_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
let mut total_removed_storage_entries = 0;
let mut total_removed_stored_bytes = 0;
let mut all_removed_slot_storages = vec![];
let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed");
for remove_slot in removed_slots {
// Remove the storage entries and collect some metrics
if let Some((_, slot_storages_to_be_removed)) = self.storage.0.remove(&remove_slot) {
{
let r_slot_removed_storages = slot_removed_storages.read().unwrap();
let r_slot_removed_storages = slot_storages_to_be_removed.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_stored_bytes += r_slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages.clone());
all_removed_slot_storages.push(slot_storages_to_be_removed.clone());
}
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
}
remove_storages_elapsed.stop();
remove_storage_entries_elapsed.stop();
let num_stored_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_elapsed =
@ -3165,19 +3262,12 @@ impl AccountsDb {
// of any locks
drop(all_removed_slot_storages);
drop_storage_entries_elapsed.stop();
purge_stats
.remove_storages_elapsed
.fetch_add(remove_storages_elapsed.as_us(), Ordering::Relaxed);
.remove_storage_entries_elapsed
.fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed);
purge_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
purge_stats
.num_cached_slots_removed
.fetch_add(num_cached_slots_removed, Ordering::Relaxed);
purge_stats
.total_removed_cached_bytes
.fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
purge_stats
.num_stored_slots_removed
.fetch_add(num_stored_slots_removed, Ordering::Relaxed);
@ -3192,24 +3282,6 @@ impl AccountsDb {
.fetch_add(recycle_stores_write_elapsed, Ordering::Relaxed);
}
fn purge_storage_slots(&self, removed_slots: &HashSet<Slot>) {
// Check all slots `removed_slots` are no longer rooted
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
for slot in removed_slots.iter() {
assert!(!self.accounts_index.is_root(*slot))
}
safety_checks_elapsed.stop();
self.clean_accounts_stats
.purge_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.purge_slots_from_cache_and_store(
false,
removed_slots.iter(),
&self.clean_accounts_stats.purge_stats,
);
}
fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) {
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
@ -3232,10 +3304,10 @@ impl AccountsDb {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(purged_slot).is_none());
let num_purged_keys = pubkey_to_slot_set.len();
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter());
assert_eq!(reclaims.len(), num_purged_keys);
if is_dead {
self.finalize_dead_slot_removal(
self.remove_dead_slots_metadata(
std::iter::once(&purged_slot),
purged_slot_pubkeys,
None,
@ -3243,6 +3315,67 @@ impl AccountsDb {
}
}
fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
let mut scan_storages_elasped = Measure::start("scan_storages_elasped");
type ScanResult = ScanStorageResult<Pubkey, Arc<Mutex<HashSet<(Pubkey, Slot)>>>>;
let scan_result: ScanResult = self.scan_account_storage(
remove_slot,
|loaded_account: LoadedAccount| Some(*loaded_account.pubkey()),
|accum: &Arc<Mutex<HashSet<(Pubkey, Slot)>>>, loaded_account: LoadedAccount| {
accum
.lock()
.unwrap()
.insert((*loaded_account.pubkey(), remove_slot));
},
);
scan_storages_elasped.stop();
purge_stats
.scan_storages_elasped
.fetch_add(scan_storages_elasped.as_us(), Ordering::Relaxed);
let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed");
let reclaims;
match scan_result {
ScanStorageResult::Cached(_) => {
panic!("Should not see cached keys in this `else` branch, since we checked this slot did not exist in the cache above");
}
ScanStorageResult::Stored(stored_keys) => {
// Purge this slot from the accounts index
reclaims = self.purge_keys_exact(stored_keys.lock().unwrap().iter());
}
}
purge_accounts_index_elapsed.stop();
purge_stats
.purge_accounts_index_elapsed
.fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed);
// `handle_reclaims()` should remove all the account index entries and
// storage entries
let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed");
// Slot should be dead after removing all its account entries
let expected_dead_slot = Some(remove_slot);
self.handle_reclaims(
&reclaims,
expected_dead_slot,
Some(purge_stats),
Some(&mut ReclaimResult::default()),
false,
);
handle_reclaims_elapsed.stop();
purge_stats
.handle_reclaims_elapsed
.fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed);
// After handling the reclaimed entries, this slot's
// storage entries should be purged from self.storage
assert!(self.storage.get_slot_stores(remove_slot).is_none());
}
#[allow(clippy::needless_collect)]
fn purge_slots(&self, slots: &HashSet<Slot>) {
// `add_root()` should be called first
@ -3256,7 +3389,6 @@ impl AccountsDb {
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.purge_slots_from_cache_and_store(
true,
non_roots.into_iter(),
&self.external_purge_slots_stats,
);
@ -3273,11 +3405,6 @@ impl AccountsDb {
panic!("Trying to remove accounts for rooted slot {}", remove_slot);
}
if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) {
// If the slot is still in the cache, remove it from the cache
self.purge_slot_cache(remove_slot, slot_cache);
}
// TODO: Handle if the slot was flushed to storage while we were removing the cached
// slot above, i.e. it's possible the storage contains partial version of the current
// slot. One way to handle this is to augment slots to contain a "version", That way,
@ -3287,37 +3414,12 @@ impl AccountsDb {
// Reads will then always read the latest version of a slot. Scans will also know
// which version their parents because banks will also be augmented with this version,
// which handles cases where a deletion of one version happens in the middle of the scan.
let scan_result: ScanStorageResult<Pubkey, DashSet<Pubkey>> = self.scan_account_storage(
remove_slot,
|loaded_account: LoadedAccount| Some(*loaded_account.pubkey()),
|accum: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
accum.insert(*loaded_account.pubkey());
},
let remove_unrooted_purge_stats = PurgeStats::default();
self.purge_slots_from_cache_and_store(
std::iter::once(&remove_slot),
&remove_unrooted_purge_stats,
);
// Purge this slot from the accounts index
let purge_slot: HashSet<Slot> = vec![remove_slot].into_iter().collect();
let mut reclaims = vec![];
match scan_result {
ScanStorageResult::Cached(cached_keys) => {
for pubkey in cached_keys.iter() {
self.accounts_index
.purge_exact(pubkey, &purge_slot, &mut reclaims);
}
}
ScanStorageResult::Stored(stored_keys) => {
for set_ref in stored_keys.iter() {
self.accounts_index
.purge_exact(set_ref.key(), &purge_slot, &mut reclaims);
}
}
}
self.handle_reclaims(&reclaims, Some(remove_slot), false, None, false);
// After handling the reclaimed entries, this slot's
// storage entries should be purged from self.storage
assert!(self.storage.get_slot_stores(remove_slot).is_none());
remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0));
}
pub fn hash_stored_account(slot: Slot, account: &StoredAccountMeta) -> Hash {
@ -4534,7 +4636,27 @@ impl AccountsDb {
dead_slots
}
fn finalize_dead_slot_removal<'a>(
fn remove_dead_slots_metadata<'a>(
&'a self,
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
// Should only be `Some` for non-cached slots
purged_stored_account_slots: Option<&mut AccountSlots>,
) {
self.clean_dead_slots_from_accounts_index(
dead_slots_iter.clone(),
purged_slot_pubkeys,
purged_stored_account_slots,
);
{
let mut bank_hashes = self.bank_hashes.write().unwrap();
for slot in dead_slots_iter {
bank_hashes.remove(slot);
}
}
}
fn clean_dead_slots_from_accounts_index<'a>(
&'a self,
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
@ -4555,7 +4677,6 @@ impl AccountsDb {
let mut rooted_cleaned_count = 0;
let mut unrooted_cleaned_count = 0;
let dead_slots: Vec<_> = dead_slots_iter
.clone()
.map(|slot| {
if let Some(latest) = self.accounts_index.clean_dead_slot(*slot) {
rooted_cleaned_count += 1;
@ -4566,7 +4687,7 @@ impl AccountsDb {
*slot
})
.collect();
info!("finalize_dead_slot_removal: slots {:?}", dead_slots);
info!("remove_dead_slots_metadata: slots {:?}", dead_slots);
accounts_index_root_stats.rooted_cleaned_count += rooted_cleaned_count;
accounts_index_root_stats.unrooted_cleaned_count += unrooted_cleaned_count;
@ -4574,13 +4695,6 @@ impl AccountsDb {
self.clean_accounts_stats
.latest_accounts_index_roots_stats
.update(&accounts_index_root_stats);
{
let mut bank_hashes = self.bank_hashes.write().unwrap();
for slot in dead_slots_iter {
bank_hashes.remove(slot);
}
}
}
fn clean_stored_dead_slots(
@ -4614,7 +4728,7 @@ impl AccountsDb {
})
})
};
self.finalize_dead_slot_removal(
self.remove_dead_slots_metadata(
dead_slots.iter(),
purged_slot_pubkeys,
purged_account_slots,
@ -4957,9 +5071,11 @@ impl AccountsDb {
// a) this slot has at least one account (the one being stored),
// b)From 1) we know no other slots are included in the "reclaims"
//
// From 1) and 2) we guarantee passing Some(slot), true is safe
// From 1) and 2) we guarantee passing `no_purge_stats` == None, which is
// equivalent to asserting there will be no dead slots, is safe.
let no_purge_stats = None;
let mut handle_reclaims_time = Measure::start("handle_reclaims");
self.handle_reclaims(&reclaims, Some(slot), true, None, reset_accounts);
self.handle_reclaims(&reclaims, Some(slot), no_purge_stats, None, reset_accounts);
handle_reclaims_time.stop();
self.stats
.store_handle_reclaims
@ -6297,15 +6413,18 @@ pub mod tests {
);
}
#[test]
fn test_remove_unrooted_slot() {
fn run_test_remove_unrooted_slot(is_cached: bool) {
let unrooted_slot = 9;
let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development);
db.caching_enabled = true;
let key = Pubkey::default();
let account0 = AccountSharedData::new(1, 0, &key);
let ancestors = vec![(unrooted_slot, 1)].into_iter().collect();
db.store_cached(unrooted_slot, &[(&key, &account0)]);
if is_cached {
db.store_cached(unrooted_slot, &[(&key, &account0)]);
} else {
db.store_uncached(unrooted_slot, &[(&key, &account0)]);
}
db.bank_hashes
.write()
.unwrap()
@ -6320,12 +6439,9 @@ pub mod tests {
db.remove_unrooted_slot(unrooted_slot);
assert!(db.load_without_fixed_root(&ancestors, &key).is_none());
assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none());
assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none());
assert!(db.storage.0.get(&unrooted_slot).is_none());
assert!(db
.accounts_index
.get_account_read_entry(&key)
.map(|locked_entry| locked_entry.slot_list().is_empty())
.unwrap_or(true));
assert!(db.accounts_index.get_account_read_entry(&key).is_none());
assert!(db
.accounts_index
.get(&key, Some(&ancestors), None)
@ -6337,6 +6453,16 @@ pub mod tests {
assert_load_account(&db, unrooted_slot, key, 2);
}
#[test]
fn test_remove_unrooted_slot_cached() {
run_test_remove_unrooted_slot(true);
}
#[test]
fn test_remove_unrooted_slot_storage() {
run_test_remove_unrooted_slot(false);
}
#[test]
fn test_remove_unrooted_slot_snapshot() {
solana_logger::setup();
@ -7628,7 +7754,7 @@ pub mod tests {
let slots: HashSet<Slot> = vec![1].into_iter().collect();
let purge_keys = vec![(key1, slots)];
db.purge_keys_exact(&purge_keys);
db.purge_keys_exact(purge_keys.iter());
let account2 = AccountSharedData::new(3, 0, &key);
db.store_uncached(2, &[(&key1, &account2)]);
@ -9493,7 +9619,7 @@ pub mod tests {
assert_eq!(account.0.lamports(), slot1_account.lamports());
// Simulate dropping the bank, which finally removes the slot from the cache
db.purge_slot(1);
db.purge_slot(1, false);
assert!(db
.do_load(
&scan_ancestors,
@ -10364,76 +10490,6 @@ pub mod tests {
do_test_load_account_and_shrink_race(false);
}
fn do_test_load_account_and_purge_race(with_retry: bool) {
let caching_enabled = true;
let mut db = AccountsDb::new_with_config(
Vec::new(),
&ClusterType::Development,
AccountSecondaryIndexes::default(),
caching_enabled,
);
db.load_delay = RACY_SLEEP_MS;
let db = Arc::new(db);
let pubkey =
Arc::new(Pubkey::from_str("CiDwVBFgWV9E5MvXWoLgnEgn2hK7rJikbvfWavzAQz3").unwrap());
let exit = Arc::new(AtomicBool::new(false));
let slot = 1;
// Store an account
let lamports = 42;
let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
account.set_lamports(lamports);
db.store_uncached(slot, &[(&pubkey, &account)]);
let t_purge_slot = {
let db = db.clone();
let exit = exit.clone();
std::thread::Builder::new()
.name("account-purge".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
// Simulate purge_slots()
db.purge_slot(slot);
sleep(Duration::from_millis(RACY_SLEEP_MS));
})
.unwrap()
};
let ancestors: Ancestors = vec![(slot, 0)].into_iter().collect();
let t_do_load =
start_load_thread(with_retry, ancestors, db, exit.clone(), pubkey, move |_| {
lamports
});
sleep(Duration::from_secs(RACE_TIME));
exit.store(true, Ordering::Relaxed);
t_purge_slot.join().unwrap();
// Propagate expected panic! occurred in the do_load thread
t_do_load.join().map_err(std::panic::resume_unwind).unwrap()
}
#[test]
#[should_panic(expected = "assertion failed: load_hint == LoadHint::Unspecified")]
fn test_load_account_and_purge_race_with_retry() {
// this tests impossible situation in the wild, so panic is expected
// Conversely, we show that we're preventing this race condition from occurring
do_test_load_account_and_purge_race(true);
}
#[test]
#[ignore]
#[should_panic(
expected = "Bad index entry detected (CiDwVBFgWV9E5MvXWoLgnEgn2hK7rJikbvfWavzAQz3, 1, 0, 0, Unspecified)"
)]
fn test_load_account_and_purge_race_without_retry() {
// this tests impossible situation in the wild, so panic is expected
// Conversely, we show that we're preventing this race condition from occurring
do_test_load_account_and_purge_race(false);
}
#[test]
fn test_collect_uncleaned_slots_up_to_slot() {
solana_logger::setup();

View File

@ -1388,6 +1388,20 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
slot < max_clean_root && slot != newest_root_in_slot_list
}
/// Given a list of slots, return a new list of only the slots that are rooted
pub fn get_rooted_from_list<'a>(&self, slots: impl Iterator<Item = &'a Slot>) -> Vec<Slot> {
let roots_tracker = self.roots_tracker.read().unwrap();
slots
.filter_map(|s| {
if roots_tracker.roots.contains(s) {
Some(*s)
} else {
None
}
})
.collect()
}
pub fn is_root(&self, slot: Slot) -> bool {
self.roots_tracker.read().unwrap().roots.contains(&slot)
}

View File

@ -5172,24 +5172,6 @@ impl Bank {
.is_active(&feature_set::consistent_recent_blockhashes_sysvar::id()),
}
}
/// Bank cleanup
///
/// If the bank is unfrozen and then dropped, additional cleanup is needed. In particular,
/// cleaning up the pubkeys that are only in this bank. To do that, call into AccountsDb to
/// scan for dirty pubkeys and add them to the uncleaned pubkeys list so they will be cleaned
/// up in AccountsDb::clean_accounts().
fn cleanup(&self) {
if self.is_frozen() {
// nothing to do here
return;
}
self.rc
.accounts
.accounts_db
.scan_slot_and_insert_dirty_pubkeys_into_uncleaned_pubkeys(self.slot);
}
}
impl Drop for Bank {
@ -5198,8 +5180,6 @@ impl Drop for Bank {
return;
}
self.cleanup();
if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() {
drop_callback.callback(self);
} else {
@ -5207,7 +5187,7 @@ impl Drop for Bank {
// 1. Tests
// 2. At startup when replaying blockstore and there's no
// AccountsBackgroundService to perform cleanups yet.
self.rc.accounts.purge_slot(self.slot());
self.rc.accounts.purge_slot(self.slot(), false);
}
}
}
@ -5246,6 +5226,7 @@ fn is_simple_vote_transaction(transaction: &Transaction) -> bool {
pub(crate) mod tests {
use super::*;
use crate::{
accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback},
accounts_db::SHRINK_RATIO,
accounts_index::{AccountIndex, AccountMap, AccountSecondaryIndexes, ITER_BATCH_SIZE},
ancestors::Ancestors,
@ -5257,7 +5238,7 @@ pub(crate) mod tests {
native_loader::NativeLoaderError,
status_cache::MAX_CACHE_ENTRIES,
};
use crossbeam_channel::bounded;
use crossbeam_channel::{bounded, unbounded};
use solana_sdk::{
account::Account,
account_utils::StateMut,
@ -11826,8 +11807,11 @@ pub(crate) mod tests {
assert!(!debug.is_empty());
}
fn test_store_scan_consistency<F: 'static>(accounts_db_caching_enabled: bool, update_f: F)
where
fn test_store_scan_consistency<F: 'static>(
accounts_db_caching_enabled: bool,
update_f: F,
drop_callback: Option<Box<dyn DropCallback + Send + Sync>>,
) where
F: Fn(Arc<Bank>, crossbeam_channel::Sender<Arc<Bank>>, Arc<HashSet<Pubkey>>, Pubkey, u64)
+ std::marker::Send,
{
@ -11844,6 +11828,7 @@ pub(crate) mod tests {
AccountSecondaryIndexes::default(),
accounts_db_caching_enabled,
));
bank0.set_callback(drop_callback);
// Set up pubkeys to write to
let total_pubkeys = ITER_BATCH_SIZE * 10;
@ -11940,9 +11925,18 @@ pub(crate) mod tests {
#[test]
fn test_store_scan_consistency_unrooted() {
for accounts_db_caching_enabled in &[false, true] {
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let abs_request_handler = AbsRequestHandler {
snapshot_request_handler: None,
pruned_banks_receiver,
};
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
move |bank0,
bank_to_scan_sender,
pubkeys_to_modify,
program_id,
starting_lamports| {
let mut current_major_fork_bank = bank0;
loop {
let mut current_minor_fork_bank = current_major_fork_bank.clone();
@ -11993,7 +11987,7 @@ pub(crate) mod tests {
// Send the last new bank to the scan thread to perform the scan.
// Meanwhile this thread will continually set roots on a separate fork
// and squash.
// and squash/clean, purging the account entries from the minor forks
/*
bank 0
/ \
@ -12014,8 +12008,16 @@ pub(crate) mod tests {
// Try to get cache flush/clean to overlap with the scan
current_major_fork_bank.force_flush_accounts_cache();
current_major_fork_bank.clean_accounts(false, false);
// Move purge here so that Bank::drop()->purge_slots() doesn't race
// with clean. Simulates the call from AccountsBackgroundService
let is_abs_service = true;
abs_request_handler
.handle_pruned_banks(&current_major_fork_bank, is_abs_service);
}
},
Some(Box::new(SendDroppedBankCallback::new(
pruned_banks_sender.clone(),
))),
)
}
}
@ -12059,6 +12061,7 @@ pub(crate) mod tests {
));
}
},
None,
);
}
}
@ -12752,7 +12755,6 @@ pub(crate) mod tests {
let key3 = Keypair::new(); // touched in both bank1 and bank2
let key4 = Keypair::new(); // in only bank1, and has zero lamports
let key5 = Keypair::new(); // in both bank1 and bank2, and has zero lamports
bank0.transfer(2, &mint_keypair, &key2.pubkey()).unwrap();
bank0.freeze();
@ -12779,12 +12781,7 @@ pub(crate) mod tests {
bank2.clean_accounts(false, false);
let expected_ref_count_for_cleaned_up_keys = 0;
let expected_ref_count_for_keys_only_in_slot_2 = bank2
.rc
.accounts
.accounts_db
.accounts_index
.ref_count_from_storage(&key2.pubkey());
let expected_ref_count_for_keys_in_both_slot1_and_slot2 = 1;
assert_eq!(
bank2
@ -12820,7 +12817,7 @@ pub(crate) mod tests {
.accounts_db
.accounts_index
.ref_count_from_storage(&key5.pubkey()),
expected_ref_count_for_keys_only_in_slot_2
expected_ref_count_for_keys_in_both_slot1_and_slot2,
);
assert_eq!(