AcctIdx: prepare to spin up multiple bg threads (#19969)

This commit is contained in:
Jeff Washington (jwash) 2021-09-17 11:53:25 -05:00 committed by GitHub
parent 7b0bf64404
commit 9998e16df3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 13 deletions

View File

@ -20,7 +20,7 @@ pub struct AccountsIndexStorage<T: IndexValue> {
// for managing the bg threads // for managing the bg threads
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
wait: Arc<WaitableCondvar>, wait: Arc<WaitableCondvar>,
handle: Option<JoinHandle<()>>, handles: Option<Vec<JoinHandle<()>>>,
// eventually the backing storage // eventually the backing storage
storage: Arc<BucketMapHolder<T>>, storage: Arc<BucketMapHolder<T>>,
@ -37,8 +37,10 @@ impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
self.wait.notify_all(); self.wait.notify_all();
if let Some(x) = self.handle.take() { if let Some(handles) = self.handles.take() {
x.join().unwrap() handles
.into_iter()
.for_each(|handle| handle.join().unwrap());
} }
} }
} }
@ -52,24 +54,32 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect(); .collect();
let storage_ = Arc::clone(&storage); const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation
let threads = DEFAULT_THREADS;
let exit = Arc::new(AtomicBool::default()); let exit = Arc::new(AtomicBool::default());
let exit_ = Arc::clone(&exit);
let wait = Arc::new(WaitableCondvar::default()); let wait = Arc::new(WaitableCondvar::default());
let wait_ = Arc::clone(&wait); let handles = Some(
let handle = Some( (0..threads)
Builder::new() .into_iter()
.name("solana-index-flusher".to_string()) .map(|_| {
.spawn(move || { let storage_ = Arc::clone(&storage);
Self::background(storage_, exit_, wait_); let exit_ = Arc::clone(&exit);
let wait_ = Arc::clone(&wait);
// note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock
Builder::new()
.name("solana-idx-flusher".to_string())
.spawn(move || {
Self::background(storage_, exit_, wait_);
})
.unwrap()
}) })
.unwrap(), .collect(),
); );
Self { Self {
exit, exit,
wait, wait,
handle, handles,
storage, storage,
in_mem, in_mem,
} }