2021-09-18 07:54:00 -07:00
|
|
|
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
|
2021-09-12 15:14:59 -07:00
|
|
|
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
2021-09-20 06:40:10 -07:00
|
|
|
use crate::in_mem_accounts_index::SlotT;
|
2021-09-17 08:41:30 -07:00
|
|
|
use crate::waitable_condvar::WaitableCondvar;
|
2021-09-20 06:40:10 -07:00
|
|
|
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
|
2021-09-12 15:14:59 -07:00
|
|
|
use std::fmt::Debug;
|
2021-09-18 10:55:57 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
2021-09-17 08:41:30 -07:00
|
|
|
use std::sync::Mutex;
|
2021-09-17 13:11:07 -07:00
|
|
|
pub type Age = u8;
|
2021-09-12 15:14:59 -07:00
|
|
|
|
2021-09-14 15:51:07 -07:00
|
|
|
pub struct BucketMapHolder<T: IndexValue> {
|
2021-09-20 06:40:10 -07:00
|
|
|
pub disk: Option<BucketMap<SlotT<T>>>,
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
pub count_ages_flushed: AtomicUsize,
|
|
|
|
pub age: AtomicU8,
|
2021-09-12 15:14:59 -07:00
|
|
|
pub stats: BucketMapHolderStats,
|
2021-09-17 08:41:30 -07:00
|
|
|
|
|
|
|
// used by bg processing to know when any bucket has become dirty
|
|
|
|
pub wait_dirty_bucket: WaitableCondvar,
|
|
|
|
next_bucket_to_flush: Mutex<usize>,
|
|
|
|
bins: usize,
|
2021-09-18 10:55:57 -07:00
|
|
|
|
2021-09-19 16:00:15 -07:00
|
|
|
// how much mb are we allowed to keep in the in-mem index?
|
|
|
|
// Rest goes to disk.
|
|
|
|
pub mem_budget_mb: Option<usize>,
|
2021-09-19 18:22:09 -07:00
|
|
|
ages_to_stay_in_cache: Age,
|
2021-09-19 16:00:15 -07:00
|
|
|
|
2021-09-18 10:55:57 -07:00
|
|
|
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
|
|
|
|
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
|
|
|
|
/// and writing to disk in parallel are.
|
|
|
|
/// Note startup is an optimization and is not required for correctness.
|
|
|
|
startup: AtomicBool,
|
2021-09-13 20:59:03 -07:00
|
|
|
_phantom: std::marker::PhantomData<T>,
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|
|
|
|
|
2021-09-15 07:54:16 -07:00
|
|
|
impl<T: IndexValue> Debug for BucketMapHolder<T> {
|
|
|
|
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-17 08:41:30 -07:00
|
|
|
#[allow(clippy::mutex_atomic)]
|
2021-09-14 15:51:07 -07:00
|
|
|
impl<T: IndexValue> BucketMapHolder<T> {
|
2021-09-17 13:11:07 -07:00
|
|
|
pub fn increment_age(&self) {
|
|
|
|
// fetch_add is defined to wrap.
|
|
|
|
// That's what we want. 0..255, then back to 0.
|
|
|
|
self.age.fetch_add(1, Ordering::Relaxed);
|
|
|
|
// since we changed age, there are now 0 buckets that have been flushed at this age
|
|
|
|
let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed);
|
|
|
|
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
|
|
|
|
}
|
|
|
|
|
2021-09-19 18:22:09 -07:00
|
|
|
pub fn future_age_to_flush(&self) -> Age {
|
|
|
|
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
|
|
|
|
}
|
2021-09-18 10:55:57 -07:00
|
|
|
/// used by bg processes to determine # active threads and how aggressively to flush
|
|
|
|
pub fn get_startup(&self) -> bool {
|
|
|
|
self.startup.load(Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_startup(&self, value: bool) {
|
2021-09-18 20:08:58 -07:00
|
|
|
if !value {
|
|
|
|
self.wait_for_idle();
|
|
|
|
}
|
2021-09-18 10:55:57 -07:00
|
|
|
self.startup.store(value, Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
2021-09-18 20:08:58 -07:00
|
|
|
pub(crate) fn wait_for_idle(&self) {
|
|
|
|
assert!(self.get_startup());
|
|
|
|
}
|
|
|
|
|
2021-09-17 13:11:07 -07:00
|
|
|
pub fn current_age(&self) -> Age {
|
|
|
|
self.age.load(Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn bucket_flushed_at_current_age(&self) {
|
|
|
|
self.count_ages_flushed.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
// have all buckets been flushed at the current age?
|
|
|
|
pub fn all_buckets_flushed_at_current_age(&self) -> bool {
|
|
|
|
self.count_ages_flushed.load(Ordering::Relaxed) >= self.bins
|
|
|
|
}
|
|
|
|
|
2021-09-19 16:00:15 -07:00
|
|
|
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
|
2021-09-19 18:22:09 -07:00
|
|
|
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
|
|
|
|
let ages_to_stay_in_cache = config
|
|
|
|
.as_ref()
|
|
|
|
.and_then(|config| config.ages_to_stay_in_cache)
|
|
|
|
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
|
2021-09-20 06:40:10 -07:00
|
|
|
|
|
|
|
let mut bucket_config = BucketMapConfig::new(bins);
|
|
|
|
bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
|
|
|
|
let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb);
|
|
|
|
// only allocate if mem_budget_mb is Some
|
|
|
|
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
|
|
|
|
|
2021-09-15 11:07:53 -07:00
|
|
|
Self {
|
2021-09-20 06:40:10 -07:00
|
|
|
disk,
|
2021-09-19 18:22:09 -07:00
|
|
|
ages_to_stay_in_cache,
|
2021-09-17 13:11:07 -07:00
|
|
|
count_ages_flushed: AtomicUsize::default(),
|
|
|
|
age: AtomicU8::default(),
|
2021-09-15 11:07:53 -07:00
|
|
|
stats: BucketMapHolderStats::default(),
|
2021-09-17 08:41:30 -07:00
|
|
|
wait_dirty_bucket: WaitableCondvar::default(),
|
|
|
|
next_bucket_to_flush: Mutex::new(0),
|
|
|
|
bins,
|
2021-09-18 10:55:57 -07:00
|
|
|
startup: AtomicBool::default(),
|
2021-09-20 06:40:10 -07:00
|
|
|
mem_budget_mb,
|
2021-09-15 11:07:53 -07:00
|
|
|
_phantom: std::marker::PhantomData::<T>::default(),
|
|
|
|
}
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|
2021-09-17 08:41:30 -07:00
|
|
|
|
|
|
|
// get the next bucket to flush, with the idea that the previous bucket
|
|
|
|
// is perhaps being flushed by another thread already.
|
|
|
|
pub fn next_bucket_to_flush(&self) -> usize {
|
|
|
|
// could be lock-free as an optimization
|
|
|
|
// wrapping is tricky
|
|
|
|
let mut lock = self.next_bucket_to_flush.lock().unwrap();
|
|
|
|
let result = *lock;
|
|
|
|
*lock = (result + 1) % self.bins;
|
|
|
|
result
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
pub mod tests {
|
|
|
|
use super::*;
|
|
|
|
use rayon::prelude::*;
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_next_bucket_to_flush() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-09-18 07:54:00 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
|
2021-09-17 08:41:30 -07:00
|
|
|
let visited = (0..bins)
|
|
|
|
.into_iter()
|
|
|
|
.map(|_| AtomicUsize::default())
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let iterations = bins * 30;
|
|
|
|
let threads = bins * 4;
|
|
|
|
let expected = threads * iterations / bins;
|
|
|
|
|
|
|
|
(0..threads).into_par_iter().for_each(|_| {
|
|
|
|
(0..iterations).into_iter().for_each(|_| {
|
|
|
|
let bin = test.next_bucket_to_flush();
|
|
|
|
visited[bin].fetch_add(1, Ordering::Relaxed);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
visited.iter().enumerate().for_each(|(bin, visited)| {
|
|
|
|
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
|
|
|
|
});
|
|
|
|
}
|
2021-09-17 13:11:07 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_age_increment() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-09-18 07:54:00 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
|
2021-09-17 13:11:07 -07:00
|
|
|
for age in 0..513 {
|
|
|
|
assert_eq!(test.current_age(), (age % 256) as Age);
|
|
|
|
|
|
|
|
// inc all
|
|
|
|
for _ in 0..bins {
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
|
|
|
test.bucket_flushed_at_current_age();
|
|
|
|
}
|
|
|
|
|
|
|
|
test.increment_age();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_age_broad() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let bins = 4;
|
2021-09-18 07:54:00 -07:00
|
|
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
|
2021-09-17 13:11:07 -07:00
|
|
|
assert_eq!(test.current_age(), 0);
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
|
|
|
// inc all but 1
|
|
|
|
for _ in 1..bins {
|
|
|
|
test.bucket_flushed_at_current_age();
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
|
|
|
}
|
|
|
|
test.bucket_flushed_at_current_age();
|
|
|
|
assert!(test.all_buckets_flushed_at_current_age());
|
|
|
|
test.increment_age();
|
|
|
|
|
|
|
|
assert_eq!(test.current_age(), 1);
|
|
|
|
assert!(!test.all_buckets_flushed_at_current_age());
|
|
|
|
}
|
2021-09-12 15:14:59 -07:00
|
|
|
}
|