From b2152be3b2870437cec07b07df6a8b81b9849dd0 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Fri, 17 Sep 2021 15:11:27 -0500 Subject: [PATCH] introduce bucket map (#19848) * introduce bucket map * rename BucketMap bits to num_buckets_pow2 * use u64::BITS * Store the number of buckets in BucketMapConfig as a regular number * remove redundant type aliases * use Slot from sdk * use Arc::clone() instead * fixup erase drives * rename num_buckets to max_buckets * add doc to BucketMapConfig::new() * add more documentation * rename to DEFAULT_CAPACITY_POW2 * doc * add more traits while we can * rename capacity to capacity_pow2 * fix a naming for max_buckets_pow2 * remove unused/incorrect DataBucket::bytes * rework benches a bit * fixup bench docs * rename create_bucket_capacity_pow2 to bucket_capacity_when_created_pow2 * rename BucketMapKeyValue to BucketItem * rename to items_in_range * remove values() * remove addref and unref * remove more addref and unref * resurect addref and unref since tests use 'em for now * rename to BucketStorage * move stats in bucket_stats * remove specializations (i don't think they are needed) * move MaxSearch and RefCount into lib.rs * move BucketItem to bucket_item.rs * add doc * keys no longer returns an option * Revert "remove specializations (i don't think they are needed)" This reverts commit b22f78e072cf0f7107851b08e58c2e3fead3f64d. Co-authored-by: Brooks Prumo --- Cargo.lock | 31 +- Cargo.toml | 1 + bucket_map/Cargo.toml | 29 ++ bucket_map/benches/bucket_map.rs | 76 +++++ bucket_map/src/bucket.rs | 397 ++++++++++++++++++++++++ bucket_map/src/bucket_item.rs | 9 + bucket_map/src/bucket_map.rs | 508 +++++++++++++++++++++++++++++++ bucket_map/src/bucket_stats.rs | 18 ++ bucket_map/src/bucket_storage.rs | 334 ++++++++++++++++++++ bucket_map/src/index_entry.rs | 62 ++++ bucket_map/src/lib.rs | 12 + bucket_map/tests/bucket_map.rs | 46 +++ programs/bpf/Cargo.lock | 31 ++ runtime/Cargo.toml | 1 + 14 files changed, 1552 insertions(+), 3 deletions(-) create mode 100644 bucket_map/Cargo.toml create mode 100644 bucket_map/benches/bucket_map.rs create mode 100644 bucket_map/src/bucket.rs create mode 100644 bucket_map/src/bucket_item.rs create mode 100644 bucket_map/src/bucket_map.rs create mode 100644 bucket_map/src/bucket_stats.rs create mode 100644 bucket_map/src/bucket_storage.rs create mode 100644 bucket_map/src/index_entry.rs create mode 100644 bucket_map/src/lib.rs create mode 100644 bucket_map/tests/bucket_map.rs diff --git a/Cargo.lock b/Cargo.lock index 8b71dc3fd..2673d69a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,11 +854,11 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", ] [[package]] @@ -2446,6 +2446,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723e3ebdcdc5c023db1df315364573789f8857c11b631a2fdfad7c00f5c046b4" +dependencies = [ + "libc", +] + [[package]] name = "memmap2" version = "0.4.0" @@ -4308,6 +4317,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-bucket-map" +version = "1.8.0" +dependencies = [ + "fs_extra", + "log 0.4.14", + "memmap2 0.2.3", + "rand 0.7.3", + "rayon", + "solana-logger 1.8.0", + "solana-measure", + "solana-sdk", + "tempfile", +] + [[package]] name = "solana-cargo-build-bpf" version = "1.8.0" @@ -5407,6 +5431,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "serde_derive", + "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", "solana-ed25519-program", diff --git a/Cargo.toml b/Cargo.toml index 212ed3c9c..7f49ba685 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "banks-client", "banks-interface", "banks-server", + "bucket_map", "clap-utils", "cli-config", "cli-output", diff --git a/bucket_map/Cargo.toml b/bucket_map/Cargo.toml new file mode 100644 index 000000000..5711b0798 --- /dev/null +++ b/bucket_map/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "solana-bucket-map" +version = "1.8.0" +description = "solana-bucket-map" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-bucket-map" +readme = "../README.md" +repository = "https://github.com/solana-labs/solana" +authors = ["Solana Maintainers "] +license = "Apache-2.0" +edition = "2018" + +[dependencies] +rayon = "1.5.0" +solana-logger = { path = "../logger", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } +memmap2 = "0.2.3" +log = { version = "0.4.11" } +solana-measure = { path = "../measure", version = "=1.8.0" } +rand = "0.7.0" +fs_extra = "1.2.0" +tempfile = "3.2.0" + +[lib] +crate-type = ["lib"] +name = "solana_bucket_map" + +[[bench]] +name = "bucket_map" diff --git a/bucket_map/benches/bucket_map.rs b/bucket_map/benches/bucket_map.rs new file mode 100644 index 000000000..8176064b1 --- /dev/null +++ b/bucket_map/benches/bucket_map.rs @@ -0,0 +1,76 @@ +#![feature(test)] + +macro_rules! DEFINE_NxM_BENCH { + ($i:ident, $n:literal, $m:literal) => { + mod $i { + use super::*; + + #[bench] + fn bench_insert_baseline_hashmap(bencher: &mut Bencher) { + do_bench_insert_baseline_hashmap(bencher, $n, $m); + } + + #[bench] + fn bench_insert_bucket_map(bencher: &mut Bencher) { + do_bench_insert_bucket_map(bencher, $n, $m); + } + } + }; +} + +extern crate test; +use rayon::prelude::*; +use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig}; +use solana_sdk::pubkey::Pubkey; +use std::collections::hash_map::HashMap; +use std::sync::RwLock; +use test::Bencher; + +type IndexValue = u64; + +DEFINE_NxM_BENCH!(dim_01x02, 1, 2); +DEFINE_NxM_BENCH!(dim_02x04, 2, 4); +DEFINE_NxM_BENCH!(dim_04x08, 4, 8); +DEFINE_NxM_BENCH!(dim_08x16, 8, 16); +DEFINE_NxM_BENCH!(dim_16x32, 16, 32); +DEFINE_NxM_BENCH!(dim_32x64, 32, 64); + +/// Benchmark insert with Hashmap as baseline for N threads inserting M keys each +fn do_bench_insert_baseline_hashmap(bencher: &mut Bencher, n: usize, m: usize) { + let index = RwLock::new(HashMap::new()); + (0..n).into_iter().into_par_iter().for_each(|i| { + let key = Pubkey::new_unique(); + index + .write() + .unwrap() + .insert(key, vec![(i, IndexValue::default())]); + }); + bencher.iter(|| { + (0..n).into_iter().into_par_iter().for_each(|_| { + for j in 0..m { + let key = Pubkey::new_unique(); + index + .write() + .unwrap() + .insert(key, vec![(j, IndexValue::default())]); + } + }) + }); +} + +/// Benchmark insert with BucketMap with N buckets for N threads inserting M keys each +fn do_bench_insert_bucket_map(bencher: &mut Bencher, n: usize, m: usize) { + let index = BucketMap::new(BucketMapConfig::new(n)); + (0..n).into_iter().into_par_iter().for_each(|i| { + let key = Pubkey::new_unique(); + index.update(&key, |_| Some((vec![(i, IndexValue::default())], 0))); + }); + bencher.iter(|| { + (0..n).into_iter().into_par_iter().for_each(|_| { + for j in 0..m { + let key = Pubkey::new_unique(); + index.update(&key, |_| Some((vec![(j, IndexValue::default())], 0))); + } + }) + }); +} diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs new file mode 100644 index 000000000..94f4a7e25 --- /dev/null +++ b/bucket_map/src/bucket.rs @@ -0,0 +1,397 @@ +use crate::bucket_item::BucketItem; +use crate::bucket_map::BucketMapError; +use crate::bucket_stats::BucketMapStats; +use crate::bucket_storage::BucketStorage; +use crate::index_entry::IndexEntry; +use crate::{MaxSearch, RefCount}; +use rand::thread_rng; +use rand::Rng; +use solana_measure::measure::Measure; +use solana_sdk::pubkey::Pubkey; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; +use std::ops::RangeBounds; +use std::path::PathBuf; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data +pub struct Bucket { + drives: Arc>, + //index + index: BucketStorage, + //random offset for the index + random: u64, + //storage buckets to store SlotSlice up to a power of 2 in len + pub data: Vec, + _phantom: PhantomData, + stats: Arc, +} + +impl Bucket { + pub fn new( + drives: Arc>, + max_search: MaxSearch, + stats: Arc, + ) -> Self { + let index = BucketStorage::new( + Arc::clone(&drives), + 1, + std::mem::size_of::() as u64, + max_search, + Arc::clone(&stats.index), + ); + Self { + random: thread_rng().gen(), + drives, + index, + data: vec![], + _phantom: PhantomData::default(), + stats, + } + } + + pub fn bucket_len(&self) -> u64 { + self.index.used.load(Ordering::Relaxed) + } + + pub fn keys(&self) -> Vec { + let mut rv = vec![]; + for i in 0..self.index.num_cells() { + if self.index.uid(i) == 0 { + continue; + } + let ix: &IndexEntry = self.index.get(i); + rv.push(ix.key); + } + rv + } + + pub fn items_in_range(&self, range: Option<&R>) -> Vec> + where + R: RangeBounds, + { + let mut result = Vec::with_capacity(self.index.used.load(Ordering::Relaxed) as usize); + for i in 0..self.index.num_cells() { + let ii = i % self.index.num_cells(); + if self.index.uid(ii) == 0 { + continue; + } + let ix: &IndexEntry = self.index.get(ii); + let key = ix.key; + if range.map(|r| r.contains(&key)).unwrap_or(true) { + let val = ix.read_value(self); + result.push(BucketItem { + pubkey: key, + ref_count: ix.ref_count(), + slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(), + }); + } + } + result + } + + pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> { + Self::bucket_find_entry(&self.index, key, self.random) + } + + fn find_entry_mut(&self, key: &Pubkey) -> Option<(&mut IndexEntry, u64)> { + Self::bucket_find_entry_mut(&self.index, key, self.random) + } + + fn bucket_find_entry_mut<'a>( + index: &'a BucketStorage, + key: &Pubkey, + random: u64, + ) -> Option<(&'a mut IndexEntry, u64)> { + let ix = Self::bucket_index_ix(index, key, random); + for i in ix..ix + index.max_search() { + let ii = i % index.num_cells(); + if index.uid(ii) == 0 { + continue; + } + let elem: &mut IndexEntry = index.get_mut(ii); + if elem.key == *key { + return Some((elem, ii)); + } + } + None + } + + fn bucket_find_entry<'a>( + index: &'a BucketStorage, + key: &Pubkey, + random: u64, + ) -> Option<(&'a IndexEntry, u64)> { + let ix = Self::bucket_index_ix(index, key, random); + for i in ix..ix + index.max_search() { + let ii = i % index.num_cells(); + if index.uid(ii) == 0 { + continue; + } + let elem: &IndexEntry = index.get(ii); + if elem.key == *key { + return Some((elem, ii)); + } + } + None + } + + fn bucket_create_key( + index: &BucketStorage, + key: &Pubkey, + elem_uid: u64, + random: u64, + ref_count: u64, + ) -> Result { + let ix = Self::bucket_index_ix(index, key, random); + for i in ix..ix + index.max_search() { + let ii = i as u64 % index.num_cells(); + if index.uid(ii) != 0 { + continue; + } + index.allocate(ii, elem_uid).unwrap(); + let mut elem: &mut IndexEntry = index.get_mut(ii); + elem.key = *key; + elem.ref_count = ref_count; + elem.storage_offset = 0; + elem.storage_capacity_when_created_pow2 = 0; + elem.num_slots = 0; + //debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid ); + return Ok(ii); + } + Err(BucketMapError::IndexNoSpace(index.capacity_pow2)) + } + + pub fn addref(&mut self, key: &Pubkey) -> Option { + let (elem, _) = self.find_entry_mut(key)?; + elem.ref_count += 1; + Some(elem.ref_count) + } + + pub fn unref(&mut self, key: &Pubkey) -> Option { + let (elem, _) = self.find_entry_mut(key)?; + elem.ref_count -= 1; + Some(elem.ref_count) + } + + fn create_key(&self, key: &Pubkey, ref_count: u64) -> Result { + Self::bucket_create_key( + &self.index, + key, + IndexEntry::key_uid(key), + self.random, + ref_count, + ) + } + + pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { + //debug!("READ_VALUE: {:?}", key); + let (elem, _) = self.find_entry(key)?; + elem.read_value(self) + } + + pub fn try_write( + &mut self, + key: &Pubkey, + data: &[T], + ref_count: u64, + ) -> Result<(), BucketMapError> { + let index_entry = self.find_entry_mut(key); + let (elem, elem_ix) = match index_entry { + None => { + let ii = self.create_key(key, ref_count)?; + let elem = self.index.get_mut(ii); // get_mut here is right? + (elem, ii) + } + Some(res) => { + if ref_count != res.0.ref_count { + res.0.ref_count = ref_count; + } + res + } + }; + let elem_uid = self.index.uid(elem_ix); + let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64); + if self.data.get(best_fit_bucket as usize).is_none() { + //error!("resizing because missing bucket"); + return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0))); + } + let bucket_ix = elem.data_bucket_ix(); + let current_bucket = &self.data[bucket_ix as usize]; + if best_fit_bucket == bucket_ix && elem.num_slots > 0 { + //in place update + let elem_loc = elem.data_loc(current_bucket); + let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data.len() as u64); + //let elem: &mut IndexEntry = self.index.get_mut(elem_ix); + assert!(current_bucket.uid(elem_loc) == elem_uid); + elem.num_slots = data.len() as u64; + slice.clone_from_slice(data); + Ok(()) + } else { + //need to move the allocation to a best fit spot + let best_bucket = &self.data[best_fit_bucket as usize]; + let cap_power = best_bucket.capacity_pow2; + let cap = best_bucket.num_cells(); + let pos = thread_rng().gen_range(0, cap); + for i in pos..pos + self.index.max_search() { + let ix = i % cap; + if best_bucket.uid(ix) == 0 { + let elem_loc = elem.data_loc(current_bucket); + if elem.num_slots > 0 { + current_bucket.free(elem_loc, elem_uid).unwrap(); + } + // elem: &mut IndexEntry = self.index.get_mut(elem_ix); + elem.storage_offset = ix; + elem.storage_capacity_when_created_pow2 = best_bucket.capacity_pow2; + elem.num_slots = data.len() as u64; + //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); + if elem.num_slots > 0 { + best_bucket.allocate(ix, elem_uid).unwrap(); + let slice = best_bucket.get_mut_cell_slice(ix, data.len() as u64); + slice.copy_from_slice(data); + } + return Ok(()); + } + } + Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))) + } + } + + pub fn delete_key(&mut self, key: &Pubkey) { + if let Some((elem, elem_ix)) = self.find_entry(key) { + let elem_uid = self.index.uid(elem_ix); + if elem.num_slots > 0 { + let data_bucket = &self.data[elem.data_bucket_ix() as usize]; + let loc = elem.data_loc(data_bucket); + //debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid ); + data_bucket.free(loc, elem_uid).unwrap(); + } + //debug!("INDEX FREE {:?} {}", key, elem_uid); + self.index.free(elem_ix, elem_uid).unwrap(); + } + } + + pub fn grow_index(&mut self, sz: u8) { + if self.index.capacity_pow2 == sz { + let mut m = Measure::start(""); + //debug!("GROW_INDEX: {}", sz); + let increment = 1; + for i in increment.. { + //increasing the capacity by ^4 reduces the + //likelyhood of a re-index collision of 2^(max_search)^2 + //1 in 2^32 + let index = BucketStorage::new_with_capacity( + Arc::clone(&self.drives), + 1, + std::mem::size_of::() as u64, + self.index.capacity_pow2 + i, // * 2, + self.index.max_search, + Arc::clone(&self.stats.index), + ); + let random = thread_rng().gen(); + let mut valid = true; + for ix in 0..self.index.num_cells() { + let uid = self.index.uid(ix); + if 0 != uid { + let elem: &IndexEntry = self.index.get(ix); + let ref_count = 0; // ??? TODO + let new_ix = + Self::bucket_create_key(&index, &elem.key, uid, random, ref_count); + if new_ix.is_err() { + valid = false; + break; + } + let new_ix = new_ix.unwrap(); + let new_elem: &mut IndexEntry = index.get_mut(new_ix); + *new_elem = *elem; + /* + let dbg_elem: IndexEntry = *new_elem; + assert_eq!( + Self::bucket_find_entry(&index, &elem.key, random).unwrap(), + (&dbg_elem, new_ix) + ); + */ + } + } + if valid { + self.index = index; + self.random = random; + break; + } + } + m.stop(); + let sz = 1 << self.index.capacity_pow2; + { + let mut max = self.stats.index.max_size.lock().unwrap(); + *max = std::cmp::max(*max, sz); + } + self.stats.index.resizes.fetch_add(1, Ordering::Relaxed); + self.stats + .index + .resize_us + .fetch_add(m.as_us(), Ordering::Relaxed); + } + } + + pub fn grow_data(&mut self, sz: (u64, u8)) { + if self.data.get(sz.0 as usize).is_none() { + for i in self.data.len() as u64..(sz.0 + 1) { + self.data.push(BucketStorage::new( + Arc::clone(&self.drives), + 1 << i, + std::mem::size_of::() as u64, + self.index.max_search, + Arc::clone(&self.stats.data), + )) + } + } + if self.data[sz.0 as usize].capacity_pow2 == sz.1 { + //debug!("GROW_DATA: {} {}", sz.0, sz.1); + self.data[sz.0 as usize].grow(); + } + } + + fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 { + let uid = IndexEntry::key_uid(key); + let mut s = DefaultHasher::new(); + uid.hash(&mut s); + //the locally generated random will make it hard for an attacker + //to deterministically cause all the pubkeys to land in the same + //location in any bucket on all validators + random.hash(&mut s); + let ix = s.finish(); + ix % index.num_cells() + //debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.num_cells() ); + } + + pub fn update(&mut self, key: &Pubkey, updatefn: F) + where + F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + { + let current = self.read_value(key); + let new = updatefn(current); + if new.is_none() { + self.delete_key(key); + return; + } + let (new, refct) = new.unwrap(); + loop { + let rv = self.try_write(key, &new, refct); + match rv { + Err(BucketMapError::DataNoSpace(sz)) => { + //debug!("GROWING SPACE {:?}", sz); + self.grow_data(sz); + continue; + } + Err(BucketMapError::IndexNoSpace(sz)) => { + //debug!("GROWING INDEX {}", sz); + self.grow_index(sz); + continue; + } + Ok(()) => return, + } + } + } +} diff --git a/bucket_map/src/bucket_item.rs b/bucket_map/src/bucket_item.rs new file mode 100644 index 000000000..532ff92bd --- /dev/null +++ b/bucket_map/src/bucket_item.rs @@ -0,0 +1,9 @@ +use crate::RefCount; +use solana_sdk::pubkey::Pubkey; + +#[derive(Debug, Default, Clone)] +pub struct BucketItem { + pub pubkey: Pubkey, + pub ref_count: RefCount, + pub slot_list: Vec, +} diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs new file mode 100644 index 000000000..90edd09ff --- /dev/null +++ b/bucket_map/src/bucket_map.rs @@ -0,0 +1,508 @@ +//! BucketMap is a mostly contention free concurrent map backed by MmapMut + +use crate::bucket::Bucket; +use crate::bucket_item::BucketItem; +use crate::bucket_stats::BucketMapStats; +use crate::{MaxSearch, RefCount}; +use solana_sdk::pubkey::Pubkey; +use std::convert::TryInto; +use std::fmt::Debug; +use std::fs; +use std::ops::RangeBounds; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::RwLock; +use tempfile::TempDir; + +#[derive(Debug, Default, Clone)] +pub struct BucketMapConfig { + pub max_buckets: usize, + pub drives: Option>, + pub max_search: Option, +} + +impl BucketMapConfig { + /// Create a new BucketMapConfig + /// NOTE: BucketMap requires that max_buckets is a power of two + pub fn new(max_buckets: usize) -> BucketMapConfig { + BucketMapConfig { + max_buckets, + ..BucketMapConfig::default() + } + } +} + +pub struct BucketMap { + buckets: Vec>>>, + drives: Arc>, + max_buckets_pow2: u8, + max_search: MaxSearch, + pub stats: Arc, + pub temp_dir: Option, +} + +impl Drop for BucketMap { + fn drop(&mut self) { + if self.temp_dir.is_none() { + BucketMap::::erase_previous_drives(&self.drives); + } + } +} + +impl std::fmt::Debug for BucketMap { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +#[derive(Debug)] +pub enum BucketMapError { + DataNoSpace((u64, u8)), + IndexNoSpace(u8), +} + +impl BucketMap { + pub fn new(config: BucketMapConfig) -> Self { + assert_ne!( + config.max_buckets, 0, + "Max number of buckets must be non-zero" + ); + assert!( + config.max_buckets.is_power_of_two(), + "Max number of buckets must be a power of two" + ); + let mut buckets = Vec::with_capacity(config.max_buckets); + buckets.resize_with(config.max_buckets, || RwLock::new(None)); + let stats = Arc::new(BucketMapStats::default()); + // this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway + const MAX_SEARCH: MaxSearch = 32; + let max_search = config.max_search.unwrap_or(MAX_SEARCH); + + if let Some(drives) = config.drives.as_ref() { + Self::erase_previous_drives(drives); + } + let mut temp_dir = None; + let drives = config.drives.unwrap_or_else(|| { + temp_dir = Some(TempDir::new().unwrap()); + vec![temp_dir.as_ref().unwrap().path().to_path_buf()] + }); + let drives = Arc::new(drives); + + // A simple log2 function that is correct if x is a power of two + let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1; + + Self { + buckets, + drives, + max_buckets_pow2: log2(config.max_buckets) as u8, + stats, + max_search, + temp_dir, + } + } + + fn erase_previous_drives(drives: &[PathBuf]) { + drives.iter().for_each(|folder| { + let _ = fs::remove_dir_all(&folder); + let _ = fs::create_dir_all(&folder); + }) + } + + pub fn num_buckets(&self) -> usize { + self.buckets.len() + } + + pub fn bucket_len(&self, ix: usize) -> u64 { + self.buckets[ix] + .read() + .unwrap() + .as_ref() + .map(|bucket| bucket.bucket_len()) + .unwrap_or_default() + } + + /// Get the items for bucket `ix` in `range` + pub fn items_in_range(&self, ix: usize, range: Option<&R>) -> Option>> + where + R: RangeBounds, + { + Some( + self.buckets[ix] + .read() + .unwrap() + .as_ref()? + .items_in_range(range), + ) + } + + /// Get the Pubkeys for bucket `ix` + pub fn keys(&self, ix: usize) -> Vec { + self.buckets[ix] + .read() + .unwrap() + .as_ref() + .map_or_else(Vec::default, |bucket| bucket.keys()) + } + + /// Get the values for Pubkey `key` + pub fn read_value(&self, key: &Pubkey) -> Option<(Vec, RefCount)> { + let ix = self.bucket_ix(key); + self.buckets[ix] + .read() + .unwrap() + .as_ref() + .and_then(|bucket| { + bucket + .read_value(key) + .map(|(value, ref_count)| (value.to_vec(), ref_count)) + }) + } + + /// Delete the Pubkey `key` + pub fn delete_key(&self, key: &Pubkey) { + let ix = self.bucket_ix(key); + if let Some(bucket) = self.buckets[ix].write().unwrap().as_mut() { + bucket.delete_key(key); + } + } + + /// Update Pubkey `key`'s value with function `updatefn` + pub fn update(&self, key: &Pubkey, updatefn: F) + where + F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + { + let ix = self.bucket_ix(key); + let mut bucket = self.buckets[ix].write().unwrap(); + if bucket.is_none() { + *bucket = Some(Bucket::new( + Arc::clone(&self.drives), + self.max_search, + Arc::clone(&self.stats), + )); + } + let bucket = bucket.as_mut().unwrap(); + bucket.update(key, updatefn) + } + + /// Get the bucket index for Pubkey `key` + pub fn bucket_ix(&self, key: &Pubkey) -> usize { + if self.max_buckets_pow2 > 0 { + let location = read_be_u64(key.as_ref()); + (location >> (u64::BITS - self.max_buckets_pow2 as u32)) as usize + } else { + 0 + } + } + + /// Increment the refcount for Pubkey `key` + pub fn addref(&self, key: &Pubkey) -> Option { + let ix = self.bucket_ix(key); + let mut bucket = self.buckets[ix].write().unwrap(); + bucket.as_mut()?.addref(key) + } + + /// Decrement the refcount for Pubkey `key` + pub fn unref(&self, key: &Pubkey) -> Option { + let ix = self.bucket_ix(key); + let mut bucket = self.buckets[ix].write().unwrap(); + bucket.as_mut()?.unref(key) + } +} + +/// Look at the first 8 bytes of the input and reinterpret them as a u64 +fn read_be_u64(input: &[u8]) -> u64 { + assert!(input.len() >= std::mem::size_of::()); + u64::from_be_bytes(input[0..std::mem::size_of::()].try_into().unwrap()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::thread_rng; + use rand::Rng; + use std::collections::HashMap; + + #[test] + fn bucket_map_test_insert() { + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + index.update(&key, |_| Some((vec![0], 0))); + assert_eq!(index.read_value(&key), Some((vec![0], 0))); + } + + #[test] + fn bucket_map_test_update() { + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + index.update(&key, |_| Some((vec![0], 0))); + assert_eq!(index.read_value(&key), Some((vec![0], 0))); + index.update(&key, |_| Some((vec![1], 0))); + assert_eq!(index.read_value(&key), Some((vec![1], 0))); + } + + #[test] + fn bucket_map_test_update_to_0_len() { + solana_logger::setup(); + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + index.update(&key, |_| Some((vec![0], 1))); + assert_eq!(index.read_value(&key), Some((vec![0], 1))); + // sets len to 0, updates in place + index.update(&key, |_| Some((vec![], 1))); + assert_eq!(index.read_value(&key), Some((vec![], 1))); + // sets len to 0, doesn't update in place - finds a new place, which causes us to no longer have an allocation in data + index.update(&key, |_| Some((vec![], 2))); + assert_eq!(index.read_value(&key), Some((vec![], 2))); + // sets len to 1, doesn't update in place - finds a new place + index.update(&key, |_| Some((vec![1], 2))); + assert_eq!(index.read_value(&key), Some((vec![1], 2))); + } + + #[test] + fn bucket_map_test_delete() { + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + for i in 0..10 { + let key = Pubkey::new_unique(); + assert_eq!(index.read_value(&key), None); + + index.update(&key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(&key), Some((vec![i], 0))); + + index.delete_key(&key); + assert_eq!(index.read_value(&key), None); + + index.update(&key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(&key), Some((vec![i], 0))); + index.delete_key(&key); + } + } + + #[test] + fn bucket_map_test_delete_2() { + let config = BucketMapConfig::new(1 << 2); + let index = BucketMap::new(config); + for i in 0..100 { + let key = Pubkey::new_unique(); + assert_eq!(index.read_value(&key), None); + + index.update(&key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(&key), Some((vec![i], 0))); + + index.delete_key(&key); + assert_eq!(index.read_value(&key), None); + + index.update(&key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(&key), Some((vec![i], 0))); + index.delete_key(&key); + } + } + + #[test] + fn bucket_map_test_n_drives() { + let config = BucketMapConfig::new(1 << 2); + let index = BucketMap::new(config); + for i in 0..100 { + let key = Pubkey::new_unique(); + index.update(&key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(&key), Some((vec![i], 0))); + } + } + #[test] + fn bucket_map_test_grow_read() { + let config = BucketMapConfig::new(1 << 2); + let index = BucketMap::new(config); + let keys: Vec = (0..100).into_iter().map(|_| Pubkey::new_unique()).collect(); + for k in 0..keys.len() { + let key = &keys[k]; + let i = read_be_u64(key.as_ref()); + index.update(key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(key), Some((vec![i], 0))); + for (ix, key) in keys.iter().enumerate() { + let i = read_be_u64(key.as_ref()); + //debug!("READ: {:?} {}", key, i); + let expected = if ix <= k { Some((vec![i], 0)) } else { None }; + assert_eq!(index.read_value(key), expected); + } + } + } + + #[test] + fn bucket_map_test_n_delete() { + let config = BucketMapConfig::new(1 << 2); + let index = BucketMap::new(config); + let keys: Vec = (0..20).into_iter().map(|_| Pubkey::new_unique()).collect(); + for key in keys.iter() { + let i = read_be_u64(key.as_ref()); + index.update(key, |_| Some((vec![i], 0))); + assert_eq!(index.read_value(key), Some((vec![i], 0))); + } + for key in keys.iter() { + let i = read_be_u64(key.as_ref()); + //debug!("READ: {:?} {}", key, i); + assert_eq!(index.read_value(key), Some((vec![i], 0))); + } + for k in 0..keys.len() { + let key = &keys[k]; + index.delete_key(key); + assert_eq!(index.read_value(key), None); + for key in keys.iter().skip(k + 1) { + let i = read_be_u64(key.as_ref()); + assert_eq!(index.read_value(key), Some((vec![i], 0))); + } + } + } + + #[test] + fn hashmap_compare() { + use std::sync::Mutex; + solana_logger::setup(); + let maps = (0..2) + .into_iter() + .map(|max_buckets_pow2| { + let config = BucketMapConfig::new(1 << max_buckets_pow2); + BucketMap::new(config) + }) + .collect::>(); + let hash_map = RwLock::new(HashMap::, RefCount)>::new()); + let max_slot_list_len = 3; + let all_keys = Mutex::new(vec![]); + + let gen_rand_value = || { + let count = thread_rng().gen_range(0, max_slot_list_len); + let v = (0..count) + .into_iter() + .map(|x| (x as usize, x as usize /*thread_rng().gen::()*/)) + .collect::>(); + let rc = thread_rng().gen::(); + (v, rc) + }; + + let get_key = || { + let mut keys = all_keys.lock().unwrap(); + if keys.is_empty() { + return None; + } + let len = keys.len(); + Some(keys.remove(thread_rng().gen_range(0, len))) + }; + let return_key = |key| { + let mut keys = all_keys.lock().unwrap(); + keys.push(key); + }; + + let verify = || { + let mut maps = maps + .iter() + .map(|map| { + let mut r = vec![]; + for bin in 0..map.num_buckets() { + r.append( + &mut map + .items_in_range(bin, None::<&std::ops::RangeInclusive>) + .unwrap_or_default(), + ); + } + r + }) + .collect::>(); + let hm = hash_map.read().unwrap(); + for (k, v) in hm.iter() { + for map in maps.iter_mut() { + for i in 0..map.len() { + if k == &map[i].pubkey { + assert_eq!(map[i].slot_list, v.0); + assert_eq!(map[i].ref_count, v.1); + map.remove(i); + break; + } + } + } + } + for map in maps.iter() { + assert!(map.is_empty()); + } + }; + let mut initial = 100; // put this many items in to start + + // do random operations: insert, update, delete, add/unref in random order + // verify consistency between hashmap and all bucket maps + for i in 0..10000 { + if initial > 0 { + initial -= 1; + } + if initial > 0 || thread_rng().gen_range(0, 5) == 0 { + // insert + let k = solana_sdk::pubkey::new_rand(); + let v = gen_rand_value(); + hash_map.write().unwrap().insert(k, v.clone()); + maps.iter().for_each(|map| { + map.update(&k, |current| { + assert!(current.is_none()); + Some(v.clone()) + }) + }); + return_key(k); + } + if thread_rng().gen_range(0, 10) == 0 { + // update + if let Some(k) = get_key() { + let hm = hash_map.read().unwrap(); + let (v, rc) = gen_rand_value(); + let v_old = hm.get(&k); + maps.iter().for_each(|map| { + map.update(&k, |current| { + assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k); + Some((v.clone(), rc)) + }) + }); + drop(hm); + hash_map.write().unwrap().insert(k, (v, rc)); + return_key(k); + } + } + if thread_rng().gen_range(0, 20) == 0 { + // delete + if let Some(k) = get_key() { + let mut hm = hash_map.write().unwrap(); + hm.remove(&k); + maps.iter().for_each(|map| { + map.delete_key(&k); + }); + } + } + if thread_rng().gen_range(0, 10) == 0 { + // add/unref + if let Some(k) = get_key() { + let mut inc = thread_rng().gen_range(0, 2) == 0; + let mut hm = hash_map.write().unwrap(); + let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap(); + if !inc && rc == 0 { + // can't decrement rc=0 + inc = true; + } + rc = if inc { rc + 1 } else { rc - 1 }; + hm.insert(k, (v.to_vec(), rc)); + maps.iter().for_each(|map| { + if thread_rng().gen_range(0, 2) == 0 { + map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc))) + } else if inc { + map.addref(&k); + } else { + map.unref(&k); + } + }); + + return_key(k); + } + } + if i % 1000 == 0 { + verify(); + } + } + verify(); + } +} diff --git a/bucket_map/src/bucket_stats.rs b/bucket_map/src/bucket_stats.rs new file mode 100644 index 000000000..9de4fbdb3 --- /dev/null +++ b/bucket_map/src/bucket_stats.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; +use std::sync::{atomic::AtomicU64, Mutex}; + +#[derive(Debug, Default)] +pub struct BucketStats { + pub resizes: AtomicU64, + pub max_size: Mutex, + pub resize_us: AtomicU64, + pub new_file_us: AtomicU64, + pub flush_file_us: AtomicU64, + pub mmap_us: AtomicU64, +} + +#[derive(Debug, Default, Clone)] +pub struct BucketMapStats { + pub index: Arc, + pub data: Arc, +} diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs new file mode 100644 index 000000000..3ad58c6ff --- /dev/null +++ b/bucket_map/src/bucket_storage.rs @@ -0,0 +1,334 @@ +use crate::bucket_stats::BucketStats; +use crate::MaxSearch; +use memmap2::MmapMut; +use rand::{thread_rng, Rng}; +use solana_measure::measure::Measure; +use std::fs::{remove_file, OpenOptions}; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/* +1 2 +2 4 +3 8 +4 16 +5 32 +6 64 +7 128 +8 256 +9 512 +10 1,024 +11 2,048 +12 4,096 +13 8,192 +14 16,384 +23 8,388,608 +24 16,777,216 +*/ +const DEFAULT_CAPACITY_POW2: u8 = 5; + +#[repr(C)] +struct Header { + lock: AtomicU64, +} + +impl Header { + fn try_lock(&self, uid: u64) -> bool { + Ok(0) + == self + .lock + .compare_exchange(0, uid, Ordering::Relaxed, Ordering::Relaxed) + } + fn unlock(&self, uid: u64) -> bool { + Ok(uid) + == self + .lock + .compare_exchange(uid, 0, Ordering::Relaxed, Ordering::Relaxed) + } + fn uid(&self) -> u64 { + self.lock.load(Ordering::Relaxed) + } +} + +pub struct BucketStorage { + drives: Arc>, + path: PathBuf, + mmap: MmapMut, + pub cell_size: u64, + pub capacity_pow2: u8, + pub used: AtomicU64, + pub stats: Arc, + pub max_search: MaxSearch, +} + +#[derive(Debug)] +pub enum BucketStorageError { + AlreadyAllocated, + InvalidFree, +} + +impl Drop for BucketStorage { + fn drop(&mut self) { + let _ = remove_file(&self.path); + } +} + +impl BucketStorage { + pub fn new_with_capacity( + drives: Arc>, + num_elems: u64, + elem_size: u64, + capacity_pow2: u8, + max_search: MaxSearch, + mut stats: Arc, + ) -> Self { + let cell_size = elem_size * num_elems + std::mem::size_of::
() as u64; + let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &mut stats); + Self { + path, + mmap, + drives, + cell_size, + used: AtomicU64::new(0), + capacity_pow2, + stats, + max_search, + } + } + + pub fn max_search(&self) -> u64 { + self.max_search as u64 + } + + pub fn new( + drives: Arc>, + num_elems: u64, + elem_size: u64, + max_search: MaxSearch, + stats: Arc, + ) -> Self { + Self::new_with_capacity( + drives, + num_elems, + elem_size, + DEFAULT_CAPACITY_POW2, + max_search, + stats, + ) + } + + pub fn uid(&self, ix: u64) -> u64 { + if ix >= self.num_cells() { + panic!("bad index size"); + } + let ix = (ix * self.cell_size) as usize; + let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; + unsafe { + let hdr = hdr_slice.as_ptr() as *const Header; + return hdr.as_ref().unwrap().uid(); + } + } + + pub fn allocate(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> { + if ix >= self.num_cells() { + panic!("allocate: bad index size"); + } + if 0 == uid { + panic!("allocate: bad uid"); + } + let mut e = Err(BucketStorageError::AlreadyAllocated); + let ix = (ix * self.cell_size) as usize; + //debug!("ALLOC {} {}", ix, uid); + let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; + unsafe { + let hdr = hdr_slice.as_ptr() as *const Header; + if hdr.as_ref().unwrap().try_lock(uid) { + e = Ok(()); + self.used.fetch_add(1, Ordering::Relaxed); + } + }; + e + } + + pub fn free(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> { + if ix >= self.num_cells() { + panic!("free: bad index size"); + } + if 0 == uid { + panic!("free: bad uid"); + } + let ix = (ix * self.cell_size) as usize; + //debug!("FREE {} {}", ix, uid); + let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; + let mut e = Err(BucketStorageError::InvalidFree); + unsafe { + let hdr = hdr_slice.as_ptr() as *const Header; + //debug!("FREE uid: {}", hdr.as_ref().unwrap().uid()); + if hdr.as_ref().unwrap().unlock(uid) { + self.used.fetch_sub(1, Ordering::Relaxed); + e = Ok(()); + } + }; + e + } + + pub fn get(&self, ix: u64) -> &T { + if ix >= self.num_cells() { + panic!("bad index size"); + } + let start = (ix * self.cell_size) as usize + std::mem::size_of::
(); + let end = start + std::mem::size_of::(); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *const T; + &*item + } + } + + pub fn get_empty_cell_slice(&self) -> &[T] { + let len = 0; + let item_slice: &[u8] = &self.mmap[0..0]; + unsafe { + let item = item_slice.as_ptr() as *const T; + std::slice::from_raw_parts(item, len as usize) + } + } + + pub fn get_cell_slice(&self, ix: u64, len: u64) -> &[T] { + if ix >= self.num_cells() { + panic!("bad index size"); + } + let ix = self.cell_size * ix; + let start = ix as usize + std::mem::size_of::
(); + let end = start + std::mem::size_of::() * len as usize; + //debug!("GET slice {} {}", start, end); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *const T; + std::slice::from_raw_parts(item, len as usize) + } + } + + pub fn get_mut(&self, ix: u64) -> &mut T { + if ix >= self.num_cells() { + panic!("bad index size"); + } + let start = (ix * self.cell_size) as usize + std::mem::size_of::
(); + let end = start + std::mem::size_of::(); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *mut T; + &mut *item + } + } + + pub fn get_mut_cell_slice(&self, ix: u64, len: u64) -> &mut [T] { + if ix >= self.num_cells() { + panic!("bad index size"); + } + let ix = self.cell_size * ix; + let start = ix as usize + std::mem::size_of::
(); + let end = start + std::mem::size_of::() * len as usize; + //debug!("GET mut slice {} {}", start, end); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *mut T; + std::slice::from_raw_parts_mut(item, len as usize) + } + } + + fn new_map( + drives: &[PathBuf], + cell_size: usize, + capacity_pow2: u8, + stats: &mut Arc, + ) -> (MmapMut, PathBuf) { + let mut m0 = Measure::start(""); + let capacity = 1u64 << capacity_pow2; + let r = thread_rng().gen_range(0, drives.len()); + let drive = &drives[r]; + let pos = format!("{}", thread_rng().gen_range(0, u128::MAX),); + let file = drive.join(pos); + let mut data = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file.clone()) + .map_err(|e| { + panic!( + "Unable to create data file {} in current dir({:?}): {:?}", + file.display(), + std::env::current_dir(), + e + ); + }) + .unwrap(); + + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + //debug!("GROWING file {}", capacity * cell_size as u64); + data.seek(SeekFrom::Start(capacity * cell_size as u64 - 1)) + .unwrap(); + data.write_all(&[0]).unwrap(); + data.seek(SeekFrom::Start(0)).unwrap(); + m0.stop(); + let mut m1 = Measure::start(""); + data.flush().unwrap(); // can we skip this? + m1.stop(); + let mut m2 = Measure::start(""); + let res = (unsafe { MmapMut::map_mut(&data).unwrap() }, file); + m2.stop(); + stats.new_file_us.fetch_add(m0.as_us(), Ordering::Relaxed); + stats.flush_file_us.fetch_add(m0.as_us(), Ordering::Relaxed); + stats.mmap_us.fetch_add(m0.as_us(), Ordering::Relaxed); + res + } + + pub fn grow(&mut self) { + let mut m = Measure::start("grow"); + let old_cap = self.num_cells(); + let old_map = &self.mmap; + let old_file = self.path.clone(); + + let increment = 1; + let index_grow = 1 << increment; + let (new_map, new_file) = Self::new_map( + &self.drives, + self.cell_size as usize, + self.capacity_pow2 + increment, + &mut self.stats, + ); + (0..old_cap as usize).into_iter().for_each(|i| { + let old_ix = i * self.cell_size as usize; + let new_ix = old_ix * index_grow; + let dst_slice: &[u8] = &new_map[new_ix..new_ix + self.cell_size as usize]; + let src_slice: &[u8] = &old_map[old_ix..old_ix + self.cell_size as usize]; + + unsafe { + let dst = dst_slice.as_ptr() as *mut u8; + let src = src_slice.as_ptr() as *const u8; + std::ptr::copy_nonoverlapping(src, dst, self.cell_size as usize); + }; + }); + self.mmap = new_map; + self.path = new_file; + self.capacity_pow2 += increment; + remove_file(old_file).unwrap(); + m.stop(); + let sz = 1 << self.capacity_pow2; + { + let mut max = self.stats.max_size.lock().unwrap(); + *max = std::cmp::max(*max, sz); + } + self.stats.resizes.fetch_add(1, Ordering::Relaxed); + self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed); + } + pub fn num_cells(&self) -> u64 { + 1 << self.capacity_pow2 + } +} diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs new file mode 100644 index 000000000..fd9ac23f0 --- /dev/null +++ b/bucket_map/src/index_entry.rs @@ -0,0 +1,62 @@ +use crate::bucket::Bucket; +use crate::bucket_storage::BucketStorage; +use crate::RefCount; +use solana_sdk::clock::Slot; +use solana_sdk::pubkey::Pubkey; +use std::collections::hash_map::DefaultHasher; +use std::fmt::Debug; +use std::hash::{Hash, Hasher}; + +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq)] +// one instance of this per item in the index +// stored in the index bucket +pub struct IndexEntry { + pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? + pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts? + pub storage_offset: u64, // smaller? since these are variably sized, this could get tricky. well, actually accountinfo is not variable sized... + // if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2 + pub storage_capacity_when_created_pow2: u8, // see data_location + pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list +} + +impl IndexEntry { + pub fn data_bucket_from_num_slots(num_slots: Slot) -> u64 { + (num_slots as f64).log2().ceil() as u64 // use int log here? + } + + pub fn data_bucket_ix(&self) -> u64 { + Self::data_bucket_from_num_slots(self.num_slots) + } + + pub fn ref_count(&self) -> RefCount { + self.ref_count + } + + // This function maps the original data location into an index in the current bucket storage. + // This is coupled with how we resize bucket storages. + pub fn data_loc(&self, storage: &BucketStorage) -> u64 { + self.storage_offset << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2) + } + + pub fn read_value<'a, T>(&self, bucket: &'a Bucket) -> Option<(&'a [T], RefCount)> { + let data_bucket_ix = self.data_bucket_ix(); + let data_bucket = &bucket.data[data_bucket_ix as usize]; + let slice = if self.num_slots > 0 { + let loc = self.data_loc(data_bucket); + let uid = Self::key_uid(&self.key); + assert_eq!(uid, bucket.data[data_bucket_ix as usize].uid(loc)); + bucket.data[data_bucket_ix as usize].get_cell_slice(loc, self.num_slots) + } else { + // num_slots is 0. This means we don't have an actual allocation. + // can we trust that the data_bucket is even safe? + bucket.data[data_bucket_ix as usize].get_empty_cell_slice() + }; + Some((slice, self.ref_count)) + } + pub fn key_uid(key: &Pubkey) -> u64 { + let mut s = DefaultHasher::new(); + key.hash(&mut s); + s.finish().max(1u64) + } +} diff --git a/bucket_map/src/lib.rs b/bucket_map/src/lib.rs new file mode 100644 index 000000000..80eb8e9ea --- /dev/null +++ b/bucket_map/src/lib.rs @@ -0,0 +1,12 @@ +#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))] +#![allow(clippy::integer_arithmetic)] +#![allow(clippy::mut_from_ref)] +mod bucket; +mod bucket_item; +pub mod bucket_map; +mod bucket_stats; +mod bucket_storage; +mod index_entry; + +pub type MaxSearch = u8; +pub type RefCount = u64; diff --git a/bucket_map/tests/bucket_map.rs b/bucket_map/tests/bucket_map.rs new file mode 100644 index 000000000..e7f29d551 --- /dev/null +++ b/bucket_map/tests/bucket_map.rs @@ -0,0 +1,46 @@ +use rayon::prelude::*; +use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig}; +use solana_measure::measure::Measure; +use solana_sdk::pubkey::Pubkey; +use std::path::PathBuf; +#[test] +#[ignore] +fn bucket_map_test_mt() { + let threads = 4096; + let items = 4096; + let tmpdir1 = std::env::temp_dir().join("bucket_map_test_mt"); + let tmpdir2 = PathBuf::from("/mnt/data/0").join("bucket_map_test_mt"); + let paths: Vec = [tmpdir1, tmpdir2] + .iter() + .filter(|x| std::fs::create_dir_all(x).is_ok()) + .cloned() + .collect(); + assert!(!paths.is_empty()); + let index = BucketMap::new(BucketMapConfig { + max_buckets: 1 << 12, + drives: Some(paths.clone()), + ..BucketMapConfig::default() + }); + (0..threads).into_iter().into_par_iter().for_each(|_| { + let key = Pubkey::new_unique(); + index.update(&key, |_| Some((vec![0u64], 0))); + }); + let mut timer = Measure::start("bucket_map_test_mt"); + (0..threads).into_iter().into_par_iter().for_each(|_| { + for _ in 0..items { + let key = Pubkey::new_unique(); + let ix: u64 = index.bucket_ix(&key) as u64; + index.update(&key, |_| Some((vec![ix], 0))); + assert_eq!(index.read_value(&key), Some((vec![ix], 0))); + } + }); + timer.stop(); + println!("time: {}ns per item", timer.as_ns() / (threads * items)); + let mut total = 0; + for tmpdir in paths.iter() { + let folder_size = fs_extra::dir::get_size(tmpdir).unwrap(); + total += folder_size; + std::fs::remove_dir_all(tmpdir).unwrap(); + } + println!("overhead: {}bytes per item", total / (threads * items)); +} diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 013b86abc..abec636bc 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -873,6 +873,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "futures" version = "0.3.17" @@ -1481,6 +1487,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723e3ebdcdc5c023db1df315364573789f8857c11b631a2fdfad7c00f5c046b4" +dependencies = [ + "libc", +] + [[package]] name = "memmap2" version = "0.4.0" @@ -2784,6 +2799,21 @@ dependencies = [ "solana-program 1.8.0", ] +[[package]] +name = "solana-bucket-map" +version = "1.8.0" +dependencies = [ + "fs_extra", + "log", + "memmap2 0.2.3", + "rand 0.7.3", + "rayon", + "solana-logger 1.8.0", + "solana-measure", + "solana-sdk", + "tempfile", +] + [[package]] name = "solana-clap-utils" version = "1.8.0" @@ -3214,6 +3244,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "serde_derive", + "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", "solana-ed25519-program", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 393df57b7..060cd3cb9 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -40,6 +40,7 @@ solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.8.0" } solana-logger = { path = "../logger", version = "=1.8.0" } solana-measure = { path = "../measure", version = "=1.8.0" } solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-bucket-map = { path = "../bucket_map", version = "=1.8.0" } solana-program-runtime = { path = "../program-runtime", version = "=1.8.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" }