get rid of rwlock per bin in accounts index (#26675)

This commit is contained in:
Jeff Washington (jwash) 2022-07-19 17:48:40 -05:00 committed by GitHub
parent e7cd6daebe
commit 2d689ac53a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 38 deletions

View File

@ -1550,7 +1550,6 @@ mod tests {
assert_eq!(0, idx.bin_calculator.bin_from_pubkey(&range2.end));
accts.hold_range_in_memory(&range, true, &test_thread_pool());
idx.account_maps.iter().enumerate().for_each(|(_bin, map)| {
let map = map.read().unwrap();
assert_eq!(
map.cache_ranges_held.read().unwrap().to_vec(),
vec![range.clone()]
@ -1558,7 +1557,6 @@ mod tests {
});
accts.hold_range_in_memory(&range2, true, &test_thread_pool());
idx.account_maps.iter().enumerate().for_each(|(bin, map)| {
let map = map.read().unwrap();
let expected = if bin == 0 {
vec![range.clone(), range2_inclusive.clone()]
} else {

View File

@ -6134,7 +6134,7 @@ impl AccountsDb {
.account_maps
.iter()
.flat_map(|map| {
let mut keys = map.read().unwrap().keys();
let mut keys = map.keys();
keys.sort_unstable(); // hashmap is not ordered, but bins are relative to each other
keys
})
@ -8428,7 +8428,7 @@ impl AccountsDb {
.account_maps
.iter()
.map(|map_bin| {
let len = map_bin.read().unwrap().len_for_stats();
let len = map_bin.len_for_stats();
min_bin_size = std::cmp::min(min_bin_size, len);
max_bin_size = std::cmp::max(max_bin_size, len);
len as usize
@ -8715,7 +8715,7 @@ impl AccountsDb {
let full_pubkey_range = Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]);
self.accounts_index.account_maps.iter().for_each(|map| {
for (pubkey, account_entry) in map.read().unwrap().items(&full_pubkey_range) {
for (pubkey, account_entry) in map.items(&full_pubkey_range) {
info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),);
info!(
" slots: {:?}",

View File

@ -586,9 +586,7 @@ impl<'a, T: IndexValue> AccountsIndexIterator<'a, T> {
thread_pool.install(|| {
(0..bin_range).into_par_iter().for_each(|idx| {
let map = &self.account_maps[idx + start_bin];
map.read()
.unwrap()
.hold_range_in_memory(range, start_holding);
map.hold_range_in_memory(range, start_holding);
});
});
}
@ -604,7 +602,7 @@ impl<'a, T: IndexValue> Iterator for AccountsIndexIterator<'a, T> {
let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE);
'outer: for i in self.account_maps.iter().skip(start_bin).take(bin_range) {
for (pubkey, account_map_entry) in Self::range(
&i.read().unwrap(),
&i,
(self.start_bound, self.end_bound),
self.collect_all_unsorted,
) {
@ -633,9 +631,9 @@ pub trait ZeroLamport {
}
type MapType<T> = AccountMap<T>;
type LockMapType<T> = Vec<RwLock<MapType<T>>>;
type LockMapTypeSlice<T> = [RwLock<MapType<T>>];
type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, MapType<T>>;
type LockMapType<T> = Vec<MapType<T>>;
type LockMapTypeSlice<T> = [MapType<T>];
type AccountMapsReadLock<'a, T> = &'a MapType<T>;
#[derive(Debug, Default)]
pub struct ScanSlotTracker {
@ -751,7 +749,7 @@ impl<T: IndexValue> AccountsIndex<T> {
let storage = AccountsIndexStorage::new(bins, &config);
let account_maps = (0..bins)
.into_iter()
.map(|bin| RwLock::new(Arc::clone(&storage.in_mem[bin])))
.map(|bin| Arc::clone(&storage.in_mem[bin]))
.collect::<Vec<_>>();
(account_maps, bin_calculator, storage)
}
@ -1129,9 +1127,7 @@ impl<T: IndexValue> AccountsIndex<T> {
pubkey: &Pubkey,
user: impl for<'a> FnOnce(&mut RwLockWriteGuard<'a, SlotList<T>>) -> RT,
) -> Option<RT> {
let read_lock = self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)]
.read()
.unwrap();
let read_lock = self.get_account_maps_read_lock(pubkey);
read_lock.slot_list_mut(pubkey, user)
}
@ -1359,7 +1355,7 @@ impl<T: IndexValue> AccountsIndex<T> {
let bin = self.bin_calculator.bin_from_pubkey(pubkey);
if bin != last_bin {
// cannot re-use lock since next pubkey is in a different bin than previous one
lock = Some(self.account_maps[bin].read().unwrap());
lock = Some(&self.account_maps[bin]);
last_bin = bin;
}
lock.as_ref().unwrap().get_internal(pubkey, |entry| {
@ -1394,16 +1390,13 @@ impl<T: IndexValue> AccountsIndex<T> {
ancestors: Option<&Ancestors>,
max_root: Option<Slot>,
) -> AccountIndexGetResult<T> {
let read_lock = self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)]
.read()
.unwrap();
let read_lock = self.get_account_maps_read_lock(pubkey);
let account = read_lock
.get(pubkey)
.map(ReadAccountMapEntry::from_account_map_entry);
match account {
Some(locked_entry) => {
drop(read_lock);
let slot_list = locked_entry.slot_list();
let found_index = self.latest_slot(ancestors, slot_list, max_root);
match found_index {
@ -1527,9 +1520,7 @@ impl<T: IndexValue> AccountsIndex<T> {
}
pub(crate) fn get_account_maps_read_lock(&self, pubkey: &Pubkey) -> AccountMapsReadLock<T> {
self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)]
.read()
.unwrap()
&self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)]
}
pub fn bins(&self) -> usize {
@ -1586,10 +1577,10 @@ impl<T: IndexValue> AccountsIndex<T> {
let insertion_time = AtomicU64::new(0);
binned.into_iter().for_each(|(pubkey_bin, items)| {
let w_account_maps = self.account_maps[pubkey_bin].read().unwrap();
let r_account_maps = &self.account_maps[pubkey_bin];
let mut insert_time = Measure::start("insert_into_primary_index");
if use_disk {
w_account_maps.startup_insert_only(slot, items.into_iter());
r_account_maps.startup_insert_only(slot, items.into_iter());
} else {
// not using disk buckets, so just write to in-mem
// this is no longer the default case
@ -1600,7 +1591,7 @@ impl<T: IndexValue> AccountsIndex<T> {
&self.storage.storage,
use_disk,
);
w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_entry);
r_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_entry);
});
}
insert_time.stop();
@ -1615,7 +1606,7 @@ impl<T: IndexValue> AccountsIndex<T> {
(0..self.bins())
.into_iter()
.map(|pubkey_bin| {
let r_account_maps = self.account_maps[pubkey_bin].read().unwrap();
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.retrieve_duplicate_keys_from_startup()
})
.collect()
@ -1654,18 +1645,15 @@ impl<T: IndexValue> AccountsIndex<T> {
&self.storage.storage,
store_raw,
);
let map = &self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)];
let map = self.get_account_maps_read_lock(pubkey);
{
let r_account_maps = map.read().unwrap();
r_account_maps.upsert(pubkey, new_item, Some(old_slot), reclaims, reclaim);
}
map.upsert(pubkey, new_item, Some(old_slot), reclaims, reclaim);
self.update_secondary_indexes(pubkey, account, account_indexes);
}
pub fn unref_from_storage(&self, pubkey: &Pubkey) {
let map = &self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)];
map.read().unwrap().unref(pubkey)
let map = self.get_account_maps_read_lock(pubkey);
map.unref(pubkey)
}
pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount {
@ -2682,15 +2670,14 @@ pub mod tests {
assert_eq!((slot, account_info), new_entry.clone().into());
assert_eq!(0, account_maps_stats_len(&index));
let w_account_maps = index.get_account_maps_read_lock(&key.pubkey());
w_account_maps.upsert(
let r_account_maps = index.get_account_maps_read_lock(&key.pubkey());
r_account_maps.upsert(
&key.pubkey(),
new_entry,
None,
&mut SlotList::default(),
UPSERT_PREVIOUS_SLOT_ENTRY_WAS_CACHED_FALSE,
);
drop(w_account_maps);
assert_eq!(1, account_maps_stats_len(&index));
let mut ancestors = Ancestors::default();