introduce ShrinkInProgress (#29329)

* introduce ShrinkInProgress

* remove redundant check

* add comments
This commit is contained in:
Jeff Washington (jwash) 2022-12-21 11:03:29 -06:00 committed by GitHub
parent 4a64f6d421
commit 0f7ef085a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 224 additions and 82 deletions

View File

@ -80,6 +80,40 @@ impl AccountStorage {
.insert(store.append_vec_id(), store) .insert(store.append_vec_id(), store)
.is_none()); .is_none());
} }
/// called when shrinking begins on a slot and append vec.
/// When 'ShrinkInProgress' is dropped by caller, the old store will be removed from the storage map.
pub(crate) fn shrinking_in_progress(
&self,
slot: Slot,
new_store: Arc<AccountStorageEntry>,
) -> ShrinkInProgress<'_> {
let slot_storages: SlotStores = self.get_slot_stores(slot).unwrap_or_else(||
// DashMap entry.or_insert() returns a RefMut, essentially a write lock,
// which is dropped after this block ends, minimizing time held by the lock.
// However, we still want to persist the reference to the `SlotStores` behind
// the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone).
self
.map
.entry(slot)
.or_insert(Arc::new(RwLock::new(HashMap::new())))
.clone());
let shrinking_store = Arc::clone(slot_storages.read().unwrap().iter().next().unwrap().1);
let new_id = new_store.append_vec_id();
let mut storages = slot_storages.write().unwrap();
// insert 'new_store' into 'map'
assert!(storages.insert(new_id, Arc::clone(&new_store)).is_none());
ShrinkInProgress {
storage: self,
slot,
new_store,
old_store: shrinking_store,
}
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn insert_empty_at_slot(&self, slot: Slot) { pub(crate) fn insert_empty_at_slot(&self, slot: Slot) {
self.map self.map
@ -92,6 +126,43 @@ impl AccountStorage {
} }
} }
/// exists while there is a shrink in progress
/// keeps track of the 'new_store' being created and the 'old_store' being replaced.
pub(crate) struct ShrinkInProgress<'a> {
storage: &'a AccountStorage,
/// newly shrunk store with a subset of contents from 'old_store'
new_store: Arc<AccountStorageEntry>,
/// old store which will be shrunk and replaced
old_store: Arc<AccountStorageEntry>,
slot: Slot,
}
/// called when the shrink is no longer in progress. This means we can release the old append vec and update the map of slot -> append vec
impl<'a> Drop for ShrinkInProgress<'a> {
fn drop(&mut self) {
// the slot must be in the map
let slot_storages: SlotStores = self.storage.get_slot_stores(self.slot).unwrap();
let mut storages = slot_storages.write().unwrap();
// the id must be in the hashmap
assert!(
storages.remove(&self.old_store.append_vec_id()).is_some(),
"slot: {}, len: {}",
self.slot,
storages.len()
);
}
}
impl<'a> ShrinkInProgress<'a> {
pub(crate) fn new_storage(&self) -> &Arc<AccountStorageEntry> {
&self.new_store
}
pub(crate) fn old_storage(&self) -> &Arc<AccountStorageEntry> {
&self.old_store
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize, AbiExample, AbiEnumVisitor)] #[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize, AbiExample, AbiEnumVisitor)]
pub enum AccountStorageStatus { pub enum AccountStorageStatus {
Available = 0, Available = 0,

View File

@ -21,7 +21,7 @@
use { use {
crate::{ crate::{
account_info::{AccountInfo, Offset, StorageLocation, StoredSize}, account_info::{AccountInfo, Offset, StorageLocation, StoredSize},
account_storage::{AccountStorage, AccountStorageStatus}, account_storage::{AccountStorage, AccountStorageStatus, ShrinkInProgress},
accounts_background_service::{DroppedSlotsSender, SendDroppedBankCallback}, accounts_background_service::{DroppedSlotsSender, SendDroppedBankCallback},
accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{ accounts_hash::{
@ -205,12 +205,27 @@ impl CurrentAncientAppendVec {
slot_and_append_vec: Some((slot, append_vec)), slot_and_append_vec: Some((slot, append_vec)),
} }
} }
fn create_ancient_append_vec(&mut self, slot: Slot, db: &AccountsDb) {
*self = Self::new(slot, db.create_ancient_append_vec(slot)); #[must_use]
fn create_ancient_append_vec<'a>(
&mut self,
slot: Slot,
db: &'a AccountsDb,
) -> ShrinkInProgress<'a> {
let shrink_in_progress = db.create_ancient_append_vec(slot);
*self = Self::new(slot, Arc::clone(shrink_in_progress.new_storage()));
shrink_in_progress
} }
fn create_if_necessary(&mut self, slot: Slot, db: &AccountsDb) { #[must_use]
fn create_if_necessary<'a>(
&mut self,
slot: Slot,
db: &'a AccountsDb,
) -> Option<ShrinkInProgress<'a>> {
if self.slot_and_append_vec.is_none() { if self.slot_and_append_vec.is_none() {
self.create_ancient_append_vec(slot, db); Some(self.create_ancient_append_vec(slot, db))
} else {
None
} }
} }
@ -3827,6 +3842,7 @@ impl AccountsDb {
shrink_collect: &ShrinkCollect, shrink_collect: &ShrinkCollect,
slot: Slot, slot: Slot,
stats: &ShrinkStats, stats: &ShrinkStats,
shrink_in_progress: Option<ShrinkInProgress>,
) -> usize { ) -> usize {
// Purge old, overwritten storage entries // Purge old, overwritten storage entries
let (remaining_stores, dead_storages) = self.mark_dirty_dead_stores( let (remaining_stores, dead_storages) = self.mark_dirty_dead_stores(
@ -3835,6 +3851,7 @@ impl AccountsDb {
// If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty. // If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty.
// otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below. // otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below.
shrink_collect.all_are_zero_lamports, shrink_collect.all_are_zero_lamports,
shrink_in_progress,
); );
if !shrink_collect.all_are_zero_lamports { if !shrink_collect.all_are_zero_lamports {
@ -3889,7 +3906,7 @@ impl AccountsDb {
let mut remove_old_stores_shrink_us = 0; let mut remove_old_stores_shrink_us = 0;
let mut store_accounts_timing = StoreAccountsTiming::default(); let mut store_accounts_timing = StoreAccountsTiming::default();
if shrink_collect.aligned_total_bytes > 0 { if shrink_collect.aligned_total_bytes > 0 {
let (shrunken_store, time) = let (shrink_in_progress, time) =
measure!(self.get_store_for_shrink(slot, shrink_collect.aligned_total_bytes)); measure!(self.get_store_for_shrink(slot, shrink_collect.aligned_total_bytes));
create_and_insert_store_elapsed_us = time.as_us(); create_and_insert_store_elapsed_us = time.as_us();
@ -3903,7 +3920,7 @@ impl AccountsDb {
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
), ),
None::<Vec<&Hash>>, None::<Vec<&Hash>>,
Some(&shrunken_store), Some(shrink_in_progress.new_storage()),
None, None,
StoreReclaims::Ignore, StoreReclaims::Ignore,
); );
@ -3917,8 +3934,13 @@ impl AccountsDb {
// those here // those here
self.shrink_candidate_slots.lock().unwrap().remove(&slot); self.shrink_candidate_slots.lock().unwrap().remove(&slot);
let (remaining_stores, remove_old_stores_shrink) = let (remaining_stores, remove_old_stores_shrink) = measure!(self
measure!(self.remove_old_stores_shrink(&shrink_collect, slot, &self.shrink_stats)); .remove_old_stores_shrink(
&shrink_collect,
slot,
&self.shrink_stats,
Some(shrink_in_progress)
));
remove_old_stores_shrink_us = remove_old_stores_shrink.as_us(); remove_old_stores_shrink_us = remove_old_stores_shrink.as_us();
if remaining_stores > 1 { if remaining_stores > 1 {
inc_new_counter_info!("accounts_db_shrink_extra_stores", 1); inc_new_counter_info!("accounts_db_shrink_extra_stores", 1);
@ -3983,29 +4005,44 @@ impl AccountsDb {
/// get stores for 'slot' /// get stores for 'slot'
/// retain only the stores where 'should_retain(store)' == true /// retain only the stores where 'should_retain(store)' == true
/// for stores not retained, insert in 'dead_storages' and optionally 'dirty_stores' /// for stores not retained, insert in 'dead_storages' and optionally 'dirty_stores'
/// This is the end of the life cycle of `shrink_in_progress`.
/// Dropping 'shrink_in_progress' here will cause the old store to be removed from the storage map.
/// returns: (# of remaining stores for this slot, dead storages) /// returns: (# of remaining stores for this slot, dead storages)
pub(crate) fn mark_dirty_dead_stores( pub(crate) fn mark_dirty_dead_stores(
&self, &self,
slot: Slot, slot: Slot,
should_retain: impl Fn(&AccountStorageEntry) -> bool, should_retain: impl Fn(&AccountStorageEntry) -> bool,
add_dirty_stores: bool, add_dirty_stores: bool,
shrink_in_progress: Option<ShrinkInProgress>,
) -> (usize, SnapshotStorage) { ) -> (usize, SnapshotStorage) {
let mut dead_storages = Vec::default(); let mut dead_storages = Vec::default();
let mut not_retaining_store = |store: &Arc<AccountStorageEntry>| {
if add_dirty_stores {
self.dirty_stores
.insert((slot, store.append_vec_id()), store.clone());
}
dead_storages.push(store.clone());
};
let remaining_stores = if let Some(slot_stores) = self.storage.get_slot_stores(slot) { let remaining_stores = if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
let mut list = slot_stores.write().unwrap(); if let Some(shrink_in_progress) = shrink_in_progress {
list.retain(|_key, store| { let store = shrink_in_progress.old_storage().clone();
if !should_retain(store) { not_retaining_store(&store);
if add_dirty_stores { drop(shrink_in_progress);
self.dirty_stores slot_stores.read().unwrap().len()
.insert((slot, store.append_vec_id()), store.clone()); } else {
let mut list = slot_stores.write().unwrap();
list.retain(|_key, store| {
if !should_retain(store) {
not_retaining_store(store);
false
} else {
true
} }
dead_storages.push(store.clone()); });
false list.len()
} else { }
true
}
});
list.len()
} else { } else {
0 0
}; };
@ -4046,7 +4083,7 @@ impl AccountsDb {
&self, &self,
slot: Slot, slot: Slot,
aligned_total: u64, aligned_total: u64,
) -> Arc<AccountStorageEntry> { ) -> ShrinkInProgress<'_> {
let shrunken_store = self let shrunken_store = self
.try_recycle_store(slot, aligned_total, aligned_total + 1024) .try_recycle_store(slot, aligned_total, aligned_total + 1024)
.unwrap_or_else(|| { .unwrap_or_else(|| {
@ -4057,8 +4094,7 @@ impl AccountsDb {
.unwrap_or_else(|| (&self.paths, "shrink")); .unwrap_or_else(|| (&self.paths, "shrink"));
self.create_store(slot, aligned_total, from, shrink_paths) self.create_store(slot, aligned_total, from, shrink_paths)
}); });
self.insert_store(slot, Arc::clone(&shrunken_store)); self.storage.shrinking_in_progress(slot, shrunken_store)
shrunken_store
} }
// Reads all accounts in given slot's AppendVecs and filter only to alive, // Reads all accounts in given slot's AppendVecs and filter only to alive,
@ -4220,16 +4256,15 @@ impl AccountsDb {
} }
/// create and return new ancient append vec /// create and return new ancient append vec
fn create_ancient_append_vec(&self, slot: Slot) -> Arc<AccountStorageEntry> { fn create_ancient_append_vec(&self, slot: Slot) -> ShrinkInProgress<'_> {
let new_ancient_storage = let shrink_in_progress = self.get_store_for_shrink(slot, get_ancient_append_vec_capacity());
self.get_store_for_shrink(slot, get_ancient_append_vec_capacity());
info!( info!(
"ancient_append_vec: creating initial ancient append vec: {}, size: {}, id: {}", "ancient_append_vec: creating initial ancient append vec: {}, size: {}, id: {}",
slot, slot,
get_ancient_append_vec_capacity(), get_ancient_append_vec_capacity(),
new_ancient_storage.append_vec_id(), shrink_in_progress.new_storage().append_vec_id(),
); );
new_ancient_storage shrink_in_progress
} }
#[cfg(test)] #[cfg(test)]
@ -4471,7 +4506,7 @@ impl AccountsDb {
return; // skipping slot with no useful accounts to write return; // skipping slot with no useful accounts to write
} }
let (_, time) = measure!(current_ancient.create_if_necessary(slot, self)); let (shrink_in_progress, time) = measure!(current_ancient.create_if_necessary(slot, self));
let mut create_and_insert_store_elapsed_us = time.as_us(); let mut create_and_insert_store_elapsed_us = time.as_us();
let available_bytes = current_ancient.append_vec().accounts.remaining_bytes(); let available_bytes = current_ancient.append_vec().accounts.remaining_bytes();
// split accounts in 'slot' into: // split accounts in 'slot' into:
@ -4521,7 +4556,8 @@ impl AccountsDb {
.remove_old_stores_shrink( .remove_old_stores_shrink(
&shrink_collect, &shrink_collect,
slot, slot,
&self.shrink_ancient_stats.shrink_stats &self.shrink_ancient_stats.shrink_stats,
shrink_in_progress,
)); ));
// we should not try to shrink any of the stores from this slot anymore. All shrinking for this slot is now handled by ancient append vec code. // we should not try to shrink any of the stores from this slot anymore. All shrinking for this slot is now handled by ancient append vec code.
@ -9674,7 +9710,9 @@ pub mod tests {
let to_store = AccountsToStore::new(available_bytes, &map, alive_total_bytes, slot0); let to_store = AccountsToStore::new(available_bytes, &map, alive_total_bytes, slot0);
// Done: setup 'to_store' // Done: setup 'to_store'
current_ancient.create_ancient_append_vec(slot0, &db); // there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot0, 1000, "test");
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot0, &db);
let mut ancient_slot_pubkeys = AncientSlotPubkeys::default(); let mut ancient_slot_pubkeys = AncientSlotPubkeys::default();
assert!(ancient_slot_pubkeys.inner.is_none()); assert!(ancient_slot_pubkeys.inner.is_none());
// same slot as current_ancient, so no-op // same slot as current_ancient, so no-op
@ -9686,7 +9724,9 @@ pub mod tests {
); );
assert!(ancient_slot_pubkeys.inner.is_none()); assert!(ancient_slot_pubkeys.inner.is_none());
// different slot than current_ancient, so update 'ancient_slot_pubkeys' // different slot than current_ancient, so update 'ancient_slot_pubkeys'
current_ancient.create_ancient_append_vec(slot1, &db); // there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot1, 1000, "test");
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot1, &db);
let slot2 = 2; let slot2 = 2;
ancient_slot_pubkeys.maybe_unref_accounts_already_in_ancient( ancient_slot_pubkeys.maybe_unref_accounts_already_in_ancient(
slot2, slot2,
@ -16468,6 +16508,7 @@ pub mod tests {
false false
}, },
add_dirty_stores, add_dirty_stores,
None,
); );
assert_eq!(0, called.load(Ordering::Relaxed)); assert_eq!(0, called.load(Ordering::Relaxed));
assert_eq!(0, remaining_stores); assert_eq!(0, remaining_stores);
@ -16482,6 +16523,7 @@ pub mod tests {
true // retain true // retain
}, },
add_dirty_stores, add_dirty_stores,
None,
); );
assert_eq!(1, called.load(Ordering::Relaxed)); assert_eq!(1, called.load(Ordering::Relaxed));
assert_eq!(1, remaining_stores); assert_eq!(1, remaining_stores);
@ -16495,6 +16537,7 @@ pub mod tests {
false // don't retain false // don't retain
}, },
add_dirty_stores, add_dirty_stores,
None,
); );
assert_eq!(1, called.load(Ordering::Relaxed)); assert_eq!(1, called.load(Ordering::Relaxed));
assert!(db assert!(db
@ -16804,8 +16847,13 @@ pub mod tests {
}; };
let db = AccountsDb::new_single_for_tests(); let db = AccountsDb::new_single_for_tests();
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot1_ancient, 1000, "test");
let ancient = db.create_ancient_append_vec(slot1_ancient); let ancient = db.create_ancient_append_vec(slot1_ancient);
let _existing_append_vec = db.create_and_insert_store(slot1_plus_ancient, 1000, "test");
let ancient_1_plus = db.create_ancient_append_vec(slot1_plus_ancient); let ancient_1_plus = db.create_ancient_append_vec(slot1_plus_ancient);
let _existing_append_vec = db.create_and_insert_store(slot3_ancient, 1000, "test");
let ancient3 = db.create_ancient_append_vec(slot3_ancient); let ancient3 = db.create_ancient_append_vec(slot3_ancient);
let temp_dir = TempDir::new().unwrap(); let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path(); let path = temp_dir.path();
@ -16825,7 +16873,7 @@ pub mod tests {
assert_eq!(Vec::<Slot>::default(), ancient_slots); assert_eq!(Vec::<Slot>::default(), ancient_slots);
// now test with an ancient append vec // now test with an ancient append vec
let raw_storages = vec![vec![ancient.clone()]]; let raw_storages = vec![vec![ancient.new_storage().clone()]];
let snapshot_storages = SortedStorages::new(&raw_storages); let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0; let one_epoch_old_slot = 0;
let ancient_slots = let ancient_slots =
@ -16837,7 +16885,10 @@ pub mod tests {
assert_eq!(vec![slot1_ancient], ancient_slots); assert_eq!(vec![slot1_ancient], ancient_slots);
// now test with an ancient append vec and then a non-ancient append vec // now test with an ancient append vec and then a non-ancient append vec
let raw_storages = vec![vec![ancient.clone()], vec![non_ancient_storage.clone()]]; let raw_storages = vec![
vec![ancient.new_storage().clone()],
vec![non_ancient_storage.clone()],
];
let snapshot_storages = SortedStorages::new(&raw_storages); let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0; let one_epoch_old_slot = 0;
let ancient_slots = let ancient_slots =
@ -16850,9 +16901,9 @@ pub mod tests {
// ancient, non-ancient, ancient // ancient, non-ancient, ancient
let raw_storages = vec![ let raw_storages = vec![
vec![ancient.clone()], vec![ancient.new_storage().clone()],
vec![non_ancient_storage.clone()], vec![non_ancient_storage.clone()],
vec![ancient3.clone()], vec![ancient3.new_storage().clone()],
]; ];
let snapshot_storages = SortedStorages::new(&raw_storages); let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0; let one_epoch_old_slot = 0;
@ -16867,12 +16918,12 @@ pub mod tests {
if sparse { if sparse {
// ancient, ancient, non-ancient, ancient // ancient, ancient, non-ancient, ancient
let raw_storages = vec![ let raw_storages = vec![
vec![ancient], vec![Arc::clone(ancient.new_storage())],
vec![ancient_1_plus], vec![Arc::clone(ancient_1_plus.new_storage())],
vec![non_ancient_storage], vec![non_ancient_storage],
vec![ancient3], vec![Arc::clone(ancient3.new_storage())],
]; ];
let snapshot_storages = SortedStorages::new(&raw_storages); let snapshot_storages = SortedStorages::new(&raw_storages[..]);
let one_epoch_old_slot = 0; let one_epoch_old_slot = 0;
let ancient_slots = let ancient_slots =
SplitAncientStorages::get_ancient_slots(one_epoch_old_slot, &snapshot_storages); SplitAncientStorages::get_ancient_slots(one_epoch_old_slot, &snapshot_storages);
@ -17007,7 +17058,7 @@ pub mod tests {
append_vec.append_vec_id() append_vec.append_vec_id()
); );
current_ancient.create_if_necessary(slot2, &db); let _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db);
assert_eq!(current_ancient.slot(), slot); assert_eq!(current_ancient.slot(), slot);
assert_eq!(current_ancient.append_vec_id(), append_vec.append_vec_id()); assert_eq!(current_ancient.append_vec_id(), append_vec.append_vec_id());
} }
@ -17015,14 +17066,17 @@ pub mod tests {
{ {
// create_if_necessary // create_if_necessary
let db = AccountsDb::new_single_for_tests(); let db = AccountsDb::new_single_for_tests();
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot2, 1000, "test");
let mut current_ancient = CurrentAncientAppendVec::default(); let mut current_ancient = CurrentAncientAppendVec::default();
current_ancient.create_if_necessary(slot2, &db); let mut _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db);
let id = current_ancient.append_vec_id(); let id = current_ancient.append_vec_id();
assert_eq!(current_ancient.slot(), slot2); assert_eq!(current_ancient.slot(), slot2);
assert!(is_ancient(&current_ancient.append_vec().accounts)); assert!(is_ancient(&current_ancient.append_vec().accounts));
let slot3 = 3; let slot3 = 3;
// should do nothing // should do nothing
current_ancient.create_if_necessary(slot3, &db); let _shrink_in_progress = current_ancient.create_if_necessary(slot3, &db);
assert_eq!(current_ancient.slot(), slot2); assert_eq!(current_ancient.slot(), slot2);
assert_eq!(current_ancient.append_vec_id(), id); assert_eq!(current_ancient.append_vec_id(), id);
assert!(is_ancient(&current_ancient.append_vec().accounts)); assert!(is_ancient(&current_ancient.append_vec().accounts));
@ -17032,11 +17086,18 @@ pub mod tests {
// create_ancient_append_vec // create_ancient_append_vec
let db = AccountsDb::new_single_for_tests(); let db = AccountsDb::new_single_for_tests();
let mut current_ancient = CurrentAncientAppendVec::default(); let mut current_ancient = CurrentAncientAppendVec::default();
current_ancient.create_ancient_append_vec(slot2, &db); // there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot2, 1000, "test");
let mut _shrink_in_progress = current_ancient.create_ancient_append_vec(slot2, &db);
let id = current_ancient.append_vec_id(); let id = current_ancient.append_vec_id();
assert_eq!(current_ancient.slot(), slot2); assert_eq!(current_ancient.slot(), slot2);
assert!(is_ancient(&current_ancient.append_vec().accounts)); assert!(is_ancient(&current_ancient.append_vec().accounts));
current_ancient.create_ancient_append_vec(slot3, &db);
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot3, 1000, "test");
let mut _shrink_in_progress = current_ancient.create_ancient_append_vec(slot3, &db);
assert_eq!(current_ancient.slot(), slot3); assert_eq!(current_ancient.slot(), slot3);
assert!(is_ancient(&current_ancient.append_vec().accounts)); assert!(is_ancient(&current_ancient.append_vec().accounts));
assert_ne!(current_ancient.append_vec_id(), id); assert_ne!(current_ancient.append_vec_id(), id);
@ -17074,12 +17135,6 @@ pub mod tests {
assert_eq!(db.get_sorted_potential_ancient_slots(), vec![root1]); assert_eq!(db.get_sorted_potential_ancient_slots(), vec![root1]);
} }
impl CurrentAncientAppendVec {
fn is_none(&self) -> bool {
self.slot_and_append_vec.is_none()
}
}
#[test] #[test]
fn test_shrink_collect_simple() { fn test_shrink_collect_simple() {
solana_logger::setup(); solana_logger::setup();
@ -17712,10 +17767,9 @@ pub mod tests {
slot5, slot5,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(current_ancient.is_none()); assert!(current_ancient.slot_and_append_vec.is_none());
// slot is not ancient, so it is good to move // slot is not ancient, so it is good to move
assert!(should_move); assert!(should_move);
// try 2 storages in 1 slot, should not be able to move // try 2 storages in 1 slot, should not be able to move
current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter
let two_storages = vec![storages[0].clone(), storages[0].clone()]; let two_storages = vec![storages[0].clone(), storages[0].clone()];
@ -17725,7 +17779,7 @@ pub mod tests {
slot5, slot5,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(current_ancient.is_none()); assert!(current_ancient.slot_and_append_vec.is_none());
assert!(!should_move); assert!(!should_move);
current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter
@ -17746,40 +17800,53 @@ pub mod tests {
// now, create an ancient slot and make sure that it does NOT think it needs to be moved and that it becomes the ancient append vec to use // now, create an ancient slot and make sure that it does NOT think it needs to be moved and that it becomes the ancient append vec to use
let mut current_ancient = CurrentAncientAppendVec::default(); let mut current_ancient = CurrentAncientAppendVec::default();
let slot1_ancient = 1; let slot1_ancient = 1;
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot1_ancient, 1000, "test");
let ancient1 = db.create_ancient_append_vec(slot1_ancient); let ancient1 = db.create_ancient_append_vec(slot1_ancient);
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![ancient1.clone()], &vec![ancient1.new_storage().clone()],
&mut current_ancient, &mut current_ancient,
slot1_ancient, slot1_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(!should_move); assert!(!should_move);
assert_eq!(current_ancient.append_vec_id(), ancient1.append_vec_id()); assert_eq!(
current_ancient.append_vec_id(),
ancient1.new_storage().append_vec_id()
);
assert_eq!(current_ancient.slot(), slot1_ancient); assert_eq!(current_ancient.slot(), slot1_ancient);
// current is ancient1 // current is ancient1
// try to move ancient2 // try to move ancient2
// current should become ancient2 // current should become ancient2
let slot2_ancient = 2; let slot2_ancient = 2;
let mut current_ancient = CurrentAncientAppendVec::new(slot1_ancient, ancient1.clone()); let mut current_ancient =
CurrentAncientAppendVec::new(slot1_ancient, ancient1.new_storage().clone());
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot2_ancient, 1000, "test");
let ancient2 = db.create_ancient_append_vec(slot2_ancient); let ancient2 = db.create_ancient_append_vec(slot2_ancient);
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![ancient2.clone()], &vec![ancient2.new_storage().clone()],
&mut current_ancient, &mut current_ancient,
slot2_ancient, slot2_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(!should_move); assert!(!should_move);
assert_eq!(current_ancient.append_vec_id(), ancient2.append_vec_id()); assert_eq!(
current_ancient.append_vec_id(),
ancient2.new_storage().append_vec_id()
);
assert_eq!(current_ancient.slot(), slot2_ancient); assert_eq!(current_ancient.slot(), slot2_ancient);
// now try a full ancient append vec // now try a full ancient append vec
// current is None // current is None
let slot3_full_ancient = 3; let slot3_full_ancient = 3;
let mut current_ancient = CurrentAncientAppendVec::default(); let mut current_ancient = CurrentAncientAppendVec::default();
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot3_full_ancient, 1000, "test");
let full_ancient_3 = make_full_ancient_append_vec(&db, slot3_full_ancient); let full_ancient_3 = make_full_ancient_append_vec(&db, slot3_full_ancient);
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![full_ancient_3.clone()], &vec![full_ancient_3.new_storage().clone()],
&mut current_ancient, &mut current_ancient,
slot3_full_ancient, slot3_full_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
@ -17787,14 +17854,15 @@ pub mod tests {
assert!(!should_move); assert!(!should_move);
assert_eq!( assert_eq!(
current_ancient.append_vec_id(), current_ancient.append_vec_id(),
full_ancient_3.append_vec_id() full_ancient_3.new_storage().append_vec_id()
); );
assert_eq!(current_ancient.slot(), slot3_full_ancient); assert_eq!(current_ancient.slot(), slot3_full_ancient);
// now set current_ancient to something // now set current_ancient to something
let mut current_ancient = CurrentAncientAppendVec::new(slot1_ancient, ancient1.clone()); let mut current_ancient =
CurrentAncientAppendVec::new(slot1_ancient, ancient1.new_storage().clone());
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![full_ancient_3.clone()], &vec![full_ancient_3.new_storage().clone()],
&mut current_ancient, &mut current_ancient,
slot3_full_ancient, slot3_full_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
@ -17802,35 +17870,39 @@ pub mod tests {
assert!(!should_move); assert!(!should_move);
assert_eq!( assert_eq!(
current_ancient.append_vec_id(), current_ancient.append_vec_id(),
full_ancient_3.append_vec_id() full_ancient_3.new_storage().append_vec_id()
); );
assert_eq!(current_ancient.slot(), slot3_full_ancient); assert_eq!(current_ancient.slot(), slot3_full_ancient);
// now mark the full ancient as candidate for shrink // now mark the full ancient as candidate for shrink
adjust_alive_bytes(&full_ancient_3, 0); adjust_alive_bytes(full_ancient_3.new_storage(), 0);
// should shrink here, returning none for current // should shrink here, returning none for current
let mut current_ancient = CurrentAncientAppendVec::default(); let mut current_ancient = CurrentAncientAppendVec::default();
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![full_ancient_3.clone()], &vec![full_ancient_3.new_storage().clone()],
&mut current_ancient, &mut current_ancient,
slot3_full_ancient, slot3_full_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(should_move); assert!(should_move);
assert!(current_ancient.is_none()); assert!(current_ancient.slot_and_append_vec.is_none());
// should return true here, returning current from prior // should return true here, returning current from prior
// now set current_ancient to something and see if it still goes to None // now set current_ancient to something and see if it still goes to None
let mut current_ancient = CurrentAncientAppendVec::new(slot1_ancient, ancient1.clone()); let mut current_ancient =
CurrentAncientAppendVec::new(slot1_ancient, ancient1.new_storage().clone());
let should_move = db.should_move_to_ancient_append_vec( let should_move = db.should_move_to_ancient_append_vec(
&vec![full_ancient_3], &vec![Arc::clone(full_ancient_3.new_storage())],
&mut current_ancient, &mut current_ancient,
slot3_full_ancient, slot3_full_ancient,
CAN_RANDOMLY_SHRINK_FALSE, CAN_RANDOMLY_SHRINK_FALSE,
); );
assert!(should_move); assert!(should_move);
assert_eq!(current_ancient.append_vec_id(), ancient1.append_vec_id()); assert_eq!(
current_ancient.append_vec_id(),
ancient1.new_storage().append_vec_id()
);
assert_eq!(current_ancient.slot(), slot1_ancient); assert_eq!(current_ancient.slot(), slot1_ancient);
} }
@ -17846,9 +17918,9 @@ pub mod tests {
adjust_alive_bytes(ancient, ancient.total_bytes() as usize); adjust_alive_bytes(ancient, ancient.total_bytes() as usize);
} }
fn make_full_ancient_append_vec(db: &AccountsDb, slot: Slot) -> Arc<AccountStorageEntry> { fn make_full_ancient_append_vec(db: &AccountsDb, slot: Slot) -> ShrinkInProgress<'_> {
let full = db.create_ancient_append_vec(slot); let full = db.create_ancient_append_vec(slot);
make_ancient_append_vec_full(&full); make_ancient_append_vec_full(full.new_storage());
full full
} }
} }

View File

@ -351,6 +351,7 @@ impl<'a> SnapshotMinimizer<'a> {
let _ = self.accounts_db().purge_keys_exact(purge_pubkeys.iter()); let _ = self.accounts_db().purge_keys_exact(purge_pubkeys.iter());
let aligned_total: u64 = AccountsDb::page_align(total_bytes as u64); let aligned_total: u64 = AccountsDb::page_align(total_bytes as u64);
let mut shrink_in_progress = None;
if aligned_total > 0 { if aligned_total > 0 {
let mut accounts = Vec::with_capacity(keep_accounts.len()); let mut accounts = Vec::with_capacity(keep_accounts.len());
let mut hashes = Vec::with_capacity(keep_accounts.len()); let mut hashes = Vec::with_capacity(keep_accounts.len());
@ -362,7 +363,8 @@ impl<'a> SnapshotMinimizer<'a> {
write_versions.push(alive_account.account.meta.write_version_obsolete); write_versions.push(alive_account.account.meta.write_version_obsolete);
} }
let new_storage = self.accounts_db().get_store_for_shrink(slot, aligned_total); shrink_in_progress = Some(self.accounts_db().get_store_for_shrink(slot, aligned_total));
let new_storage = shrink_in_progress.as_ref().unwrap().new_storage();
self.accounts_db().store_accounts_frozen( self.accounts_db().store_accounts_frozen(
( (
slot, slot,
@ -370,7 +372,7 @@ impl<'a> SnapshotMinimizer<'a> {
crate::accounts_db::INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, crate::accounts_db::INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
), ),
Some(hashes), Some(hashes),
Some(&new_storage), Some(new_storage),
Some(Box::new(write_versions.into_iter())), Some(Box::new(write_versions.into_iter())),
StoreReclaims::Default, StoreReclaims::Default,
); );
@ -378,14 +380,11 @@ impl<'a> SnapshotMinimizer<'a> {
new_storage.flush().unwrap(); new_storage.flush().unwrap();
} }
let append_vec_set: HashSet<_> = storages
.iter()
.map(|storage| storage.append_vec_id())
.collect();
let (_, mut dead_storages_this_time) = self.accounts_db().mark_dirty_dead_stores( let (_, mut dead_storages_this_time) = self.accounts_db().mark_dirty_dead_stores(
slot, slot,
|store| !append_vec_set.contains(&store.append_vec_id()), |_store| true, /* ignored if shrink_in_progress is passed, otherwise retain all */
true, // add_dirty_stores true, // add_dirty_stores
shrink_in_progress,
); );
dead_storages dead_storages
.lock() .lock()