accounts index stats (#19797)
This commit is contained in:
parent
ab152f1319
commit
3617d43e76
|
@ -1,4 +1,5 @@
|
|||
use crate::{
|
||||
accounts_index_storage::AccountsIndexStorage,
|
||||
ancestors::Ancestors,
|
||||
contains::Contains,
|
||||
in_mem_accounts_index::InMemAccountsIndex,
|
||||
|
@ -799,6 +800,8 @@ pub struct AccountsIndex<T: IsCached> {
|
|||
// on any of these slots fails. This is safe to purge once the associated Bank is dropped and
|
||||
// scanning the fork with that Bank at the tip is no longer possible.
|
||||
pub removed_bank_ids: Mutex<HashSet<BankId>>,
|
||||
|
||||
storage: AccountsIndexStorage,
|
||||
}
|
||||
|
||||
impl<T: IsCached> AccountsIndex<T> {
|
||||
|
@ -807,7 +810,7 @@ impl<T: IsCached> AccountsIndex<T> {
|
|||
}
|
||||
|
||||
pub fn new(config: Option<AccountsIndexConfig>) -> Self {
|
||||
let (account_maps, bin_calculator) = Self::allocate_accounts_index(config);
|
||||
let (account_maps, bin_calculator, storage) = Self::allocate_accounts_index(config);
|
||||
Self {
|
||||
account_maps,
|
||||
bin_calculator,
|
||||
|
@ -823,22 +826,24 @@ impl<T: IsCached> AccountsIndex<T> {
|
|||
roots_tracker: RwLock::<RootsTracker>::default(),
|
||||
ongoing_scan_roots: RwLock::<BTreeMap<Slot, u64>>::default(),
|
||||
removed_bank_ids: Mutex::<HashSet<BankId>>::default(),
|
||||
storage,
|
||||
}
|
||||
}
|
||||
|
||||
fn allocate_accounts_index(
|
||||
config: Option<AccountsIndexConfig>,
|
||||
) -> (LockMapType<T>, PubkeyBinCalculator16) {
|
||||
) -> (LockMapType<T>, PubkeyBinCalculator16, AccountsIndexStorage) {
|
||||
let bins = config
|
||||
.and_then(|config| config.bins)
|
||||
.unwrap_or(BINS_DEFAULT);
|
||||
// create bin_calculator early to verify # bins is reasonable
|
||||
let bin_calculator = PubkeyBinCalculator16::new(bins);
|
||||
let storage = AccountsIndexStorage::new();
|
||||
let account_maps = (0..bins)
|
||||
.into_iter()
|
||||
.map(|_bin| RwLock::new(AccountMap::new()))
|
||||
.map(|_bin| RwLock::new(AccountMap::new(&storage)))
|
||||
.collect::<Vec<_>>();
|
||||
(account_maps, bin_calculator)
|
||||
(account_maps, bin_calculator, storage)
|
||||
}
|
||||
|
||||
fn iter<R>(&self, range: Option<&R>, collect_all_unsorted: bool) -> AccountsIndexIterator<T>
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
use crate::bucket_map_holder::BucketMapHolder;
|
||||
use crate::waitable_condvar::WaitableCondvar;
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread::{Builder, JoinHandle},
|
||||
};
|
||||
|
||||
// eventually hold the bucket map
|
||||
// Also manages the lifetime of the background processing threads.
|
||||
// When this instance is dropped, it will drop the bucket map and cleanup
|
||||
// and it will stop all the background threads and join them.
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AccountsIndexStorage {
|
||||
// for managing the bg threads
|
||||
exit: Arc<AtomicBool>,
|
||||
wait: Arc<WaitableCondvar>,
|
||||
handle: Option<JoinHandle<()>>,
|
||||
|
||||
// eventually the backing storage
|
||||
storage: Arc<BucketMapHolder>,
|
||||
}
|
||||
|
||||
impl Drop for AccountsIndexStorage {
|
||||
fn drop(&mut self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
self.wait.notify_all();
|
||||
if let Some(x) = self.handle.take() {
|
||||
x.join().unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AccountsIndexStorage {
|
||||
pub fn new() -> AccountsIndexStorage {
|
||||
let storage = Arc::new(BucketMapHolder::new());
|
||||
let storage_ = storage.clone();
|
||||
let exit = Arc::new(AtomicBool::default());
|
||||
let exit_ = exit.clone();
|
||||
let wait = Arc::new(WaitableCondvar::default());
|
||||
let wait_ = wait.clone();
|
||||
let handle = Some(
|
||||
Builder::new()
|
||||
.name("solana-index-flusher".to_string())
|
||||
.spawn(move || {
|
||||
storage_.background(exit_, wait_);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
Self {
|
||||
exit,
|
||||
wait,
|
||||
handle,
|
||||
storage,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn storage(&self) -> &Arc<BucketMapHolder> {
|
||||
&self.storage
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
||||
use crate::waitable_condvar::WaitableCondvar;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
// will eventually hold the bucket map
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BucketMapHolder {
|
||||
pub stats: BucketMapHolderStats,
|
||||
}
|
||||
|
||||
impl BucketMapHolder {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
stats: BucketMapHolderStats::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// intended to execute in a bg thread
|
||||
pub fn background(&self, exit: Arc<AtomicBool>, wait: Arc<WaitableCondvar>) {
|
||||
loop {
|
||||
wait.wait_timeout(Duration::from_millis(10000)); // account index stats every 10 s
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
self.stats.report_stats();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
use std::fmt::Debug;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BucketMapHolderStats {
|
||||
pub get_mem_us: AtomicU64,
|
||||
pub gets_from_mem: AtomicU64,
|
||||
pub get_missing_us: AtomicU64,
|
||||
pub gets_missing: AtomicU64,
|
||||
pub items: AtomicU64,
|
||||
pub keys: AtomicU64,
|
||||
pub deletes: AtomicU64,
|
||||
}
|
||||
|
||||
impl BucketMapHolderStats {
|
||||
pub fn report_stats(&self) {
|
||||
datapoint_info!(
|
||||
"accounts_index",
|
||||
(
|
||||
"gets_from_mem",
|
||||
self.gets_from_mem.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_mem_us",
|
||||
self.get_mem_us.swap(0, Ordering::Relaxed) / 1000,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"gets_missing",
|
||||
self.gets_missing.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_missing_us",
|
||||
self.get_missing_us.swap(0, Ordering::Relaxed) / 1000,
|
||||
i64
|
||||
),
|
||||
("deletes", self.deletes.swap(0, Ordering::Relaxed), i64),
|
||||
("items", self.items.swap(0, Ordering::Relaxed), i64),
|
||||
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
|
||||
);
|
||||
}
|
||||
}
|
|
@ -1,44 +1,80 @@
|
|||
use crate::accounts_index::{AccountMapEntry, IsCached, WriteAccountMapEntry};
|
||||
use crate::accounts_index_storage::AccountsIndexStorage;
|
||||
use crate::bucket_map_holder::BucketMapHolder;
|
||||
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::collections::{
|
||||
hash_map::{Entry, Keys},
|
||||
HashMap,
|
||||
};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
type K = Pubkey;
|
||||
|
||||
// one instance of this represents one bin of the accounts index.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct InMemAccountsIndex<T: IsCached> {
|
||||
// backing store
|
||||
map: HashMap<Pubkey, AccountMapEntry<T>>,
|
||||
storage: Arc<BucketMapHolder>,
|
||||
}
|
||||
|
||||
impl<T: IsCached> InMemAccountsIndex<T> {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(storage: &AccountsIndexStorage) -> Self {
|
||||
Self {
|
||||
map: HashMap::new(),
|
||||
storage: storage.storage().clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_bucket_map_holder() -> Arc<BucketMapHolder> {
|
||||
Arc::new(BucketMapHolder::new())
|
||||
}
|
||||
|
||||
pub fn entry(&mut self, pubkey: Pubkey) -> Entry<K, AccountMapEntry<T>> {
|
||||
self.map.entry(pubkey)
|
||||
let m = Measure::start("entry");
|
||||
let result = self.map.entry(pubkey);
|
||||
let stats = &self.storage.stats;
|
||||
let (count, time) = if matches!(result, Entry::Occupied(_)) {
|
||||
(&stats.gets_from_mem, &stats.get_mem_us)
|
||||
} else {
|
||||
(&stats.gets_missing, &stats.get_missing_us)
|
||||
};
|
||||
Self::update_time_stat(time, m);
|
||||
Self::update_stat(count, 1);
|
||||
result
|
||||
}
|
||||
|
||||
pub fn items(&self) -> Vec<(K, AccountMapEntry<T>)> {
|
||||
Self::update_stat(&self.stats().items, 1);
|
||||
self.map.iter().map(|(k, v)| (*k, v.clone())).collect()
|
||||
}
|
||||
|
||||
pub fn keys(&self) -> Keys<K, AccountMapEntry<T>> {
|
||||
Self::update_stat(&self.stats().keys, 1);
|
||||
self.map.keys()
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &K) -> Option<AccountMapEntry<T>> {
|
||||
self.map.get(key).cloned()
|
||||
let m = Measure::start("get");
|
||||
let result = self.map.get(key).cloned();
|
||||
let stats = self.stats();
|
||||
let (count, time) = if result.is_some() {
|
||||
(&stats.gets_from_mem, &stats.get_mem_us)
|
||||
} else {
|
||||
(&stats.gets_missing, &stats.get_missing_us)
|
||||
};
|
||||
Self::update_time_stat(time, m);
|
||||
Self::update_stat(count, 1);
|
||||
result
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: &K) {
|
||||
Self::update_stat(&self.stats().deletes, 1);
|
||||
self.map.remove(key);
|
||||
}
|
||||
|
||||
|
@ -83,4 +119,20 @@ impl<T: IsCached> InMemAccountsIndex<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn stats(&self) -> &BucketMapHolderStats {
|
||||
&self.storage.stats
|
||||
}
|
||||
|
||||
fn update_stat(stat: &AtomicU64, value: u64) {
|
||||
if value != 0 {
|
||||
stat.fetch_add(value, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_time_stat(stat: &AtomicU64, mut m: Measure) {
|
||||
m.stop();
|
||||
let value = m.as_us();
|
||||
Self::update_stat(stat, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ pub mod accounts_cache;
|
|||
pub mod accounts_db;
|
||||
pub mod accounts_hash;
|
||||
pub mod accounts_index;
|
||||
pub mod accounts_index_storage;
|
||||
pub mod ancestors;
|
||||
pub mod append_vec;
|
||||
pub mod bank;
|
||||
|
@ -14,6 +15,8 @@ pub mod bank_forks;
|
|||
pub mod bank_utils;
|
||||
pub mod blockhash_queue;
|
||||
pub mod bloom;
|
||||
pub mod bucket_map_holder;
|
||||
pub mod bucket_map_holder_stats;
|
||||
pub mod builtins;
|
||||
pub mod commitment;
|
||||
pub mod contains;
|
||||
|
|
Loading…
Reference in New Issue