diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 0da823c63f..16fe3b5c0c 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -56,13 +56,16 @@ impl BgThreads { storage: &Arc>, in_mem: &[Arc>], threads: usize, + can_advance_age: bool, ) -> Self { // stop signal used for THIS batch of bg threads let exit = Arc::new(AtomicBool::default()); let handles = Some( (0..threads) .into_iter() - .map(|_| { + .map(|idx| { + // the first thread we start is special + let can_advance_age = can_advance_age && idx == 0; let storage_ = Arc::clone(storage); let exit_ = Arc::clone(&exit); let in_mem_ = in_mem.to_vec(); @@ -71,7 +74,7 @@ impl BgThreads { Builder::new() .name("solana-idx-flusher".to_string()) .spawn(move || { - storage_.background(exit_, in_mem_); + storage_.background(exit_, in_mem_, can_advance_age); }) .unwrap() }) @@ -113,6 +116,7 @@ impl AccountsIndexStorage { &self.storage, &self.in_mem, Self::num_threads(), + false, // cannot advance age from any of these threads )); } self.storage.set_startup(value); @@ -157,7 +161,7 @@ impl AccountsIndexStorage { .collect::>(); Self { - _bg_threads: BgThreads::new(&storage, &in_mem, threads), + _bg_threads: BgThreads::new(&storage, &in_mem, threads, true), storage, in_mem, startup_worker_threads: Mutex::default(), diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index b6125c370c..f7e4a9c05e 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -132,11 +132,13 @@ impl BucketMapHolder { self.age.load(Ordering::Acquire) } - pub fn bucket_flushed_at_current_age(&self) { + pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) { let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel); - self.maybe_advance_age_internal( - self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed), - ); + if can_advance_age { + self.maybe_advance_age_internal( + self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed), + ); + } } /// have all buckets been flushed at the current age? @@ -297,7 +299,12 @@ impl BucketMapHolder { } // intended to execute in a bg thread - pub fn background(&self, exit: Arc, in_mem: Vec>>) { + pub fn background( + &self, + exit: Arc, + in_mem: Vec>>, + can_advance_age: bool, + ) { let bins = in_mem.len(); let flush = self.disk.is_some(); let mut throttling_wait_ms = None; @@ -312,6 +319,10 @@ impl BucketMapHolder { .remaining_until_next_interval(self.age_interval_ms()), self.stats.remaining_until_next_interval(), ); + if !can_advance_age { + // if this thread cannot advance age, then make sure we don't sleep 0 + wait = wait.max(1); + } if let Some(throttling_wait_ms) = throttling_wait_ms { self.stats .bg_throttling_wait_us @@ -327,7 +338,9 @@ impl BucketMapHolder { .bg_waiting_us .fetch_add(m.as_us(), Ordering::Relaxed); // likely some time has elapsed. May have been waiting for age time interval to elapse. - self.maybe_advance_age(); + if can_advance_age { + self.maybe_advance_age(); + } } throttling_wait_ms = None; @@ -339,7 +352,7 @@ impl BucketMapHolder { for _ in 0..bins { if flush { let index = self.next_bucket_to_flush(); - in_mem[index].flush(); + in_mem[index].flush(can_advance_age); } self.stats.report_stats(self); if self.all_buckets_flushed_at_current_age() { @@ -453,11 +466,11 @@ pub mod tests { let time = AGE_MS * 8 / 3; let expected = (time / AGE_MS) as Age; let now = Instant::now(); - test.bucket_flushed_at_current_age(); // done with age 0 + test.bucket_flushed_at_current_age(true); // done with age 0 (0..threads).into_par_iter().for_each(|_| { while now.elapsed().as_millis() < (time as u128) { if test.maybe_advance_age() { - test.bucket_flushed_at_current_age(); + test.bucket_flushed_at_current_age(true); } } }); @@ -472,7 +485,7 @@ pub mod tests { assert_eq!(test.current_age(), 0); for _ in 0..bins { assert!(!test.all_buckets_flushed_at_current_age()); - test.bucket_flushed_at_current_age(); + test.bucket_flushed_at_current_age(true); } std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2)); test.maybe_advance_age(); diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 66c42f01c7..d1ab2b6b72 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -99,9 +99,9 @@ impl InMemAccountsIndex { } /// called after flush scans this bucket at the current age - fn set_has_aged(&self, age: Age) { + fn set_has_aged(&self, age: Age, can_advance_age: bool) { self.last_age_flushed.store(age, Ordering::Release); - self.storage.bucket_flushed_at_current_age(); + self.storage.bucket_flushed_at_current_age(can_advance_age); } fn last_age_flushed(&self) -> Age { @@ -876,9 +876,9 @@ impl InMemAccountsIndex { self.stop_evictions_changes.load(Ordering::Acquire) } - pub(crate) fn flush(&self) { + pub(crate) fn flush(&self, can_advance_age: bool) { if let Some(flush_guard) = FlushGuard::lock(&self.flushing_active) { - self.flush_internal(&flush_guard) + self.flush_internal(&flush_guard, can_advance_age) } } @@ -983,7 +983,7 @@ impl InMemAccountsIndex { } /// synchronize the in-mem index with the disk index - fn flush_internal(&self, flush_guard: &FlushGuard) { + fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) { let current_age = self.storage.current_age(); let iterate_for_age = self.get_should_age(current_age); let startup = self.storage.get_startup(); @@ -1093,7 +1093,7 @@ impl InMemAccountsIndex { if iterate_for_age { // completed iteration of the buckets at the current age assert_eq!(current_age, self.storage.current_age()); - self.set_has_aged(current_age); + self.set_has_aged(current_age, can_advance_age); } } } @@ -1555,13 +1555,13 @@ mod tests { let test = new_for_test::(); assert!(test.get_should_age(test.storage.current_age())); assert_eq!(test.storage.count_buckets_flushed(), 0); - test.set_has_aged(0); + test.set_has_aged(0, true); assert!(!test.get_should_age(test.storage.current_age())); assert_eq!(test.storage.count_buckets_flushed(), 1); // simulate rest of buckets aging for _ in 1..BINS_FOR_TESTING { assert!(!test.storage.all_buckets_flushed_at_current_age()); - test.storage.bucket_flushed_at_current_age(); + test.storage.bucket_flushed_at_current_age(true); } assert!(test.storage.all_buckets_flushed_at_current_age()); // advance age