2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
crate::{
|
2022-04-12 07:38:09 -07:00
|
|
|
accounts_index::{AccountsIndexConfig, IndexLimitMb, IndexValue},
|
2021-12-03 09:00:31 -08:00
|
|
|
bucket_map_holder_stats::BucketMapHolderStats,
|
|
|
|
in_mem_accounts_index::{InMemAccountsIndex, SlotT},
|
|
|
|
waitable_condvar::WaitableCondvar,
|
|
|
|
},
|
|
|
|
solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig},
|
|
|
|
solana_measure::measure::Measure,
|
|
|
|
solana_sdk::{clock::SLOT_MS, timing::AtomicInterval},
|
|
|
|
std::{
|
|
|
|
fmt::Debug,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
|
2022-01-03 08:35:35 -08:00
|
|
|
Arc,
|
2021-12-03 09:00:31 -08:00
|
|
|
},
|
|
|
|
time::Duration,
|
|
|
|
},
|
|
|
|
};
|
2021-09-17 13:11:07 -07:00
|
|
|
pub type Age = u8;
|
2021-09-12 15:14:59 -07:00
|
|
|
|
2021-09-27 18:22:07 -07:00
|
|
|
const AGE_MS: u64 = SLOT_MS; // match one age per slot time
|
2021-09-20 12:29:34 -07:00
|
|
|
|
2022-04-13 07:24:50 -07:00
|
|
|
// 10 GB limit for in-mem idx. In practice, we don't get this high. This tunes how aggressively to save items we expect to use soon.
|
|
|
|
pub const DEFAULT_DISK_INDEX: Option<usize> = Some(10_000);
|
|
|
|
|
2021-09-14 15:51:07 -07:00
|
|
|
pub struct BucketMapHolder<T: IndexValue> {
|
2021-09-20 06:40:10 -07:00
|
|
|
pub disk: Option<BucketMap<SlotT<T>>>,
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
pub count_ages_flushed: AtomicUsize,
|
|
|
|
pub age: AtomicU8,
|
2021-09-12 15:14:59 -07:00
|
|
|
pub stats: BucketMapHolderStats,
|
2021-09-17 08:41:30 -07:00
|
|
|
|
2021-09-20 12:29:34 -07:00
|
|
|
age_timer: AtomicInterval,
|
|
|
|
|
2021-09-17 08:41:30 -07:00
|
|
|
// used by bg processing to know when any bucket has become dirty
|
2021-10-15 14:15:11 -07:00
|
|
|
pub wait_dirty_or_aged: Arc<WaitableCondvar>,
|
2022-01-03 08:35:35 -08:00
|
|
|
next_bucket_to_flush: AtomicUsize,
|
2021-09-17 08:41:30 -07:00
|
|
|
bins: usize,
|
2021-09-18 10:55:57 -07:00
|
|
|
|
2021-10-26 13:09:17 -07:00
|
|
|
pub threads: usize,
|
|
|
|
|
2021-09-19 16:00:15 -07:00
|
|
|
// how much mb are we allowed to keep in the in-mem index?
|
|
|
|
// Rest goes to disk.
|
|
|
|
pub mem_budget_mb: Option<usize>,
|
2021-09-19 18:22:09 -07:00
|
|
|
ages_to_stay_in_cache: Age,
|
2021-09-19 16:00:15 -07:00
|
|
|
|
2021-09-18 10:55:57 -07:00
|
|
|
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
|
|
|
|
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
|
|
|
|
/// and writing to disk in parallel are.
|
|
|
|
/// Note startup is an optimization and is not required for correctness.
|
|
|
|
startup: AtomicBool,
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|
|
|
|
|
2021-09-15 07:54:16 -07:00
|
|
|
impl<T: IndexValue> Debug for BucketMapHolder<T> {
|
|
|
|
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-17 08:41:30 -07:00
|
|
|
#[allow(clippy::mutex_atomic)]
|
2021-09-14 15:51:07 -07:00
|
|
|
impl<T: IndexValue> BucketMapHolder<T> {
|
2021-12-08 14:52:22 -08:00
|
|
|
/// is the accounts index using disk as a backing store
|
|
|
|
pub fn is_disk_index_enabled(&self) -> bool {
|
|
|
|
self.disk.is_some()
|
|
|
|
}
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
pub fn increment_age(&self) {
|
2021-09-21 08:52:39 -07:00
|
|
|
// since we are about to change age, there are now 0 buckets that have been flushed at this age
|
|
|
|
// this should happen before the age.fetch_add
|
|
|
|
let previous = self.count_ages_flushed.swap(0, Ordering::Acquire);
|
2021-09-17 13:11:07 -07:00
|
|
|
// fetch_add is defined to wrap.
|
|
|
|
// That's what we want. 0..255, then back to 0.
|
2021-09-21 08:52:39 -07:00
|
|
|
self.age.fetch_add(1, Ordering::Release);
|
2021-09-17 13:11:07 -07:00
|
|
|
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
|
2021-09-20 07:58:20 -07:00
|
|
|
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
|
|
|
|
2021-09-19 18:22:09 -07:00
|
|
|
pub fn future_age_to_flush(&self) -> Age {
|
|
|
|
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
|
|
|
|
}
|
2021-09-20 12:29:34 -07:00
|
|
|
|
|
|
|
fn has_age_interval_elapsed(&self) -> bool {
|
|
|
|
// note that when this returns true, state of age_timer is modified
|
2021-09-27 18:22:07 -07:00
|
|
|
self.age_timer.should_update(self.age_interval_ms())
|
2021-09-20 12:29:34 -07:00
|
|
|
}
|
|
|
|
|
2021-09-18 10:55:57 -07:00
|
|
|
/// used by bg processes to determine # active threads and how aggressively to flush
|
|
|
|
pub fn get_startup(&self) -> bool {
|
|
|
|
self.startup.load(Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
2021-10-15 14:15:11 -07:00
|
|
|
/// startup=true causes:
|
|
|
|
/// in mem to act in a way that flushes to disk asap
|
|
|
|
/// startup=false is 'normal' operation
|
|
|
|
pub fn set_startup(&self, value: bool) {
|
|
|
|
if !value {
|
2021-09-18 20:08:58 -07:00
|
|
|
self.wait_for_idle();
|
|
|
|
}
|
2021-09-18 10:55:57 -07:00
|
|
|
self.startup.store(value, Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
2021-10-15 14:15:11 -07:00
|
|
|
/// return when the bg threads have reached an 'idle' state
|
2021-09-18 20:08:58 -07:00
|
|
|
pub(crate) fn wait_for_idle(&self) {
|
|
|
|
assert!(self.get_startup());
|
2021-09-28 09:07:19 -07:00
|
|
|
if self.disk.is_none() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-10-15 14:15:11 -07:00
|
|
|
// when age has incremented twice, we know that we have made it through scanning all bins since we started waiting,
|
|
|
|
// so we are then 'idle'
|
2021-09-28 09:07:19 -07:00
|
|
|
let end_age = self.current_age().wrapping_add(2);
|
|
|
|
loop {
|
|
|
|
self.wait_dirty_or_aged
|
|
|
|
.wait_timeout(Duration::from_millis(self.age_interval_ms()));
|
|
|
|
if end_age == self.current_age() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-09-18 20:08:58 -07:00
|
|
|
}
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
pub fn current_age(&self) -> Age {
|
2021-09-21 08:52:39 -07:00
|
|
|
self.age.load(Ordering::Acquire)
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn bucket_flushed_at_current_age(&self) {
|
2021-09-21 08:52:39 -07:00
|
|
|
self.count_ages_flushed.fetch_add(1, Ordering::Release);
|
|
|
|
self.maybe_advance_age();
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
|
|
|
|
2021-10-15 14:15:11 -07:00
|
|
|
/// have all buckets been flushed at the current age?
|
2021-09-17 13:11:07 -07:00
|
|
|
pub fn all_buckets_flushed_at_current_age(&self) -> bool {
|
2021-09-21 06:40:55 -07:00
|
|
|
self.count_ages_flushed() >= self.bins
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn count_ages_flushed(&self) -> usize {
|
2021-09-21 08:52:39 -07:00
|
|
|
self.count_ages_flushed.load(Ordering::Acquire)
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
|
|
|
|
2021-09-20 12:29:34 -07:00
|
|
|
pub fn maybe_advance_age(&self) -> bool {
|
|
|
|
// check has_age_interval_elapsed last as calling it modifies state on success
|
|
|
|
if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() {
|
|
|
|
self.increment_age();
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-26 13:09:17 -07:00
|
|
|
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
|
2021-09-19 18:22:09 -07:00
|
|
|
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
|
|
|
|
let ages_to_stay_in_cache = config
|
|
|
|
.as_ref()
|
|
|
|
.and_then(|config| config.ages_to_stay_in_cache)
|
|
|
|
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
|
2021-09-20 06:40:10 -07:00
|
|
|
|
|
|
|
let mut bucket_config = BucketMapConfig::new(bins);
|
|
|
|
bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
|
2022-04-13 07:24:50 -07:00
|
|
|
let mem_budget_mb = match config
|
|
|
|
.as_ref()
|
|
|
|
.map(|config| &config.index_limit_mb)
|
|
|
|
.unwrap_or(&IndexLimitMb::Unspecified)
|
|
|
|
{
|
|
|
|
// creator said to use disk idx with a specific limit
|
|
|
|
IndexLimitMb::Limit(mb) => Some(*mb),
|
|
|
|
// creator said InMemOnly, so no disk index
|
|
|
|
IndexLimitMb::InMemOnly => None,
|
|
|
|
// whatever started us didn't specify whether to use the acct idx
|
|
|
|
IndexLimitMb::Unspecified => {
|
|
|
|
// check env var if we were not started from a validator
|
|
|
|
let mut use_default = true;
|
|
|
|
if !config
|
|
|
|
.as_ref()
|
|
|
|
.map(|config| config.started_from_validator)
|
|
|
|
.unwrap_or_default()
|
|
|
|
{
|
|
|
|
if let Ok(_limit) = std::env::var("SOLANA_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB")
|
|
|
|
{
|
|
|
|
// Note this env var means the opposite of the default. The default now is disk index is on.
|
|
|
|
// So, if this env var is set, DO NOT allocate with disk buckets if mem budget was not set, we were NOT started from validator, and env var was set
|
|
|
|
// we do not want the env var to have an effect when running the validator (only tests, benches, etc.)
|
|
|
|
use_default = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if use_default {
|
|
|
|
// if validator does not specify disk index limit or specify in mem only, then this is the default
|
|
|
|
DEFAULT_DISK_INDEX
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
2022-02-22 07:40:12 -08:00
|
|
|
}
|
2022-04-13 07:24:50 -07:00
|
|
|
};
|
2022-02-22 07:40:12 -08:00
|
|
|
|
2021-09-20 06:40:10 -07:00
|
|
|
// only allocate if mem_budget_mb is Some
|
|
|
|
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
|
2021-09-15 11:07:53 -07:00
|
|
|
Self {
|
2021-09-20 06:40:10 -07:00
|
|
|
disk,
|
2021-09-19 18:22:09 -07:00
|
|
|
ages_to_stay_in_cache,
|
2021-09-17 13:11:07 -07:00
|
|
|
count_ages_flushed: AtomicUsize::default(),
|
|
|
|
age: AtomicU8::default(),
|
2021-09-22 06:55:07 -07:00
|
|
|
stats: BucketMapHolderStats::new(bins),
|
2021-10-15 14:15:11 -07:00
|
|
|
wait_dirty_or_aged: Arc::default(),
|
2022-01-03 08:35:35 -08:00
|
|
|
next_bucket_to_flush: AtomicUsize::new(0),
|
2021-09-20 12:29:34 -07:00
|
|
|
age_timer: AtomicInterval::default(),
|
2021-09-17 08:41:30 -07:00
|
|
|
bins,
|
2021-09-18 10:55:57 -07:00
|
|
|
startup: AtomicBool::default(),
|
2021-09-20 06:40:10 -07:00
|
|
|
mem_budget_mb,
|
2021-10-26 13:09:17 -07:00
|
|
|
threads,
|
2021-09-15 11:07:53 -07:00
|
|
|
}
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|
2021-09-17 08:41:30 -07:00
|
|
|
|
|
|
|
// get the next bucket to flush, with the idea that the previous bucket
|
|
|
|
// is perhaps being flushed by another thread already.
|
|
|
|
pub fn next_bucket_to_flush(&self) -> usize {
|
2022-01-03 08:35:35 -08:00
|
|
|
self.next_bucket_to_flush
|
|
|
|
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
|
|
|
|
Some((bucket + 1) % self.bins)
|
|
|
|
})
|
|
|
|
.unwrap()
|
2021-09-17 08:41:30 -07:00
|
|
|
}
|
2021-09-22 10:40:30 -07:00
|
|
|
|
2021-09-27 18:22:07 -07:00
|
|
|
/// prepare for this to be dynamic if necessary
|
|
|
|
/// For example, maybe startup has a shorter age interval.
|
|
|
|
fn age_interval_ms(&self) -> u64 {
|
|
|
|
AGE_MS
|
|
|
|
}
|
|
|
|
|
2021-09-29 05:42:54 -07:00
|
|
|
/// return an amount of ms to sleep
|
|
|
|
fn throttling_wait_ms_internal(
|
|
|
|
&self,
|
|
|
|
interval_ms: u64,
|
|
|
|
elapsed_ms: u64,
|
|
|
|
bins_flushed: u64,
|
|
|
|
) -> Option<u64> {
|
|
|
|
let target_percent = 90; // aim to finish in 90% of the allocated time
|
|
|
|
let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
|
|
|
|
let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
|
|
|
|
if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
|
|
|
|
// any of these conditions result in 'do not wait due to progress'
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
let ms_per_s = 1_000;
|
|
|
|
let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
|
|
|
|
let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
|
|
|
|
if expected_bins_processed_in_remaining_time > remaining_bins {
|
|
|
|
// wait because we predict will finish prior to target
|
|
|
|
Some(1)
|
|
|
|
} else {
|
|
|
|
// do not wait because we predict will finish after target
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Check progress this age.
|
|
|
|
/// Return ms to wait to get closer to the wait target and spread out work over the entire age interval.
|
|
|
|
/// Goal is to avoid cpu spikes at beginning of age interval.
|
|
|
|
fn throttling_wait_ms(&self) -> Option<u64> {
|
|
|
|
let interval_ms = self.age_interval_ms();
|
|
|
|
let elapsed_ms = self.age_timer.elapsed_ms();
|
|
|
|
let bins_flushed = self.count_ages_flushed() as u64;
|
|
|
|
self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
|
|
|
|
}
|
|
|
|
|
2021-10-05 14:48:23 -07:00
|
|
|
/// true if this thread can sleep
|
|
|
|
fn should_thread_sleep(&self) -> bool {
|
|
|
|
let bins_flushed = self.count_ages_flushed();
|
|
|
|
if bins_flushed >= self.bins {
|
|
|
|
// all bins flushed, so this thread can sleep
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
// at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep
|
|
|
|
let active = self.stats.active_threads.load(Ordering::Relaxed);
|
|
|
|
bins_flushed.saturating_add(active as usize) >= self.bins
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-22 10:40:30 -07:00
|
|
|
// intended to execute in a bg thread
|
|
|
|
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
|
|
|
let bins = in_mem.len();
|
|
|
|
let flush = self.disk.is_some();
|
2021-09-29 05:42:54 -07:00
|
|
|
let mut throttling_wait_ms = None;
|
2021-09-22 10:40:30 -07:00
|
|
|
loop {
|
2021-09-27 09:24:36 -07:00
|
|
|
if !flush {
|
|
|
|
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
|
|
|
|
self.stats.remaining_until_next_interval(),
|
|
|
|
));
|
2021-10-05 14:48:23 -07:00
|
|
|
} else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
|
2021-09-29 05:42:54 -07:00
|
|
|
let mut wait = std::cmp::min(
|
2021-09-27 18:22:07 -07:00
|
|
|
self.age_timer
|
|
|
|
.remaining_until_next_interval(self.age_interval_ms()),
|
2021-09-24 14:00:41 -07:00
|
|
|
self.stats.remaining_until_next_interval(),
|
|
|
|
);
|
2021-09-29 05:42:54 -07:00
|
|
|
if let Some(throttling_wait_ms) = throttling_wait_ms {
|
|
|
|
self.stats
|
|
|
|
.bg_throttling_wait_us
|
|
|
|
.fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
|
|
|
|
wait = std::cmp::min(throttling_wait_ms, wait);
|
|
|
|
}
|
2021-09-24 14:00:41 -07:00
|
|
|
|
2021-09-24 11:19:06 -07:00
|
|
|
let mut m = Measure::start("wait");
|
|
|
|
self.wait_dirty_or_aged
|
2021-09-24 14:00:41 -07:00
|
|
|
.wait_timeout(Duration::from_millis(wait));
|
2021-09-24 11:19:06 -07:00
|
|
|
m.stop();
|
|
|
|
self.stats
|
|
|
|
.bg_waiting_us
|
|
|
|
.fetch_add(m.as_us(), Ordering::Relaxed);
|
2021-09-27 11:39:41 -07:00
|
|
|
// likely some time has elapsed. May have been waiting for age time interval to elapse.
|
|
|
|
self.maybe_advance_age();
|
2021-09-24 11:19:06 -07:00
|
|
|
}
|
2021-09-29 05:42:54 -07:00
|
|
|
throttling_wait_ms = None;
|
2021-09-23 11:37:14 -07:00
|
|
|
|
2021-09-22 10:40:30 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
|
|
|
|
for _ in 0..bins {
|
|
|
|
if flush {
|
|
|
|
let index = self.next_bucket_to_flush();
|
|
|
|
in_mem[index].flush();
|
|
|
|
}
|
|
|
|
self.stats.report_stats(self);
|
2021-09-24 11:19:06 -07:00
|
|
|
if self.all_buckets_flushed_at_current_age() {
|
|
|
|
break;
|
|
|
|
}
|
2021-09-29 05:42:54 -07:00
|
|
|
throttling_wait_ms = self.throttling_wait_ms();
|
|
|
|
if throttling_wait_ms.is_some() {
|
|
|
|
break;
|
|
|
|
}
|
2021-09-22 10:40:30 -07:00
|
|
|
}
|
|
|
|
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
2021-09-17 08:41:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
pub mod tests {
|
2022-01-03 08:35:35 -08:00
|
|
|
use {super::*, rayon::prelude::*, std::time::Instant};
|
2021-09-17 08:41:30 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_next_bucket_to_flush() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-10-26 13:09:17 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
2021-09-17 08:41:30 -07:00
|
|
|
let visited = (0..bins)
|
|
|
|
.into_iter()
|
|
|
|
.map(|_| AtomicUsize::default())
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let iterations = bins * 30;
|
|
|
|
let threads = bins * 4;
|
|
|
|
let expected = threads * iterations / bins;
|
|
|
|
|
|
|
|
(0..threads).into_par_iter().for_each(|_| {
|
|
|
|
(0..iterations).into_iter().for_each(|_| {
|
|
|
|
let bin = test.next_bucket_to_flush();
|
|
|
|
visited[bin].fetch_add(1, Ordering::Relaxed);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
visited.iter().enumerate().for_each(|(bin, visited)| {
|
|
|
|
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
|
|
|
|
});
|
|
|
|
}
|
2021-09-17 13:11:07 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_age_increment() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-10-26 13:09:17 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
2021-09-17 13:11:07 -07:00
|
|
|
for age in 0..513 {
|
|
|
|
assert_eq!(test.current_age(), (age % 256) as Age);
|
|
|
|
|
|
|
|
// inc all
|
|
|
|
for _ in 0..bins {
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
2021-09-21 08:52:39 -07:00
|
|
|
// cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
|
|
|
|
2021-09-21 08:52:39 -07:00
|
|
|
// this would normally happen once time went off and all buckets had been flushed at the previous age
|
|
|
|
test.count_ages_flushed.fetch_add(bins, Ordering::Release);
|
2021-09-17 13:11:07 -07:00
|
|
|
test.increment_age();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-29 05:42:54 -07:00
|
|
|
#[test]
|
|
|
|
fn test_throttle() {
|
|
|
|
solana_logger::setup();
|
2022-04-13 07:24:50 -07:00
|
|
|
let bins = 128;
|
2021-10-26 13:09:17 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
2021-09-29 05:42:54 -07:00
|
|
|
let bins = test.bins as u64;
|
|
|
|
let interval_ms = test.age_interval_ms();
|
|
|
|
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
|
|
|
|
let elapsed_ms = interval_ms * 89 / 100;
|
|
|
|
let bins_flushed = bins - 1;
|
|
|
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
|
|
|
assert_eq!(result, None);
|
|
|
|
// 10% of time, all bins but 1, should wait
|
|
|
|
let elapsed_ms = interval_ms / 10;
|
|
|
|
let bins_flushed = bins - 1;
|
|
|
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
|
|
|
assert_eq!(result, Some(1));
|
|
|
|
// 5% of time, 8% of bins, should wait. target is 90%. These #s roughly work
|
|
|
|
let elapsed_ms = interval_ms * 5 / 100;
|
|
|
|
let bins_flushed = bins * 8 / 100;
|
|
|
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
|
|
|
assert_eq!(result, Some(1));
|
|
|
|
// 11% of time, 12% of bins, should NOT wait. target is 90%. These #s roughly work
|
|
|
|
let elapsed_ms = interval_ms * 11 / 100;
|
|
|
|
let bins_flushed = bins * 12 / 100;
|
|
|
|
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
|
|
|
|
assert_eq!(result, None);
|
|
|
|
}
|
|
|
|
|
2021-12-08 14:52:22 -08:00
|
|
|
#[test]
|
|
|
|
fn test_disk_index_enabled() {
|
|
|
|
let bins = 1;
|
|
|
|
let config = AccountsIndexConfig {
|
2022-04-12 07:38:09 -07:00
|
|
|
index_limit_mb: IndexLimitMb::Limit(0),
|
2021-12-08 14:52:22 -08:00
|
|
|
..AccountsIndexConfig::default()
|
|
|
|
};
|
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(config), 1);
|
|
|
|
assert!(test.is_disk_index_enabled());
|
|
|
|
}
|
|
|
|
|
2021-09-20 12:29:34 -07:00
|
|
|
#[test]
|
|
|
|
fn test_age_time() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 1;
|
2021-10-26 13:09:17 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
2021-09-20 12:29:34 -07:00
|
|
|
let threads = 2;
|
|
|
|
let time = AGE_MS * 5 / 2;
|
|
|
|
let expected = (time / AGE_MS) as Age;
|
|
|
|
let now = Instant::now();
|
|
|
|
test.bucket_flushed_at_current_age(); // done with age 0
|
|
|
|
(0..threads).into_par_iter().for_each(|_| {
|
|
|
|
while now.elapsed().as_millis() < (time as u128) {
|
|
|
|
if test.maybe_advance_age() {
|
|
|
|
test.bucket_flushed_at_current_age();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
assert_eq!(test.current_age(), expected);
|
|
|
|
}
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
#[test]
|
|
|
|
fn test_age_broad() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-10-26 13:09:17 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
2021-09-17 13:11:07 -07:00
|
|
|
assert_eq!(test.current_age(), 0);
|
2021-09-21 08:52:39 -07:00
|
|
|
for _ in 0..bins {
|
2021-09-17 13:11:07 -07:00
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
2021-09-21 08:52:39 -07:00
|
|
|
test.bucket_flushed_at_current_age();
|
2021-09-17 13:11:07 -07:00
|
|
|
}
|
2021-09-21 08:52:39 -07:00
|
|
|
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
|
|
|
|
test.maybe_advance_age();
|
2021-09-17 13:11:07 -07:00
|
|
|
assert_eq!(test.current_age(), 1);
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
|
|
|
}
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|