AcctIdx: consolidate to correct CondVar (#20017)

This commit is contained in:
Jeff Washington (jwash) 2021-09-20 09:58:20 -05:00 committed by GitHub
parent 55d9ff9899
commit e6934e7247
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 11 deletions

View File

@ -1,7 +1,6 @@
use crate::accounts_index::{AccountsIndexConfig, IndexValue}; use crate::accounts_index::{AccountsIndexConfig, IndexValue};
use crate::bucket_map_holder::BucketMapHolder; use crate::bucket_map_holder::BucketMapHolder;
use crate::in_mem_accounts_index::InMemAccountsIndex; use crate::in_mem_accounts_index::InMemAccountsIndex;
use crate::waitable_condvar::WaitableCondvar;
use std::fmt::Debug; use std::fmt::Debug;
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
@ -19,7 +18,6 @@ use std::{
pub struct AccountsIndexStorage<T: IndexValue> { pub struct AccountsIndexStorage<T: IndexValue> {
// for managing the bg threads // for managing the bg threads
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
wait: Arc<WaitableCondvar>,
handles: Option<Vec<JoinHandle<()>>>, handles: Option<Vec<JoinHandle<()>>>,
// eventually the backing storage // eventually the backing storage
@ -36,7 +34,7 @@ impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
impl<T: IndexValue> Drop for AccountsIndexStorage<T> { impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
self.wait.notify_all(); self.storage.wait_dirty_or_aged.notify_all();
if let Some(handles) = self.handles.take() { if let Some(handles) = self.handles.take() {
handles handles
.into_iter() .into_iter()
@ -61,21 +59,19 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
.unwrap_or(DEFAULT_THREADS); .unwrap_or(DEFAULT_THREADS);
let exit = Arc::new(AtomicBool::default()); let exit = Arc::new(AtomicBool::default());
let wait = Arc::new(WaitableCondvar::default());
let handles = Some( let handles = Some(
(0..threads) (0..threads)
.into_iter() .into_iter()
.map(|_| { .map(|_| {
let storage_ = Arc::clone(&storage); let storage_ = Arc::clone(&storage);
let exit_ = Arc::clone(&exit); let exit_ = Arc::clone(&exit);
let wait_ = Arc::clone(&wait);
let in_mem_ = in_mem.clone(); let in_mem_ = in_mem.clone();
// note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock // note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock
Builder::new() Builder::new()
.name("solana-idx-flusher".to_string()) .name("solana-idx-flusher".to_string())
.spawn(move || { .spawn(move || {
Self::background(storage_, exit_, wait_, in_mem_); Self::background(storage_, exit_, in_mem_);
}) })
.unwrap() .unwrap()
}) })
@ -84,7 +80,6 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
Self { Self {
exit, exit,
wait,
handles, handles,
storage, storage,
in_mem, in_mem,
@ -99,12 +94,13 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
pub fn background( pub fn background(
storage: Arc<BucketMapHolder<T>>, storage: Arc<BucketMapHolder<T>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
wait: Arc<WaitableCondvar>,
in_mem: Vec<Arc<InMemAccountsIndex<T>>>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
) { ) {
loop { loop {
// this will transition to waits and thread throttling // this will transition to waits and thread throttling
wait.wait_timeout(Duration::from_millis(10000)); storage
.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(10000));
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }

View File

@ -16,7 +16,7 @@ pub struct BucketMapHolder<T: IndexValue> {
pub stats: BucketMapHolderStats, pub stats: BucketMapHolderStats,
// used by bg processing to know when any bucket has become dirty // used by bg processing to know when any bucket has become dirty
pub wait_dirty_bucket: WaitableCondvar, pub wait_dirty_or_aged: WaitableCondvar,
next_bucket_to_flush: Mutex<usize>, next_bucket_to_flush: Mutex<usize>,
bins: usize, bins: usize,
@ -48,6 +48,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
// since we changed age, there are now 0 buckets that have been flushed at this age // since we changed age, there are now 0 buckets that have been flushed at this age
let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed); let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed);
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
} }
pub fn future_age_to_flush(&self) -> Age { pub fn future_age_to_flush(&self) -> Age {
@ -101,7 +102,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
count_ages_flushed: AtomicUsize::default(), count_ages_flushed: AtomicUsize::default(),
age: AtomicU8::default(), age: AtomicU8::default(),
stats: BucketMapHolderStats::default(), stats: BucketMapHolderStats::default(),
wait_dirty_bucket: WaitableCondvar::default(), wait_dirty_or_aged: WaitableCondvar::default(),
next_bucket_to_flush: Mutex::new(0), next_bucket_to_flush: Mutex::new(0),
bins, bins,
startup: AtomicBool::default(), startup: AtomicBool::default(),

View File

@ -397,6 +397,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
pub fn set_bin_dirty(&self) { pub fn set_bin_dirty(&self) {
self.bin_dirty.store(true, Ordering::Release); self.bin_dirty.store(true, Ordering::Release);
// 1 bin dirty, so only need 1 thread to wake up if many could be waiting
self.storage.wait_dirty_or_aged.notify_one();
} }
fn flush_internal(&self) { fn flush_internal(&self) {