Optimize stale slot shrinking for previously cleaned roots (#10099)

* Prioritize shrinking of previously cleaned roots

* measure time of stale slot shrinking

* Disable shrink for test

* shrink: budgeting, store count, force for snapshot

* Polish implementation and fix tests

* Fix ci..

* Clean up a bit

* Further polish implementation and fix/add tests

* Rebase fixes

* Remove unneeded Default for AccountStorageEntry

* Address review comments

* More cleanup

* More cleanup
This commit is contained in:
Ryo Onodera 2020-06-12 14:51:43 +09:00 committed by GitHub
parent 4f761395d2
commit dfe72d5242
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 51 deletions

View File

@ -15,11 +15,15 @@ pub struct AccountsBackgroundService {
}
const INTERVAL_MS: u64 = 100;
const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
impl AccountsBackgroundService {
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
info!("AccountsBackgroundService active");
let exit = exit.clone();
let mut consumed_budget = 0;
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
@ -30,8 +34,8 @@ impl AccountsBackgroundService {
bank.process_dead_slots();
// Currently, given INTERVAL_MS, we process 1 slot/100 ms
bank.process_stale_slot();
consumed_budget = bank
.process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL);
sleep(Duration::from_millis(INTERVAL_MS));
})

View File

@ -168,17 +168,14 @@ pub struct AccountStorageEntry {
/// status corresponding to the storage, lets us know that
/// the append_vec, once maxed out, then emptied, can be reclaimed
count_and_status: RwLock<(usize, AccountStorageStatus)>,
}
impl Default for AccountStorageEntry {
fn default() -> Self {
Self {
id: 0,
slot: 0,
accounts: AppendVec::new_empty_map(0),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
}
}
/// This is the total number of accounts stored ever since initialized to keep
/// track of lifetime count of all store operations. And this differs from
/// count_and_status in that this field won't be decremented.
///
/// This is used as a rough estimate for slot shrinking. As such a relaxed
/// use case, this value ARE NOT strictly synchronized with count_and_status!
approx_store_count: AtomicUsize,
}
impl AccountStorageEntry {
@ -192,6 +189,7 @@ impl AccountStorageEntry {
slot,
accounts,
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
}
}
@ -201,6 +199,7 @@ impl AccountStorageEntry {
slot: 0,
accounts: AppendVec::new_empty_map(accounts_current_len),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
}
}
@ -233,6 +232,10 @@ impl AccountStorageEntry {
self.count_and_status.read().unwrap().0
}
pub fn approx_stored_count(&self) -> usize {
self.approx_store_count.load(Ordering::Relaxed)
}
pub fn has_accounts(&self) -> bool {
self.count() > 0
}
@ -252,6 +255,7 @@ impl AccountStorageEntry {
fn add_account(&self) {
let mut count_and_status = self.count_and_status.write().unwrap();
*count_and_status = (count_and_status.0 + 1, count_and_status.1);
self.approx_store_count.fetch_add(1, Ordering::Relaxed);
}
fn try_available(&self) -> bool {
@ -537,9 +541,12 @@ impl AccountsDB {
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
}
fn clear_uncleaned_roots(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.uncleaned_roots.clear();
fn reset_uncleaned_roots(&self) {
let previous_roots = self.accounts_index.write().unwrap().reset_uncleaned_roots();
self.shrink_candidate_slots
.lock()
.unwrap()
.extend(previous_roots);
}
fn inc_store_counts(
@ -655,7 +662,7 @@ impl AccountsDB {
if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root);
}
self.clear_uncleaned_roots();
self.reset_uncleaned_roots();
clean_old_rooted.stop();
let mut store_counts_time = Measure::start("store_counts");
@ -763,9 +770,17 @@ impl AccountsDB {
);
}
fn shrink_stale_slot(&self, slot: Slot) -> usize {
self.do_shrink_slot(slot, false)
}
fn shrink_slot_forced(&self, slot: Slot) {
self.do_shrink_slot(slot, true);
}
// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVed filled with the alive.
fn shrink_stale_slot(&self, slot: Slot) {
// then create a minimum AppendVec filled with the alive.
fn do_shrink_slot(&self, slot: Slot, forced: bool) -> usize {
trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts = vec![];
@ -773,8 +788,20 @@ impl AccountsDB {
let storage = self.storage.read().unwrap();
if let Some(stores) = storage.0.get(&slot) {
let mut alive_count = 0;
let mut stored_count = 0;
for store in stores.values() {
alive_count += store.count();
stored_count += store.approx_stored_count();
}
if (alive_count as f32 / stored_count as f32) >= 0.80 && !forced {
trace!(
"shrink_stale_slot: not enough space to shrink: {} / {}",
alive_count,
stored_count,
);
return 0;
}
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
@ -788,14 +815,6 @@ impl AccountsDB {
start = next;
}
}
if (alive_count as f32 / stored_accounts.len() as f32) >= 0.80 {
trace!(
"shrink_stale_slot: not enough space to shrink: {} / {}",
alive_count,
stored_accounts.len()
);
return;
}
}
}
@ -847,7 +866,8 @@ impl AccountsDB {
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (pubkey, account, account_hash, _size, _location, write_version) in alive_accounts {
for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts
{
accounts.push((pubkey, account));
hashes.push(*account_hash);
write_versions.push(*write_version);
@ -874,22 +894,23 @@ impl AccountsDB {
slot_storage.retain(|_key, store| store.count() > 0);
}
}
alive_accounts.len()
}
// Infinitely returns rooted roots in cyclic order
fn next_shrink_slot(&self) -> Option<Slot> {
let next = {
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
candidates.pop()
};
// hold a lock to keep reset_uncleaned_roots() from updating candidates;
// we might update it in this fn if it's empty
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
let next = candidates.pop();
if next.is_some() {
next
} else {
let mut new_all_slots = self.all_root_slots_in_index();
let next = new_all_slots.pop();
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
// update candidates under the lock finally!
*candidates = new_all_slots;
next
@ -906,15 +927,29 @@ impl AccountsDB {
storage.0.keys().cloned().collect()
}
pub fn process_stale_slot(&self) {
if let Some(slot) = self.next_shrink_slot() {
pub fn process_stale_slot(&self) -> usize {
let mut measure = Measure::start("stale_slot_shrink-ms");
let count = if let Some(slot) = self.next_shrink_slot() {
self.shrink_stale_slot(slot)
} else {
0
};
measure.stop();
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
count
}
#[cfg(test)]
fn shrink_all_stale_slots(&self) {
for slot in self.all_slots_in_storage() {
self.shrink_stale_slot(slot);
}
}
pub fn shrink_all_stale_slots(&self) {
pub fn shrink_all_slots(&self) {
for slot in self.all_slots_in_storage() {
self.shrink_stale_slot(slot);
self.shrink_slot_forced(slot);
}
}
@ -1896,6 +1931,9 @@ impl AccountsDB {
trace!("id: {} clearing count", id);
store.count_and_status.write().unwrap().0 = 0;
}
store
.approx_store_count
.store(store.accounts.accounts(0).len(), Ordering::Relaxed);
}
}
}
@ -2073,7 +2111,11 @@ pub mod tests {
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 2);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
}
// adding root doesn't change anything
db.add_root(1);
{
let stores = db.storage.read().unwrap();
@ -2083,6 +2125,23 @@ pub mod tests {
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 2);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
}
// overwrite old rooted account version; only the slot_0_stores.count() should be
// decremented
db.store(2, &[(&pubkeys[0], &account)]);
{
let stores = db.storage.read().unwrap();
let slot_0_stores = &stores.0.get(&0).unwrap();
let slot_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(slot_0_stores.len(), 1);
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 1);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
}
}
@ -2236,6 +2295,14 @@ pub mod tests {
total_count += store.count();
}
assert_eq!(total_count, count);
let (expected_store_count, actual_store_count): (usize, usize) = (
slot_storage.values().map(|s| s.approx_stored_count()).sum(),
slot_storage
.values()
.map(|s| s.accounts.accounts(0).len())
.sum(),
);
assert_eq!(expected_store_count, actual_store_count);
total_count == count
}
@ -2479,10 +2546,16 @@ pub mod tests {
let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage
let count = slot_storage
.values()
.map(|store| store.accounts.accounts(0).len())
.sum()
.sum();
let stored_count: usize = slot_storage
.values()
.map(|store| store.approx_stored_count())
.sum();
assert_eq!(stored_count, count);
count
} else {
0
}
@ -3739,14 +3812,14 @@ pub mod tests {
}
#[test]
fn test_shrink_stale_slots_none() {
fn test_shrink_all_slots_none() {
let accounts = AccountsDB::new_single();
for _ in 0..10 {
accounts.process_stale_slot();
assert_eq!(0, accounts.process_stale_slot());
}
accounts.shrink_all_stale_slots();
accounts.shrink_all_slots();
}
#[test]
@ -3785,6 +3858,37 @@ pub mod tests {
);
}
#[test]
fn test_shrink_reset_uncleaned_roots() {
let accounts = AccountsDB::new_single();
accounts.reset_uncleaned_roots();
assert_eq!(
*accounts.shrink_candidate_slots.lock().unwrap(),
vec![] as Vec<Slot>
);
accounts.add_root(0);
accounts.add_root(1);
accounts.add_root(2);
accounts.reset_uncleaned_roots();
let actual_slots = accounts.shrink_candidate_slots.lock().unwrap().clone();
assert_eq!(actual_slots, vec![] as Vec<Slot>);
accounts.reset_uncleaned_roots();
let mut actual_slots = accounts.shrink_candidate_slots.lock().unwrap().clone();
actual_slots.sort();
assert_eq!(actual_slots, vec![0, 1, 2]);
accounts.accounts_index.write().unwrap().roots.clear();
let mut actual_slots = (0..5)
.map(|_| accounts.next_shrink_slot())
.collect::<Vec<_>>();
actual_slots.sort();
assert_eq!(actual_slots, vec![None, None, Some(0), Some(1), Some(2)],);
}
#[test]
fn test_shrink_stale_slots_processed() {
solana_logger::setup();
@ -3824,7 +3928,7 @@ pub mod tests {
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
accounts.shrink_all_stale_slots();
accounts.shrink_all_slots();
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
@ -3842,7 +3946,7 @@ pub mod tests {
.unwrap();
// repeating should be no-op
accounts.shrink_all_stale_slots();
accounts.shrink_all_slots();
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
@ -3888,11 +3992,20 @@ pub mod tests {
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
// Only, try to shrink stale slots.
accounts.shrink_all_stale_slots();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
// Now, do full-shrink.
accounts.shrink_all_slots();
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
);
}
#[test]

View File

@ -19,6 +19,7 @@ pub struct AccountsIndex<T> {
pub roots: HashSet<Slot>,
pub uncleaned_roots: HashSet<Slot>,
pub previous_uncleaned_roots: HashSet<Slot>,
}
impl<'a, T: 'a + Clone> AccountsIndex<T> {
@ -235,6 +236,13 @@ impl<'a, T: 'a + Clone> AccountsIndex<T> {
pub fn clean_dead_slot(&mut self, slot: Slot) {
self.roots.remove(&slot);
self.uncleaned_roots.remove(&slot);
self.previous_uncleaned_roots.remove(&slot);
}
pub fn reset_uncleaned_roots(&mut self) -> HashSet<Slot> {
let empty = HashSet::new();
let new_previous = std::mem::replace(&mut self.uncleaned_roots, empty);
std::mem::replace(&mut self.previous_uncleaned_roots, new_previous)
}
}
@ -359,10 +367,31 @@ mod tests {
fn test_clean_and_unclean_slot() {
let mut index = AccountsIndex::<bool>::default();
assert_eq!(0, index.uncleaned_roots.len());
index.add_root(0);
index.add_root(1);
assert_eq!(1, index.uncleaned_roots.len());
index.clean_dead_slot(1);
assert_eq!(2, index.uncleaned_roots.len());
assert_eq!(0, index.previous_uncleaned_roots.len());
index.reset_uncleaned_roots();
assert_eq!(2, index.roots.len());
assert_eq!(0, index.uncleaned_roots.len());
assert_eq!(2, index.previous_uncleaned_roots.len());
index.add_root(2);
index.add_root(3);
assert_eq!(4, index.roots.len());
assert_eq!(2, index.uncleaned_roots.len());
assert_eq!(2, index.previous_uncleaned_roots.len());
index.clean_dead_slot(1);
assert_eq!(3, index.roots.len());
assert_eq!(2, index.uncleaned_roots.len());
assert_eq!(1, index.previous_uncleaned_roots.len());
index.clean_dead_slot(2);
assert_eq!(2, index.roots.len());
assert_eq!(1, index.uncleaned_roots.len());
assert_eq!(1, index.previous_uncleaned_roots.len());
}
#[test]

View File

@ -2308,7 +2308,7 @@ impl Bank {
/// calculation and could shield other real accounts.
pub fn verify_snapshot_bank(&self) -> bool {
self.clean_accounts();
self.shrink_all_stale_slots();
self.shrink_all_slots();
// Order and short-circuiting is significant; verify_hash requires a valid bank hash
self.verify_bank_hash() && self.verify_hash()
}
@ -2572,12 +2572,26 @@ impl Bank {
self.rc.accounts.accounts_db.process_dead_slots();
}
pub fn process_stale_slot(&self) {
self.rc.accounts.accounts_db.process_stale_slot();
pub fn shrink_all_slots(&self) {
self.rc.accounts.accounts_db.shrink_all_slots();
}
pub fn shrink_all_stale_slots(&self) {
self.rc.accounts.accounts_db.shrink_all_stale_slots();
pub fn process_stale_slot_with_budget(
&self,
mut consumed_budget: usize,
budget_recovery_delta: usize,
) -> usize {
if consumed_budget == 0 {
let shrunken_account_count = self.rc.accounts.accounts_db.process_stale_slot();
if shrunken_account_count > 0 {
datapoint_info!(
"stale_slot_shrink",
("accounts", shrunken_account_count, i64)
);
consumed_budget += shrunken_account_count;
}
}
consumed_budget.saturating_sub(budget_recovery_delta)
}
}
@ -7078,4 +7092,45 @@ mod tests {
assert_eq!(1, bank.get_balance(&program1_pubkey));
assert_eq!(42, bank.get_balance(&program2_pubkey));
}
#[test]
fn test_process_stale_slot_with_budget() {
solana_logger::setup();
let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000_000);
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let bank = Arc::new(Bank::new(&genesis_config));
bank.lazy_rent_collection.store(true, Ordering::Relaxed);
assert_eq!(bank.process_stale_slot_with_budget(0, 0), 0);
assert_eq!(bank.process_stale_slot_with_budget(133, 0), 133);
assert_eq!(bank.process_stale_slot_with_budget(0, 100), 0);
assert_eq!(bank.process_stale_slot_with_budget(33, 100), 0);
assert_eq!(bank.process_stale_slot_with_budget(133, 100), 33);
bank.squash();
let some_lamports = 123;
let bank = Arc::new(new_from_parent(&bank));
bank.deposit(&pubkey1, some_lamports);
bank.deposit(&pubkey2, some_lamports);
let bank = Arc::new(new_from_parent(&bank));
bank.deposit(&pubkey1, some_lamports);
bank.squash();
bank.clean_accounts();
let force_to_return_alive_account = 0;
assert_eq!(
bank.process_stale_slot_with_budget(22, force_to_return_alive_account),
22
);
let mut consumed_budgets = (0..3)
.map(|_| bank.process_stale_slot_with_budget(0, force_to_return_alive_account))
.collect::<Vec<_>>();
consumed_budgets.sort();
assert_eq!(consumed_budgets, vec![0, 1, 8]);
}
}