AcctIdx: stats for buckets on disk, add median (#20528)
This commit is contained in:
parent
2c3d52b4cc
commit
4f6a0b2650
|
@ -7,6 +7,7 @@ use solana_sdk::pubkey::Pubkey;
|
|||
use std::ops::RangeBounds;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
|
@ -18,6 +19,7 @@ pub struct BucketApi<T: Clone + Copy> {
|
|||
pub stats: Arc<BucketMapStats>,
|
||||
|
||||
bucket: LockedBucket<T>,
|
||||
count: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Copy> BucketApi<T> {
|
||||
|
@ -25,12 +27,14 @@ impl<T: Clone + Copy> BucketApi<T> {
|
|||
drives: Arc<Vec<PathBuf>>,
|
||||
max_search: MaxSearch,
|
||||
stats: Arc<BucketMapStats>,
|
||||
count: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
drives,
|
||||
max_search,
|
||||
stats,
|
||||
bucket: RwLock::default(),
|
||||
count,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,7 +94,9 @@ impl<T: Clone + Copy> BucketApi<T> {
|
|||
Arc::clone(&self.stats),
|
||||
));
|
||||
} else {
|
||||
bucket.as_mut().unwrap().handle_delayed_grows();
|
||||
let write = bucket.as_mut().unwrap();
|
||||
write.handle_delayed_grows();
|
||||
self.count.store(write.bucket_len(), Ordering::Relaxed);
|
||||
}
|
||||
bucket
|
||||
}
|
||||
|
|
|
@ -69,7 +69,6 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
|
|||
config.max_buckets.is_power_of_two(),
|
||||
"Max number of buckets must be a power of two"
|
||||
);
|
||||
let stats = Arc::new(BucketMapStats::default());
|
||||
// this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway
|
||||
const MAX_SEARCH: MaxSearch = 32;
|
||||
let max_search = config.max_search.unwrap_or(MAX_SEARCH);
|
||||
|
@ -84,14 +83,24 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
|
|||
});
|
||||
let drives = Arc::new(drives);
|
||||
|
||||
let mut buckets = Vec::with_capacity(config.max_buckets);
|
||||
buckets.resize_with(config.max_buckets, || {
|
||||
Arc::new(BucketApi::new(
|
||||
Arc::clone(&drives),
|
||||
max_search,
|
||||
Arc::clone(&stats),
|
||||
))
|
||||
let mut per_bucket_count = Vec::with_capacity(config.max_buckets);
|
||||
per_bucket_count.resize_with(config.max_buckets, Arc::default);
|
||||
let stats = Arc::new(BucketMapStats {
|
||||
per_bucket_count,
|
||||
..BucketMapStats::default()
|
||||
});
|
||||
let buckets = stats
|
||||
.per_bucket_count
|
||||
.iter()
|
||||
.map(|per_bucket_count| {
|
||||
Arc::new(BucketApi::new(
|
||||
Arc::clone(&drives),
|
||||
max_search,
|
||||
Arc::clone(&stats),
|
||||
Arc::clone(per_bucket_count),
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// A simple log2 function that is correct if x is a power of two
|
||||
let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1;
|
||||
|
|
|
@ -11,8 +11,9 @@ pub struct BucketStats {
|
|||
pub mmap_us: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BucketMapStats {
|
||||
pub index: Arc<BucketStats>,
|
||||
pub data: Arc<BucketStats>,
|
||||
pub per_bucket_count: Vec<Arc<AtomicU64>>,
|
||||
}
|
||||
|
|
|
@ -104,6 +104,21 @@ impl BucketMapHolderStats {
|
|||
.remaining_until_next_interval(STATS_INTERVAL_MS)
|
||||
}
|
||||
|
||||
/// return min, max, sum, median of data
|
||||
fn get_stats(mut data: Vec<u64>) -> (u64, u64, u64, u64) {
|
||||
if data.is_empty() {
|
||||
(0, 0, 0, 0)
|
||||
} else {
|
||||
data.sort_unstable();
|
||||
(
|
||||
*data.first().unwrap(),
|
||||
*data.last().unwrap(),
|
||||
data.iter().sum(),
|
||||
data[data.len() / 2],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn report_stats<T: IndexValue>(&self, storage: &BucketMapHolder<T>) {
|
||||
if !self.last_time.should_update(STATS_INTERVAL_MS) {
|
||||
return;
|
||||
|
@ -111,17 +126,23 @@ impl BucketMapHolderStats {
|
|||
|
||||
let ms_per_age = self.ms_per_age(storage);
|
||||
|
||||
let mut ct = 0;
|
||||
let mut min = usize::MAX;
|
||||
let mut max = 0;
|
||||
for d in &self.per_bucket_count {
|
||||
let d = d.load(Ordering::Relaxed) as usize;
|
||||
ct += d;
|
||||
min = std::cmp::min(min, d);
|
||||
max = std::cmp::max(max, d);
|
||||
}
|
||||
|
||||
let in_mem_per_bucket_counts = self
|
||||
.per_bucket_count
|
||||
.iter()
|
||||
.map(|count| count.load(Ordering::Relaxed))
|
||||
.collect::<Vec<_>>();
|
||||
let disk = storage.disk.as_ref();
|
||||
let disk_per_bucket_counts = disk
|
||||
.map(|disk| {
|
||||
disk.stats
|
||||
.per_bucket_count
|
||||
.iter()
|
||||
.map(|count| count.load(Ordering::Relaxed))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let in_mem_stats = Self::get_stats(in_mem_per_bucket_counts);
|
||||
let disk_stats = Self::get_stats(disk_per_bucket_counts);
|
||||
|
||||
datapoint_info!(
|
||||
"accounts_index",
|
||||
|
@ -141,9 +162,14 @@ impl BucketMapHolderStats {
|
|||
self.bg_throttling_wait_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
("min_in_bin", min, i64),
|
||||
("max_in_bin", max, i64),
|
||||
("count_from_bins", ct, i64),
|
||||
("min_in_bin_mem", in_mem_stats.0, i64),
|
||||
("max_in_bin_mem", in_mem_stats.1, i64),
|
||||
("count_from_bins_mem", in_mem_stats.2, i64),
|
||||
("median_from_bins_mem", in_mem_stats.3, i64),
|
||||
("min_in_bin_disk", disk_stats.0, i64),
|
||||
("max_in_bin_disk", disk_stats.1, i64),
|
||||
("count_from_bins_disk", disk_stats.2, i64),
|
||||
("median_from_bins_disk", disk_stats.3, i64),
|
||||
(
|
||||
"gets_from_mem",
|
||||
self.gets_from_mem.swap(0, Ordering::Relaxed),
|
||||
|
|
Loading…
Reference in New Issue