From 461dafb88777ead77cc3d192769d63fd8990f6c7 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 16 Jan 2023 18:22:30 -0600 Subject: [PATCH] AccountStorage only holds a single append vec (#29723) * AccountStorageReference * tests build * pr feedback --- runtime/src/account_storage.rs | 191 +++++++++--------- runtime/src/accounts_db.rs | 19 +- runtime/src/serde_snapshot.rs | 4 - runtime/src/serde_snapshot/tests.rs | 18 +- .../snapshot_storage_rebuilder.rs | 11 +- 5 files changed, 112 insertions(+), 131 deletions(-) diff --git a/runtime/src/account_storage.rs b/runtime/src/account_storage.rs index a9b2bd0ef..226626118 100644 --- a/runtime/src/account_storage.rs +++ b/runtime/src/account_storage.rs @@ -1,19 +1,26 @@ -//! Manage the map of slot -> append vecs +//! Manage the map of slot -> append vec use { - crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores, SnapshotStorageOne}, + crate::accounts_db::{AccountStorageEntry, AppendVecId, SnapshotStorageOne}, dashmap::DashMap, solana_sdk::clock::Slot, - std::{ - collections::HashMap, - sync::{Arc, RwLock}, - }, + std::sync::Arc, }; -pub type AccountStorageMap = DashMap; +#[derive(Clone, Debug)] +pub struct AccountStorageReference { + /// the single storage for a given slot + pub(crate) storage: SnapshotStorageOne, + /// id can be read from 'storage', but it is an atomic read. + /// id will never change while a storage is held, so we store it separately here for faster runtime lookup in 'get_account_storage_entry' + pub(crate) id: AppendVecId, +} -#[derive(Clone, Default, Debug)] +pub type AccountStorageMap = DashMap; + +#[derive(Default, Debug)] pub struct AccountStorage { + /// map from Slot -> the single append vec for the slot map: AccountStorageMap, /// while shrink is operating on a slot, there can be 2 append vecs active for that slot /// Once the index has been updated to only refer to the new append vec, the single entry for the slot in 'map' can be updated. @@ -40,7 +47,7 @@ impl AccountStorage { store_id: AppendVecId, ) -> Option> { self.get_slot_stores_shrinking_in_progress_ok(slot) - .and_then(|storage_map| storage_map.read().unwrap().get(&store_id).cloned()) + .and_then(|AccountStorageReference { id, storage }| (id == store_id).then_some(storage)) .or_else(|| { self.shrink_in_progress_map.get(&slot).and_then(|entry| { (entry.value().append_vec_id() == store_id).then(|| Arc::clone(entry.value())) @@ -49,7 +56,7 @@ impl AccountStorage { } /// public api, should only be called when shrinking is not in progress - pub fn get_slot_stores(&self, slot: Slot) -> Option { + pub fn get_slot_stores(&self, slot: Slot) -> Option { assert!(self.shrink_in_progress_map.is_empty()); self.get_slot_stores_shrinking_in_progress_ok(slot) } @@ -58,7 +65,7 @@ impl AccountStorage { pub(crate) fn get_slot_stores_shrinking_in_progress_ok( &self, slot: Slot, - ) -> Option { + ) -> Option { self.map.get(&slot).map(|result| result.value().clone()) } @@ -75,11 +82,7 @@ impl AccountStorage { slot: Slot, ) -> Option> { self.get_slot_stores_shrinking_in_progress_ok(slot) - .and_then(|res| { - let read = res.read().unwrap(); - assert!(read.len() <= 1); - read.values().next().cloned() - }) + .map(|entry| Arc::clone(&entry.storage)) } pub(crate) fn all_slots(&self) -> Vec { @@ -87,13 +90,11 @@ impl AccountStorage { self.map.iter().map(|iter_item| *iter_item.key()).collect() } - /// returns true if there is an entry in the map for 'slot', but it contains no append vec + /// returns true if there is no entry for 'slot' #[cfg(test)] pub(crate) fn is_empty_entry(&self, slot: Slot) -> bool { assert!(self.shrink_in_progress_map.is_empty()); - self.get_slot_stores(slot) - .map(|storages| storages.read().unwrap().is_empty()) - .unwrap_or(false) + self.get_slot_stores(slot).is_none() } /// initialize the storage map to 'all_storages' @@ -103,16 +104,11 @@ impl AccountStorage { self.map.extend(all_storages.into_iter()) } - /// remove all append vecs at 'slot' + /// remove the append vec at 'slot' /// returns the current contents - pub(crate) fn remove(&self, slot: &Slot) -> Option> { + pub(crate) fn remove(&self, slot: &Slot) -> Option { assert!(self.shrink_in_progress_map.is_empty()); - self.map.remove(slot).and_then(|(_slot, slot_stores)| { - let mut write = slot_stores.write().unwrap(); - assert!(write.len() <= 1, "{}", write.len()); - let mut drain = write.drain(); - drain.next().map(|(_, store)| store) - }) + self.map.remove(slot).map(|(_, entry)| entry.storage) } /// iterate through all (slot, append-vec) @@ -123,19 +119,16 @@ impl AccountStorage { pub(crate) fn insert(&self, slot: Slot, store: Arc) { assert!(self.shrink_in_progress_map.is_empty()); - let slot_storages: SlotStores = self.get_slot_stores(slot).unwrap_or_else(|| - // DashMap entry.or_insert() returns a RefMut, essentially a write lock, - // which is dropped after this block ends, minimizing time held by the lock. - // However, we still want to persist the reference to the `SlotStores` behind - // the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone). - self - .map - .entry(slot) - .or_insert(Arc::new(RwLock::new(HashMap::new()))) - .clone()); - - let mut write = slot_storages.write().unwrap(); - assert!(write.insert(store.append_vec_id(), store).is_none()); + assert!(self + .map + .insert( + slot, + AccountStorageReference { + id: store.append_vec_id(), + storage: store, + } + ) + .is_none()); } /// called when shrinking begins on a slot and append vec. @@ -159,17 +152,8 @@ impl AccountStorage { let slot_storages = self .get_slot_stores_shrinking_in_progress_ok(slot) .expect("no pre-existing storages for shrinking slot"); - let shrinking_store = Arc::clone( - slot_storages - .read() - .unwrap() - .iter() - .next() - .expect("no pre-existing storages for shrinking slot") - .1, - ); + let shrinking_store = Arc::clone(&slot_storages.storage); - let previous_id = shrinking_store.append_vec_id(); let new_id = new_store.append_vec_id(); // 1. insert 'shrinking_store' into 'shrink_in_progress_map' assert!( @@ -179,15 +163,16 @@ impl AccountStorage { "duplicate call" ); - { - // write lock held for this atomic operation - let mut storages = slot_storages.write().unwrap(); - // 2. remove 'shrinking_store' from 'map' - assert!(storages.remove(&previous_id).is_some()); - // 3. insert 'new_store' into 'map' (atomic with #2) - // should be empty prior to this call - assert!(storages.insert(new_id, Arc::clone(&new_store)).is_none()); - } + assert!(self + .map + .insert( + slot, + AccountStorageReference { + storage: Arc::clone(&new_store), + id: new_id + } + ) + .is_some()); ShrinkInProgress { storage: self, @@ -197,14 +182,6 @@ impl AccountStorage { } } - #[cfg(test)] - pub(crate) fn insert_empty_at_slot(&self, slot: Slot) { - assert!(self.shrink_in_progress_map.is_empty()); - self.map - .entry(slot) - .or_insert(Arc::new(RwLock::new(HashMap::new()))); - } - #[cfg(test)] pub(crate) fn len(&self) -> usize { self.map.len() @@ -213,7 +190,7 @@ impl AccountStorage { /// iterate contents of AccountStorage without exposing internals pub struct AccountStorageIter<'a> { - iter: dashmap::iter::Iter<'a, Slot, SlotStores>, + iter: dashmap::iter::Iter<'a, Slot, AccountStorageReference>, } impl<'a> AccountStorageIter<'a> { @@ -228,13 +205,10 @@ impl<'a> Iterator for AccountStorageIter<'a> { type Item = (Slot, SnapshotStorageOne); fn next(&mut self) -> Option { - for entry in self.iter.by_ref() { - // if no stores for a slot, then don't return the item at all, loops to try next slot + if let Some(entry) = self.iter.next() { let slot = entry.key(); - let stores = entry.value(); - if let Some((_, store)) = stores.read().unwrap().iter().next() { - return Some((*slot, Arc::clone(store))); - } + let store = entry.value(); + return Some((*slot, Arc::clone(&store.storage))); } None } @@ -316,9 +290,10 @@ pub(crate) mod tests { id, store_file_size2, )); - let slot_stores = SlotStores::default(); - slot_stores.write().unwrap().insert(id, entry); - storage.map.insert(slot, slot_stores); + storage + .map + .insert(slot, AccountStorageReference { id, storage: entry }); + // look in map assert_eq!( store_file_size, @@ -446,9 +421,13 @@ pub(crate) mod tests { // already entry in shrink_in_progress_map let storage = AccountStorage::default(); let sample = storage.get_test_storage(); - let slot_stores = SlotStores::default(); - slot_stores.write().unwrap().insert(0, sample.clone()); - storage.map.insert(0, slot_stores); + storage.map.insert( + 0, + AccountStorageReference { + id: 0, + storage: sample.clone(), + }, + ); storage.shrink_in_progress_map.insert(0, sample.clone()); storage.shrinking_in_progress(0, sample); } @@ -460,9 +439,13 @@ pub(crate) mod tests { let storage = AccountStorage::default(); let sample_to_shrink = storage.get_test_storage(); let sample = storage.get_test_storage(); - let slot_stores = SlotStores::default(); - slot_stores.write().unwrap().insert(0, sample_to_shrink); - storage.map.insert(0, slot_stores); + storage.map.insert( + 0, + AccountStorageReference { + id: 0, + storage: sample_to_shrink, + }, + ); let _shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone()); storage.shrinking_in_progress(0, sample); } @@ -476,15 +459,19 @@ pub(crate) mod tests { let id_shrunk = 0; let sample_to_shrink = storage.get_test_storage_with_id(id_to_shrink); let sample = storage.get_test_storage(); - let slot_stores = SlotStores::default(); - slot_stores - .write() - .unwrap() - .insert(id_to_shrink, sample_to_shrink); - storage.map.insert(0, slot_stores.clone()); + storage.map.insert( + 0, + AccountStorageReference { + id: id_to_shrink, + storage: sample_to_shrink, + }, + ); let shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone()); - assert_eq!(slot_stores.read().unwrap().len(), 1); - assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id()); + assert!(storage.map.contains_key(&0)); + assert_eq!( + id_shrunk, + storage.map.get(&0).unwrap().storage.append_vec_id() + ); assert_eq!( (0, id_to_shrink), storage @@ -495,8 +482,11 @@ pub(crate) mod tests { .unwrap() ); drop(shrinking_in_progress); - assert_eq!(slot_stores.read().unwrap().len(), 1); - assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id()); + assert!(storage.map.contains_key(&0)); + assert_eq!( + id_shrunk, + storage.map.get(&0).unwrap().storage.append_vec_id() + ); assert!(storage.shrink_in_progress_map.is_empty()); storage.shrinking_in_progress(0, sample); } @@ -515,7 +505,6 @@ pub(crate) mod tests { fn test_shrinking_in_progress_fail2() { // nothing in slot currently, but there is an empty map entry let storage = AccountStorage::default(); - storage.map.insert(0, Arc::default()); let sample = storage.get_test_storage(); storage.shrinking_in_progress(0, sample); } @@ -527,8 +516,6 @@ pub(crate) mod tests { let storage = AccountStorage::default(); let sample = storage.get_test_storage(); let id = sample.append_vec_id(); - let slot_stores = SlotStores::default(); - slot_stores.write().unwrap().insert(id, sample.clone()); let missing_id = 9999; let slot = sample.slot(); // id is missing since not in maps at all @@ -537,7 +524,13 @@ pub(crate) mod tests { assert!(storage .get_account_storage_entry(slot, missing_id) .is_none()); - storage.map.insert(slot, slot_stores.clone()); + storage.map.insert( + slot, + AccountStorageReference { + id, + storage: sample.clone(), + }, + ); // id is found in map assert!(storage.get_account_storage_entry(slot, id).is_some()); assert!(storage @@ -551,7 +544,7 @@ pub(crate) mod tests { .get_account_storage_entry(slot, missing_id) .is_none()); assert!(storage.get_account_storage_entry(slot, id).is_some()); - slot_stores.write().unwrap().clear(); + storage.map.remove(&slot); // id is found in shrink_in_progress_map assert!(storage .get_account_storage_entry(slot, missing_id) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a57fbdf89..847cf3ac0 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -605,9 +605,6 @@ pub type SnapshotStorages = Vec>; /// exactly 1 append vec per slot pub type SnapshotStoragesOne = Vec; -// Each slot has a set of storage entries. -pub(crate) type SlotStores = Arc>>>; - type AccountSlots = HashMap>; type AppendVecOffsets = HashMap>; type ReclaimResult = (AccountSlots, AppendVecOffsets); @@ -3915,12 +3912,9 @@ impl AccountsDb { // shrink is in progress, so 1 new append vec to keep, 1 old one to throw away not_retaining_store(shrink_in_progress.old_storage()); // dropping 'shrink_in_progress' removes the old append vec that was being shrunk from db's storage - } else if let Some(slot_stores) = self.storage.get_slot_stores(slot) { + } else if let Some(store) = self.storage.remove(&slot) { // no shrink in progress, so all append vecs in this slot are dead - let mut list = slot_stores.write().unwrap(); - list.drain().for_each(|(_key, store)| { - not_retaining_store(&store); - }); + not_retaining_store(&store); } dead_storages @@ -10730,12 +10724,12 @@ pub mod tests { } let mut append_vec_histogram = HashMap::new(); - let mut all_storages = vec![]; + let mut all_slots = vec![]; for slot_storage in accounts.storage.iter() { - all_storages.push(slot_storage.1) + all_slots.push(slot_storage.0) } - for storage in all_storages { - *append_vec_histogram.entry(storage.slot()).or_insert(0) += 1; + for slot in all_slots { + *append_vec_histogram.entry(slot).or_insert(0) += 1; } for count in append_vec_histogram.values() { assert!(*count >= 2); @@ -17190,7 +17184,6 @@ pub mod tests { .write() .unwrap() .insert(slot0, BankHashInfo::default()); - db.storage.insert_empty_at_slot(slot0); assert!(!db.bank_hashes.read().unwrap().is_empty()); db.accounts_index.add_root(slot0); db.accounts_index.add_uncleaned_roots([slot0].into_iter()); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 262c4d5df..2c75790bc 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -711,10 +711,6 @@ where next_append_vec_id, } = storage_and_next_append_vec_id; - // discard any slots with no storage entries - // this can happen if a non-root slot was serialized - // but non-root stores should not be included in the snapshot - storage.retain(|_slot, stores| !stores.read().unwrap().is_empty()); assert!( !storage.is_empty(), "At least one storage entry must exist from deserializing stream" diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 0c5ac2934..d91d2607a 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -3,7 +3,7 @@ use { super::*, crate::{ - account_storage::AccountStorageMap, + account_storage::{AccountStorageMap, AccountStorageReference}, accounts::Accounts, accounts_db::{ get_temp_accounts_paths, test_utils::create_test_accounts, AccountShrinkThreshold, @@ -60,15 +60,13 @@ fn copy_append_vecs>( num_accounts, ); next_append_vec_id = next_append_vec_id.max(new_storage_entry.append_vec_id()); - storage - .entry(new_storage_entry.slot()) - .or_default() - .write() - .unwrap() - .insert( - new_storage_entry.append_vec_id(), - Arc::new(new_storage_entry), - ); + storage.insert( + new_storage_entry.slot(), + AccountStorageReference { + id: new_storage_entry.append_vec_id(), + storage: Arc::new(new_storage_entry), + }, + ); } Ok(StorageAndNextAppendVecId { diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 3d87b56d5..ab7387d93 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -3,7 +3,7 @@ use { super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion}, crate::{ - account_storage::AccountStorageMap, + account_storage::{AccountStorageMap, AccountStorageReference}, accounts_db::{AccountStorageEntry, AppendVecId, AtomicAppendVecId}, serde_snapshot::{ self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields, @@ -292,7 +292,7 @@ impl SnapshotStorageRebuilder { let slot_storage_paths = self.storage_paths.get(&slot).unwrap(); let lock = slot_storage_paths.lock().unwrap(); - let slot_stores = lock + let mut slot_stores = lock .iter() .map(|path| { let filename = path.file_name().unwrap().to_str().unwrap(); @@ -317,9 +317,10 @@ impl SnapshotStorageRebuilder { }) .collect::>, std::io::Error>>()?; - let slot_entry = self.storage.entry(slot).or_default(); - let mut storage_lock = slot_entry.write().unwrap(); - *storage_lock = slot_stores; + assert_eq!(slot_stores.len(), 1); + let (id, storage) = slot_stores.drain().next().unwrap(); + self.storage + .insert(slot, AccountStorageReference { id, storage }); Ok(()) }