diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 35a10039f..f7428e773 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3691,6 +3691,7 @@ impl AccountsDb { { // Slots that are currently being flushed by flush_slot_cache() + let mut currently_contended_slots = slots_under_contention.lock().unwrap(); // Slots that are currently being flushed by flush_slot_cache() AND @@ -4229,42 +4230,53 @@ impl AccountsDb { slot: Slot, should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>, ) -> Option { - self.accounts_cache.slot_cache(slot).and_then(|slot_cache| { - let is_being_purged = { - let mut slots_under_contention = self - .remove_unrooted_slots_synchronization - .slots_under_contention - .lock() - .unwrap(); - // If we're purging this slot, don't flush it here - if slots_under_contention.contains(&slot) { - true - } else { - slots_under_contention.insert(slot); - false - } - }; - if !is_being_purged { - let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f); - // Nobody else should have been purging this slot, so should not have been removed - // from `self.remove_unrooted_slots_synchronization`. - assert!(self - .remove_unrooted_slots_synchronization - .slots_under_contention - .lock() - .unwrap() - .remove(&slot)); - - // Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished - // flushing - self.remove_unrooted_slots_synchronization - .signal - .notify_all(); - Some(flush_stats) + let is_being_purged = { + let mut slots_under_contention = self + .remove_unrooted_slots_synchronization + .slots_under_contention + .lock() + .unwrap(); + // If we're purging this slot, don't flush it here + if slots_under_contention.contains(&slot) { + true } else { - None + slots_under_contention.insert(slot); + false } - }) + }; + + if !is_being_purged { + let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| { + #[cfg(test)] + { + // Give some time for cache flushing to occur here for unit tests + sleep(Duration::from_millis(self.load_delay)); + } + // Since we added the slot to `slots_under_contention` AND this slot + // still exists in the cache, we know the slot cannot be removed + // by any other threads past this point. We are now responsible for + // flushing this slot. + self.do_flush_slot_cache(slot, &slot_cache, should_flush_f) + }); + + // Nobody else should have been purging this slot, so should not have been removed + // from `self.remove_unrooted_slots_synchronization`. + assert!(self + .remove_unrooted_slots_synchronization + .slots_under_contention + .lock() + .unwrap() + .remove(&slot)); + + // Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished + // flushing + self.remove_unrooted_slots_synchronization + .signal + .notify_all(); + flush_stats + } else { + None + } } fn write_accounts_to_cache( @@ -11516,7 +11528,79 @@ pub mod tests { } #[test] - fn test_cache_flush_remove_unrooted_race() { + fn test_cache_flush_delayed_remove_unrooted_race() { + let caching_enabled = true; + let mut db = AccountsDb::new_with_config( + Vec::new(), + &ClusterType::Development, + AccountSecondaryIndexes::default(), + caching_enabled, + AccountShrinkThreshold::default(), + ); + db.load_delay = RACY_SLEEP_MS; + let db = Arc::new(db); + let slot = 10; + let bank_id = 10; + + let lamports = 42; + let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner()); + account.set_lamports(lamports); + + // Start up a thread to flush the accounts cache + let (flush_trial_start_sender, flush_trial_start_receiver) = unbounded(); + let (flush_done_sender, flush_done_receiver) = unbounded(); + let t_flush_cache = { + let db = db.clone(); + std::thread::Builder::new() + .name("account-cache-flush".to_string()) + .spawn(move || loop { + // Wait for the signal to start a trial + if flush_trial_start_receiver.recv().is_err() { + return; + } + db.flush_slot_cache(10, None::<&mut fn(&_, &_) -> bool>); + flush_done_sender.send(()).unwrap(); + }) + .unwrap() + }; + + // Start up a thread remove the slot + let (remove_trial_start_sender, remove_trial_start_receiver) = unbounded(); + let (remove_done_sender, remove_done_receiver) = unbounded(); + let t_remove = { + let db = db.clone(); + std::thread::Builder::new() + .name("account-remove".to_string()) + .spawn(move || loop { + // Wait for the signal to start a trial + if remove_trial_start_receiver.recv().is_err() { + return; + } + db.remove_unrooted_slots(&[(slot, bank_id)]); + remove_done_sender.send(()).unwrap(); + }) + .unwrap() + }; + + let num_trials = 10; + for _ in 0..num_trials { + let pubkey = Pubkey::new_unique(); + db.store_cached(slot, &[(&pubkey, &account)]); + // Wait for both threads to finish + flush_trial_start_sender.send(()).unwrap(); + remove_trial_start_sender.send(()).unwrap(); + let _ = flush_done_receiver.recv(); + let _ = remove_done_receiver.recv(); + } + + drop(flush_trial_start_sender); + drop(remove_trial_start_sender); + t_flush_cache.join().unwrap(); + t_remove.join().unwrap(); + } + + #[test] + fn test_cache_flush_remove_unrooted_race_multiple_slots() { let caching_enabled = true; let db = AccountsDb::new_with_config( Vec::new(), @@ -11606,15 +11690,17 @@ pub mod tests { // in which case flush should ignore/move past the slot to be dumped // // Hence, we split into chunks to get the dumping of each chunk to race with the - // flushes. If we were to dump the entire chunk at once, then this lessens the possibility + // flushes. If we were to dump the entire chunk at once, then this reduces the possibility // of the flush occurring first since the dumping logic reserves all the slots it's about // to dump immediately. + for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) { db.remove_unrooted_slots(chunks); } // Check that all the slots in `slots_to_dump` were completely removed from the // cache, storage, and index + for (slot, _) in slots_to_dump { assert!(db.storage.get_slot_storage_entries(*slot).is_none()); assert!(db.accounts_cache.slot_cache(*slot).is_none()); @@ -11626,6 +11712,7 @@ pub mod tests { } // Wait for flush to finish before starting next trial + flush_done_receiver.recv().unwrap(); for (slot, bank_id) in slots_to_keep { @@ -11646,6 +11733,7 @@ pub mod tests { exit.store(true, Ordering::Relaxed); drop(new_trial_start_sender); t_flush_cache.join().unwrap(); + t_spurious_signal.join().unwrap(); }