solana/runtime/src/accounts_index_storage.rs

132 lines
4.0 KiB
Rust
Raw Normal View History

use crate::accounts_index::{AccountsIndexConfig, IndexValue};
use crate::bucket_map_holder::BucketMapHolder;
use crate::in_mem_accounts_index::InMemAccountsIndex;
2021-10-15 14:15:11 -07:00
use crate::waitable_condvar::WaitableCondvar;
use std::fmt::Debug;
2021-09-12 15:14:59 -07:00
use std::{
sync::{
atomic::{AtomicBool, Ordering},
2021-10-15 14:15:11 -07:00
Arc, Mutex,
2021-09-12 15:14:59 -07:00
},
thread::{Builder, JoinHandle},
};
2021-10-15 14:15:11 -07:00
/// Manages the lifetime of the background processing threads.
pub struct AccountsIndexStorage<T: IndexValue> {
2021-10-15 14:15:11 -07:00
_bg_threads: BgThreads,
2021-09-12 15:14:59 -07:00
pub storage: Arc<BucketMapHolder<T>>,
pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
2021-10-15 14:15:11 -07:00
/// set_startup(true) creates bg threads which are kept alive until set_startup(false)
startup_worker_threads: Mutex<Option<BgThreads>>,
2021-09-12 15:14:59 -07:00
}
impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
2021-10-15 14:15:11 -07:00
/// low-level managing the bg threads
struct BgThreads {
exit: Arc<AtomicBool>,
handles: Option<Vec<JoinHandle<()>>>,
wait: Arc<WaitableCondvar>,
}
impl Drop for BgThreads {
2021-09-12 15:14:59 -07:00
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
2021-10-15 14:15:11 -07:00
self.wait.notify_all();
if let Some(handles) = self.handles.take() {
handles
.into_iter()
.for_each(|handle| handle.join().unwrap());
2021-09-12 15:14:59 -07:00
}
}
}
2021-10-15 14:15:11 -07:00
impl BgThreads {
fn new<T: IndexValue>(
storage: &Arc<BucketMapHolder<T>>,
in_mem: &[Arc<InMemAccountsIndex<T>>],
threads: usize,
) -> Self {
2021-10-15 14:15:11 -07:00
// stop signal used for THIS batch of bg threads
2021-09-12 15:14:59 -07:00
let exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
.into_iter()
.map(|_| {
2021-10-15 14:15:11 -07:00
let storage_ = Arc::clone(storage);
let exit_ = Arc::clone(&exit);
2021-10-15 14:15:11 -07:00
let in_mem_ = in_mem.to_vec();
2021-10-15 14:15:11 -07:00
// note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
Builder::new()
.name("solana-idx-flusher".to_string())
.spawn(move || {
storage_.background(exit_, in_mem_);
})
.unwrap()
2021-09-12 15:14:59 -07:00
})
.collect(),
2021-09-12 15:14:59 -07:00
);
2021-10-15 14:15:11 -07:00
BgThreads {
2021-09-12 15:14:59 -07:00
exit,
handles,
2021-10-15 14:15:11 -07:00
wait: Arc::clone(&storage.wait_dirty_or_aged),
}
}
}
impl<T: IndexValue> AccountsIndexStorage<T> {
/// startup=true causes:
/// in mem to act in a way that flushes to disk asap
/// also creates some additional bg threads to facilitate flushing to disk asap
/// startup=false is 'normal' operation
pub fn set_startup(&self, value: bool) {
if value {
// create some additional bg threads to help get things to the disk index asap
*self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
&self.storage,
&self.in_mem,
Self::num_threads(),
));
2021-09-12 15:14:59 -07:00
}
2021-10-15 14:15:11 -07:00
self.storage.set_startup(value);
if !value {
// shutdown the bg threads
*self.startup_worker_threads.lock().unwrap() = None;
}
}
fn num_threads() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
2021-09-12 15:14:59 -07:00
}
2021-10-15 14:15:11 -07:00
/// allocate BucketMapHolder and InMemAccountsIndex[]
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
let threads = config
.as_ref()
.and_then(|config| config.flush_threads)
2021-10-15 14:15:11 -07:00
.unwrap_or_else(Self::num_threads);
2021-10-15 14:15:11 -07:00
let storage = Arc::new(BucketMapHolder::new(bins, config));
let in_mem = (0..bins)
.into_iter()
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();
2021-10-15 14:15:11 -07:00
Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads),
storage,
in_mem,
startup_worker_threads: Mutex::default(),
}
}
2021-09-12 15:14:59 -07:00
}