Aggregate purge and shrink metrics (#14763)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2021-01-27 01:39:47 -08:00 committed by GitHub
parent daddcd361a
commit 72f10f5f29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 489 additions and 232 deletions

View File

@ -70,6 +70,11 @@ impl SlotCacheInner {
pub fn is_frozen(&self) -> bool {
self.is_frozen.load(Ordering::SeqCst)
}
pub fn total_bytes(&self) -> u64 {
self.unique_account_writes_size.load(Ordering::Relaxed)
+ self.same_account_writes_size.load(Ordering::Relaxed)
}
}
impl Deref for SlotCacheInner {

View File

@ -21,8 +21,8 @@
use crate::{
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_index::{
AccountIndex, AccountsIndex, Ancestors, IndexKey, IsCached, SlotList, SlotSlice,
ZeroLamport,
AccountIndex, AccountsIndex, AccountsIndexRootsStats, Ancestors, IndexKey, IsCached,
SlotList, SlotSlice, ZeroLamport,
},
append_vec::{AppendVec, StoredAccountMeta, StoredMeta},
contains::Contains,
@ -643,6 +643,13 @@ pub struct AccountsDB {
stats: AccountsStats,
clean_accounts_stats: CleanAccountsStats,
// Stats for purges called outside of clean_accounts()
external_purge_slots_stats: PurgeStats,
shrink_stats: ShrinkStats,
pub cluster_type: Option<ClusterType>,
pub account_indexes: HashSet<AccountIndex>,
@ -678,6 +685,257 @@ struct AccountsStats {
store_uncleaned_update: AtomicU64,
}
#[derive(Debug, Default)]
struct PurgeStats {
last_report: AtomicU64,
safety_checks_elapsed: AtomicU64,
remove_storages_elapsed: AtomicU64,
drop_storage_entries_elapsed: AtomicU64,
num_cached_slots_removed: AtomicUsize,
num_stored_slots_removed: AtomicUsize,
total_removed_storage_entries: AtomicUsize,
total_removed_cached_bytes: AtomicU64,
total_removed_stored_bytes: AtomicU64,
recycle_stores_write_elapsed: AtomicU64,
}
impl PurgeStats {
fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) {
let should_report = report_interval_ms
.map(|report_interval_ms| {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
})
.unwrap_or(true);
if should_report {
datapoint_info!(
metric_name,
(
"safety_checks_elapsed",
self.safety_checks_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"remove_storages_elapsed",
self.remove_storages_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"drop_storage_entries_elapsed",
self.drop_storage_entries_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_cached_slots_removed",
self.num_cached_slots_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_stored_slots_removed",
self.num_stored_slots_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_storage_entries",
self.total_removed_storage_entries
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_cached_bytes",
self.total_removed_cached_bytes.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_removed_stored_bytes",
self.total_removed_stored_bytes.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"recycle_stores_write_elapsed",
self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
#[derive(Debug, Default)]
struct LatestAccountsIndexRootsStats {
roots_len: AtomicUsize,
uncleaned_roots_len: AtomicUsize,
previous_uncleaned_roots_len: AtomicUsize,
}
impl LatestAccountsIndexRootsStats {
fn update(&self, accounts_index_roots_stats: &AccountsIndexRootsStats) {
self.roots_len
.store(accounts_index_roots_stats.roots_len, Ordering::Relaxed);
self.uncleaned_roots_len.store(
accounts_index_roots_stats.uncleaned_roots_len,
Ordering::Relaxed,
);
self.previous_uncleaned_roots_len.store(
accounts_index_roots_stats.previous_uncleaned_roots_len,
Ordering::Relaxed,
);
}
fn report(&self) {
datapoint_info!(
"accounts_index_roots_len",
(
"roots_len",
self.roots_len.load(Ordering::Relaxed) as i64,
i64
),
(
"uncleaned_roots_len",
self.uncleaned_roots_len.load(Ordering::Relaxed) as i64,
i64
),
(
"previous_uncleaned_roots_len",
self.previous_uncleaned_roots_len.load(Ordering::Relaxed) as i64,
i64
),
);
// Don't need to reset since this tracks the latest updates, not a cumulative total
}
}
#[derive(Debug, Default)]
struct CleanAccountsStats {
purge_stats: PurgeStats,
latest_accounts_index_roots_stats: LatestAccountsIndexRootsStats,
}
impl CleanAccountsStats {
fn report(&self) {
self.purge_stats.report("clean_purge_slots_stats", None);
self.latest_accounts_index_roots_stats.report();
}
}
#[derive(Debug, Default)]
struct ShrinkStats {
last_report: AtomicU64,
num_slots_shrunk: AtomicUsize,
storage_read_elapsed: AtomicU64,
index_read_elapsed: AtomicU64,
find_alive_elapsed: AtomicU64,
create_and_insert_store_elapsed: AtomicU64,
store_accounts_elapsed: AtomicU64,
update_index_elapsed: AtomicU64,
handle_reclaims_elapsed: AtomicU64,
write_storage_elapsed: AtomicU64,
rewrite_elapsed: AtomicU64,
drop_storage_entries_elapsed: AtomicU64,
recycle_stores_write_elapsed: AtomicU64,
accounts_removed: AtomicUsize,
bytes_removed: AtomicU64,
}
impl ShrinkStats {
fn report(&self) {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
let should_report = now.saturating_sub(last) > 1000
&& self
.last_report
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last);
if should_report {
datapoint_info!(
"shrink_stats",
(
"num_slots_shrunk",
self.num_slots_shrunk.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"storage_read_elapsed",
self.storage_read_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"index_read_elapsed",
self.index_read_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"find_alive_elapsed",
self.find_alive_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"create_and_insert_store_elapsed",
self.create_and_insert_store_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"store_accounts_elapsed",
self.store_accounts_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"update_index_elapsed",
self.update_index_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"handle_reclaims_elapsed",
self.handle_reclaims_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"write_storage_elapsed",
self.write_storage_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"rewrite_elapsed",
self.rewrite_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"drop_storage_entries_elapsed",
self.drop_storage_entries_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"recycle_stores_write_time",
self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"accounts_removed",
self.accounts_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"bytes_removed",
self.bytes_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
fn make_min_priority_thread_pool() -> ThreadPool {
// Use lower thread count to reduce priority.
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
@ -732,6 +990,9 @@ impl Default for AccountsDB {
min_num_stores: num_threads,
bank_hashes: RwLock::new(bank_hashes),
frozen_accounts: HashMap::new(),
external_purge_slots_stats: PurgeStats::default(),
clean_accounts_stats: CleanAccountsStats::default(),
shrink_stats: ShrinkStats::default(),
stats: AccountsStats::default(),
cluster_type: None,
account_indexes: HashSet::new(),
@ -1231,6 +1492,8 @@ impl AccountsDB {
self.handle_reclaims(&reclaims, None, false, None);
reclaims_time.stop();
self.clean_accounts_stats.report();
datapoint_info!(
"clean_accounts",
(
@ -1319,7 +1582,7 @@ impl AccountsDB {
clean_dead_slots.stop();
let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
self.purge_removed_slots_from_store(&dead_slots);
self.purge_storage_slots(&dead_slots);
purge_removed_slots.stop();
// If the slot is dead, remove the need to shrink the storages as
@ -1343,8 +1606,10 @@ impl AccountsDB {
{
debug!("do_shrink_slot_stores: slot: {}", slot);
let mut stored_accounts = vec![];
let mut original_bytes = 0;
for store in stores {
let mut start = 0;
original_bytes += store.total_bytes();
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
@ -1483,9 +1748,9 @@ impl AccountsDB {
}
rewrite_elapsed.stop();
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time");
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop();
recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES {
@ -1500,49 +1765,51 @@ impl AccountsDB {
}
drop_storage_entries_elapsed.stop();
datapoint_info!(
"do_shrink_slot_stores_time",
("index_read_elapsed", index_read_elapsed.as_us(), i64),
("find_alive_elapsed", find_alive_elapsed, i64),
(
"create_and_insert_store_elapsed",
create_and_insert_store_elapsed,
i64
),
(
"store_accounts_elapsed",
store_accounts_timing.store_accounts_elapsed,
i64
),
(
"update_index_elapsed",
store_accounts_timing.update_index_elapsed,
i64
),
(
"handle_reclaims_elapsed",
store_accounts_timing.handle_reclaims_elapsed,
i64
),
("write_storage_elapsed", write_storage_elapsed, i64),
("rewrite_elapsed", rewrite_elapsed.as_us(), i64),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
(
"recycle_stores_write_time",
recycle_stores_write_time.as_us(),
i64
),
("total_starting_accounts", total_starting_accounts, i64),
(
"total_accounts_after_shrink",
total_accounts_after_shrink,
i64
)
self.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
self.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_stats.store_accounts_elapsed.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.accounts_removed.fetch_add(
total_starting_accounts - total_accounts_after_shrink,
Ordering::Relaxed,
);
self.shrink_stats.bytes_removed.fetch_add(
original_bytes.saturating_sub(aligned_total),
Ordering::Relaxed,
);
self.shrink_stats.report();
}
// Reads all accounts in given slot's AppendVecs and filter only to alive,
@ -1750,9 +2017,9 @@ impl AccountsDB {
}
rewrite_elapsed.stop();
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time");
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop();
recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES {
@ -1767,44 +2034,47 @@ impl AccountsDB {
}
drop_storage_entries_elapsed.stop();
datapoint_info!(
"do_shrink_slot_time",
("storage_read_elapsed", storage_read_elapsed.as_us(), i64),
("index_read_elapsed", index_read_elapsed.as_us(), i64),
("find_alive_elapsed", find_alive_elapsed, i64),
(
"create_and_insert_store_elapsed",
create_and_insert_store_elapsed,
i64
),
(
"store_accounts_elapsed",
store_accounts_timing.store_accounts_elapsed,
i64
),
(
"update_index_elapsed",
store_accounts_timing.update_index_elapsed,
i64
),
(
"handle_reclaims_elapsed",
store_accounts_timing.handle_reclaims_elapsed,
i64
),
("write_storage_elapsed", write_storage_elapsed, i64),
("rewrite_elapsed", rewrite_elapsed.as_us(), i64),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
(
"recycle_stores_write_time",
recycle_stores_write_time.as_us(),
i64
),
self.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
self.shrink_stats
.storage_read_elapsed
.fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_stats.store_accounts_elapsed.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts.len()
}
@ -2452,9 +2722,9 @@ impl AccountsDB {
) -> u64 {
let mut recycled_count = 0;
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time");
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop();
recycle_stores_write_elapsed.stop();
for slot_entries in slot_stores {
let entry = slot_entries.read().unwrap();
@ -2464,50 +2734,71 @@ impl AccountsDB {
self.stats
.dropped_stores
.fetch_add(dropped_count as u64, Ordering::Relaxed);
return recycle_stores_write_time.as_us();
return recycle_stores_write_elapsed.as_us();
}
recycle_stores.push(stores.clone());
recycled_count += 1;
}
}
recycle_stores_write_time.as_us()
recycle_stores_write_elapsed.as_us()
}
/// # Arguments
/// * `removed_slots` - Slots that were previously rooted but just removed
fn purge_removed_slots_from_store(&self, removed_slots: &HashSet<Slot>) {
// Check all slots `removed_slots` are no longer rooted
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
for slot in removed_slots.iter() {
assert!(!self.accounts_index.is_root(*slot))
}
safety_checks_elapsed.stop();
// Purge the storage entries of the removed slots
fn do_purge_slots_from_cache_and_store<'a>(
&'a self,
can_exist_in_cache: bool,
removed_slots: impl Iterator<Item = &'a Slot>,
purge_stats: &PurgeStats,
) {
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed");
let mut all_removed_slot_storages = vec![];
let mut num_cached_slots_removed = 0;
let mut total_removed_cached_bytes = 0;
let mut total_removed_storage_entries = 0;
let mut total_removed_bytes = 0;
for slot in removed_slots {
// The removed slot must alrady have been flushed from the cache
assert!(self.accounts_cache.slot_cache(*slot).is_none());
if let Some((_, slot_removed_storages)) = self.storage.0.remove(&slot) {
let mut total_removed_stored_bytes = 0;
for remove_slot in removed_slots {
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for
// the slot and from the Accounts Index
if !can_exist_in_cache {
panic!("The removed slot must alrady have been flushed from the cache");
}
num_cached_slots_removed += 1;
total_removed_cached_bytes += slot_cache.total_bytes();
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
// Note this only cleans up the storage entries. The accounts index cleaning
// (removing from the slot list, decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
{
let r_slot_removed_storages = slot_removed_storages.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_bytes += r_slot_removed_storages
total_removed_stored_bytes += r_slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages.clone());
}
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
// Remove any delta pubkey set if existing.
self.uncleaned_pubkeys.remove(remove_slot);
}
remove_storages_elapsed.stop();
let num_slots_removed = all_removed_slot_storages.len();
let num_stored_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_time =
let recycle_stores_write_elapsed =
self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages);
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
@ -2516,31 +2807,47 @@ impl AccountsDB {
drop(all_removed_slot_storages);
drop_storage_entries_elapsed.stop();
datapoint_info!(
"purge_slots_time",
("safety_checks_elapsed", safety_checks_elapsed.as_us(), i64),
(
"remove_storages_elapsed",
remove_storages_elapsed.as_us(),
i64
),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
("num_slots_removed", num_slots_removed, i64),
(
"total_removed_storage_entries",
total_removed_storage_entries,
i64
),
("total_removed_bytes", total_removed_bytes, i64),
(
"recycle_stores_write_elapsed",
recycle_stores_write_time,
i64
),
purge_stats
.remove_storages_elapsed
.fetch_add(remove_storages_elapsed.as_us(), Ordering::Relaxed);
purge_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
purge_stats
.num_cached_slots_removed
.fetch_add(num_cached_slots_removed, Ordering::Relaxed);
purge_stats
.total_removed_cached_bytes
.fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
purge_stats
.num_stored_slots_removed
.fetch_add(num_stored_slots_removed, Ordering::Relaxed);
purge_stats
.total_removed_storage_entries
.fetch_add(total_removed_storage_entries, Ordering::Relaxed);
purge_stats
.total_removed_stored_bytes
.fetch_add(total_removed_stored_bytes, Ordering::Relaxed);
purge_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed, Ordering::Relaxed);
}
fn purge_storage_slots(&self, removed_slots: &HashSet<Slot>) {
// Check all slots `removed_slots` are no longer rooted
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
for slot in removed_slots.iter() {
assert!(!self.accounts_index.is_root(*slot))
}
safety_checks_elapsed.stop();
self.clean_accounts_stats
.purge_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.do_purge_slots_from_cache_and_store(
false,
removed_slots.iter(),
&self.clean_accounts_stats.purge_stats,
);
}
@ -2578,90 +2885,23 @@ impl AccountsDB {
}
fn purge_slots(&self, slots: &HashSet<Slot>) {
//add_root should be called first
let non_roots: Vec<_> = slots
// `add_root()` should be called first
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
let non_roots: Vec<&Slot> = slots
.iter()
.filter(|slot| !self.accounts_index.is_root(**slot))
.collect();
let mut all_removed_slot_storages = vec![];
let mut total_removed_storage_entries = 0;
let mut total_removed_bytes = 0;
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed");
for remove_slot in non_roots {
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for
// the slot. The accounts index cleaning (removing from the slot list,
// decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
// function call), then we don't need to worry above an overlapping cache flush
// with this function call. This means, if we get into this case, we can be
// confident that the entire state for this slot has been flushed to the storage
// already.
// Note this only cleans up the storage entries. The accounts index cleaning
// (removing from the slot list, decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
{
let r_slot_removed_storages = slot_removed_storages.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_bytes += r_slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages.clone());
}
// It should not be possible that a slot is neither in the cache or storage. Even in
// a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars
// on bank creation.
// Remove any delta pubkey set if existing.
self.uncleaned_pubkeys.remove(remove_slot);
}
remove_storages_elapsed.stop();
let num_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_time =
self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages);
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
// Backing mmaps for removed storages entries explicitly dropped here outside
// of any locks
drop(all_removed_slot_storages);
drop_storage_entries_elapsed.stop();
datapoint_info!(
"purge_slots_time",
(
"remove_storages_elapsed",
remove_storages_elapsed.as_us(),
i64
),
(
"drop_storage_entries_elapsed",
drop_storage_entries_elapsed.as_us(),
i64
),
("num_slots_removed", num_slots_removed, i64),
(
"total_removed_storage_entries",
total_removed_storage_entries,
i64
),
("total_removed_bytes", total_removed_bytes, i64),
(
"recycle_stores_write_elapsed",
recycle_stores_write_time,
i64
),
safety_checks_elapsed.stop();
self.external_purge_slots_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.do_purge_slots_from_cache_and_store(
true,
non_roots.into_iter(),
&self.external_purge_slots_stats,
);
self.external_purge_slots_stats
.report("external_purge_slots_stats", Some(1000));
}
// TODO: This is currently:
@ -3867,9 +4107,17 @@ impl AccountsDB {
self.accounts_index.unref_from_storage(&pubkey);
}
let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
for slot in dead_slots_iter.clone() {
self.accounts_index.clean_dead_slot(*slot);
if let Some(latest) = self.accounts_index.clean_dead_slot(*slot) {
accounts_index_root_stats = latest;
}
}
self.clean_accounts_stats
.latest_accounts_index_roots_stats
.update(&accounts_index_root_stats);
{
let mut bank_hashes = self.bank_hashes.write().unwrap();
for slot in dead_slots_iter {
@ -4033,13 +4281,13 @@ impl AccountsDB {
let last = self.stats.last_store_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& self
.stats
.last_store_report
.compare_and_swap(last, now, Ordering::Relaxed)
== last
&& self.stats.last_store_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
{
datapoint_info!(
"accounts_db_store_timings",

View File

@ -164,6 +164,13 @@ pub struct RootsTracker {
previous_uncleaned_roots: HashSet<Slot>,
}
#[derive(Debug, Default)]
pub struct AccountsIndexRootsStats {
pub roots_len: usize,
pub uncleaned_roots_len: usize,
pub previous_uncleaned_roots_len: usize,
}
pub struct AccountsIndexIterator<'a, T> {
account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
start_bound: Bound<Pubkey>,
@ -964,7 +971,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
/// Remove the slot when the storage for the slot is freed
/// Accounts no longer reference this slot.
pub fn clean_dead_slot(&self, slot: Slot) {
pub fn clean_dead_slot(&self, slot: Slot) -> Option<AccountsIndexRootsStats> {
if self.is_root(slot) {
let (roots_len, uncleaned_roots_len, previous_uncleaned_roots_len) = {
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
@ -977,16 +984,13 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
w_roots_tracker.previous_uncleaned_roots.len(),
)
};
datapoint_info!(
"accounts_index_roots_len",
("roots_len", roots_len as i64, i64),
("uncleaned_roots_len", uncleaned_roots_len as i64, i64),
(
"previous_uncleaned_roots_len",
previous_uncleaned_roots_len as i64,
i64
),
);
Some(AccountsIndexRootsStats {
roots_len,
uncleaned_roots_len,
previous_uncleaned_roots_len,
})
} else {
None
}
}