AccountStorage only holds a single append vec (#29723)

* AccountStorageReference

* tests build

* pr feedback
This commit is contained in:
Jeff Washington (jwash) 2023-01-16 18:22:30 -06:00 committed by GitHub
parent 0db14ad39c
commit 461dafb887
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 131 deletions

View File

@ -1,19 +1,26 @@
//! Manage the map of slot -> append vecs
//! Manage the map of slot -> append vec
use {
crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores, SnapshotStorageOne},
crate::accounts_db::{AccountStorageEntry, AppendVecId, SnapshotStorageOne},
dashmap::DashMap,
solana_sdk::clock::Slot,
std::{
collections::HashMap,
sync::{Arc, RwLock},
},
std::sync::Arc,
};
pub type AccountStorageMap = DashMap<Slot, SlotStores>;
#[derive(Clone, Debug)]
pub struct AccountStorageReference {
/// the single storage for a given slot
pub(crate) storage: SnapshotStorageOne,
/// id can be read from 'storage', but it is an atomic read.
/// id will never change while a storage is held, so we store it separately here for faster runtime lookup in 'get_account_storage_entry'
pub(crate) id: AppendVecId,
}
#[derive(Clone, Default, Debug)]
pub type AccountStorageMap = DashMap<Slot, AccountStorageReference>;
#[derive(Default, Debug)]
pub struct AccountStorage {
/// map from Slot -> the single append vec for the slot
map: AccountStorageMap,
/// while shrink is operating on a slot, there can be 2 append vecs active for that slot
/// Once the index has been updated to only refer to the new append vec, the single entry for the slot in 'map' can be updated.
@ -40,7 +47,7 @@ impl AccountStorage {
store_id: AppendVecId,
) -> Option<Arc<AccountStorageEntry>> {
self.get_slot_stores_shrinking_in_progress_ok(slot)
.and_then(|storage_map| storage_map.read().unwrap().get(&store_id).cloned())
.and_then(|AccountStorageReference { id, storage }| (id == store_id).then_some(storage))
.or_else(|| {
self.shrink_in_progress_map.get(&slot).and_then(|entry| {
(entry.value().append_vec_id() == store_id).then(|| Arc::clone(entry.value()))
@ -49,7 +56,7 @@ impl AccountStorage {
}
/// public api, should only be called when shrinking is not in progress
pub fn get_slot_stores(&self, slot: Slot) -> Option<SlotStores> {
pub fn get_slot_stores(&self, slot: Slot) -> Option<AccountStorageReference> {
assert!(self.shrink_in_progress_map.is_empty());
self.get_slot_stores_shrinking_in_progress_ok(slot)
}
@ -58,7 +65,7 @@ impl AccountStorage {
pub(crate) fn get_slot_stores_shrinking_in_progress_ok(
&self,
slot: Slot,
) -> Option<SlotStores> {
) -> Option<AccountStorageReference> {
self.map.get(&slot).map(|result| result.value().clone())
}
@ -75,11 +82,7 @@ impl AccountStorage {
slot: Slot,
) -> Option<Arc<AccountStorageEntry>> {
self.get_slot_stores_shrinking_in_progress_ok(slot)
.and_then(|res| {
let read = res.read().unwrap();
assert!(read.len() <= 1);
read.values().next().cloned()
})
.map(|entry| Arc::clone(&entry.storage))
}
pub(crate) fn all_slots(&self) -> Vec<Slot> {
@ -87,13 +90,11 @@ impl AccountStorage {
self.map.iter().map(|iter_item| *iter_item.key()).collect()
}
/// returns true if there is an entry in the map for 'slot', but it contains no append vec
/// returns true if there is no entry for 'slot'
#[cfg(test)]
pub(crate) fn is_empty_entry(&self, slot: Slot) -> bool {
assert!(self.shrink_in_progress_map.is_empty());
self.get_slot_stores(slot)
.map(|storages| storages.read().unwrap().is_empty())
.unwrap_or(false)
self.get_slot_stores(slot).is_none()
}
/// initialize the storage map to 'all_storages'
@ -103,16 +104,11 @@ impl AccountStorage {
self.map.extend(all_storages.into_iter())
}
/// remove all append vecs at 'slot'
/// remove the append vec at 'slot'
/// returns the current contents
pub(crate) fn remove(&self, slot: &Slot) -> Option<Arc<AccountStorageEntry>> {
pub(crate) fn remove(&self, slot: &Slot) -> Option<SnapshotStorageOne> {
assert!(self.shrink_in_progress_map.is_empty());
self.map.remove(slot).and_then(|(_slot, slot_stores)| {
let mut write = slot_stores.write().unwrap();
assert!(write.len() <= 1, "{}", write.len());
let mut drain = write.drain();
drain.next().map(|(_, store)| store)
})
self.map.remove(slot).map(|(_, entry)| entry.storage)
}
/// iterate through all (slot, append-vec)
@ -123,19 +119,16 @@ impl AccountStorage {
pub(crate) fn insert(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
assert!(self.shrink_in_progress_map.is_empty());
let slot_storages: SlotStores = self.get_slot_stores(slot).unwrap_or_else(||
// DashMap entry.or_insert() returns a RefMut, essentially a write lock,
// which is dropped after this block ends, minimizing time held by the lock.
// However, we still want to persist the reference to the `SlotStores` behind
// the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone).
self
.map
.entry(slot)
.or_insert(Arc::new(RwLock::new(HashMap::new())))
.clone());
let mut write = slot_storages.write().unwrap();
assert!(write.insert(store.append_vec_id(), store).is_none());
assert!(self
.map
.insert(
slot,
AccountStorageReference {
id: store.append_vec_id(),
storage: store,
}
)
.is_none());
}
/// called when shrinking begins on a slot and append vec.
@ -159,17 +152,8 @@ impl AccountStorage {
let slot_storages = self
.get_slot_stores_shrinking_in_progress_ok(slot)
.expect("no pre-existing storages for shrinking slot");
let shrinking_store = Arc::clone(
slot_storages
.read()
.unwrap()
.iter()
.next()
.expect("no pre-existing storages for shrinking slot")
.1,
);
let shrinking_store = Arc::clone(&slot_storages.storage);
let previous_id = shrinking_store.append_vec_id();
let new_id = new_store.append_vec_id();
// 1. insert 'shrinking_store' into 'shrink_in_progress_map'
assert!(
@ -179,15 +163,16 @@ impl AccountStorage {
"duplicate call"
);
{
// write lock held for this atomic operation
let mut storages = slot_storages.write().unwrap();
// 2. remove 'shrinking_store' from 'map'
assert!(storages.remove(&previous_id).is_some());
// 3. insert 'new_store' into 'map' (atomic with #2)
// should be empty prior to this call
assert!(storages.insert(new_id, Arc::clone(&new_store)).is_none());
}
assert!(self
.map
.insert(
slot,
AccountStorageReference {
storage: Arc::clone(&new_store),
id: new_id
}
)
.is_some());
ShrinkInProgress {
storage: self,
@ -197,14 +182,6 @@ impl AccountStorage {
}
}
#[cfg(test)]
pub(crate) fn insert_empty_at_slot(&self, slot: Slot) {
assert!(self.shrink_in_progress_map.is_empty());
self.map
.entry(slot)
.or_insert(Arc::new(RwLock::new(HashMap::new())));
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.map.len()
@ -213,7 +190,7 @@ impl AccountStorage {
/// iterate contents of AccountStorage without exposing internals
pub struct AccountStorageIter<'a> {
iter: dashmap::iter::Iter<'a, Slot, SlotStores>,
iter: dashmap::iter::Iter<'a, Slot, AccountStorageReference>,
}
impl<'a> AccountStorageIter<'a> {
@ -228,13 +205,10 @@ impl<'a> Iterator for AccountStorageIter<'a> {
type Item = (Slot, SnapshotStorageOne);
fn next(&mut self) -> Option<Self::Item> {
for entry in self.iter.by_ref() {
// if no stores for a slot, then don't return the item at all, loops to try next slot
if let Some(entry) = self.iter.next() {
let slot = entry.key();
let stores = entry.value();
if let Some((_, store)) = stores.read().unwrap().iter().next() {
return Some((*slot, Arc::clone(store)));
}
let store = entry.value();
return Some((*slot, Arc::clone(&store.storage)));
}
None
}
@ -316,9 +290,10 @@ pub(crate) mod tests {
id,
store_file_size2,
));
let slot_stores = SlotStores::default();
slot_stores.write().unwrap().insert(id, entry);
storage.map.insert(slot, slot_stores);
storage
.map
.insert(slot, AccountStorageReference { id, storage: entry });
// look in map
assert_eq!(
store_file_size,
@ -446,9 +421,13 @@ pub(crate) mod tests {
// already entry in shrink_in_progress_map
let storage = AccountStorage::default();
let sample = storage.get_test_storage();
let slot_stores = SlotStores::default();
slot_stores.write().unwrap().insert(0, sample.clone());
storage.map.insert(0, slot_stores);
storage.map.insert(
0,
AccountStorageReference {
id: 0,
storage: sample.clone(),
},
);
storage.shrink_in_progress_map.insert(0, sample.clone());
storage.shrinking_in_progress(0, sample);
}
@ -460,9 +439,13 @@ pub(crate) mod tests {
let storage = AccountStorage::default();
let sample_to_shrink = storage.get_test_storage();
let sample = storage.get_test_storage();
let slot_stores = SlotStores::default();
slot_stores.write().unwrap().insert(0, sample_to_shrink);
storage.map.insert(0, slot_stores);
storage.map.insert(
0,
AccountStorageReference {
id: 0,
storage: sample_to_shrink,
},
);
let _shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone());
storage.shrinking_in_progress(0, sample);
}
@ -476,15 +459,19 @@ pub(crate) mod tests {
let id_shrunk = 0;
let sample_to_shrink = storage.get_test_storage_with_id(id_to_shrink);
let sample = storage.get_test_storage();
let slot_stores = SlotStores::default();
slot_stores
.write()
.unwrap()
.insert(id_to_shrink, sample_to_shrink);
storage.map.insert(0, slot_stores.clone());
storage.map.insert(
0,
AccountStorageReference {
id: id_to_shrink,
storage: sample_to_shrink,
},
);
let shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone());
assert_eq!(slot_stores.read().unwrap().len(), 1);
assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id());
assert!(storage.map.contains_key(&0));
assert_eq!(
id_shrunk,
storage.map.get(&0).unwrap().storage.append_vec_id()
);
assert_eq!(
(0, id_to_shrink),
storage
@ -495,8 +482,11 @@ pub(crate) mod tests {
.unwrap()
);
drop(shrinking_in_progress);
assert_eq!(slot_stores.read().unwrap().len(), 1);
assert_eq!(id_shrunk, slot_stores.read().unwrap()[&0].append_vec_id());
assert!(storage.map.contains_key(&0));
assert_eq!(
id_shrunk,
storage.map.get(&0).unwrap().storage.append_vec_id()
);
assert!(storage.shrink_in_progress_map.is_empty());
storage.shrinking_in_progress(0, sample);
}
@ -515,7 +505,6 @@ pub(crate) mod tests {
fn test_shrinking_in_progress_fail2() {
// nothing in slot currently, but there is an empty map entry
let storage = AccountStorage::default();
storage.map.insert(0, Arc::default());
let sample = storage.get_test_storage();
storage.shrinking_in_progress(0, sample);
}
@ -527,8 +516,6 @@ pub(crate) mod tests {
let storage = AccountStorage::default();
let sample = storage.get_test_storage();
let id = sample.append_vec_id();
let slot_stores = SlotStores::default();
slot_stores.write().unwrap().insert(id, sample.clone());
let missing_id = 9999;
let slot = sample.slot();
// id is missing since not in maps at all
@ -537,7 +524,13 @@ pub(crate) mod tests {
assert!(storage
.get_account_storage_entry(slot, missing_id)
.is_none());
storage.map.insert(slot, slot_stores.clone());
storage.map.insert(
slot,
AccountStorageReference {
id,
storage: sample.clone(),
},
);
// id is found in map
assert!(storage.get_account_storage_entry(slot, id).is_some());
assert!(storage
@ -551,7 +544,7 @@ pub(crate) mod tests {
.get_account_storage_entry(slot, missing_id)
.is_none());
assert!(storage.get_account_storage_entry(slot, id).is_some());
slot_stores.write().unwrap().clear();
storage.map.remove(&slot);
// id is found in shrink_in_progress_map
assert!(storage
.get_account_storage_entry(slot, missing_id)

View File

@ -605,9 +605,6 @@ pub type SnapshotStorages = Vec<Vec<SnapshotStorageOne>>;
/// exactly 1 append vec per slot
pub type SnapshotStoragesOne = Vec<SnapshotStorageOne>;
// Each slot has a set of storage entries.
pub(crate) type SlotStores = Arc<RwLock<HashMap<AppendVecId, Arc<AccountStorageEntry>>>>;
type AccountSlots = HashMap<Pubkey, HashSet<Slot>>;
type AppendVecOffsets = HashMap<AppendVecId, HashSet<usize>>;
type ReclaimResult = (AccountSlots, AppendVecOffsets);
@ -3915,12 +3912,9 @@ impl AccountsDb {
// shrink is in progress, so 1 new append vec to keep, 1 old one to throw away
not_retaining_store(shrink_in_progress.old_storage());
// dropping 'shrink_in_progress' removes the old append vec that was being shrunk from db's storage
} else if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
} else if let Some(store) = self.storage.remove(&slot) {
// no shrink in progress, so all append vecs in this slot are dead
let mut list = slot_stores.write().unwrap();
list.drain().for_each(|(_key, store)| {
not_retaining_store(&store);
});
not_retaining_store(&store);
}
dead_storages
@ -10730,12 +10724,12 @@ pub mod tests {
}
let mut append_vec_histogram = HashMap::new();
let mut all_storages = vec![];
let mut all_slots = vec![];
for slot_storage in accounts.storage.iter() {
all_storages.push(slot_storage.1)
all_slots.push(slot_storage.0)
}
for storage in all_storages {
*append_vec_histogram.entry(storage.slot()).or_insert(0) += 1;
for slot in all_slots {
*append_vec_histogram.entry(slot).or_insert(0) += 1;
}
for count in append_vec_histogram.values() {
assert!(*count >= 2);
@ -17190,7 +17184,6 @@ pub mod tests {
.write()
.unwrap()
.insert(slot0, BankHashInfo::default());
db.storage.insert_empty_at_slot(slot0);
assert!(!db.bank_hashes.read().unwrap().is_empty());
db.accounts_index.add_root(slot0);
db.accounts_index.add_uncleaned_roots([slot0].into_iter());

View File

@ -711,10 +711,6 @@ where
next_append_vec_id,
} = storage_and_next_append_vec_id;
// discard any slots with no storage entries
// this can happen if a non-root slot was serialized
// but non-root stores should not be included in the snapshot
storage.retain(|_slot, stores| !stores.read().unwrap().is_empty());
assert!(
!storage.is_empty(),
"At least one storage entry must exist from deserializing stream"

View File

@ -3,7 +3,7 @@
use {
super::*,
crate::{
account_storage::AccountStorageMap,
account_storage::{AccountStorageMap, AccountStorageReference},
accounts::Accounts,
accounts_db::{
get_temp_accounts_paths, test_utils::create_test_accounts, AccountShrinkThreshold,
@ -60,15 +60,13 @@ fn copy_append_vecs<P: AsRef<Path>>(
num_accounts,
);
next_append_vec_id = next_append_vec_id.max(new_storage_entry.append_vec_id());
storage
.entry(new_storage_entry.slot())
.or_default()
.write()
.unwrap()
.insert(
new_storage_entry.append_vec_id(),
Arc::new(new_storage_entry),
);
storage.insert(
new_storage_entry.slot(),
AccountStorageReference {
id: new_storage_entry.append_vec_id(),
storage: Arc::new(new_storage_entry),
},
);
}
Ok(StorageAndNextAppendVecId {

View File

@ -3,7 +3,7 @@
use {
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
crate::{
account_storage::AccountStorageMap,
account_storage::{AccountStorageMap, AccountStorageReference},
accounts_db::{AccountStorageEntry, AppendVecId, AtomicAppendVecId},
serde_snapshot::{
self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields,
@ -292,7 +292,7 @@ impl SnapshotStorageRebuilder {
let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
let lock = slot_storage_paths.lock().unwrap();
let slot_stores = lock
let mut slot_stores = lock
.iter()
.map(|path| {
let filename = path.file_name().unwrap().to_str().unwrap();
@ -317,9 +317,10 @@ impl SnapshotStorageRebuilder {
})
.collect::<Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, std::io::Error>>()?;
let slot_entry = self.storage.entry(slot).or_default();
let mut storage_lock = slot_entry.write().unwrap();
*storage_lock = slot_stores;
assert_eq!(slot_stores.len(), 1);
let (id, storage) = slot_stores.drain().next().unwrap();
self.storage
.insert(slot, AccountStorageReference { id, storage });
Ok(())
}