From dd69f3baf537a4ee3190e4eda596fc0d44fef3da Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Sun, 20 Mar 2022 19:56:20 -0500 Subject: [PATCH] throttle index adding to allow disk flushing to keep up and reduce startup ram usage (#23773) --- runtime/src/accounts_db.rs | 33 +++++++++++++++++++++----- runtime/src/accounts_index.rs | 4 ++++ runtime/src/accounts_index_storage.rs | 9 +++++++ runtime/src/bucket_map_holder_stats.rs | 10 ++++++++ 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 90399df5f3..71c91e607a 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -18,8 +18,6 @@ //! tracks the number of commits to the entire data store. So the latest //! commit for each slot entry would be indexed. -#[cfg(test)] -use std::{thread::sleep, time::Duration}; use { crate::{ account_info::{AccountInfo, Offset, StorageLocation, StoredSize}, @@ -80,8 +78,8 @@ use { atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}, Arc, Condvar, Mutex, MutexGuard, RwLock, }, - thread::Builder, - time::Instant, + thread::{sleep, Builder}, + time::{Duration, Instant}, }, tempfile::TempDir, }; @@ -6928,6 +6926,7 @@ impl AccountsDb { .map(|key| (key, &account)) .collect::>(); let hashes = (0..filler_entries).map(|_| hash).collect::>(); + self.maybe_throttle_index_generation(); self.store_accounts_frozen((*slot, &add[..]), Some(&hashes[..]), None, None); }); self.accounts_index.set_startup(false); @@ -7009,6 +7008,7 @@ impl AccountsDb { let insert_us = if pass == 0 { // generate index + self.maybe_throttle_index_generation(); let SlotIndexGenerationInfo { insert_time_us: insert_us, num_accounts: total_this_slot, @@ -7139,6 +7139,28 @@ impl AccountsDb { } } + /// Startup processes can consume large amounts of memory while inserting accounts into the index as fast as possible. + /// Calling this can slow down the insertion process to allow flushing to disk to keep pace. + fn maybe_throttle_index_generation(&self) { + // This number is chosen to keep the initial ram usage sufficiently small + // The process of generating the index is goverened entirely by how fast the disk index can be populated. + // 10M accounts is sufficiently small that it will never have memory usage. It seems sufficiently large that it will provide sufficient performance. + // Performance is measured by total time to generate the index. + // Just estimating - 150M accounts can easily be held in memory in the accounts index on a 256G machine. 2-300M are also likely 'fine' during startup. + // 550M was straining a 384G machine at startup. + // This is a tunable parameter that just needs to be small enough to keep the generation threads from overwhelming RAM and oom at startup. + const LIMIT: usize = 10_000_000; + while self + .accounts_index + .get_startup_remaining_items_to_flush_estimate() + > LIMIT + { + // 10 ms is long enough to allow some flushing to occur before insertion is resumed. + // callers of this are typically run in parallel, so many threads will be sleeping at different starting intervals, waiting to resume insertion. + sleep(Duration::from_millis(10)); + } + } + /// Used during generate_index() to get the _duplicate_ accounts data len from the given pubkeys /// Note this should only be used when ALL entries in the accounts index are roots. fn pubkeys_to_duplicate_accounts_data_len(&self, pubkeys: &[Pubkey]) -> u64 { @@ -7526,8 +7548,7 @@ pub mod tests { std::{ iter::FromIterator, str::FromStr, - thread::{self, sleep, Builder, JoinHandle}, - time::Duration, + thread::{self, Builder, JoinHandle}, }, }; diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 1a417e8697..bbcb1f47bf 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1509,6 +1509,10 @@ impl AccountsIndex { self.storage.set_startup(value); } + pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize { + self.storage.get_startup_remaining_items_to_flush_estimate() + } + /// For each pubkey, find the latest account that appears in `roots` and <= `max_root` /// call `callback` pub(crate) fn scan(&self, pubkeys: &[Pubkey], max_root: Option, mut callback: F) diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 4462c386c5..b4c8d809ed 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -110,6 +110,15 @@ impl AccountsIndexStorage { } } + /// estimate how many items are still needing to be flushed to the disk cache. + pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize { + self.storage + .disk + .as_ref() + .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate()) + .unwrap_or_default() + } + fn shrink_to_fit(&self) { self.in_mem.iter().for_each(|mem| mem.shrink_to_fit()) } diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 47da35ae7c..1a0cd118eb 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -154,6 +154,16 @@ impl BucketMapHolderStats { } } + /// This is an estimate of the # of items in mem that are awaiting flushing to disk. + /// returns (# items in mem) - (# items we intend to hold in mem for performance heuristics) + /// The result is also an estimate because 'held_in_mem' is based on a stat that is swapped out when stats are reported. + pub fn get_remaining_items_to_flush_estimate(&self) -> usize { + let in_mem = self.count_in_mem.load(Ordering::Relaxed) as u64; + let held_in_mem = self.held_in_mem_slot_list_cached.load(Ordering::Relaxed) + + self.held_in_mem_slot_list_len.load(Ordering::Relaxed); + in_mem.saturating_sub(held_in_mem) as usize + } + pub fn report_stats(&self, storage: &BucketMapHolder) { let elapsed_ms = self.last_time.elapsed_ms(); if elapsed_ms < STATS_INTERVAL_MS {