AcctIdx: share bucket map size for perf (#21935)

This commit is contained in:
Jeff Washington (jwash) 2021-12-16 21:25:54 -06:00 committed by GitHub
parent ba777f4f56
commit 6374995522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 44 deletions

View File

@ -17,7 +17,7 @@ use {
ops::RangeBounds,
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
},
@ -81,6 +81,7 @@ impl<T: Clone + Copy> Bucket<T> {
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
) -> Self {
let index = BucketStorage::new(
Arc::clone(&drives),
@ -88,6 +89,7 @@ impl<T: Clone + Copy> Bucket<T> {
std::mem::size_of::<IndexEntry>() as u64,
max_search,
Arc::clone(&stats.index),
count,
);
Self {
random: thread_rng().gen(),
@ -100,10 +102,6 @@ impl<T: Clone + Copy> Bucket<T> {
}
}
pub fn bucket_len(&self) -> u64 {
self.index.used.load(Ordering::Relaxed)
}
pub fn keys(&self) -> Vec<Pubkey> {
let mut rv = vec![];
for i in 0..self.index.capacity() {
@ -120,7 +118,7 @@ impl<T: Clone + Copy> Bucket<T> {
where
R: RangeBounds<Pubkey>,
{
let mut result = Vec::with_capacity(self.index.used.load(Ordering::Relaxed) as usize);
let mut result = Vec::with_capacity(self.index.count.load(Ordering::Relaxed) as usize);
for i in 0..self.index.capacity() {
let ii = i % self.index.capacity();
if self.index.uid(ii) == UID_UNLOCKED {
@ -191,6 +189,7 @@ impl<T: Clone + Copy> Bucket<T> {
key: &Pubkey,
elem_uid: Uid,
random: u64,
is_resizing: bool,
) -> Result<u64, BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
@ -198,7 +197,7 @@ impl<T: Clone + Copy> Bucket<T> {
if index.uid(ii) != UID_UNLOCKED {
continue;
}
index.allocate(ii, elem_uid).unwrap();
index.allocate(ii, elem_uid, is_resizing).unwrap();
let mut elem: &mut IndexEntry = index.get_mut(ii);
elem.key = *key;
// These will be overwritten after allocation by callers.
@ -226,7 +225,13 @@ impl<T: Clone + Copy> Bucket<T> {
}
fn create_key(&self, key: &Pubkey) -> Result<u64, BucketMapError> {
Self::bucket_create_key(&self.index, key, IndexEntry::key_uid(key), self.random)
Self::bucket_create_key(
&self.index,
key,
IndexEntry::key_uid(key),
self.random,
false,
)
}
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
@ -285,7 +290,7 @@ impl<T: Clone + Copy> Bucket<T> {
elem.num_slots = data.len() as u64;
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if elem.num_slots > 0 {
best_bucket.allocate(ix, elem_uid).unwrap();
best_bucket.allocate(ix, elem_uid, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, data.len() as u64);
slice.copy_from_slice(data);
}
@ -327,6 +332,7 @@ impl<T: Clone + Copy> Bucket<T> {
self.index.capacity_pow2 + i, // * 2,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
);
let random = thread_rng().gen();
let mut valid = true;
@ -334,7 +340,7 @@ impl<T: Clone + Copy> Bucket<T> {
let uid = self.index.uid(ix);
if UID_UNLOCKED != uid {
let elem: &IndexEntry = self.index.get(ix);
let new_ix = Self::bucket_create_key(&index, &elem.key, uid, random);
let new_ix = Self::bucket_create_key(&index, &elem.key, uid, random, true);
if new_ix.is_err() {
valid = false;
break;
@ -391,6 +397,7 @@ impl<T: Clone + Copy> Bucket<T> {
Self::elem_size(),
self.index.max_search,
Arc::clone(&self.stats.data),
Arc::default(),
))
}
self.data.push(bucket);

View File

@ -30,14 +30,13 @@ impl<T: Clone + Copy> BucketApi<T> {
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
) -> Self {
Self {
drives,
max_search,
stats,
bucket: RwLock::default(),
count,
count: Arc::default(),
}
}
@ -73,12 +72,7 @@ impl<T: Clone + Copy> BucketApi<T> {
}
pub fn bucket_len(&self) -> u64 {
self.bucket
.read()
.unwrap()
.as_ref()
.map(|bucket| bucket.bucket_len())
.unwrap_or_default()
self.count.load(Ordering::Relaxed)
}
pub fn delete_key(&self, key: &Pubkey) {
@ -95,11 +89,11 @@ impl<T: Clone + Copy> BucketApi<T> {
Arc::clone(&self.drives),
self.max_search,
Arc::clone(&self.stats),
Arc::clone(&self.count),
));
} else {
let write = bucket.as_mut().unwrap();
write.handle_delayed_grows();
self.count.store(write.bucket_len(), Ordering::Relaxed);
}
bucket
}

View File

@ -79,21 +79,14 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
});
let drives = Arc::new(drives);
let mut per_bucket_count = Vec::with_capacity(config.max_buckets);
per_bucket_count.resize_with(config.max_buckets, Arc::default);
let stats = Arc::new(BucketMapStats {
per_bucket_count,
..BucketMapStats::default()
});
let buckets = stats
.per_bucket_count
.iter()
.map(|per_bucket_count| {
let stats = Arc::default();
let buckets = (0..config.max_buckets)
.into_iter()
.map(|_| {
Arc::new(BucketApi::new(
Arc::clone(&drives),
max_search,
Arc::clone(&stats),
Arc::clone(per_bucket_count),
))
})
.collect();

View File

@ -14,5 +14,4 @@ pub struct BucketStats {
pub struct BucketMapStats {
pub index: Arc<BucketStats>,
pub data: Arc<BucketStats>,
pub per_bucket_count: Vec<Arc<AtomicU64>>,
}

View File

@ -64,7 +64,7 @@ pub struct BucketStorage {
mmap: MmapMut,
pub cell_size: u64,
pub capacity_pow2: u8,
pub used: AtomicU64,
pub count: Arc<AtomicU64>,
pub stats: Arc<BucketStats>,
pub max_search: MaxSearch,
}
@ -88,6 +88,7 @@ impl BucketStorage {
capacity_pow2: u8,
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
let cell_size = elem_size * num_elems + std::mem::size_of::<Header>() as u64;
let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &stats);
@ -95,7 +96,7 @@ impl BucketStorage {
path,
mmap,
cell_size,
used: AtomicU64::new(0),
count,
capacity_pow2,
stats,
max_search,
@ -112,6 +113,7 @@ impl BucketStorage {
elem_size: u64,
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
Self::new_with_capacity(
drives,
@ -120,6 +122,7 @@ impl BucketStorage {
DEFAULT_CAPACITY_POW2,
max_search,
stats,
count,
)
}
@ -133,20 +136,24 @@ impl BucketStorage {
}
}
pub fn allocate(&self, ix: u64, uid: Uid) -> Result<(), BucketStorageError> {
/// 'is_resizing' true if caller is resizing the index (so don't increment count)
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "allocate: bad index size");
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
let mut e = Err(BucketStorageError::AlreadyAllocated);
let ix = (ix * self.cell_size) as usize;
//debug!("ALLOC {} {}", ix, uid);
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
if unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
if hdr.as_ref().unwrap().try_lock(uid) {
e = Ok(());
self.used.fetch_add(1, Ordering::Relaxed);
hdr.as_ref().unwrap().try_lock(uid)
} {
e = Ok(());
if !is_resizing {
self.count.fetch_add(1, Ordering::Relaxed);
}
};
}
e
}
@ -165,7 +172,7 @@ impl BucketStorage {
"free: unlocked a header with a differet uid: {}",
previous_uid
);
self.used.fetch_sub(1, Ordering::Relaxed);
self.count.fetch_sub(1, Ordering::Relaxed);
}
}
@ -324,6 +331,9 @@ impl BucketStorage {
capacity_pow_2,
max_search,
Arc::clone(stats),
bucket
.map(|bucket| Arc::clone(&bucket.count))
.unwrap_or_default(),
);
if let Some(bucket) = bucket {
new_bucket.copy_contents(bucket);

View File

@ -170,10 +170,9 @@ impl BucketMapHolderStats {
let disk = storage.disk.as_ref();
let disk_per_bucket_counts = disk
.map(|disk| {
disk.stats
.per_bucket_count
.iter()
.map(|count| count.load(Ordering::Relaxed) as usize)
(0..self.bins)
.into_iter()
.map(|i| disk.get_bucket_from_index(i as usize).bucket_len() as usize)
.collect::<Vec<_>>()
})
.unwrap_or_default();