AcctIdx: never retry a bucket flush (#23732)
This commit is contained in:
parent
c9b8977226
commit
998e7d18f9
|
@ -268,11 +268,21 @@ impl<T: IndexValue> AccountMapEntryInner<T> {
|
|||
}
|
||||
|
||||
pub fn age(&self) -> Age {
|
||||
self.meta.age.load(Ordering::Relaxed)
|
||||
self.meta.age.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn set_age(&self, value: Age) {
|
||||
self.meta.age.store(value, Ordering::Relaxed)
|
||||
self.meta.age.store(value, Ordering::Release)
|
||||
}
|
||||
|
||||
/// set age to 'next_age' if 'self.age' is 'expected_age'
|
||||
pub fn try_exchange_age(&self, next_age: Age, expected_age: Age) {
|
||||
let _ = self.meta.age.compare_exchange(
|
||||
expected_age,
|
||||
next_age,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -929,7 +929,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
|
||||
fn flush_internal(&self, _flush_guard: &FlushGuard) {
|
||||
let current_age = self.storage.current_age();
|
||||
let mut iterate_for_age = self.get_should_age(current_age);
|
||||
let iterate_for_age = self.get_should_age(current_age);
|
||||
let startup = self.storage.get_startup();
|
||||
if !iterate_for_age && !startup {
|
||||
// no need to age, so no need to flush this bucket
|
||||
|
@ -1032,11 +1032,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
);
|
||||
|
||||
let m = Measure::start("flush_evict");
|
||||
if !self.evict_from_cache(evictions, current_age, startup, false)
|
||||
|| !self.evict_from_cache(evictions_random, current_age, startup, true)
|
||||
{
|
||||
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
|
||||
}
|
||||
self.evict_from_cache(evictions, current_age, startup, false);
|
||||
self.evict_from_cache(evictions_random, current_age, startup, true);
|
||||
Self::update_time_stat(&self.stats().flush_remove_us, m);
|
||||
|
||||
if iterate_for_age {
|
||||
|
@ -1047,37 +1044,52 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// for each key in 'keys', look up in map, set age to the future
|
||||
fn move_ages_to_future(&self, next_age: Age, current_age: Age, keys: &[Pubkey]) {
|
||||
let map = self.map().read().unwrap();
|
||||
keys.iter().for_each(|key| {
|
||||
if let Some(entry) = map.get(key) {
|
||||
entry.try_exchange_age(next_age, current_age);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// remove keys in 'evictions' from in-mem cache, likely due to age
|
||||
// return true if the removal was completed
|
||||
fn evict_from_cache(
|
||||
&self,
|
||||
mut evictions: Vec<Pubkey>,
|
||||
current_age: Age,
|
||||
startup: bool,
|
||||
randomly_evicted: bool,
|
||||
) -> bool {
|
||||
let mut completed_scan = true;
|
||||
) {
|
||||
if evictions.is_empty() {
|
||||
return completed_scan; // completed, don't need to get lock or do other work
|
||||
return;
|
||||
}
|
||||
|
||||
let stop_evictions_changes_at_start = self.get_stop_evictions_changes();
|
||||
let next_age_on_failure = self.storage.future_age_to_flush();
|
||||
if self.get_stop_evictions() {
|
||||
return false; // did NOT complete, ranges were changed, so have to restart
|
||||
// ranges were changed
|
||||
self.move_ages_to_future(next_age_on_failure, current_age, &evictions);
|
||||
return;
|
||||
}
|
||||
|
||||
// skip any keys that are held in memory because of ranges being held
|
||||
let ranges = self.cache_ranges_held.read().unwrap().clone();
|
||||
if !ranges.is_empty() {
|
||||
let mut move_age = Vec::default();
|
||||
evictions.retain(|k| {
|
||||
if ranges.iter().any(|range| range.contains(k)) {
|
||||
// this item is held in mem by range, so don't remove
|
||||
completed_scan = false;
|
||||
// this item is held in mem by range, so don't evict
|
||||
move_age.push(*k);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
if !move_age.is_empty() {
|
||||
self.move_ages_to_future(next_age_on_failure, current_age, &move_age);
|
||||
}
|
||||
}
|
||||
|
||||
let mut removed = 0;
|
||||
|
@ -1087,8 +1099,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
if let Entry::Occupied(occupied) = map.entry(k) {
|
||||
let v = occupied.get();
|
||||
if Arc::strong_count(v) > 1 {
|
||||
// someone is holding the value arc's ref count and could modify it, so do not remove this from in-mem cache
|
||||
completed_scan = false;
|
||||
// someone is holding the value arc's ref count and could modify it, so do not evict
|
||||
v.try_exchange_age(next_age_on_failure, current_age);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1097,13 +1109,15 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
&& !Self::should_evict_based_on_age(current_age, v, startup))
|
||||
{
|
||||
// marked dirty or bumped in age after we looked above
|
||||
// these flushes will be handled in later passes (at later ages)
|
||||
// these evictions will be handled in later passes (at later ages)
|
||||
// but, at startup, everything is ready to age out if it isn't dirty
|
||||
continue;
|
||||
}
|
||||
|
||||
if stop_evictions_changes_at_start != self.get_stop_evictions_changes() {
|
||||
return false; // did NOT complete, ranges were changed, so have to restart
|
||||
// ranges were changed
|
||||
v.try_exchange_age(next_age_on_failure, current_age);
|
||||
continue;
|
||||
}
|
||||
|
||||
// all conditions for removing succeeded, so really remove item from in-mem cache
|
||||
|
@ -1118,8 +1132,6 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
self.stats()
|
||||
.insert_or_delete_mem_count(false, self.bin, removed);
|
||||
Self::update_stat(&self.stats().flush_entries_removed_from_mem, removed as u64);
|
||||
|
||||
completed_scan
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> &BucketMapHolderStats {
|
||||
|
|
Loading…
Reference in New Issue