introduce bucket map (#19848)

* introduce bucket map

* rename BucketMap bits to num_buckets_pow2

* use u64::BITS

* Store the number of buckets in BucketMapConfig as a regular number

* remove redundant type aliases

* use Slot from sdk

* use Arc::clone() instead

* fixup erase drives

* rename num_buckets to max_buckets

* add doc to BucketMapConfig::new()

* add more documentation

* rename to DEFAULT_CAPACITY_POW2

* doc

* add more traits while we can

* rename capacity to capacity_pow2

* fix a naming for max_buckets_pow2

* remove unused/incorrect DataBucket::bytes

* rework benches a bit

* fixup bench docs

* rename create_bucket_capacity_pow2 to bucket_capacity_when_created_pow2

* rename BucketMapKeyValue to BucketItem

* rename to items_in_range

* remove values()

* remove addref and unref

* remove more addref and unref

* resurect addref and unref since tests use 'em for now

* rename to BucketStorage

* move stats in bucket_stats

* remove specializations (i don't think they are needed)

* move MaxSearch and RefCount into lib.rs

* move BucketItem to bucket_item.rs

* add doc

* keys no longer returns an option

* Revert "remove specializations (i don't think they are needed)"

This reverts commit b22f78e072cf0f7107851b08e58c2e3fead3f64d.

Co-authored-by: Brooks Prumo <brooks@solana.com>
This commit is contained in:
Jeff Washington (jwash) 2021-09-17 15:11:27 -05:00 committed by GitHub
parent cddb9da4f0
commit b2152be3b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1552 additions and 3 deletions

31
Cargo.lock generated
View File

@ -854,11 +854,11 @@ dependencies = [
[[package]]
name = "crc32fast"
version = "1.2.0"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
dependencies = [
"cfg-if 0.1.10",
"cfg-if 1.0.0",
]
[[package]]
@ -2446,6 +2446,15 @@ dependencies = [
"libc",
]
[[package]]
name = "memmap2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "723e3ebdcdc5c023db1df315364573789f8857c11b631a2fdfad7c00f5c046b4"
dependencies = [
"libc",
]
[[package]]
name = "memmap2"
version = "0.4.0"
@ -4308,6 +4317,21 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-bucket-map"
version = "1.8.0"
dependencies = [
"fs_extra",
"log 0.4.14",
"memmap2 0.2.3",
"rand 0.7.3",
"rayon",
"solana-logger 1.8.0",
"solana-measure",
"solana-sdk",
"tempfile",
]
[[package]]
name = "solana-cargo-build-bpf"
version = "1.8.0"
@ -5407,6 +5431,7 @@ dependencies = [
"rustc_version 0.4.0",
"serde",
"serde_derive",
"solana-bucket-map",
"solana-compute-budget-program",
"solana-config-program",
"solana-ed25519-program",

View File

@ -8,6 +8,7 @@ members = [
"banks-client",
"banks-interface",
"banks-server",
"bucket_map",
"clap-utils",
"cli-config",
"cli-output",

29
bucket_map/Cargo.toml Normal file
View File

@ -0,0 +1,29 @@
[package]
name = "solana-bucket-map"
version = "1.8.0"
description = "solana-bucket-map"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-bucket-map"
readme = "../README.md"
repository = "https://github.com/solana-labs/solana"
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
license = "Apache-2.0"
edition = "2018"
[dependencies]
rayon = "1.5.0"
solana-logger = { path = "../logger", version = "=1.8.0" }
solana-sdk = { path = "../sdk", version = "=1.8.0" }
memmap2 = "0.2.3"
log = { version = "0.4.11" }
solana-measure = { path = "../measure", version = "=1.8.0" }
rand = "0.7.0"
fs_extra = "1.2.0"
tempfile = "3.2.0"
[lib]
crate-type = ["lib"]
name = "solana_bucket_map"
[[bench]]
name = "bucket_map"

View File

@ -0,0 +1,76 @@
#![feature(test)]
macro_rules! DEFINE_NxM_BENCH {
($i:ident, $n:literal, $m:literal) => {
mod $i {
use super::*;
#[bench]
fn bench_insert_baseline_hashmap(bencher: &mut Bencher) {
do_bench_insert_baseline_hashmap(bencher, $n, $m);
}
#[bench]
fn bench_insert_bucket_map(bencher: &mut Bencher) {
do_bench_insert_bucket_map(bencher, $n, $m);
}
}
};
}
extern crate test;
use rayon::prelude::*;
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
use solana_sdk::pubkey::Pubkey;
use std::collections::hash_map::HashMap;
use std::sync::RwLock;
use test::Bencher;
type IndexValue = u64;
DEFINE_NxM_BENCH!(dim_01x02, 1, 2);
DEFINE_NxM_BENCH!(dim_02x04, 2, 4);
DEFINE_NxM_BENCH!(dim_04x08, 4, 8);
DEFINE_NxM_BENCH!(dim_08x16, 8, 16);
DEFINE_NxM_BENCH!(dim_16x32, 16, 32);
DEFINE_NxM_BENCH!(dim_32x64, 32, 64);
/// Benchmark insert with Hashmap as baseline for N threads inserting M keys each
fn do_bench_insert_baseline_hashmap(bencher: &mut Bencher, n: usize, m: usize) {
let index = RwLock::new(HashMap::new());
(0..n).into_iter().into_par_iter().for_each(|i| {
let key = Pubkey::new_unique();
index
.write()
.unwrap()
.insert(key, vec![(i, IndexValue::default())]);
});
bencher.iter(|| {
(0..n).into_iter().into_par_iter().for_each(|_| {
for j in 0..m {
let key = Pubkey::new_unique();
index
.write()
.unwrap()
.insert(key, vec![(j, IndexValue::default())]);
}
})
});
}
/// Benchmark insert with BucketMap with N buckets for N threads inserting M keys each
fn do_bench_insert_bucket_map(bencher: &mut Bencher, n: usize, m: usize) {
let index = BucketMap::new(BucketMapConfig::new(n));
(0..n).into_iter().into_par_iter().for_each(|i| {
let key = Pubkey::new_unique();
index.update(&key, |_| Some((vec![(i, IndexValue::default())], 0)));
});
bencher.iter(|| {
(0..n).into_iter().into_par_iter().for_each(|_| {
for j in 0..m {
let key = Pubkey::new_unique();
index.update(&key, |_| Some((vec![(j, IndexValue::default())], 0)));
}
})
});
}

397
bucket_map/src/bucket.rs Normal file
View File

@ -0,0 +1,397 @@
use crate::bucket_item::BucketItem;
use crate::bucket_map::BucketMapError;
use crate::bucket_stats::BucketMapStats;
use crate::bucket_storage::BucketStorage;
use crate::index_entry::IndexEntry;
use crate::{MaxSearch, RefCount};
use rand::thread_rng;
use rand::Rng;
use solana_measure::measure::Measure;
use solana_sdk::pubkey::Pubkey;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T> {
drives: Arc<Vec<PathBuf>>,
//index
index: BucketStorage,
//random offset for the index
random: u64,
//storage buckets to store SlotSlice up to a power of 2 in len
pub data: Vec<BucketStorage>,
_phantom: PhantomData<T>,
stats: Arc<BucketMapStats>,
}
impl<T: Clone + Copy> Bucket<T> {
pub fn new(
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
) -> Self {
let index = BucketStorage::new(
Arc::clone(&drives),
1,
std::mem::size_of::<IndexEntry>() as u64,
max_search,
Arc::clone(&stats.index),
);
Self {
random: thread_rng().gen(),
drives,
index,
data: vec![],
_phantom: PhantomData::default(),
stats,
}
}
pub fn bucket_len(&self) -> u64 {
self.index.used.load(Ordering::Relaxed)
}
pub fn keys(&self) -> Vec<Pubkey> {
let mut rv = vec![];
for i in 0..self.index.num_cells() {
if self.index.uid(i) == 0 {
continue;
}
let ix: &IndexEntry = self.index.get(i);
rv.push(ix.key);
}
rv
}
pub fn items_in_range<R>(&self, range: Option<&R>) -> Vec<BucketItem<T>>
where
R: RangeBounds<Pubkey>,
{
let mut result = Vec::with_capacity(self.index.used.load(Ordering::Relaxed) as usize);
for i in 0..self.index.num_cells() {
let ii = i % self.index.num_cells();
if self.index.uid(ii) == 0 {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
let val = ix.read_value(self);
result.push(BucketItem {
pubkey: key,
ref_count: ix.ref_count(),
slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(),
});
}
}
result
}
pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> {
Self::bucket_find_entry(&self.index, key, self.random)
}
fn find_entry_mut(&self, key: &Pubkey) -> Option<(&mut IndexEntry, u64)> {
Self::bucket_find_entry_mut(&self.index, key, self.random)
}
fn bucket_find_entry_mut<'a>(
index: &'a BucketStorage,
key: &Pubkey,
random: u64,
) -> Option<(&'a mut IndexEntry, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.num_cells();
if index.uid(ii) == 0 {
continue;
}
let elem: &mut IndexEntry = index.get_mut(ii);
if elem.key == *key {
return Some((elem, ii));
}
}
None
}
fn bucket_find_entry<'a>(
index: &'a BucketStorage,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntry, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.num_cells();
if index.uid(ii) == 0 {
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
return Some((elem, ii));
}
}
None
}
fn bucket_create_key(
index: &BucketStorage,
key: &Pubkey,
elem_uid: u64,
random: u64,
ref_count: u64,
) -> Result<u64, BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i as u64 % index.num_cells();
if index.uid(ii) != 0 {
continue;
}
index.allocate(ii, elem_uid).unwrap();
let mut elem: &mut IndexEntry = index.get_mut(ii);
elem.key = *key;
elem.ref_count = ref_count;
elem.storage_offset = 0;
elem.storage_capacity_when_created_pow2 = 0;
elem.num_slots = 0;
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
return Ok(ii);
}
Err(BucketMapError::IndexNoSpace(index.capacity_pow2))
}
pub fn addref(&mut self, key: &Pubkey) -> Option<RefCount> {
let (elem, _) = self.find_entry_mut(key)?;
elem.ref_count += 1;
Some(elem.ref_count)
}
pub fn unref(&mut self, key: &Pubkey) -> Option<RefCount> {
let (elem, _) = self.find_entry_mut(key)?;
elem.ref_count -= 1;
Some(elem.ref_count)
}
fn create_key(&self, key: &Pubkey, ref_count: u64) -> Result<u64, BucketMapError> {
Self::bucket_create_key(
&self.index,
key,
IndexEntry::key_uid(key),
self.random,
ref_count,
)
}
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_entry(key)?;
elem.read_value(self)
}
pub fn try_write(
&mut self,
key: &Pubkey,
data: &[T],
ref_count: u64,
) -> Result<(), BucketMapError> {
let index_entry = self.find_entry_mut(key);
let (elem, elem_ix) = match index_entry {
None => {
let ii = self.create_key(key, ref_count)?;
let elem = self.index.get_mut(ii); // get_mut here is right?
(elem, ii)
}
Some(res) => {
if ref_count != res.0.ref_count {
res.0.ref_count = ref_count;
}
res
}
};
let elem_uid = self.index.uid(elem_ix);
let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64);
if self.data.get(best_fit_bucket as usize).is_none() {
//error!("resizing because missing bucket");
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
}
let bucket_ix = elem.data_bucket_ix();
let current_bucket = &self.data[bucket_ix as usize];
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
//in place update
let elem_loc = elem.data_loc(current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data.len() as u64);
//let elem: &mut IndexEntry = self.index.get_mut(elem_ix);
assert!(current_bucket.uid(elem_loc) == elem_uid);
elem.num_slots = data.len() as u64;
slice.clone_from_slice(data);
Ok(())
} else {
//need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.num_cells();
let pos = thread_rng().gen_range(0, cap);
for i in pos..pos + self.index.max_search() {
let ix = i % cap;
if best_bucket.uid(ix) == 0 {
let elem_loc = elem.data_loc(current_bucket);
if elem.num_slots > 0 {
current_bucket.free(elem_loc, elem_uid).unwrap();
}
// elem: &mut IndexEntry = self.index.get_mut(elem_ix);
elem.storage_offset = ix;
elem.storage_capacity_when_created_pow2 = best_bucket.capacity_pow2;
elem.num_slots = data.len() as u64;
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if elem.num_slots > 0 {
best_bucket.allocate(ix, elem_uid).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, data.len() as u64);
slice.copy_from_slice(data);
}
return Ok(());
}
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
}
}
pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_entry(key) {
let elem_uid = self.index.uid(elem_ix);
if elem.num_slots > 0 {
let data_bucket = &self.data[elem.data_bucket_ix() as usize];
let loc = elem.data_loc(data_bucket);
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc, elem_uid).unwrap();
}
//debug!("INDEX FREE {:?} {}", key, elem_uid);
self.index.free(elem_ix, elem_uid).unwrap();
}
}
pub fn grow_index(&mut self, sz: u8) {
if self.index.capacity_pow2 == sz {
let mut m = Measure::start("");
//debug!("GROW_INDEX: {}", sz);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
//likelyhood of a re-index collision of 2^(max_search)^2
//1 in 2^32
let index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry>() as u64,
self.index.capacity_pow2 + i, // * 2,
self.index.max_search,
Arc::clone(&self.stats.index),
);
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.num_cells() {
let uid = self.index.uid(ix);
if 0 != uid {
let elem: &IndexEntry = self.index.get(ix);
let ref_count = 0; // ??? TODO
let new_ix =
Self::bucket_create_key(&index, &elem.key, uid, random, ref_count);
if new_ix.is_err() {
valid = false;
break;
}
let new_ix = new_ix.unwrap();
let new_elem: &mut IndexEntry = index.get_mut(new_ix);
*new_elem = *elem;
/*
let dbg_elem: IndexEntry = *new_elem;
assert_eq!(
Self::bucket_find_entry(&index, &elem.key, random).unwrap(),
(&dbg_elem, new_ix)
);
*/
}
}
if valid {
self.index = index;
self.random = random;
break;
}
}
m.stop();
let sz = 1 << self.index.capacity_pow2;
{
let mut max = self.stats.index.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
self.stats.index.resizes.fetch_add(1, Ordering::Relaxed);
self.stats
.index
.resize_us
.fetch_add(m.as_us(), Ordering::Relaxed);
}
}
pub fn grow_data(&mut self, sz: (u64, u8)) {
if self.data.get(sz.0 as usize).is_none() {
for i in self.data.len() as u64..(sz.0 + 1) {
self.data.push(BucketStorage::new(
Arc::clone(&self.drives),
1 << i,
std::mem::size_of::<T>() as u64,
self.index.max_search,
Arc::clone(&self.stats.data),
))
}
}
if self.data[sz.0 as usize].capacity_pow2 == sz.1 {
//debug!("GROW_DATA: {} {}", sz.0, sz.1);
self.data[sz.0 as usize].grow();
}
}
fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
random.hash(&mut s);
let ix = s.finish();
ix % index.num_cells()
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.num_cells() );
}
pub fn update<F>(&mut self, key: &Pubkey, updatefn: F)
where
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
{
let current = self.read_value(key);
let new = updatefn(current);
if new.is_none() {
self.delete_key(key);
return;
}
let (new, refct) = new.unwrap();
loop {
let rv = self.try_write(key, &new, refct);
match rv {
Err(BucketMapError::DataNoSpace(sz)) => {
//debug!("GROWING SPACE {:?}", sz);
self.grow_data(sz);
continue;
}
Err(BucketMapError::IndexNoSpace(sz)) => {
//debug!("GROWING INDEX {}", sz);
self.grow_index(sz);
continue;
}
Ok(()) => return,
}
}
}
}

View File

@ -0,0 +1,9 @@
use crate::RefCount;
use solana_sdk::pubkey::Pubkey;
#[derive(Debug, Default, Clone)]
pub struct BucketItem<T> {
pub pubkey: Pubkey,
pub ref_count: RefCount,
pub slot_list: Vec<T>,
}

View File

@ -0,0 +1,508 @@
//! BucketMap is a mostly contention free concurrent map backed by MmapMut
use crate::bucket::Bucket;
use crate::bucket_item::BucketItem;
use crate::bucket_stats::BucketMapStats;
use crate::{MaxSearch, RefCount};
use solana_sdk::pubkey::Pubkey;
use std::convert::TryInto;
use std::fmt::Debug;
use std::fs;
use std::ops::RangeBounds;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use tempfile::TempDir;
#[derive(Debug, Default, Clone)]
pub struct BucketMapConfig {
pub max_buckets: usize,
pub drives: Option<Vec<PathBuf>>,
pub max_search: Option<MaxSearch>,
}
impl BucketMapConfig {
/// Create a new BucketMapConfig
/// NOTE: BucketMap requires that max_buckets is a power of two
pub fn new(max_buckets: usize) -> BucketMapConfig {
BucketMapConfig {
max_buckets,
..BucketMapConfig::default()
}
}
}
pub struct BucketMap<T: Clone + Copy + Debug> {
buckets: Vec<RwLock<Option<Bucket<T>>>>,
drives: Arc<Vec<PathBuf>>,
max_buckets_pow2: u8,
max_search: MaxSearch,
pub stats: Arc<BucketMapStats>,
pub temp_dir: Option<TempDir>,
}
impl<T: Clone + Copy + Debug> Drop for BucketMap<T> {
fn drop(&mut self) {
if self.temp_dir.is_none() {
BucketMap::<T>::erase_previous_drives(&self.drives);
}
}
}
impl<T: Clone + Copy + Debug> std::fmt::Debug for BucketMap<T> {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[derive(Debug)]
pub enum BucketMapError {
DataNoSpace((u64, u8)),
IndexNoSpace(u8),
}
impl<T: Clone + Copy + Debug> BucketMap<T> {
pub fn new(config: BucketMapConfig) -> Self {
assert_ne!(
config.max_buckets, 0,
"Max number of buckets must be non-zero"
);
assert!(
config.max_buckets.is_power_of_two(),
"Max number of buckets must be a power of two"
);
let mut buckets = Vec::with_capacity(config.max_buckets);
buckets.resize_with(config.max_buckets, || RwLock::new(None));
let stats = Arc::new(BucketMapStats::default());
// this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway
const MAX_SEARCH: MaxSearch = 32;
let max_search = config.max_search.unwrap_or(MAX_SEARCH);
if let Some(drives) = config.drives.as_ref() {
Self::erase_previous_drives(drives);
}
let mut temp_dir = None;
let drives = config.drives.unwrap_or_else(|| {
temp_dir = Some(TempDir::new().unwrap());
vec![temp_dir.as_ref().unwrap().path().to_path_buf()]
});
let drives = Arc::new(drives);
// A simple log2 function that is correct if x is a power of two
let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1;
Self {
buckets,
drives,
max_buckets_pow2: log2(config.max_buckets) as u8,
stats,
max_search,
temp_dir,
}
}
fn erase_previous_drives(drives: &[PathBuf]) {
drives.iter().for_each(|folder| {
let _ = fs::remove_dir_all(&folder);
let _ = fs::create_dir_all(&folder);
})
}
pub fn num_buckets(&self) -> usize {
self.buckets.len()
}
pub fn bucket_len(&self, ix: usize) -> u64 {
self.buckets[ix]
.read()
.unwrap()
.as_ref()
.map(|bucket| bucket.bucket_len())
.unwrap_or_default()
}
/// Get the items for bucket `ix` in `range`
pub fn items_in_range<R>(&self, ix: usize, range: Option<&R>) -> Option<Vec<BucketItem<T>>>
where
R: RangeBounds<Pubkey>,
{
Some(
self.buckets[ix]
.read()
.unwrap()
.as_ref()?
.items_in_range(range),
)
}
/// Get the Pubkeys for bucket `ix`
pub fn keys(&self, ix: usize) -> Vec<Pubkey> {
self.buckets[ix]
.read()
.unwrap()
.as_ref()
.map_or_else(Vec::default, |bucket| bucket.keys())
}
/// Get the values for Pubkey `key`
pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
let ix = self.bucket_ix(key);
self.buckets[ix]
.read()
.unwrap()
.as_ref()
.and_then(|bucket| {
bucket
.read_value(key)
.map(|(value, ref_count)| (value.to_vec(), ref_count))
})
}
/// Delete the Pubkey `key`
pub fn delete_key(&self, key: &Pubkey) {
let ix = self.bucket_ix(key);
if let Some(bucket) = self.buckets[ix].write().unwrap().as_mut() {
bucket.delete_key(key);
}
}
/// Update Pubkey `key`'s value with function `updatefn`
pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
{
let ix = self.bucket_ix(key);
let mut bucket = self.buckets[ix].write().unwrap();
if bucket.is_none() {
*bucket = Some(Bucket::new(
Arc::clone(&self.drives),
self.max_search,
Arc::clone(&self.stats),
));
}
let bucket = bucket.as_mut().unwrap();
bucket.update(key, updatefn)
}
/// Get the bucket index for Pubkey `key`
pub fn bucket_ix(&self, key: &Pubkey) -> usize {
if self.max_buckets_pow2 > 0 {
let location = read_be_u64(key.as_ref());
(location >> (u64::BITS - self.max_buckets_pow2 as u32)) as usize
} else {
0
}
}
/// Increment the refcount for Pubkey `key`
pub fn addref(&self, key: &Pubkey) -> Option<RefCount> {
let ix = self.bucket_ix(key);
let mut bucket = self.buckets[ix].write().unwrap();
bucket.as_mut()?.addref(key)
}
/// Decrement the refcount for Pubkey `key`
pub fn unref(&self, key: &Pubkey) -> Option<RefCount> {
let ix = self.bucket_ix(key);
let mut bucket = self.buckets[ix].write().unwrap();
bucket.as_mut()?.unref(key)
}
}
/// Look at the first 8 bytes of the input and reinterpret them as a u64
fn read_be_u64(input: &[u8]) -> u64 {
assert!(input.len() >= std::mem::size_of::<u64>());
u64::from_be_bytes(input[0..std::mem::size_of::<u64>()].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;
use rand::thread_rng;
use rand::Rng;
use std::collections::HashMap;
#[test]
fn bucket_map_test_insert() {
let key = Pubkey::new_unique();
let config = BucketMapConfig::new(1 << 1);
let index = BucketMap::new(config);
index.update(&key, |_| Some((vec![0], 0)));
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
}
#[test]
fn bucket_map_test_update() {
let key = Pubkey::new_unique();
let config = BucketMapConfig::new(1 << 1);
let index = BucketMap::new(config);
index.update(&key, |_| Some((vec![0], 0)));
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
index.update(&key, |_| Some((vec![1], 0)));
assert_eq!(index.read_value(&key), Some((vec![1], 0)));
}
#[test]
fn bucket_map_test_update_to_0_len() {
solana_logger::setup();
let key = Pubkey::new_unique();
let config = BucketMapConfig::new(1 << 1);
let index = BucketMap::new(config);
index.update(&key, |_| Some((vec![0], 1)));
assert_eq!(index.read_value(&key), Some((vec![0], 1)));
// sets len to 0, updates in place
index.update(&key, |_| Some((vec![], 1)));
assert_eq!(index.read_value(&key), Some((vec![], 1)));
// sets len to 0, doesn't update in place - finds a new place, which causes us to no longer have an allocation in data
index.update(&key, |_| Some((vec![], 2)));
assert_eq!(index.read_value(&key), Some((vec![], 2)));
// sets len to 1, doesn't update in place - finds a new place
index.update(&key, |_| Some((vec![1], 2)));
assert_eq!(index.read_value(&key), Some((vec![1], 2)));
}
#[test]
fn bucket_map_test_delete() {
let config = BucketMapConfig::new(1 << 1);
let index = BucketMap::new(config);
for i in 0..10 {
let key = Pubkey::new_unique();
assert_eq!(index.read_value(&key), None);
index.update(&key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(&key), Some((vec![i], 0)));
index.delete_key(&key);
assert_eq!(index.read_value(&key), None);
index.update(&key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(&key), Some((vec![i], 0)));
index.delete_key(&key);
}
}
#[test]
fn bucket_map_test_delete_2() {
let config = BucketMapConfig::new(1 << 2);
let index = BucketMap::new(config);
for i in 0..100 {
let key = Pubkey::new_unique();
assert_eq!(index.read_value(&key), None);
index.update(&key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(&key), Some((vec![i], 0)));
index.delete_key(&key);
assert_eq!(index.read_value(&key), None);
index.update(&key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(&key), Some((vec![i], 0)));
index.delete_key(&key);
}
}
#[test]
fn bucket_map_test_n_drives() {
let config = BucketMapConfig::new(1 << 2);
let index = BucketMap::new(config);
for i in 0..100 {
let key = Pubkey::new_unique();
index.update(&key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(&key), Some((vec![i], 0)));
}
}
#[test]
fn bucket_map_test_grow_read() {
let config = BucketMapConfig::new(1 << 2);
let index = BucketMap::new(config);
let keys: Vec<Pubkey> = (0..100).into_iter().map(|_| Pubkey::new_unique()).collect();
for k in 0..keys.len() {
let key = &keys[k];
let i = read_be_u64(key.as_ref());
index.update(key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(key), Some((vec![i], 0)));
for (ix, key) in keys.iter().enumerate() {
let i = read_be_u64(key.as_ref());
//debug!("READ: {:?} {}", key, i);
let expected = if ix <= k { Some((vec![i], 0)) } else { None };
assert_eq!(index.read_value(key), expected);
}
}
}
#[test]
fn bucket_map_test_n_delete() {
let config = BucketMapConfig::new(1 << 2);
let index = BucketMap::new(config);
let keys: Vec<Pubkey> = (0..20).into_iter().map(|_| Pubkey::new_unique()).collect();
for key in keys.iter() {
let i = read_be_u64(key.as_ref());
index.update(key, |_| Some((vec![i], 0)));
assert_eq!(index.read_value(key), Some((vec![i], 0)));
}
for key in keys.iter() {
let i = read_be_u64(key.as_ref());
//debug!("READ: {:?} {}", key, i);
assert_eq!(index.read_value(key), Some((vec![i], 0)));
}
for k in 0..keys.len() {
let key = &keys[k];
index.delete_key(key);
assert_eq!(index.read_value(key), None);
for key in keys.iter().skip(k + 1) {
let i = read_be_u64(key.as_ref());
assert_eq!(index.read_value(key), Some((vec![i], 0)));
}
}
}
#[test]
fn hashmap_compare() {
use std::sync::Mutex;
solana_logger::setup();
let maps = (0..2)
.into_iter()
.map(|max_buckets_pow2| {
let config = BucketMapConfig::new(1 << max_buckets_pow2);
BucketMap::new(config)
})
.collect::<Vec<_>>();
let hash_map = RwLock::new(HashMap::<Pubkey, (Vec<(usize, usize)>, RefCount)>::new());
let max_slot_list_len = 3;
let all_keys = Mutex::new(vec![]);
let gen_rand_value = || {
let count = thread_rng().gen_range(0, max_slot_list_len);
let v = (0..count)
.into_iter()
.map(|x| (x as usize, x as usize /*thread_rng().gen::<usize>()*/))
.collect::<Vec<_>>();
let rc = thread_rng().gen::<RefCount>();
(v, rc)
};
let get_key = || {
let mut keys = all_keys.lock().unwrap();
if keys.is_empty() {
return None;
}
let len = keys.len();
Some(keys.remove(thread_rng().gen_range(0, len)))
};
let return_key = |key| {
let mut keys = all_keys.lock().unwrap();
keys.push(key);
};
let verify = || {
let mut maps = maps
.iter()
.map(|map| {
let mut r = vec![];
for bin in 0..map.num_buckets() {
r.append(
&mut map
.items_in_range(bin, None::<&std::ops::RangeInclusive<Pubkey>>)
.unwrap_or_default(),
);
}
r
})
.collect::<Vec<_>>();
let hm = hash_map.read().unwrap();
for (k, v) in hm.iter() {
for map in maps.iter_mut() {
for i in 0..map.len() {
if k == &map[i].pubkey {
assert_eq!(map[i].slot_list, v.0);
assert_eq!(map[i].ref_count, v.1);
map.remove(i);
break;
}
}
}
}
for map in maps.iter() {
assert!(map.is_empty());
}
};
let mut initial = 100; // put this many items in to start
// do random operations: insert, update, delete, add/unref in random order
// verify consistency between hashmap and all bucket maps
for i in 0..10000 {
if initial > 0 {
initial -= 1;
}
if initial > 0 || thread_rng().gen_range(0, 5) == 0 {
// insert
let k = solana_sdk::pubkey::new_rand();
let v = gen_rand_value();
hash_map.write().unwrap().insert(k, v.clone());
maps.iter().for_each(|map| {
map.update(&k, |current| {
assert!(current.is_none());
Some(v.clone())
})
});
return_key(k);
}
if thread_rng().gen_range(0, 10) == 0 {
// update
if let Some(k) = get_key() {
let hm = hash_map.read().unwrap();
let (v, rc) = gen_rand_value();
let v_old = hm.get(&k);
maps.iter().for_each(|map| {
map.update(&k, |current| {
assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k);
Some((v.clone(), rc))
})
});
drop(hm);
hash_map.write().unwrap().insert(k, (v, rc));
return_key(k);
}
}
if thread_rng().gen_range(0, 20) == 0 {
// delete
if let Some(k) = get_key() {
let mut hm = hash_map.write().unwrap();
hm.remove(&k);
maps.iter().for_each(|map| {
map.delete_key(&k);
});
}
}
if thread_rng().gen_range(0, 10) == 0 {
// add/unref
if let Some(k) = get_key() {
let mut inc = thread_rng().gen_range(0, 2) == 0;
let mut hm = hash_map.write().unwrap();
let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap();
if !inc && rc == 0 {
// can't decrement rc=0
inc = true;
}
rc = if inc { rc + 1 } else { rc - 1 };
hm.insert(k, (v.to_vec(), rc));
maps.iter().for_each(|map| {
if thread_rng().gen_range(0, 2) == 0 {
map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc)))
} else if inc {
map.addref(&k);
} else {
map.unref(&k);
}
});
return_key(k);
}
}
if i % 1000 == 0 {
verify();
}
}
verify();
}
}

View File

@ -0,0 +1,18 @@
use std::sync::Arc;
use std::sync::{atomic::AtomicU64, Mutex};
#[derive(Debug, Default)]
pub struct BucketStats {
pub resizes: AtomicU64,
pub max_size: Mutex<u64>,
pub resize_us: AtomicU64,
pub new_file_us: AtomicU64,
pub flush_file_us: AtomicU64,
pub mmap_us: AtomicU64,
}
#[derive(Debug, Default, Clone)]
pub struct BucketMapStats {
pub index: Arc<BucketStats>,
pub data: Arc<BucketStats>,
}

View File

@ -0,0 +1,334 @@
use crate::bucket_stats::BucketStats;
use crate::MaxSearch;
use memmap2::MmapMut;
use rand::{thread_rng, Rng};
use solana_measure::measure::Measure;
use std::fs::{remove_file, OpenOptions};
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
/*
1 2
2 4
3 8
4 16
5 32
6 64
7 128
8 256
9 512
10 1,024
11 2,048
12 4,096
13 8,192
14 16,384
23 8,388,608
24 16,777,216
*/
const DEFAULT_CAPACITY_POW2: u8 = 5;
#[repr(C)]
struct Header {
lock: AtomicU64,
}
impl Header {
fn try_lock(&self, uid: u64) -> bool {
Ok(0)
== self
.lock
.compare_exchange(0, uid, Ordering::Relaxed, Ordering::Relaxed)
}
fn unlock(&self, uid: u64) -> bool {
Ok(uid)
== self
.lock
.compare_exchange(uid, 0, Ordering::Relaxed, Ordering::Relaxed)
}
fn uid(&self) -> u64 {
self.lock.load(Ordering::Relaxed)
}
}
pub struct BucketStorage {
drives: Arc<Vec<PathBuf>>,
path: PathBuf,
mmap: MmapMut,
pub cell_size: u64,
pub capacity_pow2: u8,
pub used: AtomicU64,
pub stats: Arc<BucketStats>,
pub max_search: MaxSearch,
}
#[derive(Debug)]
pub enum BucketStorageError {
AlreadyAllocated,
InvalidFree,
}
impl Drop for BucketStorage {
fn drop(&mut self) {
let _ = remove_file(&self.path);
}
}
impl BucketStorage {
pub fn new_with_capacity(
drives: Arc<Vec<PathBuf>>,
num_elems: u64,
elem_size: u64,
capacity_pow2: u8,
max_search: MaxSearch,
mut stats: Arc<BucketStats>,
) -> Self {
let cell_size = elem_size * num_elems + std::mem::size_of::<Header>() as u64;
let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &mut stats);
Self {
path,
mmap,
drives,
cell_size,
used: AtomicU64::new(0),
capacity_pow2,
stats,
max_search,
}
}
pub fn max_search(&self) -> u64 {
self.max_search as u64
}
pub fn new(
drives: Arc<Vec<PathBuf>>,
num_elems: u64,
elem_size: u64,
max_search: MaxSearch,
stats: Arc<BucketStats>,
) -> Self {
Self::new_with_capacity(
drives,
num_elems,
elem_size,
DEFAULT_CAPACITY_POW2,
max_search,
stats,
)
}
pub fn uid(&self, ix: u64) -> u64 {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = (ix * self.cell_size) as usize;
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
return hdr.as_ref().unwrap().uid();
}
}
pub fn allocate(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> {
if ix >= self.num_cells() {
panic!("allocate: bad index size");
}
if 0 == uid {
panic!("allocate: bad uid");
}
let mut e = Err(BucketStorageError::AlreadyAllocated);
let ix = (ix * self.cell_size) as usize;
//debug!("ALLOC {} {}", ix, uid);
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
if hdr.as_ref().unwrap().try_lock(uid) {
e = Ok(());
self.used.fetch_add(1, Ordering::Relaxed);
}
};
e
}
pub fn free(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> {
if ix >= self.num_cells() {
panic!("free: bad index size");
}
if 0 == uid {
panic!("free: bad uid");
}
let ix = (ix * self.cell_size) as usize;
//debug!("FREE {} {}", ix, uid);
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
let mut e = Err(BucketStorageError::InvalidFree);
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
//debug!("FREE uid: {}", hdr.as_ref().unwrap().uid());
if hdr.as_ref().unwrap().unlock(uid) {
self.used.fetch_sub(1, Ordering::Relaxed);
e = Ok(());
}
};
e
}
pub fn get<T: Sized>(&self, ix: u64) -> &T {
if ix >= self.num_cells() {
panic!("bad index size");
}
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *const T;
&*item
}
}
pub fn get_empty_cell_slice<T: Sized>(&self) -> &[T] {
let len = 0;
let item_slice: &[u8] = &self.mmap[0..0];
unsafe {
let item = item_slice.as_ptr() as *const T;
std::slice::from_raw_parts(item, len as usize)
}
}
pub fn get_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &[T] {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *const T;
std::slice::from_raw_parts(item, len as usize)
}
}
pub fn get_mut<T: Sized>(&self, ix: u64) -> &mut T {
if ix >= self.num_cells() {
panic!("bad index size");
}
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *mut T;
&mut *item
}
}
pub fn get_mut_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &mut [T] {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET mut slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *mut T;
std::slice::from_raw_parts_mut(item, len as usize)
}
}
fn new_map(
drives: &[PathBuf],
cell_size: usize,
capacity_pow2: u8,
stats: &mut Arc<BucketStats>,
) -> (MmapMut, PathBuf) {
let mut m0 = Measure::start("");
let capacity = 1u64 << capacity_pow2;
let r = thread_rng().gen_range(0, drives.len());
let drive = &drives[r];
let pos = format!("{}", thread_rng().gen_range(0, u128::MAX),);
let file = drive.join(pos);
let mut data = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file.clone())
.map_err(|e| {
panic!(
"Unable to create data file {} in current dir({:?}): {:?}",
file.display(),
std::env::current_dir(),
e
);
})
.unwrap();
// Theoretical performance optimization: write a zero to the end of
// the file so that we won't have to resize it later, which may be
// expensive.
//debug!("GROWING file {}", capacity * cell_size as u64);
data.seek(SeekFrom::Start(capacity * cell_size as u64 - 1))
.unwrap();
data.write_all(&[0]).unwrap();
data.seek(SeekFrom::Start(0)).unwrap();
m0.stop();
let mut m1 = Measure::start("");
data.flush().unwrap(); // can we skip this?
m1.stop();
let mut m2 = Measure::start("");
let res = (unsafe { MmapMut::map_mut(&data).unwrap() }, file);
m2.stop();
stats.new_file_us.fetch_add(m0.as_us(), Ordering::Relaxed);
stats.flush_file_us.fetch_add(m0.as_us(), Ordering::Relaxed);
stats.mmap_us.fetch_add(m0.as_us(), Ordering::Relaxed);
res
}
pub fn grow(&mut self) {
let mut m = Measure::start("grow");
let old_cap = self.num_cells();
let old_map = &self.mmap;
let old_file = self.path.clone();
let increment = 1;
let index_grow = 1 << increment;
let (new_map, new_file) = Self::new_map(
&self.drives,
self.cell_size as usize,
self.capacity_pow2 + increment,
&mut self.stats,
);
(0..old_cap as usize).into_iter().for_each(|i| {
let old_ix = i * self.cell_size as usize;
let new_ix = old_ix * index_grow;
let dst_slice: &[u8] = &new_map[new_ix..new_ix + self.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + self.cell_size as usize];
unsafe {
let dst = dst_slice.as_ptr() as *mut u8;
let src = src_slice.as_ptr() as *const u8;
std::ptr::copy_nonoverlapping(src, dst, self.cell_size as usize);
};
});
self.mmap = new_map;
self.path = new_file;
self.capacity_pow2 += increment;
remove_file(old_file).unwrap();
m.stop();
let sz = 1 << self.capacity_pow2;
{
let mut max = self.stats.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
self.stats.resizes.fetch_add(1, Ordering::Relaxed);
self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
}
pub fn num_cells(&self) -> u64 {
1 << self.capacity_pow2
}
}

View File

@ -0,0 +1,62 @@
use crate::bucket::Bucket;
use crate::bucket_storage::BucketStorage;
use crate::RefCount;
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use std::collections::hash_map::DefaultHasher;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq)]
// one instance of this per item in the index
// stored in the index bucket
pub struct IndexEntry {
pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts?
pub storage_offset: u64, // smaller? since these are variably sized, this could get tricky. well, actually accountinfo is not variable sized...
// if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2
pub storage_capacity_when_created_pow2: u8, // see data_location
pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list
}
impl IndexEntry {
pub fn data_bucket_from_num_slots(num_slots: Slot) -> u64 {
(num_slots as f64).log2().ceil() as u64 // use int log here?
}
pub fn data_bucket_ix(&self) -> u64 {
Self::data_bucket_from_num_slots(self.num_slots)
}
pub fn ref_count(&self) -> RefCount {
self.ref_count
}
// This function maps the original data location into an index in the current bucket storage.
// This is coupled with how we resize bucket storages.
pub fn data_loc(&self, storage: &BucketStorage) -> u64 {
self.storage_offset << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2)
}
pub fn read_value<'a, T>(&self, bucket: &'a Bucket<T>) -> Option<(&'a [T], RefCount)> {
let data_bucket_ix = self.data_bucket_ix();
let data_bucket = &bucket.data[data_bucket_ix as usize];
let slice = if self.num_slots > 0 {
let loc = self.data_loc(data_bucket);
let uid = Self::key_uid(&self.key);
assert_eq!(uid, bucket.data[data_bucket_ix as usize].uid(loc));
bucket.data[data_bucket_ix as usize].get_cell_slice(loc, self.num_slots)
} else {
// num_slots is 0. This means we don't have an actual allocation.
// can we trust that the data_bucket is even safe?
bucket.data[data_bucket_ix as usize].get_empty_cell_slice()
};
Some((slice, self.ref_count))
}
pub fn key_uid(key: &Pubkey) -> u64 {
let mut s = DefaultHasher::new();
key.hash(&mut s);
s.finish().max(1u64)
}
}

12
bucket_map/src/lib.rs Normal file
View File

@ -0,0 +1,12 @@
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
#![allow(clippy::integer_arithmetic)]
#![allow(clippy::mut_from_ref)]
mod bucket;
mod bucket_item;
pub mod bucket_map;
mod bucket_stats;
mod bucket_storage;
mod index_entry;
pub type MaxSearch = u8;
pub type RefCount = u64;

View File

@ -0,0 +1,46 @@
use rayon::prelude::*;
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
use solana_measure::measure::Measure;
use solana_sdk::pubkey::Pubkey;
use std::path::PathBuf;
#[test]
#[ignore]
fn bucket_map_test_mt() {
let threads = 4096;
let items = 4096;
let tmpdir1 = std::env::temp_dir().join("bucket_map_test_mt");
let tmpdir2 = PathBuf::from("/mnt/data/0").join("bucket_map_test_mt");
let paths: Vec<PathBuf> = [tmpdir1, tmpdir2]
.iter()
.filter(|x| std::fs::create_dir_all(x).is_ok())
.cloned()
.collect();
assert!(!paths.is_empty());
let index = BucketMap::new(BucketMapConfig {
max_buckets: 1 << 12,
drives: Some(paths.clone()),
..BucketMapConfig::default()
});
(0..threads).into_iter().into_par_iter().for_each(|_| {
let key = Pubkey::new_unique();
index.update(&key, |_| Some((vec![0u64], 0)));
});
let mut timer = Measure::start("bucket_map_test_mt");
(0..threads).into_iter().into_par_iter().for_each(|_| {
for _ in 0..items {
let key = Pubkey::new_unique();
let ix: u64 = index.bucket_ix(&key) as u64;
index.update(&key, |_| Some((vec![ix], 0)));
assert_eq!(index.read_value(&key), Some((vec![ix], 0)));
}
});
timer.stop();
println!("time: {}ns per item", timer.as_ns() / (threads * items));
let mut total = 0;
for tmpdir in paths.iter() {
let folder_size = fs_extra::dir::get_size(tmpdir).unwrap();
total += folder_size;
std::fs::remove_dir_all(tmpdir).unwrap();
}
println!("overhead: {}bytes per item", total / (threads * items));
}

View File

@ -873,6 +873,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.17"
@ -1481,6 +1487,15 @@ dependencies = [
"libc",
]
[[package]]
name = "memmap2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "723e3ebdcdc5c023db1df315364573789f8857c11b631a2fdfad7c00f5c046b4"
dependencies = [
"libc",
]
[[package]]
name = "memmap2"
version = "0.4.0"
@ -2784,6 +2799,21 @@ dependencies = [
"solana-program 1.8.0",
]
[[package]]
name = "solana-bucket-map"
version = "1.8.0"
dependencies = [
"fs_extra",
"log",
"memmap2 0.2.3",
"rand 0.7.3",
"rayon",
"solana-logger 1.8.0",
"solana-measure",
"solana-sdk",
"tempfile",
]
[[package]]
name = "solana-clap-utils"
version = "1.8.0"
@ -3214,6 +3244,7 @@ dependencies = [
"rustc_version 0.4.0",
"serde",
"serde_derive",
"solana-bucket-map",
"solana-compute-budget-program",
"solana-config-program",
"solana-ed25519-program",

View File

@ -40,6 +40,7 @@ solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.8.0" }
solana-logger = { path = "../logger", version = "=1.8.0" }
solana-measure = { path = "../measure", version = "=1.8.0" }
solana-metrics = { path = "../metrics", version = "=1.8.0" }
solana-bucket-map = { path = "../bucket_map", version = "=1.8.0" }
solana-program-runtime = { path = "../program-runtime", version = "=1.8.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.8.0" }
solana-sdk = { path = "../sdk", version = "=1.8.0" }