AcctIdx: combine scan and update loops (#20546)
This commit is contained in:
parent
bdf8b1da6b
commit
33d8c07364
|
@ -184,6 +184,14 @@ impl<T: IndexValue> AccountMapEntryInner<T> {
|
||||||
self.meta.dirty.store(value, Ordering::Release)
|
self.meta.dirty.store(value, Ordering::Release)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// set dirty to false, return true if was dirty
|
||||||
|
pub fn clear_dirty(&self) -> bool {
|
||||||
|
self.meta
|
||||||
|
.dirty
|
||||||
|
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn age(&self) -> Age {
|
pub fn age(&self) -> Age {
|
||||||
self.meta.age.load(Ordering::Relaxed)
|
self.meta.age.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,8 +36,7 @@ pub struct BucketMapHolderStats {
|
||||||
pub get_range_us: AtomicU64,
|
pub get_range_us: AtomicU64,
|
||||||
last_age: AtomicU8,
|
last_age: AtomicU8,
|
||||||
last_age_time: AtomicU64,
|
last_age_time: AtomicU64,
|
||||||
pub flush_scan_us: AtomicU64,
|
pub flush_scan_update_us: AtomicU64,
|
||||||
pub flush_update_us: AtomicU64,
|
|
||||||
pub flush_remove_us: AtomicU64,
|
pub flush_remove_us: AtomicU64,
|
||||||
pub flush_grow_us: AtomicU64,
|
pub flush_grow_us: AtomicU64,
|
||||||
last_time: AtomicInterval,
|
last_time: AtomicInterval,
|
||||||
|
@ -251,13 +250,8 @@ impl BucketMapHolderStats {
|
||||||
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
|
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
|
||||||
("ms_per_age", ms_per_age, i64),
|
("ms_per_age", ms_per_age, i64),
|
||||||
(
|
(
|
||||||
"flush_scan_us",
|
"flush_scan_update_us",
|
||||||
self.flush_scan_us.swap(0, Ordering::Relaxed),
|
self.flush_scan_update_us.swap(0, Ordering::Relaxed),
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"flush_update_us",
|
|
||||||
self.flush_update_us.swap(0, Ordering::Relaxed),
|
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
|
|
|
@ -675,27 +675,36 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
|
|
||||||
// may have to loop if disk has to grow and we have to restart
|
// may have to loop if disk has to grow and we have to restart
|
||||||
loop {
|
loop {
|
||||||
let mut removes = Vec::default();
|
let mut removes;
|
||||||
let mut removes_random = Vec::default();
|
let mut removes_random = Vec::default();
|
||||||
let disk = self.bucket.as_ref().unwrap();
|
let disk = self.bucket.as_ref().unwrap();
|
||||||
|
|
||||||
let mut updates = Vec::default();
|
|
||||||
let m = Measure::start("flush_scan");
|
|
||||||
let mut flush_entries_updated_on_disk = 0;
|
let mut flush_entries_updated_on_disk = 0;
|
||||||
let mut disk_resize = Ok(());
|
let mut disk_resize = Ok(());
|
||||||
// scan and update loop
|
// scan and update loop
|
||||||
|
// holds read lock
|
||||||
{
|
{
|
||||||
let map = self.map().read().unwrap();
|
let map = self.map().read().unwrap();
|
||||||
|
removes = Vec::with_capacity(map.len());
|
||||||
|
let m = Measure::start("flush_scan_and_update"); // we don't care about lock time in this metric - bg threads can wait
|
||||||
for (k, v) in map.iter() {
|
for (k, v) in map.iter() {
|
||||||
if v.dirty() {
|
if v.clear_dirty() {
|
||||||
// step 1: clear the dirty flag
|
// step 1: clear the dirty flag
|
||||||
// step 2: perform the update on disk based on the fields in the entry
|
// step 2: perform the update on disk based on the fields in the entry
|
||||||
// If a parallel operation dirties the item again - even while this flush is occurring,
|
// If a parallel operation dirties the item again - even while this flush is occurring,
|
||||||
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
||||||
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
||||||
v.set_dirty(false);
|
// happens inside of lock on in-mem cache. This is because of deleting items
|
||||||
|
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
|
||||||
updates.push((*k, Arc::clone(v)));
|
disk_resize =
|
||||||
|
disk.try_write(k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
||||||
|
if disk_resize.is_ok() {
|
||||||
|
flush_entries_updated_on_disk += 1;
|
||||||
|
} else {
|
||||||
|
// disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize
|
||||||
|
v.set_dirty(true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.should_remove_from_mem(current_age, v, startup) {
|
if self.should_remove_from_mem(current_age, v, startup) {
|
||||||
|
@ -704,33 +713,12 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
removes_random.push(*k);
|
removes_random.push(*k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
Self::update_time_stat(&self.stats().flush_scan_update_us, m);
|
||||||
|
|
||||||
// happens inside of lock on in-mem cache. This is because of deleting items
|
|
||||||
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
|
|
||||||
let m = Measure::start("flush_update");
|
|
||||||
for (k, v) in updates.into_iter() {
|
|
||||||
if v.dirty() {
|
|
||||||
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
|
||||||
}
|
|
||||||
if disk_resize.is_ok() {
|
|
||||||
disk_resize =
|
|
||||||
disk.try_write(&k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
|
||||||
if disk_resize.is_ok() {
|
|
||||||
flush_entries_updated_on_disk += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if disk_resize.is_err() {
|
|
||||||
// disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize
|
|
||||||
v.set_dirty(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Self::update_time_stat(&self.stats().flush_update_us, m);
|
|
||||||
Self::update_stat(
|
|
||||||
&self.stats().flush_entries_updated_on_disk,
|
|
||||||
flush_entries_updated_on_disk,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
Self::update_stat(
|
||||||
|
&self.stats().flush_entries_updated_on_disk,
|
||||||
|
flush_entries_updated_on_disk,
|
||||||
|
);
|
||||||
|
|
||||||
let m = Measure::start("flush_remove_or_grow");
|
let m = Measure::start("flush_remove_or_grow");
|
||||||
match disk_resize {
|
match disk_resize {
|
||||||
|
|
Loading…
Reference in New Issue