Add EvictionsGuard to InMemAccountsIndex (#25847)

This commit is contained in:
Brooks Prumo 2022-06-12 17:31:58 -05:00 committed by GitHub
parent 655b40a2b7
commit a2c180dc0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 96 additions and 29 deletions

View File

@ -6,6 +6,7 @@ use {
},
bucket_map_holder::{Age, BucketMapHolder},
bucket_map_holder_stats::BucketMapHolderStats,
waitable_condvar::WaitableCondvar,
},
rand::{thread_rng, Rng},
solana_bucket_map::bucket_api::BucketApi,
@ -146,10 +147,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
pub fn keys(&self) -> Vec<Pubkey> {
Self::update_stat(&self.stats().keys, 1);
// easiest implementation is to load evrything from disk into cache and return the keys
self.start_stop_evictions(true);
self.put_range_in_cache(&None::<&RangeInclusive<Pubkey>>);
let evictions_guard = EvictionsGuard::lock(self);
self.put_range_in_cache(&None::<&RangeInclusive<Pubkey>>, &evictions_guard);
let keys = self.map().read().unwrap().keys().cloned().collect();
self.start_stop_evictions(false);
keys
}
@ -717,17 +717,30 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
/// being held, then add 'range' to the currently held list AND return true
/// If 'range' is NOT already included in what is being held, then return false
/// withOUT adding 'range' to the list of what is currently held
fn add_hold_range_in_memory_if_already_held<R>(&self, range: &R) -> bool
fn add_hold_range_in_memory_if_already_held<R>(
&self,
range: &R,
evictions_guard: &EvictionsGuard,
) -> bool
where
R: RangeBounds<Pubkey>,
{
let start_holding = true;
let only_add_if_already_held = true;
self.just_set_hold_range_in_memory_internal(range, start_holding, only_add_if_already_held)
self.just_set_hold_range_in_memory_internal(
range,
start_holding,
only_add_if_already_held,
evictions_guard,
)
}
fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
where
fn just_set_hold_range_in_memory<R>(
&self,
range: &R,
start_holding: bool,
evictions_guard: &EvictionsGuard,
) where
R: RangeBounds<Pubkey>,
{
let only_add_if_already_held = false;
@ -735,6 +748,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
range,
start_holding,
only_add_if_already_held,
evictions_guard,
);
}
@ -747,6 +761,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
range: &R,
start_holding: bool,
only_add_if_already_held: bool,
_evictions_guard: &EvictionsGuard,
) -> bool
where
R: RangeBounds<Pubkey>,
@ -797,19 +812,6 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
already_held
}
/// called with 'stop'=true to stop bg flusher from evicting any entries from in-mem idx
/// called with 'stop'=false to allow bg flusher to evict eligible (not in held ranges) entries from in-mem idx
fn start_stop_evictions(&self, stop: bool) {
if stop {
self.stop_evictions.fetch_add(1, Ordering::Release);
} else if 1 == self.stop_evictions.fetch_sub(1, Ordering::Release) {
// stop_evictions went to 0, so this bucket could now be ready to be aged
self.storage.wait_dirty_or_aged.notify_one();
}
// note that this value has changed
self.stop_evictions_changes.fetch_add(1, Ordering::Release);
}
/// if 'start_holding'=true, then:
/// at the end of this function, cache_ranges_held will be updated to contain 'range'
/// and all pubkeys in that range will be in the in-mem cache
@ -821,21 +823,20 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
where
R: RangeBounds<Pubkey> + Debug,
{
self.start_stop_evictions(true);
let evictions_guard = EvictionsGuard::lock(self);
if !start_holding || !self.add_hold_range_in_memory_if_already_held(range) {
if !start_holding || !self.add_hold_range_in_memory_if_already_held(range, &evictions_guard)
{
if start_holding {
// put everything in the cache and it will be held there
self.put_range_in_cache(&Some(range));
self.put_range_in_cache(&Some(range), &evictions_guard);
}
// do this AFTER items have been put in cache - that way anyone who finds this range can know that the items are already in the cache
self.just_set_hold_range_in_memory(range, start_holding);
self.just_set_hold_range_in_memory(range, start_holding, &evictions_guard);
}
self.start_stop_evictions(false);
}
fn put_range_in_cache<R>(&self, range: &Option<&R>)
fn put_range_in_cache<R>(&self, range: &Option<&R>, _evictions_guard: &EvictionsGuard)
where
R: RangeBounds<Pubkey>,
{
@ -1255,6 +1256,69 @@ impl Drop for FlushGuard<'_> {
}
}
/// Disable (and safely enable) the background flusher from evicting entries from the in-mem
/// accounts index. When disabled, no entries may be evicted. When enabled, only eligible entries
/// may be evicted (i.e. those not in a held range).
///
/// An RAII implementation of a scoped lock for the `stop_evictions` atomic flag/counter in
/// `InMemAccountsIndex`. When this structure is dropped (falls out of scope), the counter will
/// decrement and conditionally notify its storage.
///
/// After successfully locking (calling `EvictionsGuard::lock()`), pass a reference to the
/// `EvictionsGuard` instance to any function/code that requires `stop_evictions` to be
/// incremented/decremented correctly.
#[derive(Debug)]
struct EvictionsGuard<'a> {
/// The number of active callers disabling evictions
stop_evictions: &'a AtomicU64,
/// The number of times that evictions have been disabled or enabled
num_state_changes: &'a AtomicU64,
/// Who will be notified after the evictions are re-enabled
storage_notifier: &'a WaitableCondvar,
}
impl<'a> EvictionsGuard<'a> {
#[must_use = "if unused, this evictions lock will be immediately unlocked"]
fn lock<T: IndexValue>(in_mem_accounts_index: &'a InMemAccountsIndex<T>) -> Self {
Self::lock_with(
&in_mem_accounts_index.stop_evictions,
&in_mem_accounts_index.stop_evictions_changes,
&in_mem_accounts_index.storage.wait_dirty_or_aged,
)
}
#[must_use = "if unused, this evictions lock will be immediately unlocked"]
fn lock_with(
stop_evictions: &'a AtomicU64,
num_state_changes: &'a AtomicU64,
storage_notifier: &'a WaitableCondvar,
) -> Self {
num_state_changes.fetch_add(1, Ordering::Release);
stop_evictions.fetch_add(1, Ordering::Release);
Self {
stop_evictions,
num_state_changes,
storage_notifier,
}
}
}
impl Drop for EvictionsGuard<'_> {
fn drop(&mut self) {
let previous_value = self.stop_evictions.fetch_sub(1, Ordering::AcqRel);
debug_assert!(previous_value > 0);
let should_notify = previous_value == 1;
if should_notify {
// stop_evictions went to 0, so this bucket could now be ready to be aged
self.storage_notifier.notify_one();
}
self.num_state_changes.fetch_add(1, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use {
@ -1442,7 +1506,8 @@ mod tests {
vec![range.clone()]
);
{
assert!(bucket.add_hold_range_in_memory_if_already_held(&range));
let evictions_guard = EvictionsGuard::lock(&bucket);
assert!(bucket.add_hold_range_in_memory_if_already_held(&range, &evictions_guard));
bucket.hold_range_in_memory(&range, false);
}
bucket.hold_range_in_memory(&range, false);
@ -1478,7 +1543,9 @@ mod tests {
// hold all in mem first
assert!(bucket.cache_ranges_held.read().unwrap().is_empty());
bucket.hold_range_in_memory(&all, true);
assert!(bucket.add_hold_range_in_memory_if_already_held(&range));
let evictions_guard = EvictionsGuard::lock(&bucket);
assert!(bucket.add_hold_range_in_memory_if_already_held(&range, &evictions_guard));
bucket.hold_range_in_memory(&range, false);
bucket.hold_range_in_memory(&all, false);
}