diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index ffa3e5baa..d003a034d 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -664,66 +664,90 @@ impl InMemAccountsIndex { return; } - let mut removes = Vec::default(); - let mut removes_random = Vec::default(); - let disk = self.storage.disk.as_ref().unwrap(); + // may have to loop if disk has to grow and we have to restart + loop { + let mut removes = Vec::default(); + let mut removes_random = Vec::default(); + let disk = self.storage.disk.as_ref().unwrap(); - let mut updates = Vec::default(); - let m = Measure::start("flush_scan"); - let mut flush_entries_updated_on_disk = 0; - // scan and update loop - { - let map = self.map().read().unwrap(); - for (k, v) in map.iter() { - if v.dirty() { - // step 1: clear the dirty flag - // step 2: perform the update on disk based on the fields in the entry - // If a parallel operation dirties the item again - even while this flush is occurring, - // the last thing the writer will do, after updating contents, is set_dirty(true) - // That prevents dropping an item from cache before disk is updated to latest in mem. - v.set_dirty(false); + let mut updates = Vec::default(); + let m = Measure::start("flush_scan"); + let mut flush_entries_updated_on_disk = 0; + let mut disk_resize = Ok(()); + // scan and update loop + { + let map = self.map().read().unwrap(); + for (k, v) in map.iter() { + if v.dirty() { + // step 1: clear the dirty flag + // step 2: perform the update on disk based on the fields in the entry + // If a parallel operation dirties the item again - even while this flush is occurring, + // the last thing the writer will do, after updating contents, is set_dirty(true) + // That prevents dropping an item from cache before disk is updated to latest in mem. + v.set_dirty(false); - updates.push((*k, Arc::clone(v))); + updates.push((*k, Arc::clone(v))); + } + + if self.should_remove_from_mem(current_age, v, startup) { + removes.push(*k); + } else if Self::random_chance_of_eviction() { + removes_random.push(*k); + } } + Self::update_time_stat(&self.stats().flush_scan_us, m); - if self.should_remove_from_mem(current_age, v, startup) { - removes.push(*k); - } else if Self::random_chance_of_eviction() { - removes_random.push(*k); + // happens inside of lock on in-mem cache. This is because of deleting items + // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. + let m = Measure::start("flush_update"); + for (k, v) in updates.into_iter() { + if disk_resize.is_ok() { + if v.dirty() { + continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed + } + flush_entries_updated_on_disk += 1; + disk_resize = disk.try_insert( + self.bin, + &k, + (&v.slot_list.read().unwrap(), v.ref_count()), + ); + } + if disk_resize.is_err() { + // disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize + v.set_dirty(true); + } + } + Self::update_time_stat(&self.stats().flush_update_us, m); + Self::update_stat( + &self.stats().flush_entries_updated_on_disk, + flush_entries_updated_on_disk, + ); + } + + let m = Measure::start("flush_remove"); + match disk_resize { + Ok(_) => { + if !self.flush_remove_from_cache(removes, current_age, startup, false) { + iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely + } + if !self.flush_remove_from_cache(removes_random, current_age, startup, true) { + iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely + } + Self::update_time_stat(&self.stats().flush_remove_us, m); + + if iterate_for_age { + // completed iteration of the buckets at the current age + assert_eq!(current_age, self.storage.current_age()); + self.set_has_aged(current_age); + } + return; + } + Err(err) => { + // grow the bucket, outside of all in-mem locks. + // then, loop to try again + disk.grow(self.bin, err); } } - Self::update_time_stat(&self.stats().flush_scan_us, m); - - // happens inside of lock on in-mem cache. This is because of deleting items - // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. - let m = Measure::start("flush_update"); - for (k, v) in updates.into_iter() { - if v.dirty() { - continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed - } - flush_entries_updated_on_disk += 1; - disk.insert(self.bin, &k, (&v.slot_list.read().unwrap(), v.ref_count())); - } - Self::update_time_stat(&self.stats().flush_update_us, m); - Self::update_stat( - &self.stats().flush_entries_updated_on_disk, - flush_entries_updated_on_disk, - ); - } - - let m = Measure::start("flush_remove"); - if !self.flush_remove_from_cache(removes, current_age, startup, false) { - iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely - } - if !self.flush_remove_from_cache(removes_random, current_age, startup, true) { - iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely - } - Self::update_time_stat(&self.stats().flush_remove_us, m); - - if iterate_for_age { - // completed iteration of the buckets at the current age - assert_eq!(current_age, self.storage.current_age()); - self.set_has_aged(current_age); } }