iterate contents of AccountStorage without exposing internals (#29719)
* iterate contents of AccountStorage without exposing internals * Update runtime/src/accounts_db.rs Co-authored-by: Brooks <brooks@prumo.org> * Update runtime/src/accounts_db.rs Co-authored-by: Brooks <brooks@prumo.org> * Update runtime/src/accounts_db.rs Co-authored-by: Brooks <brooks@prumo.org> * compile errors Co-authored-by: Brooks <brooks@prumo.org>
This commit is contained in:
parent
da39c4837f
commit
ef30083319
|
@ -1,7 +1,7 @@
|
|||
//! Manage the map of slot -> append vecs
|
||||
|
||||
use {
|
||||
crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores},
|
||||
crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores, SnapshotStorageOne},
|
||||
dashmap::DashMap,
|
||||
solana_sdk::clock::Slot,
|
||||
std::{
|
||||
|
@ -115,10 +115,10 @@ impl AccountStorage {
|
|||
})
|
||||
}
|
||||
|
||||
/// iterate through all (slot, append-vecs)
|
||||
pub(crate) fn iter(&self) -> dashmap::iter::Iter<Slot, SlotStores> {
|
||||
/// iterate through all (slot, append-vec)
|
||||
pub(crate) fn iter(&self) -> AccountStorageIter<'_> {
|
||||
assert!(self.shrink_in_progress_map.is_empty());
|
||||
self.map.iter()
|
||||
AccountStorageIter::new(self)
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
|
||||
|
@ -211,6 +211,35 @@ impl AccountStorage {
|
|||
}
|
||||
}
|
||||
|
||||
/// iterate contents of AccountStorage without exposing internals
|
||||
pub struct AccountStorageIter<'a> {
|
||||
iter: dashmap::iter::Iter<'a, Slot, SlotStores>,
|
||||
}
|
||||
|
||||
impl<'a> AccountStorageIter<'a> {
|
||||
pub fn new(storage: &'a AccountStorage) -> Self {
|
||||
Self {
|
||||
iter: storage.map.iter(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for AccountStorageIter<'a> {
|
||||
type Item = (Slot, SnapshotStorageOne);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
for entry in self.iter.by_ref() {
|
||||
// if no stores for a slot, then don't return the item at all, loops to try next slot
|
||||
let slot = entry.key();
|
||||
let stores = entry.value();
|
||||
if let Some((_, store)) = stores.read().unwrap().iter().next() {
|
||||
return Some((*slot, Arc::clone(store)));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// exists while there is a shrink in progress
|
||||
/// keeps track of the 'new_store' being created and the 'old_store' being replaced.
|
||||
pub(crate) struct ShrinkInProgress<'a> {
|
||||
|
|
|
@ -6526,42 +6526,22 @@ impl AccountsDb {
|
|||
|
||||
fn report_store_stats(&self) {
|
||||
let mut total_count = 0;
|
||||
let mut min = std::usize::MAX;
|
||||
let mut min_slot = 0;
|
||||
let mut max = 0;
|
||||
let mut max_slot = 0;
|
||||
let mut newest_slot = 0;
|
||||
let mut oldest_slot = std::u64::MAX;
|
||||
let mut total_bytes = 0;
|
||||
let mut total_alive_bytes = 0;
|
||||
for iter_item in self.storage.iter() {
|
||||
let slot = iter_item.key();
|
||||
let slot_stores = iter_item.value().read().unwrap();
|
||||
total_count += slot_stores.len();
|
||||
if slot_stores.len() < min {
|
||||
min = slot_stores.len();
|
||||
min_slot = *slot;
|
||||
}
|
||||
for (slot, store) in self.storage.iter() {
|
||||
total_count += 1;
|
||||
newest_slot = std::cmp::max(newest_slot, slot);
|
||||
|
||||
if slot_stores.len() > max {
|
||||
max = slot_stores.len();
|
||||
max_slot = *slot;
|
||||
}
|
||||
if *slot > newest_slot {
|
||||
newest_slot = *slot;
|
||||
}
|
||||
oldest_slot = std::cmp::min(oldest_slot, slot);
|
||||
|
||||
if *slot < oldest_slot {
|
||||
oldest_slot = *slot;
|
||||
}
|
||||
|
||||
for store in slot_stores.values() {
|
||||
total_alive_bytes += Self::page_align(store.alive_bytes() as u64);
|
||||
total_bytes += store.total_bytes();
|
||||
}
|
||||
total_alive_bytes += Self::page_align(store.alive_bytes() as u64);
|
||||
total_bytes += store.total_bytes();
|
||||
}
|
||||
info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})",
|
||||
total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min);
|
||||
info!(
|
||||
"total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}"
|
||||
);
|
||||
|
||||
let total_alive_ratio = if total_bytes > 0 {
|
||||
total_alive_bytes as f64 / total_bytes as f64
|
||||
|
@ -8266,14 +8246,13 @@ impl AccountsDb {
|
|||
ancestors: Option<&Ancestors>,
|
||||
) -> (SnapshotStoragesOne, Vec<Slot>) {
|
||||
let mut m = Measure::start("get slots");
|
||||
let slots_and_storages = self
|
||||
let mut slots_and_storages = self
|
||||
.storage
|
||||
.iter()
|
||||
.filter_map(|entry| {
|
||||
let slot = *entry.key() as Slot;
|
||||
.filter_map(|(slot, store)| {
|
||||
requested_slots
|
||||
.contains(&slot)
|
||||
.then_some((slot, Arc::clone(entry.value())))
|
||||
.then_some((slot, Some(store)))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
m.stop();
|
||||
|
@ -8282,24 +8261,19 @@ impl AccountsDb {
|
|||
let chunk_size = 5_000;
|
||||
let wide = self.thread_pool_clean.install(|| {
|
||||
slots_and_storages
|
||||
.par_chunks(chunk_size)
|
||||
.par_chunks_mut(chunk_size)
|
||||
.map(|slots_and_storages| {
|
||||
slots_and_storages
|
||||
.iter()
|
||||
.iter_mut()
|
||||
.filter(|(slot, _)| {
|
||||
self.accounts_index.is_alive_root(*slot)
|
||||
|| ancestors
|
||||
.map(|ancestors| ancestors.contains_key(slot))
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.filter_map(|(slot, storages)| {
|
||||
storages
|
||||
.read()
|
||||
.unwrap()
|
||||
.values()
|
||||
.next()
|
||||
.filter(|x| x.has_accounts())
|
||||
.map(|storage| (Arc::clone(storage), *slot))
|
||||
.filter_map(|(slot, store)| {
|
||||
let store = std::mem::take(store).unwrap();
|
||||
store.has_accounts().then_some((store, *slot))
|
||||
})
|
||||
.collect::<Vec<(SnapshotStorageOne, Slot)>>()
|
||||
})
|
||||
|
@ -8925,23 +8899,22 @@ impl AccountsDb {
|
|||
) {
|
||||
// store count and size for each storage
|
||||
let mut storage_size_storages_time = Measure::start("storage_size_storages");
|
||||
for slot_stores in self.storage.iter() {
|
||||
for (id, store) in slot_stores.value().read().unwrap().iter() {
|
||||
// Should be default at this point
|
||||
assert_eq!(store.alive_bytes(), 0);
|
||||
if let Some(entry) = stored_sizes_and_counts.get(id) {
|
||||
trace!(
|
||||
"id: {} setting count: {} cur: {}",
|
||||
id,
|
||||
entry.count,
|
||||
store.count(),
|
||||
);
|
||||
store.count_and_status.write().unwrap().0 = entry.count;
|
||||
store.alive_bytes.store(entry.stored_size, Ordering::SeqCst);
|
||||
} else {
|
||||
trace!("id: {} clearing count", id);
|
||||
store.count_and_status.write().unwrap().0 = 0;
|
||||
}
|
||||
for (_slot, store) in self.storage.iter() {
|
||||
let id = store.append_vec_id();
|
||||
// Should be default at this point
|
||||
assert_eq!(store.alive_bytes(), 0);
|
||||
if let Some(entry) = stored_sizes_and_counts.get(&id) {
|
||||
trace!(
|
||||
"id: {} setting count: {} cur: {}",
|
||||
id,
|
||||
entry.count,
|
||||
store.count(),
|
||||
);
|
||||
store.count_and_status.write().unwrap().0 = entry.count;
|
||||
store.alive_bytes.store(entry.stored_size, Ordering::SeqCst);
|
||||
} else {
|
||||
trace!("id: {} clearing count", id);
|
||||
store.count_and_status.write().unwrap().0 = 0;
|
||||
}
|
||||
}
|
||||
storage_size_storages_time.stop();
|
||||
|
@ -10761,7 +10734,7 @@ pub mod tests {
|
|||
let mut append_vec_histogram = HashMap::new();
|
||||
let mut all_storages = vec![];
|
||||
for slot_storage in accounts.storage.iter() {
|
||||
all_storages.extend(slot_storage.read().unwrap().values().cloned())
|
||||
all_storages.push(slot_storage.1)
|
||||
}
|
||||
for storage in all_storages {
|
||||
*append_vec_histogram.entry(storage.slot()).or_insert(0) += 1;
|
||||
|
@ -15433,10 +15406,8 @@ pub mod tests {
|
|||
accounts.add_root_and_flush_write_cache(slot0);
|
||||
|
||||
// fake out the store count to avoid the assert
|
||||
for slot_stores in accounts.storage.iter() {
|
||||
for (_id, store) in slot_stores.value().read().unwrap().iter() {
|
||||
store.alive_bytes.store(0, Ordering::Release);
|
||||
}
|
||||
for (_, store) in accounts.storage.iter() {
|
||||
store.alive_bytes.store(0, Ordering::Release);
|
||||
}
|
||||
|
||||
// populate based on made up hash data
|
||||
|
@ -15450,12 +15421,10 @@ pub mod tests {
|
|||
);
|
||||
accounts.set_storage_count_and_alive_bytes(dashmap, &mut GenerateIndexTimings::default());
|
||||
assert_eq!(accounts.storage.len(), 1);
|
||||
for slot_stores in accounts.storage.iter() {
|
||||
for (id, store) in slot_stores.value().read().unwrap().iter() {
|
||||
assert_eq!(id, &0);
|
||||
assert_eq!(store.count_and_status.read().unwrap().0, 3);
|
||||
assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2);
|
||||
}
|
||||
for (_, store) in accounts.storage.iter() {
|
||||
assert_eq!(store.append_vec_id(), 0);
|
||||
assert_eq!(store.count_and_status.read().unwrap().0, 3);
|
||||
assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue