Switch accounts storage lock to DashMap (#12126)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2020-10-13 18:29:50 -07:00 committed by GitHub
parent 1f1eb9f26e
commit f8d338c9cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 416 additions and 241 deletions

41
Cargo.lock generated
View File

@ -25,6 +25,15 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "aho-corasick"
version = "0.7.10"
@ -539,6 +548,26 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -737,6 +766,17 @@ dependencies = [
"zeroize",
]
[[package]]
name = "dashmap"
version = "3.11.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
dependencies = [
"ahash",
"cfg-if",
"num_cpus",
]
[[package]]
name = "derivative"
version = "2.1.1"
@ -4293,6 +4333,7 @@ dependencies = [
"byteorder",
"bzip2",
"crossbeam-channel",
"dashmap",
"dir-diff",
"flate2",
"fnv",

View File

@ -15,6 +15,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d"
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "aho-corasick"
version = "0.7.10"
@ -278,6 +287,26 @@ dependencies = [
"byteorder 1.3.4",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -403,6 +432,17 @@ dependencies = [
"zeroize",
]
[[package]]
name = "dashmap"
version = "3.11.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
dependencies = [
"ahash",
"cfg-if",
"num_cpus",
]
[[package]]
name = "digest"
version = "0.8.1"
@ -2027,6 +2067,7 @@ dependencies = [
"byteorder 1.3.4",
"bzip2",
"crossbeam-channel",
"dashmap",
"dir-diff",
"flate2",
"fnv",

View File

@ -14,6 +14,7 @@ blake3 = "0.3.6"
bv = { version = "0.11.1", features = ["serde"] }
byteorder = "1.3.4"
bzip2 = "0.3.3"
dashmap = "3.11.10"
crossbeam-channel = "0.4"
dir-diff = "0.3.2"
flate2 = "1.0.14"

View File

@ -2,6 +2,7 @@
extern crate test;
use rand::Rng;
use solana_runtime::{
accounts::{create_test_accounts, Accounts},
bank::*,
@ -11,7 +12,7 @@ use solana_sdk::{
genesis_config::{create_genesis_config, ClusterType},
pubkey::Pubkey,
};
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf, sync::Arc, thread::Builder};
use test::Bencher;
fn deposit_many(bank: &Bank, pubkeys: &mut Vec<Pubkey>, num: usize) {
@ -141,3 +142,57 @@ fn bench_delete_dependencies(bencher: &mut Bencher) {
accounts.accounts_db.clean_accounts(None);
});
}
#[bench]
#[ignore]
fn bench_concurrent_read_write(bencher: &mut Bencher) {
let num_readers = 5;
let accounts = Arc::new(Accounts::new(
vec![
PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()))
.join("concurrent_read_write"),
],
&ClusterType::Development,
));
let num_keys = 1000;
let slot = 0;
accounts.add_root(slot);
let pubkeys: Arc<Vec<_>> = Arc::new(
(0..num_keys)
.map(|_| {
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
accounts.store_slow(slot, &pubkey, &account);
pubkey
})
.collect(),
);
for _ in 0..num_readers {
let accounts = accounts.clone();
let pubkeys = pubkeys.clone();
Builder::new()
.name("readers".to_string())
.spawn(move || {
let mut rng = rand::thread_rng();
loop {
let i = rng.gen_range(0, num_keys);
test::black_box(accounts.load_slow(&HashMap::new(), &pubkeys[i]).unwrap());
}
})
.unwrap();
}
let num_new_keys = 1000;
let new_accounts: Vec<_> = (0..num_new_keys)
.map(|_| Account::new(1, 0, &Account::default().owner))
.collect();
bencher.iter(|| {
for account in &new_accounts {
// Write to a different slot than the one being read from. Because
// there's a new account pubkey being written to every time, will
// compete for the accounts index lock on every store
accounts.store_slow(slot + 1, &Pubkey::new_rand(), &account);
}
})
}

View File

@ -303,7 +303,6 @@ impl Accounts {
//PERF: hold the lock to scan for the references, but not to clone the accounts
//TODO: two locks usually leads to deadlocks, should this be one structure?
let accounts_index = self.accounts_db.accounts_index.read().unwrap();
let storage = self.accounts_db.storage.read().unwrap();
let fee_config = FeeConfig {
secp256k1_program_enabled: feature_set
@ -328,7 +327,7 @@ impl Accounts {
};
let load_res = self.load_tx_accounts(
&storage,
&self.accounts_db.storage,
ancestors,
&accounts_index,
tx,
@ -343,7 +342,7 @@ impl Accounts {
};
let load_res = Self::load_loaders(
&storage,
&self.accounts_db.storage,
ancestors,
&accounts_index,
tx,
@ -1507,10 +1506,9 @@ mod tests {
let ancestors = vec![(0, 0)].into_iter().collect();
let accounts_index = accounts.accounts_db.accounts_index.read().unwrap();
let storage = accounts.accounts_db.storage.read().unwrap();
assert_eq!(
Accounts::load_executable_accounts(
&storage,
&accounts.accounts_db.storage,
&ancestors,
&accounts_index,
&Pubkey::new_rand(),

View File

@ -23,6 +23,7 @@ use crate::{
append_vec::{AppendVec, StoredAccount, StoredMeta},
};
use blake3::traits::digest::Digest;
use dashmap::DashMap;
use lazy_static::lazy_static;
use log::*;
use rand::{thread_rng, Rng};
@ -46,7 +47,7 @@ use std::{
ops::RangeBounds,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard},
sync::{Arc, Mutex, MutexGuard, RwLock},
time::Instant,
};
use tempfile::TempDir;
@ -98,7 +99,7 @@ pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
pub type SnapshotStorages = Vec<SnapshotStorage>;
// Each slot has a set of storage entries.
pub(crate) type SlotStores = HashMap<usize, Arc<AccountStorageEntry>>;
pub(crate) type SlotStores = Arc<RwLock<HashMap<usize, Arc<AccountStorageEntry>>>>;
type AccountSlots = HashMap<Pubkey, HashSet<Slot>>;
type AppendVecOffsets = HashMap<AppendVecId, HashSet<usize>>;
@ -121,31 +122,31 @@ impl Versioned for (u64, AccountInfo) {
}
#[derive(Clone, Default, Debug)]
pub struct AccountStorage(pub HashMap<Slot, SlotStores>);
pub struct AccountStorage(pub DashMap<Slot, SlotStores>);
impl AccountStorage {
fn scan_accounts(&self, account_info: &AccountInfo, slot: Slot) -> Option<(Account, Slot)> {
fn get_account_storage_entry(
&self,
slot: Slot,
store_id: AppendVecId,
) -> Option<Arc<AccountStorageEntry>> {
self.0
.get(&slot)
.and_then(|storage_map| storage_map.get(&account_info.store_id))
.and_then(|store| {
Some(
store
.accounts
.get_account(account_info.offset)?
.0
.clone_account(),
)
})
.map(|account| (account, slot))
.and_then(|storage_map| storage_map.value().read().unwrap().get(&store_id).cloned())
}
fn get_slot_stores(&self, slot: Slot) -> Option<SlotStores> {
self.0.get(&slot).map(|result| result.value().clone())
}
fn slot_store_count(&self, slot: Slot, store_id: AppendVecId) -> Option<usize> {
self.0
.get(&slot)
.and_then(|slot_storages| slot_storages.get(&store_id))
self.get_account_storage_entry(slot, store_id)
.map(|store| store.count_and_status.read().unwrap().0)
}
fn all_slots(&self) -> Vec<Slot> {
self.0.iter().map(|iter_item| *iter_item.key()).collect()
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize, AbiExample, AbiEnumVisitor)]
@ -269,6 +270,15 @@ impl AccountStorageEntry {
self.accounts.flush()
}
fn get_account(&self, account_info: &AccountInfo) -> Option<Account> {
Some(
self.accounts
.get_account(account_info.offset)?
.0
.clone_account(),
)
}
fn add_account(&self) {
let mut count_and_status = self.count_and_status.write().unwrap();
*count_and_status = (count_and_status.0 + 1, count_and_status.1);
@ -394,7 +404,7 @@ pub struct AccountsDB {
/// Keeps tracks of index into AppendVec on a per slot basis
pub accounts_index: RwLock<AccountsIndex<AccountInfo>>,
pub storage: RwLock<AccountStorage>,
pub storage: AccountStorage,
/// distribute the accounts across storage lists
pub next_id: AtomicUsize,
@ -471,7 +481,7 @@ impl Default for AccountsDB {
bank_hashes.insert(0, BankHashInfo::default());
AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(AccountStorage(HashMap::new())),
storage: AccountStorage(DashMap::new()),
next_id: AtomicUsize::new(0),
shrink_candidate_slots: Mutex::new(Vec::new()),
write_version: AtomicU64::new(0),
@ -792,8 +802,6 @@ impl AccountsDB {
key_set.insert(*key);
let count = self
.storage
.read()
.unwrap()
.slot_store_count(*slot, account_info.store_id)
.unwrap()
- 1;
@ -965,8 +973,8 @@ impl AccountsDB {
let mut stored_accounts = vec![];
let mut storage_read_elapsed = Measure::start("storage_read_elapsed");
{
let slot_storages = self.storage.read().unwrap().0.get(&slot).cloned();
if let Some(stores) = slot_storages {
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
let stores = stores_lock.read().unwrap();
let mut alive_count = 0;
let mut stored_count = 0;
for store in stores.values() {
@ -1099,15 +1107,14 @@ impl AccountsDB {
start.stop();
update_index_elapsed = start.as_us();
let mut start = Measure::start("update_index_elapsed");
let mut start = Measure::start("handle_reclaims_elapsed");
self.handle_reclaims(&reclaims, Some(slot), true, None);
start.stop();
handle_reclaims_elapsed = start.as_us();
let mut start = Measure::start("write_storage_elapsed");
let mut storage = self.storage.write().unwrap();
if let Some(slot_storage) = storage.0.get_mut(&slot) {
slot_storage.retain(|_key, store| {
if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
slot_stores.write().unwrap().retain(|_key, store| {
if store.count() == 0 {
dead_storages.push(store.clone());
}
@ -1178,8 +1185,7 @@ impl AccountsDB {
}
fn all_slots_in_storage(&self) -> Vec<Slot> {
let storage = self.storage.read().unwrap();
storage.0.keys().cloned().collect()
self.storage.all_slots()
}
pub fn process_stale_slot(&self) -> usize {
@ -1221,14 +1227,11 @@ impl AccountsDB {
{
let mut collector = A::default();
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
scan_func(
&mut collector,
storage
.scan_accounts(account_info, slot)
.map(|(account, slot)| (pubkey, account, slot)),
)
let account_slot = self
.get_account_from_storage(slot, account_info)
.map(|account| (pubkey, account, slot));
scan_func(&mut collector, account_slot)
});
collector
}
@ -1241,46 +1244,34 @@ impl AccountsDB {
{
let mut collector = A::default();
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
accounts_index.range_scan_accounts(ancestors, range, |pubkey, (account_info, slot)| {
scan_func(
&mut collector,
storage
.scan_accounts(account_info, slot)
.map(|(account, slot)| (pubkey, account, slot)),
)
let account_slot = self
.get_account_from_storage(slot, account_info)
.map(|account| (pubkey, account, slot));
scan_func(&mut collector, account_slot)
});
collector
}
/// Scan a specific slot through all the account storage in parallel with sequential read
// PERF: Sequentially read each storage entry in parallel
/// Scan a specific slot through all the account storage in parallel
pub fn scan_account_storage<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
where
F: Fn(&StoredAccount, AppendVecId, &mut B) + Send + Sync,
B: Send + Default,
{
self.scan_account_storage_inner(slot, scan_func, &self.storage.read().unwrap())
self.scan_account_storage_inner(slot, scan_func)
}
// The input storage must come from self.storage.read().unwrap()
fn scan_account_storage_inner<F, B>(
&self,
slot: Slot,
scan_func: F,
storage: &RwLockReadGuard<AccountStorage>,
) -> Vec<B>
fn scan_account_storage_inner<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
where
F: Fn(&StoredAccount, AppendVecId, &mut B) + Send + Sync,
B: Send + Default,
{
let storage_maps: Vec<Arc<AccountStorageEntry>> = storage
.0
.get(&slot)
.unwrap_or(&HashMap::new())
.values()
.cloned()
.collect();
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_stores(slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.unwrap_or_default();
self.thread_pool.install(|| {
storage_maps
.into_par_iter()
@ -1323,15 +1314,15 @@ impl AccountsDB {
let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None)?;
let slot = lock[index].0;
//TODO: thread this as a ref
if let Some(slot_storage) = storage.0.get(&slot) {
let info = &lock[index].1;
slot_storage
.get(&info.store_id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.map(|account| (account, slot))
} else {
None
}
storage
.get_account_storage_entry(slot, lock[index].1.store_id)
.and_then(|store| {
let info = &lock[index].1;
store
.accounts
.get_account(info.offset)
.map(|account| (account.0.clone_account(), slot))
})
}
#[cfg(test)]
@ -1339,25 +1330,33 @@ impl AccountsDB {
let accounts_index = self.accounts_index.read().unwrap();
let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None).unwrap();
let slot = lock[index].0;
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot).unwrap();
let info = &lock[index].1;
let entry = slot_storage.get(&info.store_id).unwrap();
let entry = self
.storage
.get_account_storage_entry(slot, info.store_id)
.unwrap();
let account = entry.accounts.get_account(info.offset);
*account.as_ref().unwrap().0.hash
}
pub fn load_slow(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Option<(Account, Slot)> {
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
Self::load(&storage, ancestors, &accounts_index, pubkey)
Self::load(&self.storage, ancestors, &accounts_index, pubkey)
}
fn get_account_from_storage(&self, slot: Slot, account_info: &AccountInfo) -> Option<Account> {
let account_storage_entry = self
.storage
.get_account_storage_entry(slot, account_info.store_id);
account_storage_entry
.and_then(|account_storage_entry| account_storage_entry.get_account(account_info))
}
fn find_storage_candidate(&self, slot: Slot) -> Arc<AccountStorageEntry> {
let mut create_extra = false;
let stores = self.storage.read().unwrap();
if let Some(slot_stores) = stores.0.get(&slot) {
let slot_stores_lock = self.storage.get_slot_stores(slot);
if let Some(slot_stores_lock) = slot_stores_lock {
let slot_stores = slot_stores_lock.read().unwrap();
if !slot_stores.is_empty() {
if slot_stores.len() <= self.min_num_stores {
let mut total_accounts = 0;
@ -1377,7 +1376,7 @@ impl AccountsDB {
for (i, store) in slot_stores.values().cycle().skip(to_skip).enumerate() {
if store.try_available() {
let ret = store.clone();
drop(stores);
drop(slot_stores);
if create_extra {
self.create_and_insert_store(slot, self.file_size);
}
@ -1391,8 +1390,6 @@ impl AccountsDB {
}
}
drop(stores);
let store = self.create_and_insert_store(slot, self.file_size);
store.try_available();
store
@ -1403,10 +1400,15 @@ impl AccountsDB {
let store =
Arc::new(self.new_storage_entry(slot, &Path::new(&self.paths[path_index]), size));
let store_for_index = store.clone();
let mut stores = self.storage.write().unwrap();
let slot_storage = stores.0.entry(slot).or_insert_with(HashMap::new);
slot_storage.insert(store.id, store_for_index);
let slot_storage = self
.storage
.0
.entry(slot)
.or_insert(Arc::new(RwLock::new(HashMap::new())));
slot_storage
.write()
.unwrap()
.insert(store.id, store_for_index);
store
}
@ -1424,27 +1426,25 @@ impl AccountsDB {
.filter(|slot| !accounts_index.is_root(**slot))
.collect();
drop(accounts_index);
let mut storage_lock_elapsed = Measure::start("storage_lock_elapsed");
let mut storage = self.storage.write().unwrap();
storage_lock_elapsed.stop();
let mut all_removed_slot_storages = vec![];
let mut total_removed_storage_entries = 0;
let mut total_removed_bytes = 0;
let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed");
for slot in non_roots {
if let Some(slot_removed_storages) = storage.0.remove(&slot) {
total_removed_storage_entries += slot_removed_storages.len();
total_removed_bytes += slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
if let Some((_, slot_removed_storages)) = self.storage.0.remove(&slot) {
{
let r_slot_removed_storages = slot_removed_storages.read().unwrap();
total_removed_storage_entries += r_slot_removed_storages.len();
total_removed_bytes += r_slot_removed_storages
.values()
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages);
}
}
remove_storages_elapsed.stop();
drop(storage);
let num_slots_removed = all_removed_slot_storages.len();
@ -1456,7 +1456,6 @@ impl AccountsDB {
datapoint_info!(
"purge_slots_time",
("storage_lock_elapsed", storage_lock_elapsed.as_us(), i64),
(
"remove_storages_elapsed",
remove_storages_elapsed.as_us(),
@ -1503,7 +1502,7 @@ impl AccountsDB {
// 1) Remove old bank hash from self.bank_hashes
// 2) Purge this slot's storage entries from self.storage
self.handle_reclaims(&reclaims, Some(remove_slot), false, None);
assert!(self.storage.read().unwrap().0.get(&remove_slot).is_none());
assert!(self.storage.get_slot_stores(remove_slot).is_none());
}
fn include_owner(cluster_type: &ClusterType, slot: Slot) -> bool {
@ -1777,8 +1776,9 @@ impl AccountsDB {
let mut max_slot = 0;
let mut newest_slot = 0;
let mut oldest_slot = std::u64::MAX;
let stores = self.storage.read().unwrap();
for (slot, slot_stores) in &stores.0 {
for iter_item in self.storage.0.iter() {
let slot = iter_item.key();
let slot_stores = iter_item.value().read().unwrap();
total_count += slot_stores.len();
if slot_stores.len() < min {
min = slot_stores.len();
@ -1797,7 +1797,6 @@ impl AccountsDB {
oldest_slot = *slot;
}
}
drop(stores);
info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})",
total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min);
datapoint_info!("accounts_db-stores", ("total_count", total_count, i64));
@ -1935,7 +1934,6 @@ impl AccountsDB {
use BankHashVerificationError::*;
let mut scan = Measure::start("scan");
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
let keys: Vec<_> = accounts_index.account_maps.keys().collect();
let mismatch_found = AtomicU64::new(0);
let hashes: Vec<_> = keys
@ -1944,10 +1942,8 @@ impl AccountsDB {
if let Some((list, index)) = accounts_index.get(pubkey, Some(ancestors), None) {
let (slot, account_info) = &list[index];
if account_info.lamports != 0 {
storage
.0
.get(&slot)
.and_then(|storage_map| storage_map.get(&account_info.store_id))
self.storage
.get_account_storage_entry(*slot, account_info.store_id)
.and_then(|store| {
let account = store.accounts.get_account(account_info.offset)?.0;
let balance = Self::account_balance_for_capitalization(
@ -2128,7 +2124,6 @@ impl AccountsDB {
expected_slot: Option<Slot>,
mut reclaimed_offsets: Option<&mut AppendVecOffsets>,
) -> HashSet<Slot> {
let storage = self.storage.read().unwrap();
let mut dead_slots = HashSet::new();
for (slot, account_info) in reclaims {
if let Some(ref mut reclaimed_offsets) = reclaimed_offsets {
@ -2140,23 +2135,25 @@ impl AccountsDB {
if let Some(expected_slot) = expected_slot {
assert_eq!(*slot, expected_slot);
}
if let Some(slot_storage) = storage.0.get(slot) {
if let Some(store) = slot_storage.get(&account_info.store_id) {
assert_eq!(
*slot, store.slot,
"AccountDB::accounts_index corrupted. Storage should only point to one slot"
);
let count = store.remove_account();
if count == 0 {
dead_slots.insert(*slot);
}
if let Some(store) = self
.storage
.get_account_storage_entry(*slot, account_info.store_id)
{
assert_eq!(
*slot, store.slot,
"AccountDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, should only point to one slot",
store.slot, *slot
);
let count = store.remove_account();
if count == 0 {
dead_slots.insert(*slot);
}
}
}
dead_slots.retain(|slot| {
if let Some(slot_storage) = storage.0.get(&slot) {
for x in slot_storage.values() {
if let Some(slot_stores) = self.storage.get_slot_stores(*slot) {
for x in slot_stores.read().unwrap().values() {
if x.count() != 0 {
return false;
}
@ -2175,16 +2172,14 @@ impl AccountsDB {
) {
{
let mut measure = Measure::start("clean_dead_slots-ms");
let storage = self.storage.read().unwrap();
let mut stores: Vec<Arc<AccountStorageEntry>> = vec![];
for slot in dead_slots.iter() {
if let Some(slot_storage) = storage.0.get(slot) {
for store in slot_storage.values() {
if let Some(slot_storage) = self.storage.get_slot_stores(*slot) {
for store in slot_storage.read().unwrap().values() {
stores.push(store.clone());
}
}
}
drop(storage);
datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64));
let slot_pubkeys: HashSet<(Slot, Pubkey)> = {
self.thread_pool_clean.install(|| {
@ -2334,15 +2329,18 @@ impl AccountsDB {
pub fn get_snapshot_storages(&self, snapshot_slot: Slot) -> SnapshotStorages {
let accounts_index = self.accounts_index.read().unwrap();
let r_storage = self.storage.read().unwrap();
r_storage
self.storage
.0
.iter()
.filter(|(slot, _slot_stores)| {
**slot <= snapshot_slot && accounts_index.is_root(**slot)
.filter(|iter_item| {
let slot = *iter_item.key();
slot <= snapshot_slot && accounts_index.is_root(slot)
})
.map(|(_slot, slot_stores)| {
slot_stores
.map(|iter_item| {
iter_item
.value()
.read()
.unwrap()
.values()
.filter(|x| x.has_accounts())
.cloned()
@ -2368,8 +2366,7 @@ impl AccountsDB {
pub fn generate_index(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
let storage = self.storage.read().unwrap();
let mut slots: Vec<Slot> = storage.0.keys().cloned().collect();
let mut slots = self.storage.all_slots();
slots.sort();
let mut last_log_update = Instant::now();
@ -2396,7 +2393,6 @@ impl AccountsDB {
.or_insert_with(Vec::new);
entry.push((stored_account.meta.write_version, account_info));
},
&storage,
);
let mut accounts_map: HashMap<Pubkey, Vec<(u64, AccountInfo)>> = HashMap::new();
@ -2432,8 +2428,8 @@ impl AccountsDB {
*counts.entry(account_entry.store_id).or_insert(0) += 1;
}
}
for slot_stores in storage.0.values() {
for (id, store) in slot_stores {
for slot_stores in self.storage.0.iter() {
for (id, store) in slot_stores.value().read().unwrap().iter() {
if let Some(count) = counts.get(&id) {
trace!(
"id: {} setting count: {} cur: {}",
@ -2476,17 +2472,16 @@ impl AccountsDB {
}
fn print_count_and_status(&self, label: &'static str) {
let storage = self.storage.read().unwrap();
let mut slots: Vec<_> = storage.0.keys().cloned().collect();
let mut slots: Vec<_> = self.storage.all_slots();
slots.sort();
info!("{}: count_and status for {} slots:", label, slots.len());
for slot in &slots {
let slot_stores = storage.0.get(slot).unwrap();
let mut ids: Vec<_> = slot_stores.keys().cloned().collect();
let slot_stores = self.storage.get_slot_stores(*slot).unwrap();
let r_slot_stores = slot_stores.read().unwrap();
let mut ids: Vec<_> = r_slot_stores.keys().cloned().collect();
ids.sort();
for id in &ids {
let entry = slot_stores.get(id).unwrap();
let entry = r_slot_stores.get(id).unwrap();
info!(
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
slot,
@ -2674,45 +2669,48 @@ pub mod tests {
db.store(1, &[(&pubkey, &account)]);
db.store(1, &[(&pubkeys[0], &account)]);
{
let stores = db.storage.read().unwrap();
let slot_0_stores = &stores.0.get(&0).unwrap();
let slot_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(slot_0_stores.len(), 1);
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 2);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
let slot_0_stores = &db.storage.get_slot_stores(0).unwrap();
let slot_1_stores = &db.storage.get_slot_stores(1).unwrap();
let r_slot_0_stores = slot_0_stores.read().unwrap();
let r_slot_1_stores = slot_1_stores.read().unwrap();
assert_eq!(r_slot_0_stores.len(), 1);
assert_eq!(r_slot_1_stores.len(), 1);
assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 2);
assert_eq!(r_slot_1_stores[&1].count(), 2);
assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2);
assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2);
}
// adding root doesn't change anything
db.add_root(1);
{
let stores = db.storage.read().unwrap();
let slot_0_stores = &stores.0.get(&0).unwrap();
let slot_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(slot_0_stores.len(), 1);
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 2);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
let slot_0_stores = &db.storage.get_slot_stores(0).unwrap();
let slot_1_stores = &db.storage.get_slot_stores(1).unwrap();
let r_slot_0_stores = slot_0_stores.read().unwrap();
let r_slot_1_stores = slot_1_stores.read().unwrap();
assert_eq!(r_slot_0_stores.len(), 1);
assert_eq!(r_slot_1_stores.len(), 1);
assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 2);
assert_eq!(r_slot_1_stores[&1].count(), 2);
assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2);
assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2);
}
// overwrite old rooted account version; only the slot_0_stores.count() should be
// overwrite old rooted account version; only the r_slot_0_stores.count() should be
// decremented
db.store(2, &[(&pubkeys[0], &account)]);
db.clean_accounts(None);
{
let stores = db.storage.read().unwrap();
let slot_0_stores = &stores.0.get(&0).unwrap();
let slot_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(slot_0_stores.len(), 1);
assert_eq!(slot_1_stores.len(), 1);
assert_eq!(slot_0_stores[&0].count(), 1);
assert_eq!(slot_1_stores[&1].count(), 2);
assert_eq!(slot_0_stores[&0].approx_stored_count(), 2);
assert_eq!(slot_1_stores[&1].approx_stored_count(), 2);
let slot_0_stores = &db.storage.get_slot_stores(0).unwrap();
let slot_1_stores = &db.storage.get_slot_stores(1).unwrap();
let r_slot_0_stores = slot_0_stores.read().unwrap();
let r_slot_1_stores = slot_1_stores.read().unwrap();
assert_eq!(r_slot_0_stores.len(), 1);
assert_eq!(r_slot_1_stores.len(), 1);
assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 1);
assert_eq!(r_slot_1_stores[&1].count(), 2);
assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2);
assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2);
}
}
@ -2761,7 +2759,7 @@ pub mod tests {
db.remove_unrooted_slot(unrooted_slot);
assert!(db.load_slow(&ancestors, &key).is_none());
assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none());
assert!(db.storage.read().unwrap().0.get(&unrooted_slot).is_none());
assert!(db.storage.0.get(&unrooted_slot).is_none());
assert!(db
.accounts_index
.read()
@ -2857,18 +2855,30 @@ pub mod tests {
}
fn check_storage(accounts: &AccountsDB, slot: Slot, count: usize) -> bool {
let storage = accounts.storage.read().unwrap();
assert_eq!(storage.0[&slot].len(), 1);
let slot_storage = storage.0.get(&slot).unwrap();
assert_eq!(
accounts
.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
);
let slot_storages = accounts.storage.get_slot_stores(slot).unwrap();
let mut total_count: usize = 0;
for store in slot_storage.values() {
let r_slot_storages = slot_storages.read().unwrap();
for store in r_slot_storages.values() {
assert_eq!(store.status(), AccountStorageStatus::Available);
total_count += store.count();
}
assert_eq!(total_count, count);
let (expected_store_count, actual_store_count): (usize, usize) = (
slot_storage.values().map(|s| s.approx_stored_count()).sum(),
slot_storage
r_slot_storages
.values()
.map(|s| s.approx_stored_count())
.sum(),
r_slot_storages
.values()
.map(|s| s.accounts.accounts(0).len())
.sum(),
@ -2962,14 +2972,11 @@ pub mod tests {
}
let mut append_vec_histogram = HashMap::new();
for storage in accounts
.storage
.read()
.unwrap()
.0
.values()
.flat_map(|x| x.values())
{
let mut all_storages = vec![];
for slot_storage in accounts.storage.0.iter() {
all_storages.extend(slot_storage.read().unwrap().values().cloned())
}
for storage in all_storages {
*append_vec_histogram.entry(storage.slot).or_insert(0) += 1;
}
for count in append_vec_histogram.values() {
@ -2987,23 +2994,25 @@ pub mod tests {
let account1 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey1);
accounts.store(0, &[(&pubkey1, &account1)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available);
let stores = &accounts.storage.get_slot_stores(0).unwrap();
let r_stores = stores.read().unwrap();
assert_eq!(r_stores.len(), 1);
assert_eq!(r_stores[&0].count(), 1);
assert_eq!(r_stores[&0].status(), AccountStorageStatus::Available);
}
let pubkey2 = Pubkey::new_rand();
let account2 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey2);
accounts.store(0, &[(&pubkey2, &account2)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].len(), 2);
assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Full);
assert_eq!(stores.0[&0][&1].count(), 1);
assert_eq!(stores.0[&0][&1].status(), AccountStorageStatus::Available);
assert_eq!(accounts.storage.0.len(), 1);
let stores = &accounts.storage.get_slot_stores(0).unwrap();
let r_stores = stores.read().unwrap();
assert_eq!(r_stores.len(), 2);
assert_eq!(r_stores[&0].count(), 1);
assert_eq!(r_stores[&0].status(), AccountStorageStatus::Full);
assert_eq!(r_stores[&1].count(), 1);
assert_eq!(r_stores[&1].status(), AccountStorageStatus::Available);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -3020,15 +3029,16 @@ pub mod tests {
let index = i % 2;
accounts.store(0, &[(&pubkey1, &account1)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].len(), 3);
assert_eq!(stores.0[&0][&0].count(), count[index]);
assert_eq!(stores.0[&0][&0].status(), status[0]);
assert_eq!(stores.0[&0][&1].count(), 1);
assert_eq!(stores.0[&0][&1].status(), status[1]);
assert_eq!(stores.0[&0][&2].count(), count[index ^ 1]);
assert_eq!(stores.0[&0][&2].status(), status[0]);
assert_eq!(accounts.storage.0.len(), 1);
let stores = &accounts.storage.get_slot_stores(0).unwrap();
let r_stores = stores.read().unwrap();
assert_eq!(r_stores.len(), 3);
assert_eq!(r_stores[&0].count(), count[index]);
assert_eq!(r_stores[&0].status(), status[0]);
assert_eq!(r_stores[&1].count(), 1);
assert_eq!(r_stores[&1].status(), status[1]);
assert_eq!(r_stores[&2].count(), count[index ^ 1]);
assert_eq!(r_stores[&2].status(), status[0]);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -3084,7 +3094,14 @@ pub mod tests {
accounts.add_root(1);
//slot is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some());
assert!(accounts
.storage
.get_slot_stores(0)
.unwrap()
.read()
.unwrap()
.get(&id)
.is_some());
//store causes clean
accounts.store(1, &[(&pubkey, &account)]);
@ -3092,7 +3109,7 @@ pub mod tests {
//slot is gone
accounts.print_accounts_stats("pre-clean");
accounts.clean_accounts(None);
assert!(accounts.storage.read().unwrap().0.get(&0).is_none());
assert!(accounts.storage.0.get(&0).is_none());
//new value is there
let ancestors = vec![(1, 1)].into_iter().collect();
@ -3101,26 +3118,28 @@ pub mod tests {
impl AccountsDB {
fn alive_account_count_in_store(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot);
let slot_storage = self.storage.get_slot_stores(slot);
if let Some(slot_storage) = slot_storage {
slot_storage.values().map(|store| store.count()).sum()
slot_storage
.read()
.unwrap()
.values()
.map(|store| store.count())
.sum()
} else {
0
}
}
fn all_account_count_in_append_vec(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot);
let slot_storage = self.storage.get_slot_stores(slot);
if let Some(slot_storage) = slot_storage {
let count = slot_storage
let r_slot_storage = slot_storage.read().unwrap();
let count = r_slot_storage
.values()
.map(|store| store.accounts.accounts(0).len())
.sum();
let stored_count: usize = slot_storage
let stored_count: usize = r_slot_storage
.values()
.map(|store| store.approx_stored_count())
.sum();
@ -3194,8 +3213,8 @@ pub mod tests {
// Slot 1 should be removed, slot 0 cannot be removed because it still has
// the latest update for pubkey 2
accounts.clean_accounts(None);
assert!(accounts.storage.read().unwrap().0.get(&0).is_some());
assert!(accounts.storage.read().unwrap().0.get(&1).is_none());
assert!(accounts.storage.get_slot_stores(0).is_some());
assert!(accounts.storage.get_slot_stores(1).is_none());
// Slot 1 should be cleaned because all it's accounts are
// zero lamports, and are not present in any other slot's
@ -3225,8 +3244,8 @@ pub mod tests {
// zero-lamport account should be cleaned
accounts.clean_accounts(None);
assert!(accounts.storage.read().unwrap().0.get(&0).is_none());
assert!(accounts.storage.read().unwrap().0.get(&1).is_none());
assert!(accounts.storage.get_slot_stores(0).is_none());
assert!(accounts.storage.get_slot_stores(1).is_none());
// Slot 0 should be cleaned because all it's accounts have been
// updated in the rooted slot 1
@ -3559,9 +3578,13 @@ pub mod tests {
}
fn assert_no_stores(accounts: &AccountsDB, slot: Slot) {
let stores = accounts.storage.read().unwrap();
info!("{:?}", stores.0.get(&slot));
assert!(stores.0.get(&slot).is_none() || stores.0.get(&slot).unwrap().is_empty());
let slot_stores = accounts.storage.get_slot_stores(slot);
let r_slot_stores = slot_stores.as_ref().map(|slot_stores| {
let r_slot_stores = slot_stores.read().unwrap();
info!("{:?}", *r_slot_stores);
r_slot_stores
});
assert!(r_slot_stores.is_none() || r_slot_stores.unwrap().is_empty());
}
#[test]
@ -3739,10 +3762,9 @@ pub mod tests {
// Store enough accounts such that an additional store for slot 2 is created.
while accounts
.storage
.read()
.get_slot_stores(current_slot)
.unwrap()
.0
.get(&current_slot)
.read()
.unwrap()
.len()
< 2
@ -4400,10 +4422,9 @@ pub mod tests {
db.store(base_slot, &[(&key, &account)]);
db.storage
.write()
.get_slot_stores(base_slot)
.unwrap()
.0
.get_mut(&base_slot)
.write()
.unwrap()
.clear();
db.add_root(base_slot);
@ -4442,8 +4463,15 @@ pub mod tests {
db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
let storage = db.storage.read().unwrap();
storage.0[&0].values().next().unwrap().remove_account();
db.storage
.get_slot_stores(0)
.unwrap()
.read()
.unwrap()
.values()
.next()
.unwrap()
.remove_account();
assert!(db.get_snapshot_storages(after_slot).is_empty());
}
@ -4454,8 +4482,16 @@ pub mod tests {
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
accounts.store(0, &[(&pubkey, &account)]);
let storage = accounts.storage.read().unwrap();
let storage_entry = storage.0[&0].values().next().unwrap();
let storage_entry = accounts
.storage
.get_slot_stores(0)
.unwrap()
.read()
.unwrap()
.values()
.next()
.unwrap()
.clone();
storage_entry.remove_account();
storage_entry.remove_account();
}

View File

@ -33,7 +33,7 @@ use {
io::{BufReader, BufWriter, Read, Write},
path::{Path, PathBuf},
result::Result,
sync::{atomic::Ordering, Arc},
sync::{atomic::Ordering, Arc, RwLock},
time::Instant,
},
};
@ -265,7 +265,7 @@ where
E: Into<AccountStorageEntry>,
P: AsRef<Path>,
{
let accounts_db = AccountsDB::new(account_paths.to_vec(), cluster_type);
let mut accounts_db = AccountsDB::new(account_paths.to_vec(), cluster_type);
let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields;
@ -348,8 +348,11 @@ where
.expect("At least one storage entry must exist from deserializing stream");
{
let mut stores = accounts_db.storage.write().unwrap();
stores.0.extend(storage);
accounts_db.storage.0.extend(
storage.into_iter().map(|(slot, slot_storage_entry)| {
(slot, Arc::new(RwLock::new(slot_storage_entry)))
}),
);
}
accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);