Fix race condition between shrinking and cleaning (#11235)

* Fix race condition between shrinking and cleaning

* Minor formatting

* fix ci

* Update comments

* More update comment

* Adjust fn naming
This commit is contained in:
Ryo Onodera 2020-07-29 06:06:14 +09:00 committed by GitHub
parent b5065a4fde
commit 3e4f49f9c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 102 additions and 22 deletions

View File

@ -43,7 +43,7 @@ use std::{
ops::RangeBounds, ops::RangeBounds,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::Instant, time::Instant,
}; };
use tempfile::TempDir; use tempfile::TempDir;
@ -558,12 +558,14 @@ impl AccountsDB {
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize); inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
} }
fn reset_uncleaned_roots(&self) { fn do_reset_uncleaned_roots(&self, candidates: &mut MutexGuard<Vec<Slot>>) {
let previous_roots = self.accounts_index.write().unwrap().reset_uncleaned_roots(); let previous_roots = self.accounts_index.write().unwrap().reset_uncleaned_roots();
self.shrink_candidate_slots candidates.extend(previous_roots);
.lock() }
.unwrap()
.extend(previous_roots); #[cfg(test)]
fn reset_uncleaned_roots(&self) {
self.do_reset_uncleaned_roots(&mut self.shrink_candidate_slots.lock().unwrap());
} }
fn calc_delete_dependencies( fn calc_delete_dependencies(
@ -640,6 +642,11 @@ impl AccountsDB {
// Only remove those accounts where the entire rooted history of the account // Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors // can be purged because there are no live append vecs in the ancestors
pub fn clean_accounts(&self) { pub fn clean_accounts(&self) {
// hold a lock to prevent slot shrinking from running because it might modify some rooted
// slot storages which can not happen as long as we're cleaning accounts because we're also
// modifying the rooted slot storages!
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
self.report_store_stats(); self.report_store_stats();
let mut accounts_scan = Measure::start("accounts_scan"); let mut accounts_scan = Measure::start("accounts_scan");
@ -680,7 +687,7 @@ impl AccountsDB {
if !purges_in_root.is_empty() { if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root); self.clean_old_rooted_accounts(purges_in_root);
} }
self.reset_uncleaned_roots(); self.do_reset_uncleaned_roots(&mut candidates);
clean_old_rooted.stop(); clean_old_rooted.stop();
let mut store_counts_time = Measure::start("store_counts"); let mut store_counts_time = Measure::start("store_counts");
@ -818,14 +825,22 @@ impl AccountsDB {
); );
} }
fn shrink_stale_slot(&self, slot: Slot) -> usize { fn do_shrink_stale_slot(&self, slot: Slot) -> usize {
self.do_shrink_slot(slot, false) self.do_shrink_slot(slot, false)
} }
fn shrink_slot_forced(&self, slot: Slot) { fn do_shrink_slot_forced(&self, slot: Slot) {
self.do_shrink_slot(slot, true); self.do_shrink_slot(slot, true);
} }
fn shrink_stale_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
if let Some(slot) = self.do_next_shrink_slot(candidates) {
self.do_shrink_stale_slot(slot)
} else {
0
}
}
// Reads all accounts in given slot's AppendVecs and filter only to alive, // Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVec filled with the alive. // then create a minimum AppendVec filled with the alive.
fn do_shrink_slot(&self, slot: Slot, forced: bool) -> usize { fn do_shrink_slot(&self, slot: Slot, forced: bool) -> usize {
@ -947,10 +962,10 @@ impl AccountsDB {
} }
// Infinitely returns rooted roots in cyclic order // Infinitely returns rooted roots in cyclic order
fn next_shrink_slot(&self) -> Option<Slot> { fn do_next_shrink_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> Option<Slot> {
// hold a lock to keep reset_uncleaned_roots() from updating candidates; // At this point, a lock (= candidates) is ensured to be held to keep
// we might update it in this fn if it's empty // do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates.
let mut candidates = self.shrink_candidate_slots.lock().unwrap(); // Also, candidates in the lock may be swapped here if it's empty.
let next = candidates.pop(); let next = candidates.pop();
if next.is_some() { if next.is_some() {
@ -958,13 +973,19 @@ impl AccountsDB {
} else { } else {
let mut new_all_slots = self.all_root_slots_in_index(); let mut new_all_slots = self.all_root_slots_in_index();
let next = new_all_slots.pop(); let next = new_all_slots.pop();
// update candidates under the lock finally! // refresh candidates for later calls!
*candidates = new_all_slots; **candidates = new_all_slots;
next next
} }
} }
#[cfg(test)]
fn next_shrink_slot(&self) -> Option<Slot> {
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
self.do_next_shrink_slot(&mut candidates)
}
fn all_root_slots_in_index(&self) -> Vec<Slot> { fn all_root_slots_in_index(&self) -> Vec<Slot> {
let index = self.accounts_index.read().unwrap(); let index = self.accounts_index.read().unwrap();
index.roots.iter().cloned().collect() index.roots.iter().cloned().collect()
@ -977,11 +998,17 @@ impl AccountsDB {
pub fn process_stale_slot(&self) -> usize { pub fn process_stale_slot(&self) -> usize {
let mut measure = Measure::start("stale_slot_shrink-ms"); let mut measure = Measure::start("stale_slot_shrink-ms");
let count = if let Some(slot) = self.next_shrink_slot() { let candidates = self.shrink_candidate_slots.try_lock();
self.shrink_stale_slot(slot) if candidates.is_err() {
} else { // skip and return immediately if locked by clean_accounts()
0 // the calling background thread will just retry later.
}; return 0;
}
// hold this lock as long as this shrinking process is running to avoid conflicts
// with clean_accounts().
let mut candidates = candidates.unwrap();
let count = self.shrink_stale_slot(&mut candidates);
measure.stop(); measure.stop();
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize); inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
@ -991,13 +1018,13 @@ impl AccountsDB {
#[cfg(test)] #[cfg(test)]
fn shrink_all_stale_slots(&self) { fn shrink_all_stale_slots(&self) {
for slot in self.all_slots_in_storage() { for slot in self.all_slots_in_storage() {
self.shrink_stale_slot(slot); self.do_shrink_stale_slot(slot);
} }
} }
pub fn shrink_all_slots(&self) { pub fn shrink_all_slots(&self) {
for slot in self.all_slots_in_storage() { for slot in self.all_slots_in_storage() {
self.shrink_slot_forced(slot); self.do_shrink_slot_forced(slot);
} }
} }
@ -4154,4 +4181,57 @@ pub mod tests {
assert!(store_counts[&x].0 >= 1); assert!(store_counts[&x].0 >= 1);
} }
} }
#[test]
fn test_shrink_and_clean() {
solana_logger::setup();
// repeat the whole test scenario
for _ in 0..5 {
let accounts = Arc::new(AccountsDB::new_single());
let accounts_for_shrink = accounts.clone();
// spawn the slot shrinking background thread
let exit = Arc::new(AtomicBool::default());
let exit_for_shrink = exit.clone();
let shrink_thread = std::thread::spawn(move || loop {
if exit_for_shrink.load(Ordering::Relaxed) {
break;
}
accounts_for_shrink.process_stale_slot();
});
let mut alive_accounts = vec![];
let owner = Pubkey::default();
// populate the AccountsDB with plenty of food for slot shrinking
// also this simulates realistic some heavy spike account updates in the wild
for current_slot in 0..1000 {
while alive_accounts.len() <= 10 {
alive_accounts.push((
Pubkey::new_rand(),
Account::new(thread_rng().gen_range(0, 50), 0, &owner),
));
}
alive_accounts.retain(|(_pubkey, account)| account.lamports >= 1);
for (pubkey, account) in alive_accounts.iter_mut() {
account.lamports -= 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
}
accounts.add_root(current_slot);
}
// let's dance.
for _ in 0..10 {
accounts.clean_accounts();
std::thread::sleep(std::time::Duration::from_millis(100));
}
// cleanup
exit.store(true, Ordering::Relaxed);
shrink_thread.join().unwrap();
}
}
} }