From 3e4f49f9c976a7a9c3784fc74924f422bf4af6d1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 29 Jul 2020 06:06:14 +0900 Subject: [PATCH] Fix race condition between shrinking and cleaning (#11235) * Fix race condition between shrinking and cleaning * Minor formatting * fix ci * Update comments * More update comment * Adjust fn naming --- runtime/src/accounts_db.rs | 124 ++++++++++++++++++++++++++++++------- 1 file changed, 102 insertions(+), 22 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 42c9ab8c37..0c008bebb3 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -43,7 +43,7 @@ use std::{ ops::RangeBounds, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::Instant, }; use tempfile::TempDir; @@ -558,12 +558,14 @@ impl AccountsDB { inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize); } - fn reset_uncleaned_roots(&self) { + fn do_reset_uncleaned_roots(&self, candidates: &mut MutexGuard>) { let previous_roots = self.accounts_index.write().unwrap().reset_uncleaned_roots(); - self.shrink_candidate_slots - .lock() - .unwrap() - .extend(previous_roots); + candidates.extend(previous_roots); + } + + #[cfg(test)] + fn reset_uncleaned_roots(&self) { + self.do_reset_uncleaned_roots(&mut self.shrink_candidate_slots.lock().unwrap()); } fn calc_delete_dependencies( @@ -640,6 +642,11 @@ impl AccountsDB { // Only remove those accounts where the entire rooted history of the account // can be purged because there are no live append vecs in the ancestors pub fn clean_accounts(&self) { + // hold a lock to prevent slot shrinking from running because it might modify some rooted + // slot storages which can not happen as long as we're cleaning accounts because we're also + // modifying the rooted slot storages! + let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + self.report_store_stats(); let mut accounts_scan = Measure::start("accounts_scan"); @@ -680,7 +687,7 @@ impl AccountsDB { if !purges_in_root.is_empty() { self.clean_old_rooted_accounts(purges_in_root); } - self.reset_uncleaned_roots(); + self.do_reset_uncleaned_roots(&mut candidates); clean_old_rooted.stop(); let mut store_counts_time = Measure::start("store_counts"); @@ -818,14 +825,22 @@ impl AccountsDB { ); } - fn shrink_stale_slot(&self, slot: Slot) -> usize { + fn do_shrink_stale_slot(&self, slot: Slot) -> usize { self.do_shrink_slot(slot, false) } - fn shrink_slot_forced(&self, slot: Slot) { + fn do_shrink_slot_forced(&self, slot: Slot) { self.do_shrink_slot(slot, true); } + fn shrink_stale_slot(&self, candidates: &mut MutexGuard>) -> usize { + if let Some(slot) = self.do_next_shrink_slot(candidates) { + self.do_shrink_stale_slot(slot) + } else { + 0 + } + } + // Reads all accounts in given slot's AppendVecs and filter only to alive, // then create a minimum AppendVec filled with the alive. fn do_shrink_slot(&self, slot: Slot, forced: bool) -> usize { @@ -947,10 +962,10 @@ impl AccountsDB { } // Infinitely returns rooted roots in cyclic order - fn next_shrink_slot(&self) -> Option { - // hold a lock to keep reset_uncleaned_roots() from updating candidates; - // we might update it in this fn if it's empty - let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + fn do_next_shrink_slot(&self, candidates: &mut MutexGuard>) -> Option { + // At this point, a lock (= candidates) is ensured to be held to keep + // do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates. + // Also, candidates in the lock may be swapped here if it's empty. let next = candidates.pop(); if next.is_some() { @@ -958,13 +973,19 @@ impl AccountsDB { } else { let mut new_all_slots = self.all_root_slots_in_index(); let next = new_all_slots.pop(); - // update candidates under the lock finally! - *candidates = new_all_slots; + // refresh candidates for later calls! + **candidates = new_all_slots; next } } + #[cfg(test)] + fn next_shrink_slot(&self) -> Option { + let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + self.do_next_shrink_slot(&mut candidates) + } + fn all_root_slots_in_index(&self) -> Vec { let index = self.accounts_index.read().unwrap(); index.roots.iter().cloned().collect() @@ -977,11 +998,17 @@ impl AccountsDB { pub fn process_stale_slot(&self) -> usize { let mut measure = Measure::start("stale_slot_shrink-ms"); - let count = if let Some(slot) = self.next_shrink_slot() { - self.shrink_stale_slot(slot) - } else { - 0 - }; + let candidates = self.shrink_candidate_slots.try_lock(); + if candidates.is_err() { + // skip and return immediately if locked by clean_accounts() + // the calling background thread will just retry later. + return 0; + } + // hold this lock as long as this shrinking process is running to avoid conflicts + // with clean_accounts(). + let mut candidates = candidates.unwrap(); + + let count = self.shrink_stale_slot(&mut candidates); measure.stop(); inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize); @@ -991,13 +1018,13 @@ impl AccountsDB { #[cfg(test)] fn shrink_all_stale_slots(&self) { for slot in self.all_slots_in_storage() { - self.shrink_stale_slot(slot); + self.do_shrink_stale_slot(slot); } } pub fn shrink_all_slots(&self) { for slot in self.all_slots_in_storage() { - self.shrink_slot_forced(slot); + self.do_shrink_slot_forced(slot); } } @@ -4154,4 +4181,57 @@ pub mod tests { assert!(store_counts[&x].0 >= 1); } } + + #[test] + fn test_shrink_and_clean() { + solana_logger::setup(); + + // repeat the whole test scenario + for _ in 0..5 { + let accounts = Arc::new(AccountsDB::new_single()); + let accounts_for_shrink = accounts.clone(); + + // spawn the slot shrinking background thread + let exit = Arc::new(AtomicBool::default()); + let exit_for_shrink = exit.clone(); + let shrink_thread = std::thread::spawn(move || loop { + if exit_for_shrink.load(Ordering::Relaxed) { + break; + } + accounts_for_shrink.process_stale_slot(); + }); + + let mut alive_accounts = vec![]; + let owner = Pubkey::default(); + + // populate the AccountsDB with plenty of food for slot shrinking + // also this simulates realistic some heavy spike account updates in the wild + for current_slot in 0..1000 { + while alive_accounts.len() <= 10 { + alive_accounts.push(( + Pubkey::new_rand(), + Account::new(thread_rng().gen_range(0, 50), 0, &owner), + )); + } + + alive_accounts.retain(|(_pubkey, account)| account.lamports >= 1); + + for (pubkey, account) in alive_accounts.iter_mut() { + account.lamports -= 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + } + accounts.add_root(current_slot); + } + + // let's dance. + for _ in 0..10 { + accounts.clean_accounts(); + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + // cleanup + exit.store(true, Ordering::Relaxed); + shrink_thread.join().unwrap(); + } + } }