diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index c0f386b62e..31de2ab55d 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -52,7 +52,7 @@ impl AccountsIndexStorage { let in_mem = (0..bins) .into_iter() .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) - .collect(); + .collect::>(); const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation let threads = config @@ -69,11 +69,13 @@ impl AccountsIndexStorage { let storage_ = Arc::clone(&storage); let exit_ = Arc::clone(&exit); let wait_ = Arc::clone(&wait); + let in_mem_ = in_mem.clone(); + // note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock Builder::new() .name("solana-idx-flusher".to_string()) .spawn(move || { - Self::background(storage_, exit_, wait_); + Self::background(storage_, exit_, wait_, in_mem_); }) .unwrap() }) @@ -98,6 +100,7 @@ impl AccountsIndexStorage { storage: Arc>, exit: Arc, wait: Arc, + in_mem: Vec>>, ) { loop { // this will transition to waits and thread throttling @@ -105,6 +108,11 @@ impl AccountsIndexStorage { if exit.load(Ordering::Relaxed) { break; } + + for bucket in &in_mem { + bucket.flush(); + } + storage.stats.report_stats(); } } diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 57b016f1fb..87bba1c371 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -7,7 +7,7 @@ use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::collections::{hash_map::Entry, HashMap}; use std::ops::{Bound, RangeBounds, RangeInclusive}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; use std::fmt::Debug; @@ -26,6 +26,10 @@ pub struct InMemAccountsIndex { pub(crate) cache_ranges_held: CacheRangesHeld, // true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held. stop_flush: AtomicU64, + // set to true when any entry in this bin is marked dirty + bin_dirty: AtomicBool, + // set to true while this bin is being actively flushed + flushing_active: AtomicBool, } impl Debug for InMemAccountsIndex { @@ -42,6 +46,8 @@ impl InMemAccountsIndex { bin, cache_ranges_held: CacheRangesHeld::default(), stop_flush: AtomicU64::default(), + bin_dirty: AtomicBool::default(), + flushing_active: AtomicBool::default(), } } @@ -358,6 +364,46 @@ impl InMemAccountsIndex { self.stop_flush.load(Ordering::Relaxed) > 0 } + pub(crate) fn flush(&self) { + let flushing = self.flushing_active.swap(true, Ordering::Acquire); + if flushing { + // already flushing in another thread + return; + } + + self.flush_internal(); + + self.flushing_active.store(false, Ordering::Release); + } + + pub fn set_bin_dirty(&self) { + self.bin_dirty.store(true, Ordering::Release); + } + + fn flush_internal(&self) { + let was_dirty = self.bin_dirty.swap(false, Ordering::Acquire); + if !was_dirty { + // wasn't dirty, no need to flush + return; + } + + let map = self.map().read().unwrap(); + for (_k, _v) in map.iter() { + /* + if v.dirty() { + // step 1: clear the dirty flag + // step 2: perform the update on disk based on the fields in the entry + // If a parallel operation dirties the item again - even while this flush is occurring, + // the last thing the writer will do, after updating contents, is set_dirty(true) + // That prevents dropping an item from cache before disk is updated to latest in mem. + v.set_dirty(false); + + // soon, this will update disk from the in-mem contents + } + */ + } + } + fn stats(&self) -> &BucketMapHolderStats { &self.storage.stats }