AcctIdx: start calling flush on in mem cache buckets (#19966)
* AcctIdx: start calling flush on in mem cache buckets * fix orderings
This commit is contained in:
parent
4089f8b06b
commit
8df8f4396d
|
@ -52,7 +52,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
let in_mem = (0..bins)
|
||||
.into_iter()
|
||||
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
|
||||
.collect();
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation
|
||||
let threads = config
|
||||
|
@ -69,11 +69,13 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
let storage_ = Arc::clone(&storage);
|
||||
let exit_ = Arc::clone(&exit);
|
||||
let wait_ = Arc::clone(&wait);
|
||||
let in_mem_ = in_mem.clone();
|
||||
|
||||
// note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock
|
||||
Builder::new()
|
||||
.name("solana-idx-flusher".to_string())
|
||||
.spawn(move || {
|
||||
Self::background(storage_, exit_, wait_);
|
||||
Self::background(storage_, exit_, wait_, in_mem_);
|
||||
})
|
||||
.unwrap()
|
||||
})
|
||||
|
@ -98,6 +100,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
storage: Arc<BucketMapHolder<T>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
wait: Arc<WaitableCondvar>,
|
||||
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
|
||||
) {
|
||||
loop {
|
||||
// this will transition to waits and thread throttling
|
||||
|
@ -105,6 +108,11 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
for bucket in &in_mem {
|
||||
bucket.flush();
|
||||
}
|
||||
|
||||
storage.stats.report_stats();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use solana_measure::measure::Measure;
|
|||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
use std::ops::{Bound, RangeBounds, RangeInclusive};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
@ -26,6 +26,10 @@ pub struct InMemAccountsIndex<T: IndexValue> {
|
|||
pub(crate) cache_ranges_held: CacheRangesHeld,
|
||||
// true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held.
|
||||
stop_flush: AtomicU64,
|
||||
// set to true when any entry in this bin is marked dirty
|
||||
bin_dirty: AtomicBool,
|
||||
// set to true while this bin is being actively flushed
|
||||
flushing_active: AtomicBool,
|
||||
}
|
||||
|
||||
impl<T: IndexValue> Debug for InMemAccountsIndex<T> {
|
||||
|
@ -42,6 +46,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
bin,
|
||||
cache_ranges_held: CacheRangesHeld::default(),
|
||||
stop_flush: AtomicU64::default(),
|
||||
bin_dirty: AtomicBool::default(),
|
||||
flushing_active: AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,6 +364,46 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
self.stop_flush.load(Ordering::Relaxed) > 0
|
||||
}
|
||||
|
||||
pub(crate) fn flush(&self) {
|
||||
let flushing = self.flushing_active.swap(true, Ordering::Acquire);
|
||||
if flushing {
|
||||
// already flushing in another thread
|
||||
return;
|
||||
}
|
||||
|
||||
self.flush_internal();
|
||||
|
||||
self.flushing_active.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
pub fn set_bin_dirty(&self) {
|
||||
self.bin_dirty.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
fn flush_internal(&self) {
|
||||
let was_dirty = self.bin_dirty.swap(false, Ordering::Acquire);
|
||||
if !was_dirty {
|
||||
// wasn't dirty, no need to flush
|
||||
return;
|
||||
}
|
||||
|
||||
let map = self.map().read().unwrap();
|
||||
for (_k, _v) in map.iter() {
|
||||
/*
|
||||
if v.dirty() {
|
||||
// step 1: clear the dirty flag
|
||||
// step 2: perform the update on disk based on the fields in the entry
|
||||
// If a parallel operation dirties the item again - even while this flush is occurring,
|
||||
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
||||
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
||||
v.set_dirty(false);
|
||||
|
||||
// soon, this will update disk from the in-mem contents
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
fn stats(&self) -> &BucketMapHolderStats {
|
||||
&self.storage.stats
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue