remove process_stale_slot_v1 and other fns (#29244)

This commit is contained in:
Jeff Washington (jwash) 2022-12-13 18:32:24 -06:00 committed by GitHub
parent 4ae12bf5fa
commit 62be54a75e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 281 deletions

View File

@ -91,7 +91,7 @@ use {
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
Arc, Condvar, Mutex, MutexGuard, RwLock,
Arc, Condvar, Mutex, RwLock,
},
thread::{sleep, Builder},
time::{Duration, Instant},
@ -4128,10 +4128,6 @@ impl AccountsDb {
self.storage.all_slots()
}
fn all_alive_roots_in_index(&self) -> Vec<Slot> {
self.accounts_index.all_alive_roots()
}
/// Given the input `ShrinkCandidates`, this function sorts the stores by their alive ratio
/// in increasing order with the most sparse entries in the front. It will then simulate the
/// shrinking by working on the most sparse entries first and if the overall alive ratio is
@ -9517,140 +9513,6 @@ impl AccountsDb {
}
}
/// Legacy shrink functions to support non-cached path.
/// Should be able to be deleted after cache path is the only path.
impl AccountsDb {
// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVec filled with the alive.
// v1 path shrinks all stores in the slot
//
// Requires all stores in the slot to be re-written otherwise the accounts_index
// store ref count could become incorrect.
fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize {
trace!("shrink_stale_slot: slot: {}", slot);
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
let stores: Vec<_> = stores_lock.read().unwrap().values().cloned().collect();
let mut alive_count = 0;
let mut stored_count = 0;
let mut written_bytes = 0;
let mut total_bytes = 0;
for store in &stores {
alive_count += store.count();
stored_count += store.approx_stored_count();
written_bytes += store.written_bytes();
total_bytes += store.total_bytes();
}
if alive_count == stored_count && stores.len() == 1 {
trace!(
"shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}",
slot,
alive_count,
stored_count,
if forced { " (forced)" } else { "" },
);
return 0;
} else if !forced {
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let not_sparse = !sparse_by_count && !sparse_by_bytes;
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
if not_sparse || too_small_to_shrink {
return 0;
}
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
}
self.do_shrink_slot_stores(slot, stores.iter())
} else {
0
}
}
fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize {
self.do_shrink_slot_v1(slot, false)
}
fn shrink_stale_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
let mut shrunken_account_total = 0;
let mut shrunk_slot_count = 0;
let start = Instant::now();
let num_roots = self.accounts_index.num_alive_roots();
loop {
if let Some(slot) = self.do_next_shrink_slot_v1(candidates) {
shrunken_account_total += self.do_shrink_stale_slot_v1(slot);
} else {
return 0;
}
if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 {
debug!(
"do_shrink_stale_slot_v1: {} {} {}us",
shrunk_slot_count,
candidates.len(),
start.elapsed().as_micros()
);
break;
}
shrunk_slot_count += 1;
}
shrunken_account_total
}
// Infinitely returns rooted roots in cyclic order
fn do_next_shrink_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> Option<Slot> {
// At this point, a lock (= candidates) is ensured to be held to keep
// do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates.
// Also, candidates in the lock may be swapped here if it's empty.
let next = candidates.pop();
if next.is_some() {
next
} else {
let mut new_all_slots = self.all_alive_roots_in_index();
let next = new_all_slots.pop();
// refresh candidates for later calls!
**candidates = new_all_slots;
next
}
}
#[cfg(test)]
fn next_shrink_slot_v1(&self) -> Option<Slot> {
let mut candidates = self.shrink_candidate_slots_v1.lock().unwrap();
self.do_next_shrink_slot_v1(&mut candidates)
}
pub fn process_stale_slot_v1(&self) -> usize {
let mut measure = Measure::start("stale_slot_shrink-ms");
let candidates = self.shrink_candidate_slots_v1.try_lock();
if candidates.is_err() {
// skip and return immediately if locked by clean_accounts()
// the calling background thread will just retry later.
return 0;
}
// hold this lock as long as this shrinking process is running to avoid conflicts
// with clean_accounts().
let mut candidates = candidates.unwrap();
let count = self.shrink_stale_slot_v1(&mut candidates);
measure.stop();
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
count
}
#[cfg(test)]
fn shrink_all_stale_slots_v1(&self) {
for slot in self.all_slots_in_storage() {
self.do_shrink_stale_slot_v1(slot);
}
}
}
#[cfg(test)]
pub mod tests {
use {
@ -13458,82 +13320,6 @@ pub mod tests {
}
}
#[test]
#[ignore]
fn test_shrink_next_slots() {
let accounts = AccountsDb::new_single_for_tests();
// accounts.caching_enabled = false;
let mut current_slot = 7;
assert_eq!(
vec![None, None, None],
(0..3)
.map(|_| accounts.next_shrink_slot_v1())
.collect::<Vec<_>>()
);
accounts.get_accounts_delta_hash(current_slot);
accounts.add_root(current_slot);
assert_eq!(
vec![Some(7), Some(7), Some(7)],
(0..3)
.map(|_| accounts.next_shrink_slot_v1())
.collect::<Vec<_>>()
);
current_slot += 1;
accounts.get_accounts_delta_hash(current_slot);
accounts.add_root(current_slot);
let slots = (0..6)
.map(|_| accounts.next_shrink_slot_v1())
.collect::<Vec<_>>();
// Because the origin of this data is HashMap (not BTreeMap), key order is arbitrary per cycle.
assert!(
vec![Some(7), Some(8), Some(7), Some(8), Some(7), Some(8)] == slots
|| vec![Some(8), Some(7), Some(8), Some(7), Some(8), Some(7)] == slots
);
}
#[test]
#[ignore] // this test only works with write cache off
fn test_shrink_reset_uncleaned_roots() {
let accounts = AccountsDb::new_single_for_tests();
// accounts.caching_enabled = false;
// accounts.reset_uncleaned_roots_v1();
assert_eq!(
*accounts.shrink_candidate_slots_v1.lock().unwrap(),
vec![] as Vec<Slot>
);
accounts.get_accounts_delta_hash(0);
accounts.add_root(0);
accounts.get_accounts_delta_hash(1);
accounts.add_root(1);
accounts.get_accounts_delta_hash(2);
accounts.add_root(2);
// accounts.reset_uncleaned_roots_v1();
let actual_slots = accounts.shrink_candidate_slots_v1.lock().unwrap().clone();
assert_eq!(actual_slots, vec![] as Vec<Slot>);
// accounts.reset_uncleaned_roots_v1();
let mut actual_slots = accounts.shrink_candidate_slots_v1.lock().unwrap().clone();
actual_slots.sort_unstable();
assert_eq!(actual_slots, vec![0, 1, 2]);
accounts.accounts_index.clear_roots();
let mut actual_slots = (0..5)
.map(|_| accounts.next_shrink_slot_v1())
.collect::<Vec<_>>();
actual_slots.sort();
assert_eq!(actual_slots, vec![None, None, Some(0), Some(1), Some(2)],);
}
#[test]
fn test_shrink_stale_slots_processed() {
solana_logger::setup();
@ -13905,70 +13691,6 @@ pub mod tests {
assert_eq!(0, next_candidates.len());
}
#[test]
#[ignore]
fn test_shrink_stale_slots_skipped() {
solana_logger::setup();
let accounts = AccountsDb::new_single_for_tests();
// this test tests v1 code, which can not use the write cache.
// accounts.caching_enabled = false;
let pubkey_count = 30000;
let pubkeys: Vec<_> = (0..pubkey_count)
.map(|_| solana_sdk::pubkey::new_rand())
.collect();
let some_lamport = 223;
let no_data = 0;
let owner = *AccountSharedData::default().owner();
let account = AccountSharedData::new(some_lamport, no_data, &owner);
let mut current_slot = 0;
current_slot += 1;
for pubkey in &pubkeys {
accounts.store_for_tests(current_slot, &[(pubkey, &account)]);
}
let shrink_slot = current_slot;
accounts.get_accounts_delta_hash(current_slot);
accounts.add_root(current_slot);
current_slot += 1;
let pubkey_count_after_shrink = 25000;
let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink];
for pubkey in updated_pubkeys {
accounts.store_for_tests(current_slot, &[(pubkey, &account)]);
}
accounts.get_accounts_delta_hash(current_slot);
accounts.add_root(current_slot);
accounts.clean_accounts_for_tests();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
// Only, try to shrink stale slots.
// This is the point of this v1 test, which does not use the write cache.
// The ultimate function it calls, do_shrink_slot_v1, says to delete it as soon as everyone uses the write cache.
accounts.shrink_all_stale_slots_v1();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
// Now, do full-shrink.
accounts.shrink_all_slots(false, None);
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
);
}
const UPSERT_POPULATE_RECLAIMS: UpsertReclaim = UpsertReclaim::PopulateReclaims;
// returns the rooted entries and the storage ref count

View File

@ -38,7 +38,7 @@ fn test_shrink_and_clean() {
if exit_for_shrink.load(Ordering::Relaxed) {
break;
}
accounts_for_shrink.process_stale_slot_v1();
accounts_for_shrink.shrink_all_slots(false, None);
});
let mut alive_accounts = vec![];
@ -58,9 +58,18 @@ fn test_shrink_and_clean() {
for (pubkey, account) in alive_accounts.iter_mut() {
account.checked_sub_lamports(1).unwrap();
accounts.store_uncached(current_slot, &[(pubkey, account)]);
accounts.store_cached(
(
current_slot,
&[(&*pubkey, &*account)][..],
INCLUDE_SLOT_IN_HASH_TESTS,
),
None,
);
}
accounts.add_root(current_slot);
accounts.flush_accounts_cache(true, Some(current_slot));
}
// let's dance.