AcctIdx: flush inserts to disk (#20074)
This commit is contained in:
parent
804ed825f7
commit
deab944b30
|
@ -22,10 +22,14 @@ pub struct BucketMapHolderStats {
|
||||||
pub inserts: AtomicU64,
|
pub inserts: AtomicU64,
|
||||||
pub count_in_mem: AtomicU64,
|
pub count_in_mem: AtomicU64,
|
||||||
pub per_bucket_count: Vec<AtomicU64>,
|
pub per_bucket_count: Vec<AtomicU64>,
|
||||||
|
pub flush_entries_updated_on_disk: AtomicU64,
|
||||||
pub active_threads: AtomicU64,
|
pub active_threads: AtomicU64,
|
||||||
pub get_range_us: AtomicU64,
|
pub get_range_us: AtomicU64,
|
||||||
last_age: AtomicU8,
|
last_age: AtomicU8,
|
||||||
last_age_time: AtomicU64,
|
last_age_time: AtomicU64,
|
||||||
|
pub flush_scan_us: AtomicU64,
|
||||||
|
pub flush_update_us: AtomicU64,
|
||||||
|
pub flush_remove_us: AtomicU64,
|
||||||
last_time: AtomicInterval,
|
last_time: AtomicInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,6 +167,27 @@ impl BucketMapHolderStats {
|
||||||
("items", self.items.swap(0, Ordering::Relaxed), i64),
|
("items", self.items.swap(0, Ordering::Relaxed), i64),
|
||||||
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
|
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
|
||||||
("ms_per_age", ms_per_age, i64),
|
("ms_per_age", ms_per_age, i64),
|
||||||
|
(
|
||||||
|
"flush_scan_us",
|
||||||
|
self.flush_scan_us.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"flush_update_us",
|
||||||
|
self.flush_update_us.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"flush_remove_us",
|
||||||
|
self.flush_remove_us.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"flush_entries_updated_on_disk",
|
||||||
|
self.flush_entries_updated_on_disk
|
||||||
|
.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -515,6 +515,11 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
self.storage.wait_dirty_or_aged.notify_one();
|
self.storage.wait_dirty_or_aged.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_remove_from_mem(&self, current_age: Age, entry: &AccountMapEntry<T>) -> bool {
|
||||||
|
// this could be tunable dynamically based on memory pressure
|
||||||
|
current_age == entry.age()
|
||||||
|
}
|
||||||
|
|
||||||
fn flush_internal(&self) {
|
fn flush_internal(&self) {
|
||||||
let was_dirty = self.bin_dirty.swap(false, Ordering::Acquire);
|
let was_dirty = self.bin_dirty.swap(false, Ordering::Acquire);
|
||||||
let current_age = self.storage.current_age();
|
let current_age = self.storage.current_age();
|
||||||
|
@ -524,21 +529,52 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let map = self.map().read().unwrap();
|
let mut removes = Vec::default();
|
||||||
for (_k, _v) in map.iter() {
|
let disk = self.storage.disk.as_ref().unwrap();
|
||||||
/*
|
|
||||||
if v.dirty() {
|
|
||||||
// step 1: clear the dirty flag
|
|
||||||
// step 2: perform the update on disk based on the fields in the entry
|
|
||||||
// If a parallel operation dirties the item again - even while this flush is occurring,
|
|
||||||
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
|
||||||
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
|
||||||
v.set_dirty(false);
|
|
||||||
|
|
||||||
// soon, this will update disk from the in-mem contents
|
let mut updates = Vec::default();
|
||||||
|
let m = Measure::start("flush_scan");
|
||||||
|
// scan and update loop
|
||||||
|
{
|
||||||
|
let map = self.map().read().unwrap();
|
||||||
|
for (k, v) in map.iter() {
|
||||||
|
if v.dirty() {
|
||||||
|
// step 1: clear the dirty flag
|
||||||
|
// step 2: perform the update on disk based on the fields in the entry
|
||||||
|
// If a parallel operation dirties the item again - even while this flush is occurring,
|
||||||
|
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
||||||
|
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
||||||
|
v.set_dirty(false);
|
||||||
|
|
||||||
|
updates.push((*k, Arc::clone(v)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.should_remove_from_mem(current_age, v) {
|
||||||
|
removes.push(*k);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
||||||
|
|
||||||
|
let mut flush_entries_updated_on_disk = 0;
|
||||||
|
// happens outside of lock on in-mem cache
|
||||||
|
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
|
||||||
|
let m = Measure::start("flush_update");
|
||||||
|
for (k, v) in updates.into_iter() {
|
||||||
|
if v.dirty() {
|
||||||
|
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
||||||
|
}
|
||||||
|
flush_entries_updated_on_disk += 1;
|
||||||
|
disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
||||||
|
}
|
||||||
|
Self::update_time_stat(&self.stats().flush_update_us, m);
|
||||||
|
Self::update_stat(
|
||||||
|
&self.stats().flush_entries_updated_on_disk,
|
||||||
|
flush_entries_updated_on_disk,
|
||||||
|
);
|
||||||
|
|
||||||
|
// loop that processes 'removes' will go here
|
||||||
|
|
||||||
if iterate_for_age {
|
if iterate_for_age {
|
||||||
// completed iteration of the buckets at the current age
|
// completed iteration of the buckets at the current age
|
||||||
assert_eq!(current_age, self.storage.current_age());
|
assert_eq!(current_age, self.storage.current_age());
|
||||||
|
|
Loading…
Reference in New Issue