DiskIdx: fix race condition with holding ranges in mem (#23158)
This commit is contained in:
parent
7939fdc3e5
commit
c2435363f3
|
@ -44,6 +44,8 @@ pub struct InMemAccountsIndex<T: IndexValue> {
|
||||||
|
|
||||||
// pubkey ranges that this bin must hold in the cache while the range is present in this vec
|
// pubkey ranges that this bin must hold in the cache while the range is present in this vec
|
||||||
pub(crate) cache_ranges_held: CacheRangesHeld,
|
pub(crate) cache_ranges_held: CacheRangesHeld,
|
||||||
|
// incremented each time stop_flush is changed
|
||||||
|
stop_flush_changes: AtomicU64,
|
||||||
// true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held.
|
// true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held.
|
||||||
stop_flush: AtomicU64,
|
stop_flush: AtomicU64,
|
||||||
// set to true when any entry in this bin is marked dirty
|
// set to true when any entry in this bin is marked dirty
|
||||||
|
@ -77,6 +79,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
.map(|disk| disk.get_bucket_from_index(bin))
|
.map(|disk| disk.get_bucket_from_index(bin))
|
||||||
.map(Arc::clone),
|
.map(Arc::clone),
|
||||||
cache_ranges_held: CacheRangesHeld::default(),
|
cache_ranges_held: CacheRangesHeld::default(),
|
||||||
|
stop_flush_changes: AtomicU64::default(),
|
||||||
stop_flush: AtomicU64::default(),
|
stop_flush: AtomicU64::default(),
|
||||||
bin_dirty: AtomicBool::default(),
|
bin_dirty: AtomicBool::default(),
|
||||||
flushing_active: AtomicBool::default(),
|
flushing_active: AtomicBool::default(),
|
||||||
|
@ -717,6 +720,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
already_held
|
already_held
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// called with 'stop'=true to stop bg flusher from removing any entries from in-mem idx
|
||||||
|
/// called with 'stop'=false to allow bg flusher to remove eligible (not in held ranges) entries from in-mem idx
|
||||||
fn start_stop_flush(&self, stop: bool) {
|
fn start_stop_flush(&self, stop: bool) {
|
||||||
if stop {
|
if stop {
|
||||||
self.stop_flush.fetch_add(1, Ordering::Release);
|
self.stop_flush.fetch_add(1, Ordering::Release);
|
||||||
|
@ -724,8 +729,17 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
// stop_flush went to 0, so this bucket could now be ready to be aged
|
// stop_flush went to 0, so this bucket could now be ready to be aged
|
||||||
self.storage.wait_dirty_or_aged.notify_one();
|
self.storage.wait_dirty_or_aged.notify_one();
|
||||||
}
|
}
|
||||||
|
// note that this value has changed
|
||||||
|
self.stop_flush_changes.fetch_add(1, Ordering::Release);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// if 'start_holding'=true, then:
|
||||||
|
/// at the end of this function, cache_ranges_held will be updated to contain 'range'
|
||||||
|
/// and all pubkeys in that range will be in the in-mem cache
|
||||||
|
/// if 'start_holding'=false, then:
|
||||||
|
/// 'range' will be removed from cache_ranges_held
|
||||||
|
/// and all pubkeys will be eligible for being removed from in-mem cache in the bg if no other range is holding them
|
||||||
|
/// Any in-process flush will be aborted when it gets to evicting items from in-mem.
|
||||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
|
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
|
||||||
where
|
where
|
||||||
R: RangeBounds<Pubkey> + Debug,
|
R: RangeBounds<Pubkey> + Debug,
|
||||||
|
@ -778,7 +792,11 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_stop_flush(&self) -> bool {
|
fn get_stop_flush(&self) -> bool {
|
||||||
self.stop_flush.load(Ordering::Relaxed) > 0
|
self.stop_flush.load(Ordering::Acquire) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_stop_flush_changes(&self) -> u64 {
|
||||||
|
self.stop_flush_changes.load(Ordering::Acquire)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn flush(&self) {
|
pub(crate) fn flush(&self) {
|
||||||
|
@ -970,6 +988,10 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
return completed_scan; // completed, don't need to get lock or do other work
|
return completed_scan; // completed, don't need to get lock or do other work
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let stop_flush_changes_at_start = self.get_stop_flush_changes();
|
||||||
|
if self.get_stop_flush() {
|
||||||
|
return false; // did NOT complete, ranges were changed, so have to restart
|
||||||
|
}
|
||||||
let ranges = self.cache_ranges_held.read().unwrap().clone();
|
let ranges = self.cache_ranges_held.read().unwrap().clone();
|
||||||
|
|
||||||
let mut removed = 0;
|
let mut removed = 0;
|
||||||
|
@ -1006,8 +1028,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.get_stop_flush() {
|
if stop_flush_changes_at_start != self.get_stop_flush_changes() {
|
||||||
return false; // did NOT complete, told to stop
|
return false; // did NOT complete, ranges were changed, so have to restart
|
||||||
}
|
}
|
||||||
|
|
||||||
// all conditions for removing succeeded, so really remove item from in-mem cache
|
// all conditions for removing succeeded, so really remove item from in-mem cache
|
||||||
|
|
Loading…
Reference in New Issue