diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 724e25c1f5..7909259137 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,4 +1,5 @@ use crate::{ + accounts_index_storage::AccountsIndexStorage, ancestors::Ancestors, contains::Contains, in_mem_accounts_index::InMemAccountsIndex, @@ -799,6 +800,8 @@ pub struct AccountsIndex { // on any of these slots fails. This is safe to purge once the associated Bank is dropped and // scanning the fork with that Bank at the tip is no longer possible. pub removed_bank_ids: Mutex>, + + storage: AccountsIndexStorage, } impl AccountsIndex { @@ -807,7 +810,7 @@ impl AccountsIndex { } pub fn new(config: Option) -> Self { - let (account_maps, bin_calculator) = Self::allocate_accounts_index(config); + let (account_maps, bin_calculator, storage) = Self::allocate_accounts_index(config); Self { account_maps, bin_calculator, @@ -823,22 +826,24 @@ impl AccountsIndex { roots_tracker: RwLock::::default(), ongoing_scan_roots: RwLock::>::default(), removed_bank_ids: Mutex::>::default(), + storage, } } fn allocate_accounts_index( config: Option, - ) -> (LockMapType, PubkeyBinCalculator16) { + ) -> (LockMapType, PubkeyBinCalculator16, AccountsIndexStorage) { let bins = config .and_then(|config| config.bins) .unwrap_or(BINS_DEFAULT); // create bin_calculator early to verify # bins is reasonable let bin_calculator = PubkeyBinCalculator16::new(bins); + let storage = AccountsIndexStorage::new(); let account_maps = (0..bins) .into_iter() - .map(|_bin| RwLock::new(AccountMap::new())) + .map(|_bin| RwLock::new(AccountMap::new(&storage))) .collect::>(); - (account_maps, bin_calculator) + (account_maps, bin_calculator, storage) } fn iter(&self, range: Option<&R>, collect_all_unsorted: bool) -> AccountsIndexIterator diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs new file mode 100644 index 0000000000..d2fda9b24e --- /dev/null +++ b/runtime/src/accounts_index_storage.rs @@ -0,0 +1,65 @@ +use crate::bucket_map_holder::BucketMapHolder; +use crate::waitable_condvar::WaitableCondvar; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{Builder, JoinHandle}, +}; + +// eventually hold the bucket map +// Also manages the lifetime of the background processing threads. +// When this instance is dropped, it will drop the bucket map and cleanup +// and it will stop all the background threads and join them. + +#[derive(Debug, Default)] +pub struct AccountsIndexStorage { + // for managing the bg threads + exit: Arc, + wait: Arc, + handle: Option>, + + // eventually the backing storage + storage: Arc, +} + +impl Drop for AccountsIndexStorage { + fn drop(&mut self) { + self.exit.store(true, Ordering::Relaxed); + self.wait.notify_all(); + if let Some(x) = self.handle.take() { + x.join().unwrap() + } + } +} + +impl AccountsIndexStorage { + pub fn new() -> AccountsIndexStorage { + let storage = Arc::new(BucketMapHolder::new()); + let storage_ = storage.clone(); + let exit = Arc::new(AtomicBool::default()); + let exit_ = exit.clone(); + let wait = Arc::new(WaitableCondvar::default()); + let wait_ = wait.clone(); + let handle = Some( + Builder::new() + .name("solana-index-flusher".to_string()) + .spawn(move || { + storage_.background(exit_, wait_); + }) + .unwrap(), + ); + + Self { + exit, + wait, + handle, + storage, + } + } + + pub fn storage(&self) -> &Arc { + &self.storage + } +} diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs new file mode 100644 index 0000000000..29de87ea29 --- /dev/null +++ b/runtime/src/bucket_map_holder.rs @@ -0,0 +1,31 @@ +use crate::bucket_map_holder_stats::BucketMapHolderStats; +use crate::waitable_condvar::WaitableCondvar; +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +// will eventually hold the bucket map +#[derive(Debug, Default)] +pub struct BucketMapHolder { + pub stats: BucketMapHolderStats, +} + +impl BucketMapHolder { + pub fn new() -> Self { + Self { + stats: BucketMapHolderStats::default(), + } + } + + // intended to execute in a bg thread + pub fn background(&self, exit: Arc, wait: Arc) { + loop { + wait.wait_timeout(Duration::from_millis(10000)); // account index stats every 10 s + if exit.load(Ordering::Relaxed) { + break; + } + self.stats.report_stats(); + } + } +} diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs new file mode 100644 index 0000000000..72e840d691 --- /dev/null +++ b/runtime/src/bucket_map_holder_stats.rs @@ -0,0 +1,44 @@ +use std::fmt::Debug; +use std::sync::atomic::{AtomicU64, Ordering}; + +#[derive(Debug, Default)] +pub struct BucketMapHolderStats { + pub get_mem_us: AtomicU64, + pub gets_from_mem: AtomicU64, + pub get_missing_us: AtomicU64, + pub gets_missing: AtomicU64, + pub items: AtomicU64, + pub keys: AtomicU64, + pub deletes: AtomicU64, +} + +impl BucketMapHolderStats { + pub fn report_stats(&self) { + datapoint_info!( + "accounts_index", + ( + "gets_from_mem", + self.gets_from_mem.swap(0, Ordering::Relaxed), + i64 + ), + ( + "get_mem_us", + self.get_mem_us.swap(0, Ordering::Relaxed) / 1000, + i64 + ), + ( + "gets_missing", + self.gets_missing.swap(0, Ordering::Relaxed), + i64 + ), + ( + "get_missing_us", + self.get_missing_us.swap(0, Ordering::Relaxed) / 1000, + i64 + ), + ("deletes", self.deletes.swap(0, Ordering::Relaxed), i64), + ("items", self.items.swap(0, Ordering::Relaxed), i64), + ("keys", self.keys.swap(0, Ordering::Relaxed), i64), + ); + } +} diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 6b1f320d61..95ce7e6174 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -1,44 +1,80 @@ use crate::accounts_index::{AccountMapEntry, IsCached, WriteAccountMapEntry}; +use crate::accounts_index_storage::AccountsIndexStorage; +use crate::bucket_map_holder::BucketMapHolder; +use crate::bucket_map_holder_stats::BucketMapHolderStats; +use solana_measure::measure::Measure; use solana_sdk::pubkey::Pubkey; use std::collections::{ hash_map::{Entry, Keys}, HashMap, }; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + use std::fmt::Debug; type K = Pubkey; // one instance of this represents one bin of the accounts index. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct InMemAccountsIndex { // backing store map: HashMap>, + storage: Arc, } impl InMemAccountsIndex { - pub fn new() -> Self { + pub fn new(storage: &AccountsIndexStorage) -> Self { Self { map: HashMap::new(), + storage: storage.storage().clone(), } } + pub fn new_bucket_map_holder() -> Arc { + Arc::new(BucketMapHolder::new()) + } + pub fn entry(&mut self, pubkey: Pubkey) -> Entry> { - self.map.entry(pubkey) + let m = Measure::start("entry"); + let result = self.map.entry(pubkey); + let stats = &self.storage.stats; + let (count, time) = if matches!(result, Entry::Occupied(_)) { + (&stats.gets_from_mem, &stats.get_mem_us) + } else { + (&stats.gets_missing, &stats.get_missing_us) + }; + Self::update_time_stat(time, m); + Self::update_stat(count, 1); + result } pub fn items(&self) -> Vec<(K, AccountMapEntry)> { + Self::update_stat(&self.stats().items, 1); self.map.iter().map(|(k, v)| (*k, v.clone())).collect() } pub fn keys(&self) -> Keys> { + Self::update_stat(&self.stats().keys, 1); self.map.keys() } pub fn get(&self, key: &K) -> Option> { - self.map.get(key).cloned() + let m = Measure::start("get"); + let result = self.map.get(key).cloned(); + let stats = self.stats(); + let (count, time) = if result.is_some() { + (&stats.gets_from_mem, &stats.get_mem_us) + } else { + (&stats.gets_missing, &stats.get_missing_us) + }; + Self::update_time_stat(time, m); + Self::update_stat(count, 1); + result } pub fn remove(&mut self, key: &K) { + Self::update_stat(&self.stats().deletes, 1); self.map.remove(key); } @@ -83,4 +119,20 @@ impl InMemAccountsIndex { } } } + + fn stats(&self) -> &BucketMapHolderStats { + &self.storage.stats + } + + fn update_stat(stat: &AtomicU64, value: u64) { + if value != 0 { + stat.fetch_add(value, Ordering::Relaxed); + } + } + + pub fn update_time_stat(stat: &AtomicU64, mut m: Measure) { + m.stop(); + let value = m.as_us(); + Self::update_stat(stat, value); + } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 47c4c7cb11..11c70a2293 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -6,6 +6,7 @@ pub mod accounts_cache; pub mod accounts_db; pub mod accounts_hash; pub mod accounts_index; +pub mod accounts_index_storage; pub mod ancestors; pub mod append_vec; pub mod bank; @@ -14,6 +15,8 @@ pub mod bank_forks; pub mod bank_utils; pub mod blockhash_queue; pub mod bloom; +pub mod bucket_map_holder; +pub mod bucket_map_holder_stats; pub mod builtins; pub mod commitment; pub mod contains;