AcctIdx: throttling bg flushing to spread out work over interval (#20176)
* AcctIdx: throttling bg flushing to spread out work over interval * prevent hang
This commit is contained in:
parent
8fee9a2e1a
commit
bd9b2f6f39
|
@ -172,21 +172,64 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
||||||
AGE_MS
|
AGE_MS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// return an amount of ms to sleep
|
||||||
|
fn throttling_wait_ms_internal(
|
||||||
|
&self,
|
||||||
|
interval_ms: u64,
|
||||||
|
elapsed_ms: u64,
|
||||||
|
bins_flushed: u64,
|
||||||
|
) -> Option<u64> {
|
||||||
|
let target_percent = 90; // aim to finish in 90% of the allocated time
|
||||||
|
let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
|
||||||
|
let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
|
||||||
|
if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
|
||||||
|
// any of these conditions result in 'do not wait due to progress'
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let ms_per_s = 1_000;
|
||||||
|
let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
|
||||||
|
let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
|
||||||
|
if expected_bins_processed_in_remaining_time > remaining_bins {
|
||||||
|
// wait because we predict will finish prior to target
|
||||||
|
Some(1)
|
||||||
|
} else {
|
||||||
|
// do not wait because we predict will finish after target
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check progress this age.
|
||||||
|
/// Return ms to wait to get closer to the wait target and spread out work over the entire age interval.
|
||||||
|
/// Goal is to avoid cpu spikes at beginning of age interval.
|
||||||
|
fn throttling_wait_ms(&self) -> Option<u64> {
|
||||||
|
let interval_ms = self.age_interval_ms();
|
||||||
|
let elapsed_ms = self.age_timer.elapsed_ms();
|
||||||
|
let bins_flushed = self.count_ages_flushed() as u64;
|
||||||
|
self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
|
||||||
|
}
|
||||||
|
|
||||||
// intended to execute in a bg thread
|
// intended to execute in a bg thread
|
||||||
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
||||||
let bins = in_mem.len();
|
let bins = in_mem.len();
|
||||||
let flush = self.disk.is_some();
|
let flush = self.disk.is_some();
|
||||||
|
let mut throttling_wait_ms = None;
|
||||||
loop {
|
loop {
|
||||||
if !flush {
|
if !flush {
|
||||||
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
|
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
|
||||||
self.stats.remaining_until_next_interval(),
|
self.stats.remaining_until_next_interval(),
|
||||||
));
|
));
|
||||||
} else if self.all_buckets_flushed_at_current_age() {
|
} else if self.all_buckets_flushed_at_current_age() || throttling_wait_ms.is_some() {
|
||||||
let wait = std::cmp::min(
|
let mut wait = std::cmp::min(
|
||||||
self.age_timer
|
self.age_timer
|
||||||
.remaining_until_next_interval(self.age_interval_ms()),
|
.remaining_until_next_interval(self.age_interval_ms()),
|
||||||
self.stats.remaining_until_next_interval(),
|
self.stats.remaining_until_next_interval(),
|
||||||
);
|
);
|
||||||
|
if let Some(throttling_wait_ms) = throttling_wait_ms {
|
||||||
|
self.stats
|
||||||
|
.bg_throttling_wait_us
|
||||||
|
.fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
|
||||||
|
wait = std::cmp::min(throttling_wait_ms, wait);
|
||||||
|
}
|
||||||
|
|
||||||
let mut m = Measure::start("wait");
|
let mut m = Measure::start("wait");
|
||||||
self.wait_dirty_or_aged
|
self.wait_dirty_or_aged
|
||||||
|
@ -198,6 +241,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
||||||
// likely some time has elapsed. May have been waiting for age time interval to elapse.
|
// likely some time has elapsed. May have been waiting for age time interval to elapse.
|
||||||
self.maybe_advance_age();
|
self.maybe_advance_age();
|
||||||
}
|
}
|
||||||
|
throttling_wait_ms = None;
|
||||||
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
|
@ -213,6 +257,10 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
||||||
if self.all_buckets_flushed_at_current_age() {
|
if self.all_buckets_flushed_at_current_age() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
throttling_wait_ms = self.throttling_wait_ms();
|
||||||
|
if throttling_wait_ms.is_some() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
@ -270,6 +318,35 @@ pub mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_throttle() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let bins = 100;
|
||||||
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
||||||
|
let bins = test.bins as u64;
|
||||||
|
let interval_ms = test.age_interval_ms();
|
||||||
|
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
|
||||||
|
let elapsed_ms = interval_ms * 89 / 100;
|
||||||
|
let bins_flushed = bins - 1;
|
||||||
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
||||||
|
assert_eq!(result, None);
|
||||||
|
// 10% of time, all bins but 1, should wait
|
||||||
|
let elapsed_ms = interval_ms / 10;
|
||||||
|
let bins_flushed = bins - 1;
|
||||||
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
||||||
|
assert_eq!(result, Some(1));
|
||||||
|
// 5% of time, 8% of bins, should wait. target is 90%. These #s roughly work
|
||||||
|
let elapsed_ms = interval_ms * 5 / 100;
|
||||||
|
let bins_flushed = bins * 8 / 100;
|
||||||
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
||||||
|
assert_eq!(result, Some(1));
|
||||||
|
// 11% of time, 12% of bins, should NOT wait. target is 90%. These #s roughly work
|
||||||
|
let elapsed_ms = interval_ms * 11 / 100;
|
||||||
|
let bins_flushed = bins * 12 / 100;
|
||||||
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
||||||
|
assert_eq!(result, None);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_age_time() {
|
fn test_age_time() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
|
|
@ -28,6 +28,7 @@ pub struct BucketMapHolderStats {
|
||||||
pub inserts: AtomicU64,
|
pub inserts: AtomicU64,
|
||||||
pub count: AtomicU64,
|
pub count: AtomicU64,
|
||||||
pub bg_waiting_us: AtomicU64,
|
pub bg_waiting_us: AtomicU64,
|
||||||
|
pub bg_throttling_wait_us: AtomicU64,
|
||||||
pub count_in_mem: AtomicU64,
|
pub count_in_mem: AtomicU64,
|
||||||
pub per_bucket_count: Vec<AtomicU64>,
|
pub per_bucket_count: Vec<AtomicU64>,
|
||||||
pub flush_entries_updated_on_disk: AtomicU64,
|
pub flush_entries_updated_on_disk: AtomicU64,
|
||||||
|
@ -134,6 +135,11 @@ impl BucketMapHolderStats {
|
||||||
self.bg_waiting_us.swap(0, Ordering::Relaxed),
|
self.bg_waiting_us.swap(0, Ordering::Relaxed),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"bg_throttling_wait_us",
|
||||||
|
self.bg_throttling_wait_us.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
("min_in_bin", min, i64),
|
("min_in_bin", min, i64),
|
||||||
("max_in_bin", max, i64),
|
("max_in_bin", max, i64),
|
||||||
("count_from_bins", ct, i64),
|
("count_from_bins", ct, i64),
|
||||||
|
|
Loading…
Reference in New Issue