From 8da2eb980a69ed1df005bdf2f0b28f3fb5a02dce Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Mon, 4 Oct 2021 12:48:09 -0400 Subject: [PATCH] AcctIdx: introduce BucketApi for access to a specific bucket (#20359) --- bucket_map/src/bucket_api.rs | 134 +++++++++++++++++++++++++ bucket_map/src/bucket_map.rs | 145 ++++++++------------------- bucket_map/src/lib.rs | 1 + runtime/src/in_mem_accounts_index.rs | 27 +++-- 4 files changed, 194 insertions(+), 113 deletions(-) create mode 100644 bucket_map/src/bucket_api.rs diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs new file mode 100644 index 0000000000..8dbe08bc92 --- /dev/null +++ b/bucket_map/src/bucket_api.rs @@ -0,0 +1,134 @@ +use crate::bucket::Bucket; +use crate::bucket_item::BucketItem; +use crate::bucket_map::BucketMapError; +use crate::bucket_stats::BucketMapStats; +use crate::{MaxSearch, RefCount}; +use solana_sdk::pubkey::Pubkey; +use std::ops::RangeBounds; +use std::path::PathBuf; + +use std::sync::Arc; +use std::sync::{RwLock, RwLockWriteGuard}; + +type LockedBucket = Arc>>>; + +pub struct BucketApi { + drives: Arc>, + max_search: MaxSearch, + pub stats: Arc, + + bucket: LockedBucket, +} + +impl BucketApi { + pub fn new( + drives: Arc>, + max_search: MaxSearch, + stats: Arc, + ) -> Self { + Self { + drives, + max_search, + stats, + bucket: Arc::default(), + } + } + + /// Get the items for bucket + pub fn items_in_range(&self, range: &Option<&R>) -> Vec> + where + R: RangeBounds, + { + self.bucket + .read() + .unwrap() + .as_ref() + .map(|bucket| bucket.items_in_range(range)) + .unwrap_or_default() + } + + /// Get the Pubkeys + pub fn keys(&self) -> Vec { + self.bucket + .read() + .unwrap() + .as_ref() + .map_or_else(Vec::default, |bucket| bucket.keys()) + } + + /// Get the values for Pubkey `key` + pub fn read_value(&self, key: &Pubkey) -> Option<(Vec, RefCount)> { + self.bucket.read().unwrap().as_ref().and_then(|bucket| { + bucket + .read_value(key) + .map(|(value, ref_count)| (value.to_vec(), ref_count)) + }) + } + + pub fn bucket_len(&self) -> u64 { + self.bucket + .read() + .unwrap() + .as_ref() + .map(|bucket| bucket.bucket_len()) + .unwrap_or_default() + } + + pub fn delete_key(&self, key: &Pubkey) { + let mut bucket = self.get_write_bucket(); + if let Some(bucket) = bucket.as_mut() { + bucket.delete_key(key) + } + } + + fn get_write_bucket(&self) -> RwLockWriteGuard>> { + let mut bucket = self.bucket.write().unwrap(); + if bucket.is_none() { + *bucket = Some(Bucket::new( + Arc::clone(&self.drives), + self.max_search, + Arc::clone(&self.stats), + )); + } + bucket + } + + pub fn addref(&self, key: &Pubkey) -> Option { + self.get_write_bucket() + .as_mut() + .and_then(|bucket| bucket.addref(key)) + } + + pub fn unref(&self, key: &Pubkey) -> Option { + self.get_write_bucket() + .as_mut() + .and_then(|bucket| bucket.unref(key)) + } + + pub fn insert(&self, pubkey: &Pubkey, value: (&[T], RefCount)) { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().insert(pubkey, value) + } + + pub fn grow(&self, err: BucketMapError) { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().grow(err) + } + + pub fn update(&self, key: &Pubkey, updatefn: F) + where + F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().update(key, updatefn) + } + + pub fn try_write( + &self, + pubkey: &Pubkey, + value: (&[T], RefCount), + ) -> Result<(), BucketMapError> { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().try_write(pubkey, value.0, value.1) + } +} diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 14a585c9eb..bbc5a86b7a 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -1,17 +1,14 @@ //! BucketMap is a mostly contention free concurrent map backed by MmapMut -use crate::bucket::Bucket; -use crate::bucket_item::BucketItem; +use crate::bucket_api::BucketApi; use crate::bucket_stats::BucketMapStats; use crate::{MaxSearch, RefCount}; use solana_sdk::pubkey::Pubkey; use std::convert::TryInto; use std::fmt::Debug; use std::fs; -use std::ops::RangeBounds; use std::path::PathBuf; use std::sync::Arc; -use std::sync::{RwLock, RwLockWriteGuard}; use tempfile::TempDir; #[derive(Debug, Default, Clone)] @@ -33,10 +30,9 @@ impl BucketMapConfig { } pub struct BucketMap { - buckets: Vec>>>, + buckets: Vec>>, drives: Arc>, max_buckets_pow2: u8, - max_search: MaxSearch, pub stats: Arc, pub temp_dir: Option, } @@ -71,8 +67,6 @@ impl BucketMap { config.max_buckets.is_power_of_two(), "Max number of buckets must be a power of two" ); - let mut buckets = Vec::with_capacity(config.max_buckets); - buckets.resize_with(config.max_buckets, || RwLock::new(None)); let stats = Arc::new(BucketMapStats::default()); // this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway const MAX_SEARCH: MaxSearch = 32; @@ -88,6 +82,15 @@ impl BucketMap { }); let drives = Arc::new(drives); + let mut buckets = Vec::with_capacity(config.max_buckets); + buckets.resize_with(config.max_buckets, || { + Arc::new(BucketApi::new( + Arc::clone(&drives), + max_search, + Arc::clone(&stats), + )) + }); + // A simple log2 function that is correct if x is a power of two let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1; @@ -96,7 +99,6 @@ impl BucketMap { drives, max_buckets_pow2: log2(config.max_buckets) as u8, stats, - max_search, temp_dir, } } @@ -112,92 +114,24 @@ impl BucketMap { self.buckets.len() } - pub fn bucket_len(&self, ix: usize) -> u64 { - self.buckets[ix] - .read() - .unwrap() - .as_ref() - .map(|bucket| bucket.bucket_len()) - .unwrap_or_default() - } - - /// Get the items for bucket `ix` in `range` - pub fn items_in_range(&self, ix: usize, range: &Option<&R>) -> Vec> - where - R: RangeBounds, - { - self.buckets[ix] - .read() - .unwrap() - .as_ref() - .map(|bucket| bucket.items_in_range(range)) - .unwrap_or_default() - } - - /// Get the Pubkeys for bucket `ix` - pub fn keys(&self, ix: usize) -> Vec { - self.buckets[ix] - .read() - .unwrap() - .as_ref() - .map_or_else(Vec::default, |bucket| bucket.keys()) - } - /// Get the values for Pubkey `key` pub fn read_value(&self, key: &Pubkey) -> Option<(Vec, RefCount)> { - let ix = self.bucket_ix(key); - self.buckets[ix] - .read() - .unwrap() - .as_ref() - .and_then(|bucket| { - bucket - .read_value(key) - .map(|(value, ref_count)| (value.to_vec(), ref_count)) - }) + self.get_bucket(key).read_value(key) } /// Delete the Pubkey `key` pub fn delete_key(&self, key: &Pubkey) { - let ix = self.bucket_ix(key); - if let Some(bucket) = self.buckets[ix].write().unwrap().as_mut() { - bucket.delete_key(key); - } + self.get_bucket(key).delete_key(key); } /// Update Pubkey `key`'s value with 'value' - pub fn insert(&self, ix: usize, key: &Pubkey, value: (&[T], RefCount)) { - let mut bucket = self.get_bucket(ix); - bucket.as_mut().unwrap().insert(key, value) - } - - fn get_bucket(&self, ix: usize) -> RwLockWriteGuard>> { - let mut bucket = self.buckets[ix].write().unwrap(); - if bucket.is_none() { - *bucket = Some(Bucket::new( - Arc::clone(&self.drives), - self.max_search, - Arc::clone(&self.stats), - )); - } - bucket + pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) { + self.get_bucket(key).insert(key, value) } /// Update Pubkey `key`'s value with 'value' - pub fn try_insert( - &self, - ix: usize, - key: &Pubkey, - value: (&[T], RefCount), - ) -> Result<(), BucketMapError> { - let mut bucket = self.get_bucket(ix); - bucket.as_mut().unwrap().try_write(key, value.0, value.1) - } - - /// if err is a grow error, then grow the appropriate piece - pub fn grow(&self, ix: usize, err: BucketMapError) { - let mut bucket = self.get_bucket(ix); - bucket.as_mut().unwrap().grow(err); + pub fn try_insert(&self, key: &Pubkey, value: (&[T], RefCount)) -> Result<(), BucketMapError> { + self.get_bucket(key).try_write(key, value) } /// Update Pubkey `key`'s value with function `updatefn` @@ -205,9 +139,15 @@ impl BucketMap { where F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, { - let ix = self.bucket_ix(key); - let mut bucket = self.get_bucket(ix); - bucket.as_mut().unwrap().update(key, updatefn) + self.get_bucket(key).update(key, updatefn) + } + + pub fn get_bucket(&self, key: &Pubkey) -> &Arc> { + self.get_bucket_from_index(self.bucket_ix(key)) + } + + pub fn get_bucket_from_index(&self, ix: usize) -> &Arc> { + &self.buckets[ix] } /// Get the bucket index for Pubkey `key` @@ -223,15 +163,15 @@ impl BucketMap { /// Increment the refcount for Pubkey `key` pub fn addref(&self, key: &Pubkey) -> Option { let ix = self.bucket_ix(key); - let mut bucket = self.buckets[ix].write().unwrap(); - bucket.as_mut()?.addref(key) + let bucket = &self.buckets[ix]; + bucket.addref(key) } /// Decrement the refcount for Pubkey `key` pub fn unref(&self, key: &Pubkey) -> Option { let ix = self.bucket_ix(key); - let mut bucket = self.buckets[ix].write().unwrap(); - bucket.as_mut()?.unref(key) + let bucket = &self.buckets[ix]; + bucket.unref(key) } } @@ -247,6 +187,7 @@ mod tests { use rand::thread_rng; use rand::Rng; use std::collections::HashMap; + use std::sync::RwLock; #[test] fn bucket_map_test_insert() { @@ -263,21 +204,21 @@ mod tests { let key = Pubkey::new_unique(); let config = BucketMapConfig::new(1 << 1); let index = BucketMap::new(config); - let ix = index.bucket_ix(&key); + let bucket = index.get_bucket(&key); if pass == 0 { - index.insert(ix, &key, (&[0], 0)); + index.insert(&key, (&[0], 0)); } else { - let result = index.try_insert(ix, &key, (&[0], 0)); + let result = index.try_insert(&key, (&[0], 0)); assert!(result.is_err()); assert_eq!(index.read_value(&key), None); if pass == 2 { // another call to try insert again - should still return an error - let result = index.try_insert(ix, &key, (&[0], 0)); + let result = index.try_insert(&key, (&[0], 0)); assert!(result.is_err()); assert_eq!(index.read_value(&key), None); } - index.grow(ix, result.unwrap_err()); - let result = index.try_insert(ix, &key, (&[0], 0)); + bucket.grow(result.unwrap_err()); + let result = index.try_insert(&key, (&[0], 0)); assert!(result.is_ok()); } assert_eq!(index.read_value(&key), Some((vec![0], 0))); @@ -289,9 +230,9 @@ mod tests { let key = Pubkey::new_unique(); let config = BucketMapConfig::new(1 << 1); let index = BucketMap::new(config); - index.insert(index.bucket_ix(&key), &key, (&[0], 0)); + index.insert(&key, (&[0], 0)); assert_eq!(index.read_value(&key), Some((vec![0], 0))); - index.insert(index.bucket_ix(&key), &key, (&[1], 0)); + index.insert(&key, (&[1], 0)); assert_eq!(index.read_value(&key), Some((vec![1], 0))); } @@ -465,8 +406,8 @@ mod tests { let mut r = vec![]; for bin in 0..map.num_buckets() { r.append( - &mut map - .items_in_range(bin, &None::<&std::ops::RangeInclusive>), + &mut map.buckets[bin] + .items_in_range(&None::<&std::ops::RangeInclusive>), ); } r @@ -505,7 +446,7 @@ mod tests { let insert = thread_rng().gen_range(0, 2) == 0; maps.iter().for_each(|map| { if insert { - map.insert(map.bucket_ix(&k), &k, (&v.0, v.1)) + map.insert(&k, (&v.0, v.1)) } else { map.update(&k, |current| { assert!(current.is_none()); @@ -524,7 +465,7 @@ mod tests { let insert = thread_rng().gen_range(0, 2) == 0; maps.iter().for_each(|map| { if insert { - map.insert(map.bucket_ix(&k), &k, (&v, rc)) + map.insert(&k, (&v, rc)) } else { map.update(&k, |current| { assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k); diff --git a/bucket_map/src/lib.rs b/bucket_map/src/lib.rs index 4d0da4582b..a61cae4e36 100644 --- a/bucket_map/src/lib.rs +++ b/bucket_map/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] mod bucket; +pub mod bucket_api; mod bucket_item; pub mod bucket_map; mod bucket_stats; diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index d003a034d3..665c6e3665 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -6,6 +6,7 @@ use crate::bucket_map_holder::{Age, BucketMapHolder}; use crate::bucket_map_holder_stats::BucketMapHolderStats; use rand::thread_rng; use rand::Rng; +use solana_bucket_map::bucket_api::BucketApi; use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::collections::{hash_map::Entry, HashMap}; @@ -28,6 +29,8 @@ pub struct InMemAccountsIndex { storage: Arc>, bin: usize, + bucket: Option>>>, + // pubkey ranges that this bin must hold in the cache while the range is present in this vec pub(crate) cache_ranges_held: CacheRangesHeld, // true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held. @@ -51,6 +54,11 @@ impl InMemAccountsIndex { map_internal: RwLock::default(), storage: Arc::clone(storage), bin, + bucket: storage + .disk + .as_ref() + .map(|disk| disk.get_bucket_from_index(bin)) + .map(Arc::clone), cache_ranges_held: CacheRangesHeld::default(), stop_flush: AtomicU64::default(), bin_dirty: AtomicBool::default(), @@ -111,7 +119,7 @@ impl InMemAccountsIndex { } fn load_from_disk(&self, pubkey: &Pubkey) -> Option<(SlotList, RefCount)> { - self.storage.disk.as_ref().and_then(|disk| { + self.bucket.as_ref().and_then(|disk| { let m = Measure::start("load_disk_found_count"); let entry_disk = disk.read_value(pubkey); match &entry_disk { @@ -209,7 +217,7 @@ impl InMemAccountsIndex { } fn delete_disk_key(&self, pubkey: &Pubkey) { - if let Some(disk) = self.storage.disk.as_ref() { + if let Some(disk) = self.bucket.as_ref() { disk.delete_key(pubkey) } } @@ -579,8 +587,8 @@ impl InMemAccountsIndex { let m = Measure::start("range"); // load from disk - if let Some(disk) = self.storage.disk.as_ref() { - let items = disk.items_in_range(self.bin, range); + if let Some(disk) = self.bucket.as_ref() { + let items = disk.items_in_range(range); let mut map = self.map().write().unwrap(); let future_age = self.storage.future_age_to_flush(); for item in items { @@ -668,7 +676,7 @@ impl InMemAccountsIndex { loop { let mut removes = Vec::default(); let mut removes_random = Vec::default(); - let disk = self.storage.disk.as_ref().unwrap(); + let disk = self.bucket.as_ref().unwrap(); let mut updates = Vec::default(); let m = Measure::start("flush_scan"); @@ -706,11 +714,8 @@ impl InMemAccountsIndex { continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed } flush_entries_updated_on_disk += 1; - disk_resize = disk.try_insert( - self.bin, - &k, - (&v.slot_list.read().unwrap(), v.ref_count()), - ); + disk_resize = + disk.try_write(&k, (&v.slot_list.read().unwrap(), v.ref_count())); } if disk_resize.is_err() { // disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize @@ -745,7 +750,7 @@ impl InMemAccountsIndex { Err(err) => { // grow the bucket, outside of all in-mem locks. // then, loop to try again - disk.grow(self.bin, err); + disk.grow(err); } } }