From 426741934939f0b2798192b680561a41605c91c0 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Tue, 5 Oct 2021 16:48:23 -0500 Subject: [PATCH] AcctIdx: sleep thread when enough threads are running to complete the remaining work (#20446) --- runtime/src/bucket_map_holder.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index b06bdc39c3..a28e79e70b 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -208,6 +208,19 @@ impl BucketMapHolder { self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed) } + /// true if this thread can sleep + fn should_thread_sleep(&self) -> bool { + let bins_flushed = self.count_ages_flushed(); + if bins_flushed >= self.bins { + // all bins flushed, so this thread can sleep + true + } else { + // at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep + let active = self.stats.active_threads.load(Ordering::Relaxed); + bins_flushed.saturating_add(active as usize) >= self.bins + } + } + // intended to execute in a bg thread pub fn background(&self, exit: Arc, in_mem: Vec>>) { let bins = in_mem.len(); @@ -218,7 +231,7 @@ impl BucketMapHolder { self.wait_dirty_or_aged.wait_timeout(Duration::from_millis( self.stats.remaining_until_next_interval(), )); - } else if self.all_buckets_flushed_at_current_age() || throttling_wait_ms.is_some() { + } else if self.should_thread_sleep() || throttling_wait_ms.is_some() { let mut wait = std::cmp::min( self.age_timer .remaining_until_next_interval(self.age_interval_ms()),