From 1faf7d516b99702550835e0caab06a04a6d65edc Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 10 Jan 2023 13:52:01 -0600 Subject: [PATCH] while shrinking, old append vec is held outside of storage.map (#29323) * while shrinking, old append vec is held outside of storage.map * update comment * rework ShrinkInProgress for clarity * add tests and comment --- runtime/src/account_storage.rs | 360 ++++++++++++++++++++++++++++++--- runtime/src/accounts_db.rs | 13 +- 2 files changed, 345 insertions(+), 28 deletions(-) diff --git a/runtime/src/account_storage.rs b/runtime/src/account_storage.rs index e7e1369b8..80035719c 100644 --- a/runtime/src/account_storage.rs +++ b/runtime/src/account_storage.rs @@ -15,40 +15,82 @@ pub type AccountStorageMap = DashMap; #[derive(Clone, Default, Debug)] pub struct AccountStorage { map: AccountStorageMap, + /// while shrink is operating on a slot, there can be 2 append vecs active for that slot + /// Once the index has been updated to only refer to the new append vec, the single entry for the slot in 'map' can be updated. + /// Entries in 'shrink_in_progress_map' can be found by 'get_account_storage_entry' + shrink_in_progress_map: DashMap>, } impl AccountStorage { /// Return the append vec in 'slot' and with id='store_id'. + /// can look in 'map' and 'shrink_in_progress_map' to find the specified append vec + /// when shrinking begins, shrinking_in_progress is called. + /// This fn looks in 'map' first, then in 'shrink_in_progress_map' because + /// 'shrink_in_progress_map' first inserts the old append vec into 'shrink_in_progress_map' + /// and then removes the old append vec from 'map' + /// Then, the index is updated for all entries to refer to the new id. + /// Callers to this function have 2 choices: + /// 1. hold the account index read lock for the pubkey so that the account index entry cannot be changed prior to or during this call. (scans do this) + /// 2. expect to be ready to start over and read the index again if this function returns None + /// Operations like shrinking or write cache flushing may have updated the index between when the caller read the index and called this function to + /// load from the append vec specified in the index. pub(crate) fn get_account_storage_entry( &self, slot: Slot, store_id: AppendVecId, ) -> Option> { - self.get_slot_stores(slot) + self.get_slot_stores_shrinking_in_progress_ok(slot) .and_then(|storage_map| storage_map.read().unwrap().get(&store_id).cloned()) + .or_else(|| { + self.shrink_in_progress_map + .get(&slot) + .map(|entry| Arc::clone(entry.value())) + }) } + /// public api, should only be called when shrinking is not in progress pub fn get_slot_stores(&self, slot: Slot) -> Option { + assert!(self.shrink_in_progress_map.is_empty()); + self.get_slot_stores_shrinking_in_progress_ok(slot) + } + + /// safe to call while shrinking is in progress + pub(crate) fn get_slot_stores_shrinking_in_progress_ok( + &self, + slot: Slot, + ) -> Option { self.map.get(&slot).map(|result| result.value().clone()) } /// return the append vec for 'slot' if it exists /// This is only ever called when shrink is not possibly running and there is a max of 1 append vec per slot. pub(crate) fn get_slot_storage_entry(&self, slot: Slot) -> Option> { - self.get_slot_stores(slot).and_then(|res| { - let read = res.read().unwrap(); - assert!(read.len() <= 1); - read.values().next().cloned() - }) + assert!(self.shrink_in_progress_map.is_empty()); + self.get_slot_storage_entry_shrinking_in_progress_ok(slot) + } + + /// return the append vec for 'slot' if it exists + pub(crate) fn get_slot_storage_entry_shrinking_in_progress_ok( + &self, + slot: Slot, + ) -> Option> { + self.get_slot_stores_shrinking_in_progress_ok(slot) + .and_then(|res| { + let read = res.read().unwrap(); + assert!(read.len() <= 1); + read.values().next().cloned() + }) } pub(crate) fn all_slots(&self) -> Vec { + assert!(self.shrink_in_progress_map.is_empty()); self.map.iter().map(|iter_item| *iter_item.key()).collect() } /// returns true if there is an entry in the map for 'slot', but it contains no append vec #[cfg(test)] pub(crate) fn is_empty_entry(&self, slot: Slot) -> bool { + assert!(self.shrink_in_progress_map.is_empty()); self.get_slot_stores(slot) .map(|storages| storages.read().unwrap().is_empty()) .unwrap_or(false) @@ -57,21 +99,25 @@ impl AccountStorage { /// initialize the storage map to 'all_storages' pub(crate) fn initialize(&mut self, all_storages: AccountStorageMap) { assert!(self.map.is_empty()); + assert!(self.shrink_in_progress_map.is_empty()); self.map.extend(all_storages.into_iter()) } /// remove all append vecs at 'slot' /// returns the current contents pub(crate) fn remove(&self, slot: &Slot) -> Option<(Slot, SlotStores)> { + assert!(self.shrink_in_progress_map.is_empty()); self.map.remove(slot) } /// iterate through all (slot, append-vecs) pub(crate) fn iter(&self) -> dashmap::iter::Iter { + assert!(self.shrink_in_progress_map.is_empty()); self.map.iter() } pub(crate) fn insert(&self, slot: Slot, store: Arc) { + assert!(self.shrink_in_progress_map.is_empty()); let slot_storages: SlotStores = self.get_slot_stores(slot).unwrap_or_else(|| // DashMap entry.or_insert() returns a RefMut, essentially a write lock, // which is dropped after this block ends, minimizing time held by the lock. @@ -91,18 +137,52 @@ impl AccountStorage { /// When 'ShrinkInProgress' is dropped by caller, the old store will be removed from the storage map. /// Fails if there are no existing stores at the slot. /// 'new_store' will be replacing the current store at 'slot' in 'map' + /// 1. insert 'shrinking_store' into 'shrink_in_progress_map' + /// 2. remove 'shrinking_store' from 'map' + /// 3. insert 'new_store' into 'map' (atomic with #2) + /// #1 allows tx processing loads to find the item in 'shrink_in_progress_map' even when it is removed from 'map' + /// #3 allows tx processing loads to find the item in 'map' after the index is updated and it is now located in 'new_store' + /// loading for tx must check + /// a. 'map', because it is usually there + /// b. 'shrink_in_progress_map' because it may have moved there (#1) before it was removed from 'map' (#3) + /// Note that if it fails step a and b, then the retry code in accounts_db will look in the index again and should find the updated index entry to 'new_store' pub(crate) fn shrinking_in_progress( &self, slot: Slot, new_store: Arc, ) -> ShrinkInProgress<'_> { - let slot_storages = self.get_slot_stores(slot).unwrap(); - let shrinking_store = Arc::clone(slot_storages.read().unwrap().iter().next().unwrap().1); + let slot_storages = self + .get_slot_stores_shrinking_in_progress_ok(slot) + .expect("no pre-existing storages for shrinking slot"); + let shrinking_store = Arc::clone( + slot_storages + .read() + .unwrap() + .iter() + .next() + .expect("no pre-existing storages for shrinking slot") + .1, + ); + let previous_id = shrinking_store.append_vec_id(); let new_id = new_store.append_vec_id(); - let mut storages = slot_storages.write().unwrap(); - // insert 'new_store' into 'map' - assert!(storages.insert(new_id, Arc::clone(&new_store)).is_none()); + // 1. insert 'shrinking_store' into 'shrink_in_progress_map' + assert!( + self.shrink_in_progress_map + .insert(slot, Arc::clone(&shrinking_store)) + .is_none(), + "duplicate call" + ); + + { + // write lock held for this atomic operation + let mut storages = slot_storages.write().unwrap(); + // 2. remove 'shrinking_store' from 'map' + assert!(storages.remove(&previous_id).is_some()); + // 3. insert 'new_store' into 'map' (atomic with #2) + // should be empty prior to this call + assert!(storages.insert(new_id, Arc::clone(&new_store)).is_none()); + } ShrinkInProgress { storage: self, @@ -114,6 +194,7 @@ impl AccountStorage { #[cfg(test)] pub(crate) fn insert_empty_at_slot(&self, slot: Slot) { + assert!(self.shrink_in_progress_map.is_empty()); self.map .entry(slot) .or_insert(Arc::new(RwLock::new(HashMap::new()))); @@ -129,27 +210,23 @@ impl AccountStorage { /// keeps track of the 'new_store' being created and the 'old_store' being replaced. pub(crate) struct ShrinkInProgress<'a> { storage: &'a AccountStorage, - /// newly shrunk store with a subset of contents from 'old_store' - new_store: Arc, /// old store which will be shrunk and replaced old_store: Arc, + /// newly shrunk store with a subset of contents from 'old_store' + new_store: Arc, slot: Slot, } /// called when the shrink is no longer in progress. This means we can release the old append vec and update the map of slot -> append vec impl<'a> Drop for ShrinkInProgress<'a> { fn drop(&mut self) { - // the slot must be in the map - let slot_storages: SlotStores = self.storage.get_slot_stores(self.slot).unwrap(); - - let mut storages = slot_storages.write().unwrap(); - // the id must be in the hashmap - assert!( - storages.remove(&self.old_store.append_vec_id()).is_some(), - "slot: {}, len: {}", - self.slot, - storages.len() - ); + // The old append vec referenced in 'self' for `slot` + // can be removed from 'shrink_in_progress_map' + assert!(self + .storage + .shrink_in_progress_map + .remove(&self.slot) + .is_some()); } } @@ -174,3 +251,238 @@ impl Default for AccountStorageStatus { Self::Available } } + +#[cfg(test)] +pub(crate) mod tests { + use {super::*, std::path::Path}; + + #[test] + fn test_shrink_in_progress() { + // test that we check in order map then shrink_in_progress_map + let storage = AccountStorage::default(); + let slot = 0; + let id = 0; + // empty everything + assert!(storage.get_account_storage_entry(slot, id).is_none()); + + // add a map store + let common_store_path = Path::new(""); + let store_file_size = 4000; + let store_file_size2 = store_file_size * 2; + // 2 append vecs with same id, but different sizes + let entry = Arc::new(AccountStorageEntry::new( + common_store_path, + slot, + id, + store_file_size, + )); + let entry2 = Arc::new(AccountStorageEntry::new( + common_store_path, + slot, + id, + store_file_size2, + )); + let slot_stores = SlotStores::default(); + slot_stores.write().unwrap().insert(id, entry); + storage.map.insert(slot, slot_stores); + // look in map + assert_eq!( + store_file_size, + storage + .get_account_storage_entry(slot, id) + .map(|entry| entry.accounts.capacity()) + .unwrap_or_default() + ); + + // look in shrink_in_progress_map + storage.shrink_in_progress_map.insert(slot, entry2); + + // look in map + assert_eq!( + store_file_size, + storage + .get_account_storage_entry(slot, id) + .map(|entry| entry.accounts.capacity()) + .unwrap_or_default() + ); + + // remove from map + storage.map.remove(&slot).unwrap(); + + // look in shrink_in_progress_map + assert_eq!( + store_file_size2, + storage + .get_account_storage_entry(slot, id) + .map(|entry| entry.accounts.capacity()) + .unwrap_or_default() + ); + } + + impl AccountStorage { + fn get_test_storage_with_id(&self, id: AppendVecId) -> Arc { + let slot = 0; + // add a map store + let common_store_path = Path::new(""); + let store_file_size = 4000; + Arc::new(AccountStorageEntry::new( + common_store_path, + slot, + id, + store_file_size, + )) + } + fn get_test_storage(&self) -> Arc { + self.get_test_storage_with_id(0) + } + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_get_slot_stores_fail() { + let storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.get_slot_stores(0); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_get_slot_storage_entry_fail() { + let storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.get_slot_storage_entry(0); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_all_slots_fail() { + let storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.all_slots(); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_initialize_fail() { + let mut storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.initialize(AccountStorageMap::default()); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_remove_fail() { + let storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.remove(&0); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_iter_fail() { + let storage = AccountStorage::default(); + storage + .shrink_in_progress_map + .insert(0, storage.get_test_storage()); + storage.iter(); + } + + #[test] + #[should_panic(expected = "self.shrink_in_progress_map.is_empty()")] + fn test_insert_fail() { + let storage = AccountStorage::default(); + let sample = storage.get_test_storage(); + storage.shrink_in_progress_map.insert(0, sample.clone()); + storage.insert(0, sample); + } + + #[test] + #[should_panic(expected = "duplicate call")] + fn test_shrinking_in_progress_fail3() { + // already entry in shrink_in_progress_map + let storage = AccountStorage::default(); + let sample = storage.get_test_storage(); + let slot_stores = SlotStores::default(); + slot_stores.write().unwrap().insert(0, sample.clone()); + storage.map.insert(0, slot_stores); + storage.shrink_in_progress_map.insert(0, sample.clone()); + storage.shrinking_in_progress(0, sample); + } + + #[test] + #[should_panic(expected = "duplicate call")] + fn test_shrinking_in_progress_fail4() { + // already called 'shrink_in_progress' on this slot and it is still active + let storage = AccountStorage::default(); + let sample_to_shrink = storage.get_test_storage(); + let sample = storage.get_test_storage(); + let slot_stores = SlotStores::default(); + slot_stores.write().unwrap().insert(0, sample_to_shrink); + storage.map.insert(0, slot_stores); + let _shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone()); + storage.shrinking_in_progress(0, sample); + } + + #[test] + fn test_shrinking_in_progress_second_call() { + // already called 'shrink_in_progress' on this slot, but it finished, so we succeed + // verify data structures during and after shrink and then with subsequent shrink call + let storage = AccountStorage::default(); + let id_to_shrink = 1; + let id_shrunk = 0; + let sample_to_shrink = storage.get_test_storage_with_id(id_to_shrink); + let sample = storage.get_test_storage(); + let slot_stores = SlotStores::default(); + slot_stores + .write() + .unwrap() + .insert(id_to_shrink, sample_to_shrink); + storage.map.insert(0, slot_stores.clone()); + let shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone()); + assert_eq!(slot_stores.read().unwrap().len(), 1); + assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id()); + assert_eq!( + (0, id_to_shrink), + storage + .shrink_in_progress_map + .iter() + .next() + .map(|r| (*r.key(), r.value().append_vec_id())) + .unwrap() + ); + drop(shrinking_in_progress); + assert_eq!(slot_stores.read().unwrap().len(), 1); + assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id()); + assert!(storage.shrink_in_progress_map.is_empty()); + storage.shrinking_in_progress(0, sample); + } + + #[test] + #[should_panic(expected = "no pre-existing storages for shrinking slot")] + fn test_shrinking_in_progress_fail1() { + // nothing in slot currently + let storage = AccountStorage::default(); + let sample = storage.get_test_storage(); + storage.shrinking_in_progress(0, sample); + } + + #[test] + #[should_panic(expected = "no pre-existing storages for shrinking slot")] + fn test_shrinking_in_progress_fail2() { + // nothing in slot currently, but there is an empty map entry + let storage = AccountStorage::default(); + storage.map.insert(0, Arc::default()); + let sample = storage.get_test_storage(); + storage.shrinking_in_progress(0, sample); + } +} diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 8572f6d9a..65984c601 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3962,8 +3962,7 @@ impl AccountsDb { if let Some(shrink_in_progress) = shrink_in_progress { // shrink is in progress, so 1 new append vec to keep, 1 old one to throw away - let store = shrink_in_progress.old_storage(); - not_retaining_store(store); + not_retaining_store(shrink_in_progress.old_storage()); // dropping 'shrink_in_progress' removes the old append vec that was being shrunk from db's storage } else if let Some(slot_stores) = self.storage.get_slot_stores(slot) { // no shrink in progress, so all append vecs in this slot are dead @@ -4029,7 +4028,10 @@ impl AccountsDb { fn shrink_slot_forced(&self, slot: Slot) { debug!("shrink_slot_forced: slot: {}", slot); - if let Some(store) = self.storage.get_slot_storage_entry(slot) { + if let Some(store) = self + .storage + .get_slot_storage_entry_shrinking_in_progress_ok(slot) + { if !Self::is_shrinking_productive(slot, &store) { return; } @@ -8392,7 +8394,10 @@ impl AccountsDb { self.accounts_cache.add_root(slot); cache_time.stop(); let mut store_time = Measure::start("store_add_root"); - if let Some(slot_stores) = self.storage.get_slot_stores(slot) { + // We would not expect this slot to be shrinking right now. + // But, even if it was, we would just mark a store id as dirty unnecessarily and that is ok. + // So, allow shrinking to be in progress. + if let Some(slot_stores) = self.storage.get_slot_stores_shrinking_in_progress_ok(slot) { for (store_id, store) in slot_stores.read().unwrap().iter() { self.dirty_stores.insert((slot, *store_id), store.clone()); }