From 4dc2f0819847efa535aa9ed8163f80c410f12813 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Fri, 17 Sep 2021 17:19:29 -0500 Subject: [PATCH] AcctIdx: hold_range_in_memory (#19955) --- runtime/src/accounts.rs | 65 ++++++++++ runtime/src/accounts_index.rs | 24 ++++ runtime/src/bank.rs | 6 +- runtime/src/bucket_map_holder_stats.rs | 6 + runtime/src/in_mem_accounts_index.rs | 158 ++++++++++++++++++++++++- 5 files changed, 257 insertions(+), 2 deletions(-) diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 8051361f5b..6ff4c92696 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -813,6 +813,15 @@ impl Accounts { ) } + pub fn hold_range_in_memory(&self, range: &R, start_holding: bool) + where + R: RangeBounds + std::fmt::Debug, + { + self.accounts_db + .accounts_index + .hold_range_in_memory(range, start_holding) + } + pub fn load_to_collect_rent_eagerly + std::fmt::Debug>( &self, ancestors: &Ancestors, @@ -1240,6 +1249,62 @@ mod tests { load_accounts_with_fee(tx, ka, &fee_calculator, error_counters) } + #[test] + fn test_hold_range_in_memory() { + let accts = Accounts::default_for_tests(); + let range = Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]); + accts.hold_range_in_memory(&range, true); + accts.hold_range_in_memory(&range, false); + accts.hold_range_in_memory(&range, true); + accts.hold_range_in_memory(&range, true); + accts.hold_range_in_memory(&range, false); + accts.hold_range_in_memory(&range, false); + } + + #[test] + fn test_hold_range_in_memory2() { + let accts = Accounts::default_for_tests(); + let range = Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]); + let idx = &accts.accounts_db.accounts_index; + let bins = idx.account_maps.len(); + // use bins * 2 to get the first half of the range within bin 0 + let bins_2 = bins * 2; + let binner = crate::pubkey_bins::PubkeyBinCalculator16::new(bins_2); + let range2 = + binner.lowest_pubkey_from_bin(0, bins_2)..binner.lowest_pubkey_from_bin(1, bins_2); + let range2_inclusive = range2.start..=range2.end; + assert_eq!(0, idx.bin_calculator.bin_from_pubkey(&range2.start)); + assert_eq!(0, idx.bin_calculator.bin_from_pubkey(&range2.end)); + accts.hold_range_in_memory(&range, true); + error!("{}{}, bins: {}", file!(), line!(), bins); + idx.account_maps.iter().enumerate().for_each(|(_bin, map)| { + let map = map.read().unwrap(); + assert_eq!( + map.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(range.clone())] + ); + }); + error!("{}{}", file!(), line!()); + accts.hold_range_in_memory(&range2, true); + error!("{}{}", file!(), line!()); + idx.account_maps.iter().enumerate().for_each(|(bin, map)| { + let map = map.read().unwrap(); + let expected = if bin == 0 { + vec![Some(range.clone()), Some(range2_inclusive.clone())] + } else { + vec![Some(range.clone())] + }; + assert_eq!( + map.cache_ranges_held.read().unwrap().to_vec(), + expected, + "bin: {}", + bin + ); + }); + accts.hold_range_in_memory(&range, false); + accts.hold_range_in_memory(&range2, false); + } + #[test] fn test_load_accounts_no_account_0_exists() { let accounts: Vec<(Pubkey, AccountSharedData)> = Vec::new(); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index c52e43efe7..8d41f08f83 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -648,6 +648,23 @@ impl<'a, T: IndexValue> AccountsIndexIterator<'a, T> { collect_all_unsorted, } } + + pub fn hold_range_in_memory(&self, range: &R, start_holding: bool) + where + R: RangeBounds + Debug, + { + // forward this hold request ONLY to the bins which contain keys in the specified range + let (start_bin, bin_range) = self.bin_start_and_range(); + self.account_maps + .iter() + .skip(start_bin) + .take(bin_range) + .for_each(|map| { + map.read() + .unwrap() + .hold_range_in_memory(range, start_holding); + }); + } } impl<'a, T: IndexValue> Iterator for AccountsIndexIterator<'a, T> { @@ -1346,6 +1363,13 @@ impl AccountsIndex { rv.map(|index| slice.len() - 1 - index) } + pub fn hold_range_in_memory(&self, range: &R, start_holding: bool) + where + R: RangeBounds + Debug, + { + let iter = self.iter(Some(range), true); + iter.hold_range_in_memory(range, start_holding); + } /// Get an account /// The latest account that appears in `ancestors` or `roots` is returned. pub(crate) fn get( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 1557c8f658..7607e1ffbb 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3927,10 +3927,12 @@ impl Bank { fn collect_rent_in_partition(&self, partition: Partition) -> usize { let subrange = Self::pubkey_range_from_partition(partition); + self.rc.accounts.hold_range_in_memory(&subrange, true); + let accounts = self .rc .accounts - .load_to_collect_rent_eagerly(&self.ancestors, subrange); + .load_to_collect_rent_eagerly(&self.ancestors, subrange.clone()); let account_count = accounts.len(); // parallelize? @@ -3954,6 +3956,8 @@ impl Bank { } self.collected_rent.fetch_add(total_rent, Relaxed); self.rewards.write().unwrap().append(&mut rent_debits.0); + + self.rc.accounts.hold_range_in_memory(&subrange, false); account_count } diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 2eb7b61873..dc7734fa45 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -18,6 +18,7 @@ pub struct BucketMapHolderStats { pub inserts: AtomicU64, pub count_in_mem: AtomicU64, pub per_bucket_count: Vec, + pub get_range_us: AtomicU64, } impl BucketMapHolderStats { @@ -110,6 +111,11 @@ impl BucketMapHolderStats { self.updates_in_mem.swap(0, Ordering::Relaxed), i64 ), + ( + "get_range_us", + self.get_range_us.swap(0, Ordering::Relaxed), + i64 + ), ("inserts", self.inserts.swap(0, Ordering::Relaxed), i64), ("deletes", self.deletes.swap(0, Ordering::Relaxed), i64), ("items", self.items.swap(0, Ordering::Relaxed), i64), diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index b2c2229b16..655bb63217 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -6,12 +6,14 @@ use crate::bucket_map_holder_stats::BucketMapHolderStats; use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::collections::{hash_map::Entry, HashMap}; +use std::ops::{Bound, RangeBounds, RangeInclusive}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; use std::fmt::Debug; -use std::ops::RangeBounds; type K = Pubkey; +type CacheRangesHeld = RwLock>>>; +pub type SlotT = (Slot, T); // one instance of this represents one bin of the accounts index. pub struct InMemAccountsIndex { @@ -19,6 +21,11 @@ pub struct InMemAccountsIndex { map_internal: RwLock>>, storage: Arc>, bin: usize, + + // pubkey ranges that this bin must hold in the cache while the range is present in this vec + pub(crate) cache_ranges_held: CacheRangesHeld, + // true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held. + stop_flush: AtomicU64, } impl Debug for InMemAccountsIndex { @@ -33,6 +40,8 @@ impl InMemAccountsIndex { map_internal: RwLock::default(), storage: Arc::clone(storage), bin, + cache_ranges_held: CacheRangesHeld::default(), + stop_flush: AtomicU64::default(), } } @@ -264,6 +273,91 @@ impl InMemAccountsIndex { result } + pub fn just_set_hold_range_in_memory(&self, range: &R, start_holding: bool) + where + R: RangeBounds, + { + let start = match range.start_bound() { + Bound::Included(bound) | Bound::Excluded(bound) => *bound, + Bound::Unbounded => Pubkey::new(&[0; 32]), + }; + + let end = match range.end_bound() { + Bound::Included(bound) | Bound::Excluded(bound) => *bound, + Bound::Unbounded => Pubkey::new(&[0xff; 32]), + }; + + // this becomes inclusive - that is ok - we are just roughly holding a range of items. + // inclusive is bigger than exclusive so we may hold 1 extra item worst case + let inclusive_range = Some(start..=end); + let mut ranges = self.cache_ranges_held.write().unwrap(); + if start_holding { + ranges.push(inclusive_range); + } else { + // find the matching range and delete it since we don't want to hold it anymore + let none = inclusive_range.is_none(); + for (i, r) in ranges.iter().enumerate() { + if r.is_none() != none { + continue; + } + if !none { + // neither are none, so check values + if let (Bound::Included(start_found), Bound::Included(end_found)) = r + .as_ref() + .map(|r| (r.start_bound(), r.end_bound())) + .unwrap() + { + if start_found != &start || end_found != &end { + continue; + } + } + } + // found a match. There may be dups, that's ok, we expect another call to remove the dup. + ranges.remove(i); + break; + } + } + } + + fn start_stop_flush(&self, stop: bool) { + if stop { + self.stop_flush.fetch_add(1, Ordering::Acquire); + } else { + self.stop_flush.fetch_sub(1, Ordering::Release); + } + } + + pub fn hold_range_in_memory(&self, range: &R, start_holding: bool) + where + R: RangeBounds + Debug, + { + self.start_stop_flush(true); + + if start_holding { + // put everything in the cache and it will be held there + self.put_range_in_cache(Some(range)); + } + // do this AFTER items have been put in cache - that way anyone who finds this range can know that the items are already in the cache + self.just_set_hold_range_in_memory(range, start_holding); + + self.start_stop_flush(false); + } + + fn put_range_in_cache(&self, _range: Option<&R>) + where + R: RangeBounds, + { + assert!(self.get_stop_flush()); // caller should be controlling the lifetime of how long this needs to be present + let m = Measure::start("range"); + + // load from disk here + Self::update_time_stat(&self.stats().get_range_us, m); + } + + fn get_stop_flush(&self) -> bool { + self.stop_flush.load(Ordering::Relaxed) > 0 + } + fn stats(&self) -> &BucketMapHolderStats { &self.storage.stats } @@ -280,3 +374,65 @@ impl InMemAccountsIndex { Self::update_stat(stat, value); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::accounts_index::BINS_FOR_TESTING; + + fn new_for_test() -> InMemAccountsIndex { + let holder = Arc::new(BucketMapHolder::new(BINS_FOR_TESTING)); + InMemAccountsIndex::new(&holder, BINS_FOR_TESTING) + } + + #[test] + fn test_hold_range_in_memory() { + let accts = new_for_test::(); + // 0x81 is just some other range + let ranges = [ + Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]), + Pubkey::new(&[0x81; 32])..=Pubkey::new(&[0xff; 32]), + ]; + for range in ranges.clone() { + assert!(accts.cache_ranges_held.read().unwrap().is_empty()); + accts.hold_range_in_memory(&range, true); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(range.clone())] + ); + accts.hold_range_in_memory(&range, false); + assert!(accts.cache_ranges_held.read().unwrap().is_empty()); + accts.hold_range_in_memory(&range, true); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(range.clone())] + ); + accts.hold_range_in_memory(&range, true); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(range.clone()), Some(range.clone())] + ); + accts.hold_range_in_memory(&ranges[0], true); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![ + Some(range.clone()), + Some(range.clone()), + Some(ranges[0].clone()) + ] + ); + accts.hold_range_in_memory(&range, false); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(range.clone()), Some(ranges[0].clone())] + ); + accts.hold_range_in_memory(&range, false); + assert_eq!( + accts.cache_ranges_held.read().unwrap().to_vec(), + vec![Some(ranges[0].clone())] + ); + accts.hold_range_in_memory(&ranges[0].clone(), false); + assert!(accts.cache_ranges_held.read().unwrap().is_empty()); + } + } +}