2021-12-03 09:00:31 -08:00
use {
crate ::{
2022-04-12 07:38:09 -07:00
accounts_index ::{ AccountsIndexConfig , IndexLimitMb , IndexValue } ,
2021-12-03 09:00:31 -08:00
bucket_map_holder_stats ::BucketMapHolderStats ,
2022-06-10 20:04:56 -07:00
in_mem_accounts_index ::InMemAccountsIndex ,
2021-12-03 09:00:31 -08:00
waitable_condvar ::WaitableCondvar ,
} ,
solana_bucket_map ::bucket_map ::{ BucketMap , BucketMapConfig } ,
solana_measure ::measure ::Measure ,
2022-06-10 20:04:56 -07:00
solana_sdk ::{
2022-12-16 08:05:09 -08:00
clock ::{ Slot , DEFAULT_MS_PER_SLOT } ,
2022-06-10 20:04:56 -07:00
timing ::AtomicInterval ,
} ,
2021-12-03 09:00:31 -08:00
std ::{
fmt ::Debug ,
sync ::{
atomic ::{ AtomicBool , AtomicU8 , AtomicUsize , Ordering } ,
2022-01-03 08:35:35 -08:00
Arc ,
2021-12-03 09:00:31 -08:00
} ,
time ::Duration ,
} ,
} ;
2021-09-17 13:11:07 -07:00
pub type Age = u8 ;
2021-09-12 15:14:59 -07:00
2022-12-16 08:05:09 -08:00
const AGE_MS : u64 = DEFAULT_MS_PER_SLOT ; // match one age per slot time
2021-09-20 12:29:34 -07:00
2022-04-13 07:24:50 -07:00
// 10 GB limit for in-mem idx. In practice, we don't get this high. This tunes how aggressively to save items we expect to use soon.
pub const DEFAULT_DISK_INDEX : Option < usize > = Some ( 10_000 ) ;
2021-09-14 15:51:07 -07:00
pub struct BucketMapHolder < T : IndexValue > {
2022-06-10 20:04:56 -07:00
pub disk : Option < BucketMap < ( Slot , T ) > > ,
2021-09-20 06:40:10 -07:00
2022-06-02 10:16:01 -07:00
pub count_buckets_flushed : AtomicUsize ,
2022-08-31 06:56:26 -07:00
/// These three ages are individual atomics because their values are read many times from code during runtime.
/// Instead of accessing the single age and doing math each time, each value is incremented each time the age occurs, which is ~400ms.
/// Callers can ask for the precomputed value they already want.
2022-08-19 10:40:42 -07:00
/// rolling 'current' age
2021-09-17 13:11:07 -07:00
pub age : AtomicU8 ,
2022-08-31 06:56:26 -07:00
/// rolling age that is 'ages_to_stay_in_cache' + 'age'
pub future_age_to_flush : AtomicU8 ,
/// rolling age that is effectively 'age' - 1
/// these items are expected to be flushed from the accounts write cache or otherwise modified before this age occurs
pub future_age_to_flush_cached : AtomicU8 ,
2021-09-12 15:14:59 -07:00
pub stats : BucketMapHolderStats ,
2021-09-17 08:41:30 -07:00
2021-09-20 12:29:34 -07:00
age_timer : AtomicInterval ,
2021-09-17 08:41:30 -07:00
// used by bg processing to know when any bucket has become dirty
2021-10-15 14:15:11 -07:00
pub wait_dirty_or_aged : Arc < WaitableCondvar > ,
2022-01-03 08:35:35 -08:00
next_bucket_to_flush : AtomicUsize ,
2021-09-17 08:41:30 -07:00
bins : usize ,
2021-09-18 10:55:57 -07:00
2021-10-26 13:09:17 -07:00
pub threads : usize ,
2021-09-19 16:00:15 -07:00
// how much mb are we allowed to keep in the in-mem index?
// Rest goes to disk.
pub mem_budget_mb : Option < usize > ,
2022-08-01 12:25:19 -07:00
/// how many ages should elapse from the last time an item is used where the item will remain in the cache
pub ages_to_stay_in_cache : Age ,
2021-09-19 16:00:15 -07:00
2021-09-18 10:55:57 -07:00
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
/// and writing to disk in parallel are.
/// Note startup is an optimization and is not required for correctness.
startup : AtomicBool ,
2021-09-12 15:14:59 -07:00
}
2021-09-15 07:54:16 -07:00
impl < T : IndexValue > Debug for BucketMapHolder < T > {
fn fmt ( & self , _f : & mut std ::fmt ::Formatter < '_ > ) -> std ::fmt ::Result {
Ok ( ( ) )
}
}
2021-09-17 08:41:30 -07:00
#[ allow(clippy::mutex_atomic) ]
2021-09-14 15:51:07 -07:00
impl < T : IndexValue > BucketMapHolder < T > {
2021-12-08 14:52:22 -08:00
/// is the accounts index using disk as a backing store
pub fn is_disk_index_enabled ( & self ) -> bool {
self . disk . is_some ( )
}
2021-09-17 13:11:07 -07:00
pub fn increment_age ( & self ) {
2021-09-21 08:52:39 -07:00
// since we are about to change age, there are now 0 buckets that have been flushed at this age
// this should happen before the age.fetch_add
2022-06-13 15:43:29 -07:00
// Otherwise, as soon as we increment the age, a thread could race us and flush before we swap this out since it detects the age has moved forward and a bucket will be eligible for flushing.
2022-06-02 16:26:08 -07:00
let previous = self . count_buckets_flushed . swap ( 0 , Ordering ::AcqRel ) ;
2021-09-17 13:11:07 -07:00
// fetch_add is defined to wrap.
// That's what we want. 0..255, then back to 0.
2021-09-21 08:52:39 -07:00
self . age . fetch_add ( 1 , Ordering ::Release ) ;
2022-08-31 06:56:26 -07:00
self . future_age_to_flush . fetch_add ( 1 , Ordering ::Release ) ;
self . future_age_to_flush_cached
. fetch_add ( 1 , Ordering ::Release ) ;
2022-06-03 07:32:43 -07:00
assert! (
previous > = self . bins ,
" previous: {}, bins: {} " ,
previous ,
self . bins
) ; // we should not have increased age before previous age was fully flushed
2021-09-20 07:58:20 -07:00
self . wait_dirty_or_aged . notify_all ( ) ; // notify all because we can age scan in parallel
2021-09-17 13:11:07 -07:00
}
2022-08-31 06:56:26 -07:00
pub fn future_age_to_flush ( & self , is_cached : bool ) -> Age {
if is_cached {
& self . future_age_to_flush_cached
} else {
& self . future_age_to_flush
}
. load ( Ordering ::Acquire )
2021-09-19 18:22:09 -07:00
}
2021-09-20 12:29:34 -07:00
fn has_age_interval_elapsed ( & self ) -> bool {
// note that when this returns true, state of age_timer is modified
2021-09-27 18:22:07 -07:00
self . age_timer . should_update ( self . age_interval_ms ( ) )
2021-09-20 12:29:34 -07:00
}
2021-09-18 10:55:57 -07:00
/// used by bg processes to determine # active threads and how aggressively to flush
pub fn get_startup ( & self ) -> bool {
self . startup . load ( Ordering ::Relaxed )
}
2021-10-15 14:15:11 -07:00
/// startup=true causes:
/// in mem to act in a way that flushes to disk asap
/// startup=false is 'normal' operation
pub fn set_startup ( & self , value : bool ) {
if ! value {
2021-09-18 20:08:58 -07:00
self . wait_for_idle ( ) ;
}
2021-09-18 10:55:57 -07:00
self . startup . store ( value , Ordering ::Relaxed )
}
2021-10-15 14:15:11 -07:00
/// return when the bg threads have reached an 'idle' state
2021-09-18 20:08:58 -07:00
pub ( crate ) fn wait_for_idle ( & self ) {
assert! ( self . get_startup ( ) ) ;
2021-09-28 09:07:19 -07:00
if self . disk . is_none ( ) {
return ;
}
2021-10-15 14:15:11 -07:00
// when age has incremented twice, we know that we have made it through scanning all bins since we started waiting,
// so we are then 'idle'
2021-09-28 09:07:19 -07:00
let end_age = self . current_age ( ) . wrapping_add ( 2 ) ;
loop {
self . wait_dirty_or_aged
. wait_timeout ( Duration ::from_millis ( self . age_interval_ms ( ) ) ) ;
if end_age = = self . current_age ( ) {
break ;
}
}
2021-09-18 20:08:58 -07:00
}
2021-09-17 13:11:07 -07:00
pub fn current_age ( & self ) -> Age {
2021-09-21 08:52:39 -07:00
self . age . load ( Ordering ::Acquire )
2021-09-17 13:11:07 -07:00
}
2022-06-14 19:43:42 -07:00
pub fn bucket_flushed_at_current_age ( & self , can_advance_age : bool ) {
2022-06-02 16:26:08 -07:00
let count_buckets_flushed = 1 + self . count_buckets_flushed . fetch_add ( 1 , Ordering ::AcqRel ) ;
2022-06-14 19:43:42 -07:00
if can_advance_age {
self . maybe_advance_age_internal (
self . all_buckets_flushed_at_current_age_internal ( count_buckets_flushed ) ,
) ;
}
2021-09-17 13:11:07 -07:00
}
2021-10-15 14:15:11 -07:00
/// have all buckets been flushed at the current age?
2021-09-17 13:11:07 -07:00
pub fn all_buckets_flushed_at_current_age ( & self ) -> bool {
2022-06-02 16:26:08 -07:00
self . all_buckets_flushed_at_current_age_internal ( self . count_buckets_flushed ( ) )
}
/// have all buckets been flushed at the current age?
fn all_buckets_flushed_at_current_age_internal ( & self , count_buckets_flushed : usize ) -> bool {
count_buckets_flushed > = self . bins
2021-09-21 06:40:55 -07:00
}
2022-06-02 10:16:01 -07:00
pub fn count_buckets_flushed ( & self ) -> usize {
self . count_buckets_flushed . load ( Ordering ::Acquire )
2021-09-17 13:11:07 -07:00
}
2022-06-28 07:13:01 -07:00
/// if all buckets are flushed at the current age and time has elapsed, then advance age
2021-09-20 12:29:34 -07:00
pub fn maybe_advance_age ( & self ) -> bool {
2022-06-02 16:26:08 -07:00
self . maybe_advance_age_internal ( self . all_buckets_flushed_at_current_age ( ) )
}
2022-06-28 07:13:01 -07:00
/// if all buckets are flushed at the current age and time has elapsed, then advance age
2022-06-02 16:26:08 -07:00
fn maybe_advance_age_internal ( & self , all_buckets_flushed_at_current_age : bool ) -> bool {
// call has_age_interval_elapsed last since calling it modifies state on success
if all_buckets_flushed_at_current_age & & self . has_age_interval_elapsed ( ) {
2021-09-20 12:29:34 -07:00
self . increment_age ( ) ;
true
} else {
false
}
}
2021-10-26 13:09:17 -07:00
pub fn new ( bins : usize , config : & Option < AccountsIndexConfig > , threads : usize ) -> Self {
2021-09-19 18:22:09 -07:00
const DEFAULT_AGE_TO_STAY_IN_CACHE : Age = 5 ;
let ages_to_stay_in_cache = config
. as_ref ( )
. and_then ( | config | config . ages_to_stay_in_cache )
. unwrap_or ( DEFAULT_AGE_TO_STAY_IN_CACHE ) ;
2021-09-20 06:40:10 -07:00
let mut bucket_config = BucketMapConfig ::new ( bins ) ;
bucket_config . drives = config . as_ref ( ) . and_then ( | config | config . drives . clone ( ) ) ;
2022-04-13 07:24:50 -07:00
let mem_budget_mb = match config
. as_ref ( )
. map ( | config | & config . index_limit_mb )
. unwrap_or ( & IndexLimitMb ::Unspecified )
{
// creator said to use disk idx with a specific limit
IndexLimitMb ::Limit ( mb ) = > Some ( * mb ) ,
// creator said InMemOnly, so no disk index
IndexLimitMb ::InMemOnly = > None ,
// whatever started us didn't specify whether to use the acct idx
IndexLimitMb ::Unspecified = > {
// check env var if we were not started from a validator
let mut use_default = true ;
if ! config
. as_ref ( )
. map ( | config | config . started_from_validator )
. unwrap_or_default ( )
{
if let Ok ( _limit ) = std ::env ::var ( " SOLANA_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB " )
{
// Note this env var means the opposite of the default. The default now is disk index is on.
// So, if this env var is set, DO NOT allocate with disk buckets if mem budget was not set, we were NOT started from validator, and env var was set
// we do not want the env var to have an effect when running the validator (only tests, benches, etc.)
use_default = false ;
}
}
if use_default {
// if validator does not specify disk index limit or specify in mem only, then this is the default
DEFAULT_DISK_INDEX
} else {
None
}
2022-02-22 07:40:12 -08:00
}
2022-04-13 07:24:50 -07:00
} ;
2022-02-22 07:40:12 -08:00
2021-09-20 06:40:10 -07:00
// only allocate if mem_budget_mb is Some
let disk = mem_budget_mb . map ( | _ | BucketMap ::new ( bucket_config ) ) ;
2021-09-15 11:07:53 -07:00
Self {
2021-09-20 06:40:10 -07:00
disk ,
2021-09-19 18:22:09 -07:00
ages_to_stay_in_cache ,
2022-06-02 10:16:01 -07:00
count_buckets_flushed : AtomicUsize ::default ( ) ,
2022-08-31 06:56:26 -07:00
// age = 0
2021-09-17 13:11:07 -07:00
age : AtomicU8 ::default ( ) ,
2022-08-31 06:56:26 -07:00
// future age = age (=0) + ages_to_stay_in_cache
future_age_to_flush : AtomicU8 ::new ( ages_to_stay_in_cache ) ,
// effectively age (0) - 1. So, the oldest possible age from 'now'
future_age_to_flush_cached : AtomicU8 ::new ( 0_ u8 . wrapping_sub ( 1 ) ) ,
2021-09-22 06:55:07 -07:00
stats : BucketMapHolderStats ::new ( bins ) ,
2021-10-15 14:15:11 -07:00
wait_dirty_or_aged : Arc ::default ( ) ,
2022-01-03 08:35:35 -08:00
next_bucket_to_flush : AtomicUsize ::new ( 0 ) ,
2021-09-20 12:29:34 -07:00
age_timer : AtomicInterval ::default ( ) ,
2021-09-17 08:41:30 -07:00
bins ,
2021-09-18 10:55:57 -07:00
startup : AtomicBool ::default ( ) ,
2021-09-20 06:40:10 -07:00
mem_budget_mb ,
2021-10-26 13:09:17 -07:00
threads ,
2021-09-15 11:07:53 -07:00
}
2021-09-12 15:14:59 -07:00
}
2021-09-17 08:41:30 -07:00
// get the next bucket to flush, with the idea that the previous bucket
// is perhaps being flushed by another thread already.
pub fn next_bucket_to_flush ( & self ) -> usize {
2022-01-03 08:35:35 -08:00
self . next_bucket_to_flush
. fetch_update ( Ordering ::AcqRel , Ordering ::Acquire , | bucket | {
Some ( ( bucket + 1 ) % self . bins )
} )
. unwrap ( )
2021-09-17 08:41:30 -07:00
}
2021-09-22 10:40:30 -07:00
2021-09-27 18:22:07 -07:00
/// prepare for this to be dynamic if necessary
/// For example, maybe startup has a shorter age interval.
fn age_interval_ms ( & self ) -> u64 {
AGE_MS
}
2021-09-29 05:42:54 -07:00
/// return an amount of ms to sleep
fn throttling_wait_ms_internal (
& self ,
interval_ms : u64 ,
elapsed_ms : u64 ,
bins_flushed : u64 ,
) -> Option < u64 > {
let target_percent = 90 ; // aim to finish in 90% of the allocated time
let remaining_ms = ( interval_ms * target_percent / 100 ) . saturating_sub ( elapsed_ms ) ;
let remaining_bins = ( self . bins as u64 ) . saturating_sub ( bins_flushed ) ;
if remaining_bins = = 0 | | remaining_ms = = 0 | | elapsed_ms = = 0 | | bins_flushed = = 0 {
// any of these conditions result in 'do not wait due to progress'
return None ;
}
let ms_per_s = 1_000 ;
let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms ;
let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s ;
if expected_bins_processed_in_remaining_time > remaining_bins {
// wait because we predict will finish prior to target
Some ( 1 )
} else {
// do not wait because we predict will finish after target
None
}
}
/// Check progress this age.
/// Return ms to wait to get closer to the wait target and spread out work over the entire age interval.
/// Goal is to avoid cpu spikes at beginning of age interval.
fn throttling_wait_ms ( & self ) -> Option < u64 > {
let interval_ms = self . age_interval_ms ( ) ;
let elapsed_ms = self . age_timer . elapsed_ms ( ) ;
2022-06-02 10:16:01 -07:00
let bins_flushed = self . count_buckets_flushed ( ) as u64 ;
2021-09-29 05:42:54 -07:00
self . throttling_wait_ms_internal ( interval_ms , elapsed_ms , bins_flushed )
}
2021-10-05 14:48:23 -07:00
/// true if this thread can sleep
fn should_thread_sleep ( & self ) -> bool {
2022-06-02 10:16:01 -07:00
let bins_flushed = self . count_buckets_flushed ( ) ;
2021-10-05 14:48:23 -07:00
if bins_flushed > = self . bins {
// all bins flushed, so this thread can sleep
true
} else {
// at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep
let active = self . stats . active_threads . load ( Ordering ::Relaxed ) ;
bins_flushed . saturating_add ( active as usize ) > = self . bins
}
}
2021-09-22 10:40:30 -07:00
// intended to execute in a bg thread
2022-06-14 19:43:42 -07:00
pub fn background (
& self ,
2022-09-12 11:51:12 -07:00
exit : Vec < Arc < AtomicBool > > ,
2022-06-14 19:43:42 -07:00
in_mem : Vec < Arc < InMemAccountsIndex < T > > > ,
can_advance_age : bool ,
) {
2021-09-22 10:40:30 -07:00
let bins = in_mem . len ( ) ;
let flush = self . disk . is_some ( ) ;
2021-09-29 05:42:54 -07:00
let mut throttling_wait_ms = None ;
2021-09-22 10:40:30 -07:00
loop {
2021-09-27 09:24:36 -07:00
if ! flush {
self . wait_dirty_or_aged . wait_timeout ( Duration ::from_millis (
self . stats . remaining_until_next_interval ( ) ,
) ) ;
2021-10-05 14:48:23 -07:00
} else if self . should_thread_sleep ( ) | | throttling_wait_ms . is_some ( ) {
2021-09-29 05:42:54 -07:00
let mut wait = std ::cmp ::min (
2021-09-27 18:22:07 -07:00
self . age_timer
. remaining_until_next_interval ( self . age_interval_ms ( ) ) ,
2021-09-24 14:00:41 -07:00
self . stats . remaining_until_next_interval ( ) ,
) ;
2022-06-14 19:43:42 -07:00
if ! can_advance_age {
// if this thread cannot advance age, then make sure we don't sleep 0
wait = wait . max ( 1 ) ;
}
2021-09-29 05:42:54 -07:00
if let Some ( throttling_wait_ms ) = throttling_wait_ms {
self . stats
. bg_throttling_wait_us
. fetch_add ( throttling_wait_ms * 1000 , Ordering ::Relaxed ) ;
wait = std ::cmp ::min ( throttling_wait_ms , wait ) ;
}
2021-09-24 14:00:41 -07:00
2021-09-24 11:19:06 -07:00
let mut m = Measure ::start ( " wait " ) ;
self . wait_dirty_or_aged
2021-09-24 14:00:41 -07:00
. wait_timeout ( Duration ::from_millis ( wait ) ) ;
2021-09-24 11:19:06 -07:00
m . stop ( ) ;
self . stats
. bg_waiting_us
. fetch_add ( m . as_us ( ) , Ordering ::Relaxed ) ;
2021-09-27 11:39:41 -07:00
// likely some time has elapsed. May have been waiting for age time interval to elapse.
2022-06-14 19:43:42 -07:00
if can_advance_age {
self . maybe_advance_age ( ) ;
}
2021-09-24 11:19:06 -07:00
}
2021-09-29 05:42:54 -07:00
throttling_wait_ms = None ;
2021-09-23 11:37:14 -07:00
2022-09-12 11:51:12 -07:00
if exit . iter ( ) . any ( | exit | exit . load ( Ordering ::Relaxed ) ) {
2021-09-22 10:40:30 -07:00
break ;
}
self . stats . active_threads . fetch_add ( 1 , Ordering ::Relaxed ) ;
for _ in 0 .. bins {
if flush {
let index = self . next_bucket_to_flush ( ) ;
2022-06-14 19:43:42 -07:00
in_mem [ index ] . flush ( can_advance_age ) ;
2021-09-22 10:40:30 -07:00
}
self . stats . report_stats ( self ) ;
2021-09-24 11:19:06 -07:00
if self . all_buckets_flushed_at_current_age ( ) {
break ;
}
2021-09-29 05:42:54 -07:00
throttling_wait_ms = self . throttling_wait_ms ( ) ;
if throttling_wait_ms . is_some ( ) {
break ;
}
2021-09-22 10:40:30 -07:00
}
self . stats . active_threads . fetch_sub ( 1 , Ordering ::Relaxed ) ;
}
}
2021-09-17 08:41:30 -07:00
}
#[ cfg(test) ]
pub mod tests {
2022-01-03 08:35:35 -08:00
use { super ::* , rayon ::prelude ::* , std ::time ::Instant } ;
2021-09-17 08:41:30 -07:00
#[ test ]
fn test_next_bucket_to_flush ( ) {
solana_logger ::setup ( ) ;
let bins = 4 ;
2021-10-26 13:09:17 -07:00
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
2021-09-17 08:41:30 -07:00
let visited = ( 0 .. bins )
. map ( | _ | AtomicUsize ::default ( ) )
. collect ::< Vec < _ > > ( ) ;
let iterations = bins * 30 ;
let threads = bins * 4 ;
let expected = threads * iterations / bins ;
( 0 .. threads ) . into_par_iter ( ) . for_each ( | _ | {
2023-01-05 10:05:32 -08:00
( 0 .. iterations ) . for_each ( | _ | {
2021-09-17 08:41:30 -07:00
let bin = test . next_bucket_to_flush ( ) ;
visited [ bin ] . fetch_add ( 1 , Ordering ::Relaxed ) ;
} ) ;
} ) ;
visited . iter ( ) . enumerate ( ) . for_each ( | ( bin , visited ) | {
2022-12-06 06:30:06 -08:00
assert_eq! ( visited . load ( Ordering ::Relaxed ) , expected , " bin: {bin} " )
2021-09-17 08:41:30 -07:00
} ) ;
}
2021-09-17 13:11:07 -07:00
2022-08-31 06:56:26 -07:00
#[ test ]
fn test_ages ( ) {
solana_logger ::setup ( ) ;
let bins = 4 ;
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
assert_eq! ( 0 , test . current_age ( ) ) ;
assert_eq! ( test . ages_to_stay_in_cache , test . future_age_to_flush ( false ) ) ;
assert_eq! ( u8 ::MAX , test . future_age_to_flush ( true ) ) ;
( 0 .. bins ) . for_each ( | _ | {
test . bucket_flushed_at_current_age ( false ) ;
} ) ;
test . increment_age ( ) ;
assert_eq! ( 1 , test . current_age ( ) ) ;
assert_eq! (
test . ages_to_stay_in_cache + 1 ,
test . future_age_to_flush ( false )
) ;
assert_eq! ( 0 , test . future_age_to_flush ( true ) ) ;
}
2021-09-17 13:11:07 -07:00
#[ test ]
fn test_age_increment ( ) {
solana_logger ::setup ( ) ;
let bins = 4 ;
2021-10-26 13:09:17 -07:00
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
2021-09-17 13:11:07 -07:00
for age in 0 .. 513 {
assert_eq! ( test . current_age ( ) , ( age % 256 ) as Age ) ;
// inc all
for _ in 0 .. bins {
assert! ( ! test . all_buckets_flushed_at_current_age ( ) ) ;
2021-09-21 08:52:39 -07:00
// cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
2021-09-17 13:11:07 -07:00
}
2021-09-21 08:52:39 -07:00
// this would normally happen once time went off and all buckets had been flushed at the previous age
2022-06-02 10:16:01 -07:00
test . count_buckets_flushed
. fetch_add ( bins , Ordering ::Release ) ;
2021-09-17 13:11:07 -07:00
test . increment_age ( ) ;
}
}
2021-09-29 05:42:54 -07:00
#[ test ]
fn test_throttle ( ) {
solana_logger ::setup ( ) ;
2022-04-13 07:24:50 -07:00
let bins = 128 ;
2021-10-26 13:09:17 -07:00
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
2021-09-29 05:42:54 -07:00
let bins = test . bins as u64 ;
let interval_ms = test . age_interval_ms ( ) ;
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
let elapsed_ms = interval_ms * 89 / 100 ;
let bins_flushed = bins - 1 ;
let result = test . throttling_wait_ms_internal ( interval_ms , elapsed_ms , bins_flushed ) ;
assert_eq! ( result , None ) ;
// 10% of time, all bins but 1, should wait
let elapsed_ms = interval_ms / 10 ;
let bins_flushed = bins - 1 ;
let result = test . throttling_wait_ms_internal ( interval_ms , elapsed_ms , bins_flushed ) ;
assert_eq! ( result , Some ( 1 ) ) ;
// 5% of time, 8% of bins, should wait. target is 90%. These #s roughly work
let elapsed_ms = interval_ms * 5 / 100 ;
let bins_flushed = bins * 8 / 100 ;
let result = test . throttling_wait_ms_internal ( interval_ms , elapsed_ms , bins_flushed ) ;
assert_eq! ( result , Some ( 1 ) ) ;
// 11% of time, 12% of bins, should NOT wait. target is 90%. These #s roughly work
let elapsed_ms = interval_ms * 11 / 100 ;
let bins_flushed = bins * 12 / 100 ;
let result = test . throttling_wait_ms_internal ( interval_ms , elapsed_ms , bins_flushed ) ;
assert_eq! ( result , None ) ;
}
2021-12-08 14:52:22 -08:00
#[ test ]
fn test_disk_index_enabled ( ) {
let bins = 1 ;
let config = AccountsIndexConfig {
2022-04-12 07:38:09 -07:00
index_limit_mb : IndexLimitMb ::Limit ( 0 ) ,
2021-12-08 14:52:22 -08:00
.. AccountsIndexConfig ::default ( )
} ;
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( config ) , 1 ) ;
assert! ( test . is_disk_index_enabled ( ) ) ;
}
2021-09-20 12:29:34 -07:00
#[ test ]
fn test_age_time ( ) {
solana_logger ::setup ( ) ;
let bins = 1 ;
2021-10-26 13:09:17 -07:00
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
2021-09-20 12:29:34 -07:00
let threads = 2 ;
2022-06-08 09:15:49 -07:00
let time = AGE_MS * 8 / 3 ;
2021-09-20 12:29:34 -07:00
let expected = ( time / AGE_MS ) as Age ;
let now = Instant ::now ( ) ;
2022-06-14 19:43:42 -07:00
test . bucket_flushed_at_current_age ( true ) ; // done with age 0
2021-09-20 12:29:34 -07:00
( 0 .. threads ) . into_par_iter ( ) . for_each ( | _ | {
2022-06-28 07:13:01 -07:00
// This test used to be more strict with time, but in a parallel, multi test environment,
2022-07-10 15:52:07 -07:00
// sometimes threads starve and this test intermittently fails. So, give it more time than it should require.
// This may be aggrevated by the strategy of only allowing thread 0 to advance the age.
while now . elapsed ( ) . as_millis ( ) < ( time as u128 ) * 100 {
2021-09-20 12:29:34 -07:00
if test . maybe_advance_age ( ) {
2022-06-14 19:43:42 -07:00
test . bucket_flushed_at_current_age ( true ) ;
2021-09-20 12:29:34 -07:00
}
2022-06-28 07:13:01 -07:00
if test . current_age ( ) > = expected {
break ;
}
2021-09-20 12:29:34 -07:00
}
} ) ;
2022-06-28 07:13:01 -07:00
assert! (
test . current_age ( ) > = expected ,
" {}, {} " ,
test . current_age ( ) ,
expected
) ;
2021-09-20 12:29:34 -07:00
}
2021-09-17 13:11:07 -07:00
#[ test ]
fn test_age_broad ( ) {
solana_logger ::setup ( ) ;
let bins = 4 ;
2021-10-26 13:09:17 -07:00
let test = BucketMapHolder ::< u64 > ::new ( bins , & Some ( AccountsIndexConfig ::default ( ) ) , 1 ) ;
2021-09-17 13:11:07 -07:00
assert_eq! ( test . current_age ( ) , 0 ) ;
2021-09-21 08:52:39 -07:00
for _ in 0 .. bins {
2021-09-17 13:11:07 -07:00
assert! ( ! test . all_buckets_flushed_at_current_age ( ) ) ;
2022-06-14 19:43:42 -07:00
test . bucket_flushed_at_current_age ( true ) ;
2021-09-17 13:11:07 -07:00
}
2021-09-21 08:52:39 -07:00
std ::thread ::sleep ( std ::time ::Duration ::from_millis ( AGE_MS * 2 ) ) ;
test . maybe_advance_age ( ) ;
2021-09-17 13:11:07 -07:00
assert_eq! ( test . current_age ( ) , 1 ) ;
assert! ( ! test . all_buckets_flushed_at_current_age ( ) ) ;
}
2021-09-12 15:14:59 -07:00
}