AcctIdx: when disk index insert needs to grow, drop locks, grow, then restart flush (#20333)
This commit is contained in:
parent
acfe76b622
commit
e31c065544
|
@ -664,66 +664,90 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
return;
|
||||
}
|
||||
|
||||
let mut removes = Vec::default();
|
||||
let mut removes_random = Vec::default();
|
||||
let disk = self.storage.disk.as_ref().unwrap();
|
||||
// may have to loop if disk has to grow and we have to restart
|
||||
loop {
|
||||
let mut removes = Vec::default();
|
||||
let mut removes_random = Vec::default();
|
||||
let disk = self.storage.disk.as_ref().unwrap();
|
||||
|
||||
let mut updates = Vec::default();
|
||||
let m = Measure::start("flush_scan");
|
||||
let mut flush_entries_updated_on_disk = 0;
|
||||
// scan and update loop
|
||||
{
|
||||
let map = self.map().read().unwrap();
|
||||
for (k, v) in map.iter() {
|
||||
if v.dirty() {
|
||||
// step 1: clear the dirty flag
|
||||
// step 2: perform the update on disk based on the fields in the entry
|
||||
// If a parallel operation dirties the item again - even while this flush is occurring,
|
||||
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
||||
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
||||
v.set_dirty(false);
|
||||
let mut updates = Vec::default();
|
||||
let m = Measure::start("flush_scan");
|
||||
let mut flush_entries_updated_on_disk = 0;
|
||||
let mut disk_resize = Ok(());
|
||||
// scan and update loop
|
||||
{
|
||||
let map = self.map().read().unwrap();
|
||||
for (k, v) in map.iter() {
|
||||
if v.dirty() {
|
||||
// step 1: clear the dirty flag
|
||||
// step 2: perform the update on disk based on the fields in the entry
|
||||
// If a parallel operation dirties the item again - even while this flush is occurring,
|
||||
// the last thing the writer will do, after updating contents, is set_dirty(true)
|
||||
// That prevents dropping an item from cache before disk is updated to latest in mem.
|
||||
v.set_dirty(false);
|
||||
|
||||
updates.push((*k, Arc::clone(v)));
|
||||
updates.push((*k, Arc::clone(v)));
|
||||
}
|
||||
|
||||
if self.should_remove_from_mem(current_age, v, startup) {
|
||||
removes.push(*k);
|
||||
} else if Self::random_chance_of_eviction() {
|
||||
removes_random.push(*k);
|
||||
}
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
||||
|
||||
if self.should_remove_from_mem(current_age, v, startup) {
|
||||
removes.push(*k);
|
||||
} else if Self::random_chance_of_eviction() {
|
||||
removes_random.push(*k);
|
||||
// happens inside of lock on in-mem cache. This is because of deleting items
|
||||
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
|
||||
let m = Measure::start("flush_update");
|
||||
for (k, v) in updates.into_iter() {
|
||||
if disk_resize.is_ok() {
|
||||
if v.dirty() {
|
||||
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
||||
}
|
||||
flush_entries_updated_on_disk += 1;
|
||||
disk_resize = disk.try_insert(
|
||||
self.bin,
|
||||
&k,
|
||||
(&v.slot_list.read().unwrap(), v.ref_count()),
|
||||
);
|
||||
}
|
||||
if disk_resize.is_err() {
|
||||
// disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize
|
||||
v.set_dirty(true);
|
||||
}
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_update_us, m);
|
||||
Self::update_stat(
|
||||
&self.stats().flush_entries_updated_on_disk,
|
||||
flush_entries_updated_on_disk,
|
||||
);
|
||||
}
|
||||
|
||||
let m = Measure::start("flush_remove");
|
||||
match disk_resize {
|
||||
Ok(_) => {
|
||||
if !self.flush_remove_from_cache(removes, current_age, startup, false) {
|
||||
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
|
||||
}
|
||||
if !self.flush_remove_from_cache(removes_random, current_age, startup, true) {
|
||||
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_remove_us, m);
|
||||
|
||||
if iterate_for_age {
|
||||
// completed iteration of the buckets at the current age
|
||||
assert_eq!(current_age, self.storage.current_age());
|
||||
self.set_has_aged(current_age);
|
||||
}
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
// grow the bucket, outside of all in-mem locks.
|
||||
// then, loop to try again
|
||||
disk.grow(self.bin, err);
|
||||
}
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
||||
|
||||
// happens inside of lock on in-mem cache. This is because of deleting items
|
||||
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
|
||||
let m = Measure::start("flush_update");
|
||||
for (k, v) in updates.into_iter() {
|
||||
if v.dirty() {
|
||||
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
||||
}
|
||||
flush_entries_updated_on_disk += 1;
|
||||
disk.insert(self.bin, &k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_update_us, m);
|
||||
Self::update_stat(
|
||||
&self.stats().flush_entries_updated_on_disk,
|
||||
flush_entries_updated_on_disk,
|
||||
);
|
||||
}
|
||||
|
||||
let m = Measure::start("flush_remove");
|
||||
if !self.flush_remove_from_cache(removes, current_age, startup, false) {
|
||||
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
|
||||
}
|
||||
if !self.flush_remove_from_cache(removes_random, current_age, startup, true) {
|
||||
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
|
||||
}
|
||||
Self::update_time_stat(&self.stats().flush_remove_us, m);
|
||||
|
||||
if iterate_for_age {
|
||||
// completed iteration of the buckets at the current age
|
||||
assert_eq!(current_age, self.storage.current_age());
|
||||
self.set_has_aged(current_age);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue