Adds AtomicAge to bucket map holder (#33841)
This commit is contained in:
parent
af7fd32f4c
commit
bd1080b26f
|
@ -3,7 +3,7 @@ use {
|
|||
accounts_index_storage::{AccountsIndexStorage, Startup},
|
||||
accounts_partition::RentPayingAccountsByPartition,
|
||||
ancestors::Ancestors,
|
||||
bucket_map_holder::{Age, BucketMapHolder},
|
||||
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
|
||||
contains::Contains,
|
||||
in_mem_accounts_index::{InMemAccountsIndex, InsertNewEntryResults, StartupStats},
|
||||
inline_spl_token::{self, GenericTokenAccount},
|
||||
|
@ -36,7 +36,7 @@ use {
|
|||
},
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard,
|
||||
},
|
||||
},
|
||||
|
@ -238,7 +238,7 @@ pub struct AccountMapEntryMeta {
|
|||
/// true if entry in in-mem idx has changes and needs to be written to disk
|
||||
pub dirty: AtomicBool,
|
||||
/// 'age' at which this entry should be purged from the cache (implements lru)
|
||||
pub age: AtomicU8,
|
||||
pub age: AtomicAge,
|
||||
}
|
||||
|
||||
impl AccountMapEntryMeta {
|
||||
|
@ -248,7 +248,7 @@ impl AccountMapEntryMeta {
|
|||
) -> Self {
|
||||
AccountMapEntryMeta {
|
||||
dirty: AtomicBool::new(true),
|
||||
age: AtomicU8::new(storage.future_age_to_flush(is_cached)),
|
||||
age: AtomicAge::new(storage.future_age_to_flush(is_cached)),
|
||||
}
|
||||
}
|
||||
pub fn new_clean<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
|
||||
|
@ -256,7 +256,7 @@ impl AccountMapEntryMeta {
|
|||
) -> Self {
|
||||
AccountMapEntryMeta {
|
||||
dirty: AtomicBool::new(false),
|
||||
age: AtomicU8::new(storage.future_age_to_flush(false)),
|
||||
age: AtomicAge::new(storage.future_age_to_flush(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2113,7 +2113,7 @@ pub mod tests {
|
|||
let (slot, account_info) = entry.slot_list.read().unwrap()[0];
|
||||
let meta = AccountMapEntryMeta {
|
||||
dirty: AtomicBool::new(entry.dirty()),
|
||||
age: AtomicU8::new(entry.age()),
|
||||
age: AtomicAge::new(entry.age()),
|
||||
};
|
||||
PreAllocatedAccountMapEntry::Entry(Arc::new(AccountMapEntryInner::new(
|
||||
vec![(slot, account_info)],
|
||||
|
|
|
@ -22,6 +22,8 @@ use {
|
|||
},
|
||||
};
|
||||
pub type Age = u8;
|
||||
pub type AtomicAge = AtomicU8;
|
||||
const _: () = assert!(std::mem::size_of::<Age>() == std::mem::size_of::<AtomicAge>());
|
||||
|
||||
const AGE_MS: u64 = DEFAULT_MS_PER_SLOT; // match one age per slot time
|
||||
|
||||
|
@ -37,12 +39,12 @@ pub struct BucketMapHolder<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
|
|||
/// Instead of accessing the single age and doing math each time, each value is incremented each time the age occurs, which is ~400ms.
|
||||
/// Callers can ask for the precomputed value they already want.
|
||||
/// rolling 'current' age
|
||||
pub age: AtomicU8,
|
||||
pub age: AtomicAge,
|
||||
/// rolling age that is 'ages_to_stay_in_cache' + 'age'
|
||||
pub future_age_to_flush: AtomicU8,
|
||||
pub future_age_to_flush: AtomicAge,
|
||||
/// rolling age that is effectively 'age' - 1
|
||||
/// these items are expected to be flushed from the accounts write cache or otherwise modified before this age occurs
|
||||
pub future_age_to_flush_cached: AtomicU8,
|
||||
pub future_age_to_flush_cached: AtomicAge,
|
||||
|
||||
pub stats: BucketMapHolderStats,
|
||||
|
||||
|
@ -255,11 +257,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> BucketMapHolder<T, U>
|
|||
ages_to_stay_in_cache,
|
||||
count_buckets_flushed: AtomicUsize::default(),
|
||||
// age = 0
|
||||
age: AtomicU8::default(),
|
||||
age: AtomicAge::default(),
|
||||
// future age = age (=0) + ages_to_stay_in_cache
|
||||
future_age_to_flush: AtomicU8::new(ages_to_stay_in_cache),
|
||||
future_age_to_flush: AtomicAge::new(ages_to_stay_in_cache),
|
||||
// effectively age (0) - 1. So, the oldest possible age from 'now'
|
||||
future_age_to_flush_cached: AtomicU8::new(0_u8.wrapping_sub(1)),
|
||||
future_age_to_flush_cached: AtomicAge::new(Age::MAX),
|
||||
stats: BucketMapHolderStats::new(bins),
|
||||
wait_dirty_or_aged: Arc::default(),
|
||||
next_bucket_to_flush: AtomicUsize::new(0),
|
||||
|
@ -442,7 +444,7 @@ pub mod tests {
|
|||
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
||||
assert_eq!(0, test.current_age());
|
||||
assert_eq!(test.ages_to_stay_in_cache, test.future_age_to_flush(false));
|
||||
assert_eq!(u8::MAX, test.future_age_to_flush(true));
|
||||
assert_eq!(Age::MAX, test.future_age_to_flush(true));
|
||||
(0..bins).for_each(|_| {
|
||||
test.bucket_flushed_at_current_age(false);
|
||||
});
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use {
|
||||
crate::{
|
||||
accounts_index::{DiskIndexValue, IndexValue},
|
||||
bucket_map_holder::BucketMapHolder,
|
||||
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
|
||||
},
|
||||
solana_sdk::timing::AtomicInterval,
|
||||
std::{
|
||||
fmt::Debug,
|
||||
sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -52,7 +52,7 @@ pub struct BucketMapHolderStats {
|
|||
pub flush_entries_evicted_from_mem: AtomicU64,
|
||||
pub active_threads: AtomicU64,
|
||||
pub get_range_us: AtomicU64,
|
||||
last_age: AtomicU8,
|
||||
last_age: AtomicAge,
|
||||
last_ages_flushed: AtomicU64,
|
||||
pub flush_scan_us: AtomicU64,
|
||||
pub flush_update_us: AtomicU64,
|
||||
|
@ -120,7 +120,7 @@ impl BucketMapHolderStats {
|
|||
let mut age_now = age_now as u64;
|
||||
if last_age > age_now {
|
||||
// age wrapped
|
||||
age_now += u8::MAX as u64 + 1;
|
||||
age_now += Age::MAX as u64 + 1;
|
||||
}
|
||||
let age_delta = age_now.saturating_sub(last_age);
|
||||
if age_delta > 0 {
|
||||
|
|
|
@ -4,7 +4,7 @@ use {
|
|||
AccountMapEntry, AccountMapEntryInner, AccountMapEntryMeta, DiskIndexValue, IndexValue,
|
||||
PreAllocatedAccountMapEntry, RefCount, SlotList, UpsertReclaim, ZeroLamport,
|
||||
},
|
||||
bucket_map_holder::{Age, BucketMapHolder},
|
||||
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
|
||||
bucket_map_holder_stats::BucketMapHolderStats,
|
||||
waitable_condvar::WaitableCondvar,
|
||||
},
|
||||
|
@ -17,7 +17,7 @@ use {
|
|||
fmt::Debug,
|
||||
ops::{Bound, RangeBounds, RangeInclusive},
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
Arc, Mutex, RwLock, RwLockWriteGuard,
|
||||
},
|
||||
},
|
||||
|
@ -89,7 +89,7 @@ impl<T: IndexValue> PossibleEvictions<T> {
|
|||
|
||||
// one instance of this represents one bin of the accounts index.
|
||||
pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
|
||||
last_age_flushed: AtomicU8,
|
||||
last_age_flushed: AtomicAge,
|
||||
|
||||
// backing store
|
||||
map_internal: RwLock<InMemMap<T>>,
|
||||
|
@ -115,7 +115,7 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
|
|||
|
||||
/// how many more ages to skip before this bucket is flushed (as opposed to being skipped).
|
||||
/// When this reaches 0, this bucket is flushed.
|
||||
remaining_ages_to_skip_flushing: AtomicU8,
|
||||
remaining_ages_to_skip_flushing: AtomicAge,
|
||||
|
||||
/// an individual bucket will evict its entries and write to disk every 1/NUM_AGES_TO_DISTRIBUTE_FLUSHES ages
|
||||
/// Higher numbers mean we flush less buckets/s
|
||||
|
@ -181,12 +181,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
stop_evictions: AtomicU64::default(),
|
||||
flushing_active: AtomicBool::default(),
|
||||
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
|
||||
last_age_flushed: AtomicU8::new(Age::MAX),
|
||||
last_age_flushed: AtomicAge::new(Age::MAX),
|
||||
startup_info: StartupInfo::default(),
|
||||
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
|
||||
// Spread out the scanning across all ages within the window.
|
||||
// This causes us to scan 1/N of the bins each 'Age'
|
||||
remaining_ages_to_skip_flushing: AtomicU8::new(
|
||||
remaining_ages_to_skip_flushing: AtomicAge::new(
|
||||
thread_rng().gen_range(0..num_ages_to_distribute_flushes),
|
||||
),
|
||||
num_ages_to_distribute_flushes,
|
||||
|
|
Loading…
Reference in New Issue