throttle index adding to allow disk flushing to keep up and reduce startup ram usage (#23773)
This commit is contained in:
parent
335c4b668b
commit
dd69f3baf5
|
@ -18,8 +18,6 @@
|
|||
//! tracks the number of commits to the entire data store. So the latest
|
||||
//! commit for each slot entry would be indexed.
|
||||
|
||||
#[cfg(test)]
|
||||
use std::{thread::sleep, time::Duration};
|
||||
use {
|
||||
crate::{
|
||||
account_info::{AccountInfo, Offset, StorageLocation, StoredSize},
|
||||
|
@ -80,8 +78,8 @@ use {
|
|||
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, Condvar, Mutex, MutexGuard, RwLock,
|
||||
},
|
||||
thread::Builder,
|
||||
time::Instant,
|
||||
thread::{sleep, Builder},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
tempfile::TempDir,
|
||||
};
|
||||
|
@ -6928,6 +6926,7 @@ impl AccountsDb {
|
|||
.map(|key| (key, &account))
|
||||
.collect::<Vec<_>>();
|
||||
let hashes = (0..filler_entries).map(|_| hash).collect::<Vec<_>>();
|
||||
self.maybe_throttle_index_generation();
|
||||
self.store_accounts_frozen((*slot, &add[..]), Some(&hashes[..]), None, None);
|
||||
});
|
||||
self.accounts_index.set_startup(false);
|
||||
|
@ -7009,6 +7008,7 @@ impl AccountsDb {
|
|||
|
||||
let insert_us = if pass == 0 {
|
||||
// generate index
|
||||
self.maybe_throttle_index_generation();
|
||||
let SlotIndexGenerationInfo {
|
||||
insert_time_us: insert_us,
|
||||
num_accounts: total_this_slot,
|
||||
|
@ -7139,6 +7139,28 @@ impl AccountsDb {
|
|||
}
|
||||
}
|
||||
|
||||
/// Startup processes can consume large amounts of memory while inserting accounts into the index as fast as possible.
|
||||
/// Calling this can slow down the insertion process to allow flushing to disk to keep pace.
|
||||
fn maybe_throttle_index_generation(&self) {
|
||||
// This number is chosen to keep the initial ram usage sufficiently small
|
||||
// The process of generating the index is goverened entirely by how fast the disk index can be populated.
|
||||
// 10M accounts is sufficiently small that it will never have memory usage. It seems sufficiently large that it will provide sufficient performance.
|
||||
// Performance is measured by total time to generate the index.
|
||||
// Just estimating - 150M accounts can easily be held in memory in the accounts index on a 256G machine. 2-300M are also likely 'fine' during startup.
|
||||
// 550M was straining a 384G machine at startup.
|
||||
// This is a tunable parameter that just needs to be small enough to keep the generation threads from overwhelming RAM and oom at startup.
|
||||
const LIMIT: usize = 10_000_000;
|
||||
while self
|
||||
.accounts_index
|
||||
.get_startup_remaining_items_to_flush_estimate()
|
||||
> LIMIT
|
||||
{
|
||||
// 10 ms is long enough to allow some flushing to occur before insertion is resumed.
|
||||
// callers of this are typically run in parallel, so many threads will be sleeping at different starting intervals, waiting to resume insertion.
|
||||
sleep(Duration::from_millis(10));
|
||||
}
|
||||
}
|
||||
|
||||
/// Used during generate_index() to get the _duplicate_ accounts data len from the given pubkeys
|
||||
/// Note this should only be used when ALL entries in the accounts index are roots.
|
||||
fn pubkeys_to_duplicate_accounts_data_len(&self, pubkeys: &[Pubkey]) -> u64 {
|
||||
|
@ -7526,8 +7548,7 @@ pub mod tests {
|
|||
std::{
|
||||
iter::FromIterator,
|
||||
str::FromStr,
|
||||
thread::{self, sleep, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
thread::{self, Builder, JoinHandle},
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
@ -1509,6 +1509,10 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
self.storage.set_startup(value);
|
||||
}
|
||||
|
||||
pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
|
||||
self.storage.get_startup_remaining_items_to_flush_estimate()
|
||||
}
|
||||
|
||||
/// For each pubkey, find the latest account that appears in `roots` and <= `max_root`
|
||||
/// call `callback`
|
||||
pub(crate) fn scan<F>(&self, pubkeys: &[Pubkey], max_root: Option<Slot>, mut callback: F)
|
||||
|
|
|
@ -110,6 +110,15 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// estimate how many items are still needing to be flushed to the disk cache.
|
||||
pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
|
||||
self.storage
|
||||
.disk
|
||||
.as_ref()
|
||||
.map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn shrink_to_fit(&self) {
|
||||
self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
|
||||
}
|
||||
|
|
|
@ -154,6 +154,16 @@ impl BucketMapHolderStats {
|
|||
}
|
||||
}
|
||||
|
||||
/// This is an estimate of the # of items in mem that are awaiting flushing to disk.
|
||||
/// returns (# items in mem) - (# items we intend to hold in mem for performance heuristics)
|
||||
/// The result is also an estimate because 'held_in_mem' is based on a stat that is swapped out when stats are reported.
|
||||
pub fn get_remaining_items_to_flush_estimate(&self) -> usize {
|
||||
let in_mem = self.count_in_mem.load(Ordering::Relaxed) as u64;
|
||||
let held_in_mem = self.held_in_mem_slot_list_cached.load(Ordering::Relaxed)
|
||||
+ self.held_in_mem_slot_list_len.load(Ordering::Relaxed);
|
||||
in_mem.saturating_sub(held_in_mem) as usize
|
||||
}
|
||||
|
||||
pub fn report_stats<T: IndexValue>(&self, storage: &BucketMapHolder<T>) {
|
||||
let elapsed_ms = self.last_time.elapsed_ms();
|
||||
if elapsed_ms < STATS_INTERVAL_MS {
|
||||
|
|
Loading…
Reference in New Issue