log write cache contents from purge_slots_from_cache_and_store (#24948)

This commit is contained in:
Jeff Washington (jwash) 2022-05-04 14:28:47 -05:00 committed by GitHub
parent 0ca54db524
commit 40986daddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 11 deletions

View File

@ -4104,12 +4104,21 @@ impl AccountsDb {
/// entries in the accounts index, cache entries, and any backing storage entries. /// entries in the accounts index, cache entries, and any backing storage entries.
fn purge_slots_from_cache_and_store<'a>( fn purge_slots_from_cache_and_store<'a>(
&self, &self,
removed_slots: impl Iterator<Item = &'a Slot>, removed_slots: impl Iterator<Item = &'a Slot> + Clone,
purge_stats: &PurgeStats, purge_stats: &PurgeStats,
log_accounts: bool,
) { ) {
let mut remove_cache_elapsed_across_slots = 0; let mut remove_cache_elapsed_across_slots = 0;
let mut num_cached_slots_removed = 0; let mut num_cached_slots_removed = 0;
let mut total_removed_cached_bytes = 0; let mut total_removed_cached_bytes = 0;
if log_accounts {
if let Some(min) = removed_slots.clone().min() {
info!(
"purge_slots_from_cache_and_store: {:?}",
self.get_pubkey_hash_for_slot(*min).0
);
}
}
for remove_slot in removed_slots { for remove_slot in removed_slots {
// This function is only currently safe with respect to `flush_slot_cache()` because // This function is only currently safe with respect to `flush_slot_cache()` because
// both functions run serially in AccountsBackgroundService. // both functions run serially in AccountsBackgroundService.
@ -4314,7 +4323,7 @@ impl AccountsDb {
} }
#[allow(clippy::needless_collect)] #[allow(clippy::needless_collect)]
fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot>) { fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot> + Clone) {
// `add_root()` should be called first // `add_root()` should be called first
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
let non_roots = slots let non_roots = slots
@ -4332,7 +4341,7 @@ impl AccountsDb {
self.external_purge_slots_stats self.external_purge_slots_stats
.safety_checks_elapsed .safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats); self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats, false);
self.external_purge_slots_stats self.external_purge_slots_stats
.report("external_purge_slots_stats", Some(1000)); .report("external_purge_slots_stats", Some(1000));
} }
@ -4415,6 +4424,7 @@ impl AccountsDb {
self.purge_slots_from_cache_and_store( self.purge_slots_from_cache_and_store(
remove_slots.iter().map(|(slot, _)| slot), remove_slots.iter().map(|(slot, _)| slot),
&remove_unrooted_purge_stats, &remove_unrooted_purge_stats,
true,
); );
remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0)); remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0));
@ -6061,11 +6071,12 @@ impl AccountsDb {
pub fn get_accounts_delta_hash(&self, slot: Slot) -> Hash { pub fn get_accounts_delta_hash(&self, slot: Slot) -> Hash {
self.get_accounts_delta_hash_with_rewrites(slot, &Rewrites::default()) self.get_accounts_delta_hash_with_rewrites(slot, &Rewrites::default())
} }
pub fn get_accounts_delta_hash_with_rewrites(
&self, /// helper to return
slot: Slot, /// 1. pubkey, hash pairs for the slot
skipped_rewrites: &Rewrites, /// 2. us spent scanning
) -> Hash { /// 3. Measure started when we began accumulating
fn get_pubkey_hash_for_slot(&self, slot: Slot) -> (Vec<(Pubkey, Hash)>, u64, Measure) {
let mut scan = Measure::start("scan"); let mut scan = Measure::start("scan");
let scan_result: ScanStorageResult<(Pubkey, Hash), DashMapVersionHash> = self let scan_result: ScanStorageResult<(Pubkey, Hash), DashMapVersionHash> = self
@ -6094,14 +6105,23 @@ impl AccountsDb {
); );
scan.stop(); scan.stop();
let mut accumulate = Measure::start("accumulate"); let accumulate = Measure::start("accumulate");
let mut hashes: Vec<_> = match scan_result { let hashes: Vec<_> = match scan_result {
ScanStorageResult::Cached(cached_result) => cached_result, ScanStorageResult::Cached(cached_result) => cached_result,
ScanStorageResult::Stored(stored_result) => stored_result ScanStorageResult::Stored(stored_result) => stored_result
.into_iter() .into_iter()
.map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash)) .map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash))
.collect(), .collect(),
}; };
(hashes, scan.as_us(), accumulate)
}
pub fn get_accounts_delta_hash_with_rewrites(
&self,
slot: Slot,
skipped_rewrites: &Rewrites,
) -> Hash {
let (mut hashes, scan_us, mut accumulate) = self.get_pubkey_hash_for_slot(slot);
let dirty_keys = hashes.iter().map(|(pubkey, _hash)| *pubkey).collect(); let dirty_keys = hashes.iter().map(|(pubkey, _hash)| *pubkey).collect();
if self.filler_accounts_enabled() { if self.filler_accounts_enabled() {
@ -6122,7 +6142,7 @@ impl AccountsDb {
self.stats self.stats
.delta_hash_scan_time_total_us .delta_hash_scan_time_total_us
.fetch_add(scan.as_us(), Ordering::Relaxed); .fetch_add(scan_us, Ordering::Relaxed);
self.stats self.stats
.delta_hash_accumulate_time_total_us .delta_hash_accumulate_time_total_us
.fetch_add(accumulate.as_us(), Ordering::Relaxed); .fetch_add(accumulate.as_us(), Ordering::Relaxed);