AcctIdx: sleep thread when enough threads are running to complete the remaining work (#20446)
This commit is contained in:
parent
0d98a91511
commit
4267419349
|
@ -208,6 +208,19 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
|
||||
}
|
||||
|
||||
/// true if this thread can sleep
|
||||
fn should_thread_sleep(&self) -> bool {
|
||||
let bins_flushed = self.count_ages_flushed();
|
||||
if bins_flushed >= self.bins {
|
||||
// all bins flushed, so this thread can sleep
|
||||
true
|
||||
} else {
|
||||
// at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep
|
||||
let active = self.stats.active_threads.load(Ordering::Relaxed);
|
||||
bins_flushed.saturating_add(active as usize) >= self.bins
|
||||
}
|
||||
}
|
||||
|
||||
// intended to execute in a bg thread
|
||||
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
||||
let bins = in_mem.len();
|
||||
|
@ -218,7 +231,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
|
||||
self.stats.remaining_until_next_interval(),
|
||||
));
|
||||
} else if self.all_buckets_flushed_at_current_age() || throttling_wait_ms.is_some() {
|
||||
} else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
|
||||
let mut wait = std::cmp::min(
|
||||
self.age_timer
|
||||
.remaining_until_next_interval(self.age_interval_ms()),
|
||||
|
|
Loading…
Reference in New Issue