From ef30083319ad18a284070174336eb06bd356ccc5 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 16 Jan 2023 14:01:25 -0600 Subject: [PATCH] iterate contents of AccountStorage without exposing internals (#29719) * iterate contents of AccountStorage without exposing internals * Update runtime/src/accounts_db.rs Co-authored-by: Brooks * Update runtime/src/accounts_db.rs Co-authored-by: Brooks * Update runtime/src/accounts_db.rs Co-authored-by: Brooks * compile errors Co-authored-by: Brooks --- runtime/src/account_storage.rs | 37 +++++++++-- runtime/src/accounts_db.rs | 111 ++++++++++++--------------------- 2 files changed, 73 insertions(+), 75 deletions(-) diff --git a/runtime/src/account_storage.rs b/runtime/src/account_storage.rs index 8d48c7615..a9b2bd0ef 100644 --- a/runtime/src/account_storage.rs +++ b/runtime/src/account_storage.rs @@ -1,7 +1,7 @@ //! Manage the map of slot -> append vecs use { - crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores}, + crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores, SnapshotStorageOne}, dashmap::DashMap, solana_sdk::clock::Slot, std::{ @@ -115,10 +115,10 @@ impl AccountStorage { }) } - /// iterate through all (slot, append-vecs) - pub(crate) fn iter(&self) -> dashmap::iter::Iter { + /// iterate through all (slot, append-vec) + pub(crate) fn iter(&self) -> AccountStorageIter<'_> { assert!(self.shrink_in_progress_map.is_empty()); - self.map.iter() + AccountStorageIter::new(self) } pub(crate) fn insert(&self, slot: Slot, store: Arc) { @@ -211,6 +211,35 @@ impl AccountStorage { } } +/// iterate contents of AccountStorage without exposing internals +pub struct AccountStorageIter<'a> { + iter: dashmap::iter::Iter<'a, Slot, SlotStores>, +} + +impl<'a> AccountStorageIter<'a> { + pub fn new(storage: &'a AccountStorage) -> Self { + Self { + iter: storage.map.iter(), + } + } +} + +impl<'a> Iterator for AccountStorageIter<'a> { + type Item = (Slot, SnapshotStorageOne); + + fn next(&mut self) -> Option { + for entry in self.iter.by_ref() { + // if no stores for a slot, then don't return the item at all, loops to try next slot + let slot = entry.key(); + let stores = entry.value(); + if let Some((_, store)) = stores.read().unwrap().iter().next() { + return Some((*slot, Arc::clone(store))); + } + } + None + } +} + /// exists while there is a shrink in progress /// keeps track of the 'new_store' being created and the 'old_store' being replaced. pub(crate) struct ShrinkInProgress<'a> { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 38851293b..071ed4d6b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -6526,42 +6526,22 @@ impl AccountsDb { fn report_store_stats(&self) { let mut total_count = 0; - let mut min = std::usize::MAX; - let mut min_slot = 0; - let mut max = 0; - let mut max_slot = 0; let mut newest_slot = 0; let mut oldest_slot = std::u64::MAX; let mut total_bytes = 0; let mut total_alive_bytes = 0; - for iter_item in self.storage.iter() { - let slot = iter_item.key(); - let slot_stores = iter_item.value().read().unwrap(); - total_count += slot_stores.len(); - if slot_stores.len() < min { - min = slot_stores.len(); - min_slot = *slot; - } + for (slot, store) in self.storage.iter() { + total_count += 1; + newest_slot = std::cmp::max(newest_slot, slot); - if slot_stores.len() > max { - max = slot_stores.len(); - max_slot = *slot; - } - if *slot > newest_slot { - newest_slot = *slot; - } + oldest_slot = std::cmp::min(oldest_slot, slot); - if *slot < oldest_slot { - oldest_slot = *slot; - } - - for store in slot_stores.values() { - total_alive_bytes += Self::page_align(store.alive_bytes() as u64); - total_bytes += store.total_bytes(); - } + total_alive_bytes += Self::page_align(store.alive_bytes() as u64); + total_bytes += store.total_bytes(); } - info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})", - total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min); + info!( + "total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}" + ); let total_alive_ratio = if total_bytes > 0 { total_alive_bytes as f64 / total_bytes as f64 @@ -8266,14 +8246,13 @@ impl AccountsDb { ancestors: Option<&Ancestors>, ) -> (SnapshotStoragesOne, Vec) { let mut m = Measure::start("get slots"); - let slots_and_storages = self + let mut slots_and_storages = self .storage .iter() - .filter_map(|entry| { - let slot = *entry.key() as Slot; + .filter_map(|(slot, store)| { requested_slots .contains(&slot) - .then_some((slot, Arc::clone(entry.value()))) + .then_some((slot, Some(store))) }) .collect::>(); m.stop(); @@ -8282,24 +8261,19 @@ impl AccountsDb { let chunk_size = 5_000; let wide = self.thread_pool_clean.install(|| { slots_and_storages - .par_chunks(chunk_size) + .par_chunks_mut(chunk_size) .map(|slots_and_storages| { slots_and_storages - .iter() + .iter_mut() .filter(|(slot, _)| { self.accounts_index.is_alive_root(*slot) || ancestors .map(|ancestors| ancestors.contains_key(slot)) .unwrap_or_default() }) - .filter_map(|(slot, storages)| { - storages - .read() - .unwrap() - .values() - .next() - .filter(|x| x.has_accounts()) - .map(|storage| (Arc::clone(storage), *slot)) + .filter_map(|(slot, store)| { + let store = std::mem::take(store).unwrap(); + store.has_accounts().then_some((store, *slot)) }) .collect::>() }) @@ -8925,23 +8899,22 @@ impl AccountsDb { ) { // store count and size for each storage let mut storage_size_storages_time = Measure::start("storage_size_storages"); - for slot_stores in self.storage.iter() { - for (id, store) in slot_stores.value().read().unwrap().iter() { - // Should be default at this point - assert_eq!(store.alive_bytes(), 0); - if let Some(entry) = stored_sizes_and_counts.get(id) { - trace!( - "id: {} setting count: {} cur: {}", - id, - entry.count, - store.count(), - ); - store.count_and_status.write().unwrap().0 = entry.count; - store.alive_bytes.store(entry.stored_size, Ordering::SeqCst); - } else { - trace!("id: {} clearing count", id); - store.count_and_status.write().unwrap().0 = 0; - } + for (_slot, store) in self.storage.iter() { + let id = store.append_vec_id(); + // Should be default at this point + assert_eq!(store.alive_bytes(), 0); + if let Some(entry) = stored_sizes_and_counts.get(&id) { + trace!( + "id: {} setting count: {} cur: {}", + id, + entry.count, + store.count(), + ); + store.count_and_status.write().unwrap().0 = entry.count; + store.alive_bytes.store(entry.stored_size, Ordering::SeqCst); + } else { + trace!("id: {} clearing count", id); + store.count_and_status.write().unwrap().0 = 0; } } storage_size_storages_time.stop(); @@ -10761,7 +10734,7 @@ pub mod tests { let mut append_vec_histogram = HashMap::new(); let mut all_storages = vec![]; for slot_storage in accounts.storage.iter() { - all_storages.extend(slot_storage.read().unwrap().values().cloned()) + all_storages.push(slot_storage.1) } for storage in all_storages { *append_vec_histogram.entry(storage.slot()).or_insert(0) += 1; @@ -15433,10 +15406,8 @@ pub mod tests { accounts.add_root_and_flush_write_cache(slot0); // fake out the store count to avoid the assert - for slot_stores in accounts.storage.iter() { - for (_id, store) in slot_stores.value().read().unwrap().iter() { - store.alive_bytes.store(0, Ordering::Release); - } + for (_, store) in accounts.storage.iter() { + store.alive_bytes.store(0, Ordering::Release); } // populate based on made up hash data @@ -15450,12 +15421,10 @@ pub mod tests { ); accounts.set_storage_count_and_alive_bytes(dashmap, &mut GenerateIndexTimings::default()); assert_eq!(accounts.storage.len(), 1); - for slot_stores in accounts.storage.iter() { - for (id, store) in slot_stores.value().read().unwrap().iter() { - assert_eq!(id, &0); - assert_eq!(store.count_and_status.read().unwrap().0, 3); - assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2); - } + for (_, store) in accounts.storage.iter() { + assert_eq!(store.append_vec_id(), 0); + assert_eq!(store.count_and_status.read().unwrap().0, 3); + assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2); } }