Support out of band dumping of unrooted slots in AccountsDb (#17269)

* Accounts dumping logic

* Add test for interaction between cache flush and remove_unrooted_slot()

* Update comments

* Rename

* renaming

* Add more comments

* Renaming

* Fixup test and bad check
This commit is contained in:
carllin 2021-06-02 02:51:10 -07:00 committed by GitHub
parent 80e5b24b38
commit bbcdf073ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 344 additions and 104 deletions

View File

@ -576,7 +576,7 @@ impl RepairService {
root_bank.clear_slot_signatures(*slot);
// Clear the accounts for this slot
root_bank.remove_unrooted_slot(*slot);
root_bank.remove_unrooted_slots(&[*slot]);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted

View File

@ -62,7 +62,7 @@ use std::{
ops::{Range, RangeBounds},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, MutexGuard, RwLock},
sync::{Arc, Condvar, Mutex, MutexGuard, RwLock},
thread::Builder,
time::Instant,
};
@ -754,6 +754,16 @@ impl RecycleStores {
}
}
/// Removing unrooted slots in Accounts Background Service needs to be synchronized with flushing
/// slots from the Accounts Cache. This keeps track of those slots and the Mutex + Condvar for
/// synchronization.
#[derive(Debug, Default)]
struct RemoveUnrootedSlotsSynchronization {
// slots being flushed from the cache or being purged
slots_under_contention: Mutex<HashSet<Slot>>,
signal: Condvar,
}
// This structure handles the load/store of the accounts
#[derive(Debug)]
pub struct AccountsDb {
@ -832,6 +842,11 @@ pub struct AccountsDb {
load_limit: AtomicU64,
is_bank_drop_callback_enabled: AtomicBool,
/// Set of slots currently being flushed by `flush_slot_cache()` or removed
/// by `remove_unrooted_slot()`. Used to ensure `remove_unrooted_slots(slots)`
/// can safely clear the set of unrooted slots `slots`.
remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization,
}
#[derive(Debug, Default)]
@ -974,7 +989,6 @@ struct FlushStats {
num_flushed: usize,
num_purged: usize,
total_size: u64,
did_flush: bool,
}
#[derive(Debug, Default)]
@ -1275,6 +1289,7 @@ impl Default for AccountsDb {
#[cfg(test)]
load_limit: AtomicU64::default(),
is_bank_drop_callback_enabled: AtomicBool::default(),
remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(),
}
}
}
@ -3411,30 +3426,94 @@ impl AccountsDb {
.report("external_purge_slots_stats", Some(1000));
}
// TODO: This is currently:
// 1. Unsafe with scan because it can remove a slot in the middle
// of a scan.
// 2. Doesn't handle cache flushes that happen during the slot deletion (see comment below).
pub fn remove_unrooted_slot(&self, remove_slot: Slot) {
if self.accounts_index.is_root(remove_slot) {
panic!("Trying to remove accounts for rooted slot {}", remove_slot);
// TODO: This is currently unsafe with scan because it can remove a slot in the middle
/// Remove the set of slots in `remove_slots` from both the cache and storage. This requires
/// we know the contents of the slot are either:
///
/// 1) Completely in the cache
/// 2) Have been completely flushed from the cache
///
/// in order to guarantee that when this function returns, the contents of the slot have
/// been completely and not only partially removed. Thus synchronization with `flush_slot_cache()`
/// through `self.remove_unrooted_slots_synchronization` is necessary.
pub fn remove_unrooted_slots(&self, remove_slots: &[Slot]) {
let rooted_slots = self
.accounts_index
.get_rooted_from_list(remove_slots.iter());
assert!(
rooted_slots.is_empty(),
"Trying to remove accounts for rooted slots {:?}",
rooted_slots
);
let RemoveUnrootedSlotsSynchronization {
slots_under_contention,
signal,
} = &self.remove_unrooted_slots_synchronization;
{
// Slots that are currently being flushed by flush_slot_cache()
let mut currently_contended_slots = slots_under_contention.lock().unwrap();
// Slots that are currently being flushed by flush_slot_cache() AND
// we want to remove in this function
let mut remaining_contended_flush_slots: Vec<Slot> = remove_slots
.iter()
.filter(|remove_slot| {
let is_being_flushed = currently_contended_slots.contains(remove_slot);
if !is_being_flushed {
// Reserve the slots that we want to purge that aren't currently
// being flushed to prevent cache from flushing those slots in
// the future.
//
// Note that the single replay thread has to remove a specific slot `N`
// before another version of the same slot can be replayed. This means
// multiple threads should not call `remove_unrooted_slots()` simultaneously
// with the same slot.
currently_contended_slots.insert(**remove_slot);
}
// If the cache is currently flushing this slot, add it to the list
is_being_flushed
})
.cloned()
.collect();
// Wait for cache flushes to finish
loop {
if !remaining_contended_flush_slots.is_empty() {
// Wait for the signal that the cache has finished flushing a slot
//
// Don't wait if the remaining_contended_flush_slots is empty, otherwise
// we may never get a signal since there's no cache flush thread to
// do the signaling
currently_contended_slots = signal.wait(currently_contended_slots).unwrap();
} else {
// There are no slots being flushed to wait on, so it's safe to continue
// to purging the slots we want to purge!
break;
}
// For each slot the cache flush has finished, mark that we're about to start
// purging these slots by reserving it in `currently_contended_slots`.
remaining_contended_flush_slots.retain(|flush_slot| {
let is_being_flushed = currently_contended_slots.contains(flush_slot);
if !is_being_flushed {
// Mark that we're about to delete this slot now
currently_contended_slots.insert(*flush_slot);
}
is_being_flushed
});
}
}
// TODO: Handle if the slot was flushed to storage while we were removing the cached
// slot above, i.e. it's possible the storage contains partial version of the current
// slot. One way to handle this is to augment slots to contain a "version", That way,
// 1) We clean older versions via the natural clean() pipeline
// without having to call this function out of band.
// 2) This deletion doesn't have to block on scan
// Reads will then always read the latest version of a slot. Scans will also know
// which version their parents because banks will also be augmented with this version,
// which handles cases where a deletion of one version happens in the middle of the scan.
let remove_unrooted_purge_stats = PurgeStats::default();
self.purge_slots_from_cache_and_store(
std::iter::once(&remove_slot),
&remove_unrooted_purge_stats,
);
self.purge_slots_from_cache_and_store(remove_slots.iter(), &remove_unrooted_purge_stats);
remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0));
let mut currently_contended_slots = slots_under_contention.lock().unwrap();
for slot in remove_slots {
assert!(currently_contended_slots.remove(slot));
}
}
pub fn hash_stored_account(slot: Slot, account: &StoredAccountMeta) -> Hash {
@ -3787,7 +3866,7 @@ impl AccountsDb {
should_flush_f.as_mut()
};
if self.flush_slot_cache(root, should_flush_f).did_flush {
if self.flush_slot_cache(root, should_flush_f).is_some() {
num_roots_flushed += 1;
}
@ -3809,101 +3888,134 @@ impl AccountsDb {
(num_new_roots, num_roots_flushed)
}
// `should_flush_f` is an optional closure that determines whether a given
// account should be flushed. Passing `None` will by default flush all
// accounts
fn flush_slot_cache(
fn do_flush_slot_cache(
&self,
slot: Slot,
slot_cache: &SlotCache,
mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
) -> FlushStats {
let mut num_purged = 0;
let mut total_size = 0;
let mut num_flushed = 0;
let did_flush = if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
let iter_items: Vec<_> = slot_cache.iter().collect();
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec<Hash>) = iter_items
.iter()
.filter_map(|iter_item| {
let key = iter_item.key();
let account = &iter_item.value().account;
let should_flush = should_flush_f
.as_mut()
.map(|should_flush_f| should_flush_f(key, account))
.unwrap_or(true);
if should_flush {
let hash = iter_item.value().hash();
total_size += (account.data().len() + STORE_META_OVERHEAD) as u64;
num_flushed += 1;
Some(((key, account), hash))
} else {
// If we don't flush, we have to remove the entry from the
// index, since it's equivalent to purging
purged_slot_pubkeys.insert((slot, *key));
pubkey_to_slot_set.push((*key, slot));
num_purged += 1;
None
}
})
.unzip();
let iter_items: Vec<_> = slot_cache.iter().collect();
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec<Hash>) = iter_items
.iter()
.filter_map(|iter_item| {
let key = iter_item.key();
let account = &iter_item.value().account;
let should_flush = should_flush_f
.as_mut()
.map(|should_flush_f| should_flush_f(key, account))
.unwrap_or(true);
if should_flush {
let hash = iter_item.value().hash();
total_size += (account.data().len() + STORE_META_OVERHEAD) as u64;
num_flushed += 1;
Some(((key, account), hash))
} else {
// If we don't flush, we have to remove the entry from the
// index, since it's equivalent to purging
purged_slot_pubkeys.insert((slot, *key));
pubkey_to_slot_set.push((*key, slot));
num_purged += 1;
None
}
})
.unzip();
let is_dead_slot = accounts.is_empty();
// Remove the account index entries from earlier roots that are outdated by later roots.
// Safe because queries to the index will be reading updates from later roots.
self.purge_slot_cache_pubkeys(
let is_dead_slot = accounts.is_empty();
// Remove the account index entries from earlier roots that are outdated by later roots.
// Safe because queries to the index will be reading updates from later roots.
self.purge_slot_cache_pubkeys(slot, purged_slot_pubkeys, pubkey_to_slot_set, is_dead_slot);
if !is_dead_slot {
let aligned_total_size = Self::page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store =
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_frozen(
slot,
purged_slot_pubkeys,
pubkey_to_slot_set,
is_dead_slot,
&accounts,
Some(&hashes),
Some(Box::new(move |_, _| flushed_store.clone())),
None,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
);
}
if !is_dead_slot {
let aligned_total_size = Self::page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store =
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_frozen(
slot,
&accounts,
Some(&hashes),
Some(Box::new(move |_, _| flushed_store.clone())),
None,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
);
}
// Remove this slot from the cache, which will to AccountsDb's new readers should look like an
// atomic switch from the cache to storage.
// There is some racy condition for existing readers who just has read exactly while
// flushing. That case is handled by retry_to_get_account_accessor()
assert!(self.accounts_cache.remove_slot(slot).is_some());
true
} else {
false
};
// Remove this slot from the cache, which will to AccountsDb's new readers should look like an
// atomic switch from the cache to storage.
// There is some racy condition for existing readers who just has read exactly while
// flushing. That case is handled by retry_to_get_account_accessor()
assert!(self.accounts_cache.remove_slot(slot).is_some());
FlushStats {
slot,
num_flushed,
num_purged,
total_size,
did_flush,
}
}
/// `should_flush_f` is an optional closure that determines whether a given
/// account should be flushed. Passing `None` will by default flush all
/// accounts
fn flush_slot_cache(
&self,
slot: Slot,
should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
) -> Option<FlushStats> {
self.accounts_cache.slot_cache(slot).and_then(|slot_cache| {
let is_being_purged = {
let mut slots_under_contention = self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap();
// If we're purging this slot, don't flush it here
if slots_under_contention.contains(&slot) {
true
} else {
slots_under_contention.insert(slot);
false
}
};
if !is_being_purged {
let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f);
// Nobody else should have been purging this slot, so should not have been removed
// from `self.remove_unrooted_slots_synchronization`.
assert!(self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap()
.remove(&slot));
// Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished
// flushing
self.remove_unrooted_slots_synchronization
.signal
.notify_all();
Some(flush_stats)
} else {
None
}
})
}
fn write_accounts_to_cache(
&self,
slot: Slot,
@ -6478,7 +6590,7 @@ pub mod tests {
assert_load_account(&db, unrooted_slot, key, 1);
// Purge the slot
db.remove_unrooted_slot(unrooted_slot);
db.remove_unrooted_slots(&[unrooted_slot]);
assert!(db.load_without_fixed_root(&ancestors, &key).is_none());
assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none());
assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none());
@ -6515,7 +6627,7 @@ pub mod tests {
db.store_uncached(unrooted_slot, &[(&key, &account0)]);
// Purge the slot
db.remove_unrooted_slot(unrooted_slot);
db.remove_unrooted_slots(&[unrooted_slot]);
// Add a new root
let key2 = solana_sdk::pubkey::new_rand();
@ -10584,6 +10696,134 @@ pub mod tests {
do_test_load_account_and_shrink_race(false);
}
#[test]
fn test_cache_flush_remove_unrooted_race() {
let caching_enabled = true;
let db = AccountsDb::new_with_config(
Vec::new(),
&ClusterType::Development,
AccountSecondaryIndexes::default(),
caching_enabled,
);
let db = Arc::new(db);
let num_cached_slots = 100;
let num_trials = 100;
let (new_trial_start_sender, new_trial_start_receiver) = unbounded();
let (flush_done_sender, flush_done_receiver) = unbounded();
// Start up a thread to flush the accounts cache
let t_flush_cache = {
let db = db.clone();
std::thread::Builder::new()
.name("account-cache-flush".to_string())
.spawn(move || loop {
// Wait for the signal to start a trial
if new_trial_start_receiver.recv().is_err() {
return;
}
for slot in 0..num_cached_slots {
db.flush_slot_cache(slot, None::<&mut fn(&_, &_) -> bool>);
}
flush_done_sender.send(()).unwrap();
})
.unwrap()
};
let exit = Arc::new(AtomicBool::new(false));
let t_spurious_signal = {
let db = db.clone();
let exit = exit.clone();
std::thread::Builder::new()
.name("account-cache-flush".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
// Simulate spurious wake-up that can happen, but is too rare to
// otherwise depend on in tests.
db.remove_unrooted_slots_synchronization.signal.notify_all();
})
.unwrap()
};
// Run multiple trials. Has the added benefit of rewriting the same slots after we've
// dumped them in previous trials.
for _ in 0..num_trials {
// Store an account
let lamports = 42;
let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
account.set_lamports(lamports);
// Pick random 50% of the slots to pass to `remove_unrooted_slots()`
let mut all_slots: Vec<Slot> = (0..num_cached_slots).collect();
all_slots.shuffle(&mut rand::thread_rng());
let slots_to_dump = &all_slots[0..num_cached_slots as usize / 2];
let slots_to_keep = &all_slots[num_cached_slots as usize / 2..];
// Set up a one account per slot across many different slots, track which
// pubkey was stored in each slot.
let slot_to_pubkey_map: HashMap<Slot, Pubkey> = (0..num_cached_slots)
.map(|slot| {
let pubkey = Pubkey::new_unique();
db.store_cached(slot, &[(&pubkey, &account)]);
(slot, pubkey)
})
.collect();
// Signal the flushing shred to start flushing
new_trial_start_sender.send(()).unwrap();
// Here we want to test both:
// 1) Flush thread starts flushing a slot before we try dumping it.
// 2) Flushing thread trying to flush while/after we're trying to dump the slot,
// in which case flush should ignore/move past the slot to be dumped
//
// Hence, we split into chunks to get the dumping of each chunk to race with the
// flushes. If we were to dump the entire chunk at once, then this lessens the possibility
// of the flush occurring first since the dumping logic reserves all the slots it's about
// to dump immediately.
for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) {
db.remove_unrooted_slots(chunks);
}
// Check that all the slots in `slots_to_dump` were completely removed from the
// cache, storage, and index
for slot in slots_to_dump {
assert!(db.storage.get_slot_storage_entries(*slot).is_none());
assert!(db.accounts_cache.slot_cache(*slot).is_none());
let account_in_slot = slot_to_pubkey_map[slot];
assert!(db
.accounts_index
.get_account_read_entry(&account_in_slot)
.is_none());
}
// Wait for flush to finish before starting next trial
flush_done_receiver.recv().unwrap();
for slot in slots_to_keep {
let account_in_slot = slot_to_pubkey_map[slot];
assert!(db
.load(
&Ancestors::from(vec![(*slot, 0)]),
&account_in_slot,
LoadHint::FixedMaxRoot
)
.is_some());
// Clear for next iteration so that `assert!(self.storage.get_slot_stores(purged_slot).is_none());`
// in `purge_slot_pubkeys()` doesn't trigger
db.remove_unrooted_slots(&[*slot]);
}
}
exit.store(true, Ordering::Relaxed);
drop(new_trial_start_sender);
t_flush_cache.join().unwrap();
t_spurious_signal.join().unwrap();
}
#[test]
fn test_collect_uncleaned_slots_up_to_slot() {
solana_logger::setup();

View File

@ -2652,8 +2652,8 @@ impl Bank {
}
}
pub fn remove_unrooted_slot(&self, slot: Slot) {
self.rc.accounts.accounts_db.remove_unrooted_slot(slot)
pub fn remove_unrooted_slots(&self, slots: &[Slot]) {
self.rc.accounts.accounts_db.remove_unrooted_slots(slots)
}
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {