diff --git a/Cargo.lock b/Cargo.lock index 4718ed985a..f73dc53205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" +[[package]] +name = "ahash" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" +dependencies = [ + "const-random", +] + [[package]] name = "aho-corasick" version = "0.7.10" @@ -539,6 +548,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "const-random" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" +dependencies = [ + "const-random-macro", + "proc-macro-hack", +] + +[[package]] +name = "const-random-macro" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" +dependencies = [ + "getrandom", + "proc-macro-hack", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -737,6 +766,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dashmap" +version = "3.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +dependencies = [ + "ahash", + "cfg-if", + "num_cpus", +] + [[package]] name = "derivative" version = "2.1.1" @@ -4293,6 +4333,7 @@ dependencies = [ "byteorder", "bzip2", "crossbeam-channel", + "dashmap", "dir-diff", "flate2", "fnv", diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 44084dbd98..394bfdd967 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -15,6 +15,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" +[[package]] +name = "ahash" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" +dependencies = [ + "const-random", +] + [[package]] name = "aho-corasick" version = "0.7.10" @@ -278,6 +287,26 @@ dependencies = [ "byteorder 1.3.4", ] +[[package]] +name = "const-random" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" +dependencies = [ + "const-random-macro", + "proc-macro-hack", +] + +[[package]] +name = "const-random-macro" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" +dependencies = [ + "getrandom", + "proc-macro-hack", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -403,6 +432,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dashmap" +version = "3.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +dependencies = [ + "ahash", + "cfg-if", + "num_cpus", +] + [[package]] name = "digest" version = "0.8.1" @@ -2027,6 +2067,7 @@ dependencies = [ "byteorder 1.3.4", "bzip2", "crossbeam-channel", + "dashmap", "dir-diff", "flate2", "fnv", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 9ce32dc3ac..c42715d845 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,6 +14,7 @@ blake3 = "0.3.6" bv = { version = "0.11.1", features = ["serde"] } byteorder = "1.3.4" bzip2 = "0.3.3" +dashmap = "3.11.10" crossbeam-channel = "0.4" dir-diff = "0.3.2" flate2 = "1.0.14" diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index da31247aab..7ad58ff886 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -2,6 +2,7 @@ extern crate test; +use rand::Rng; use solana_runtime::{ accounts::{create_test_accounts, Accounts}, bank::*, @@ -11,7 +12,7 @@ use solana_sdk::{ genesis_config::{create_genesis_config, ClusterType}, pubkey::Pubkey, }; -use std::{path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc, thread::Builder}; use test::Bencher; fn deposit_many(bank: &Bank, pubkeys: &mut Vec, num: usize) { @@ -141,3 +142,57 @@ fn bench_delete_dependencies(bencher: &mut Bencher) { accounts.accounts_db.clean_accounts(None); }); } + +#[bench] +#[ignore] +fn bench_concurrent_read_write(bencher: &mut Bencher) { + let num_readers = 5; + let accounts = Arc::new(Accounts::new( + vec![ + PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string())) + .join("concurrent_read_write"), + ], + &ClusterType::Development, + )); + let num_keys = 1000; + let slot = 0; + accounts.add_root(slot); + let pubkeys: Arc> = Arc::new( + (0..num_keys) + .map(|_| { + let pubkey = Pubkey::new_rand(); + let account = Account::new(1, 0, &Account::default().owner); + accounts.store_slow(slot, &pubkey, &account); + pubkey + }) + .collect(), + ); + + for _ in 0..num_readers { + let accounts = accounts.clone(); + let pubkeys = pubkeys.clone(); + Builder::new() + .name("readers".to_string()) + .spawn(move || { + let mut rng = rand::thread_rng(); + loop { + let i = rng.gen_range(0, num_keys); + test::black_box(accounts.load_slow(&HashMap::new(), &pubkeys[i]).unwrap()); + } + }) + .unwrap(); + } + + let num_new_keys = 1000; + let new_accounts: Vec<_> = (0..num_new_keys) + .map(|_| Account::new(1, 0, &Account::default().owner)) + .collect(); + bencher.iter(|| { + for account in &new_accounts { + // Write to a different slot than the one being read from. Because + // there's a new account pubkey being written to every time, will + // compete for the accounts index lock on every store + accounts.store_slow(slot + 1, &Pubkey::new_rand(), &account); + } + }) +} diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 2f6b5c4fd8..04a5aff8c8 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -303,7 +303,6 @@ impl Accounts { //PERF: hold the lock to scan for the references, but not to clone the accounts //TODO: two locks usually leads to deadlocks, should this be one structure? let accounts_index = self.accounts_db.accounts_index.read().unwrap(); - let storage = self.accounts_db.storage.read().unwrap(); let fee_config = FeeConfig { secp256k1_program_enabled: feature_set @@ -328,7 +327,7 @@ impl Accounts { }; let load_res = self.load_tx_accounts( - &storage, + &self.accounts_db.storage, ancestors, &accounts_index, tx, @@ -343,7 +342,7 @@ impl Accounts { }; let load_res = Self::load_loaders( - &storage, + &self.accounts_db.storage, ancestors, &accounts_index, tx, @@ -1507,10 +1506,9 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let accounts_index = accounts.accounts_db.accounts_index.read().unwrap(); - let storage = accounts.accounts_db.storage.read().unwrap(); assert_eq!( Accounts::load_executable_accounts( - &storage, + &accounts.accounts_db.storage, &ancestors, &accounts_index, &Pubkey::new_rand(), diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index e701b2c75b..afd43fa0ac 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -23,6 +23,7 @@ use crate::{ append_vec::{AppendVec, StoredAccount, StoredMeta}, }; use blake3::traits::digest::Digest; +use dashmap::DashMap; use lazy_static::lazy_static; use log::*; use rand::{thread_rng, Rng}; @@ -46,7 +47,7 @@ use std::{ ops::RangeBounds, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}, + sync::{Arc, Mutex, MutexGuard, RwLock}, time::Instant, }; use tempfile::TempDir; @@ -98,7 +99,7 @@ pub type SnapshotStorage = Vec>; pub type SnapshotStorages = Vec; // Each slot has a set of storage entries. -pub(crate) type SlotStores = HashMap>; +pub(crate) type SlotStores = Arc>>>; type AccountSlots = HashMap>; type AppendVecOffsets = HashMap>; @@ -121,31 +122,31 @@ impl Versioned for (u64, AccountInfo) { } #[derive(Clone, Default, Debug)] -pub struct AccountStorage(pub HashMap); +pub struct AccountStorage(pub DashMap); impl AccountStorage { - fn scan_accounts(&self, account_info: &AccountInfo, slot: Slot) -> Option<(Account, Slot)> { + fn get_account_storage_entry( + &self, + slot: Slot, + store_id: AppendVecId, + ) -> Option> { self.0 .get(&slot) - .and_then(|storage_map| storage_map.get(&account_info.store_id)) - .and_then(|store| { - Some( - store - .accounts - .get_account(account_info.offset)? - .0 - .clone_account(), - ) - }) - .map(|account| (account, slot)) + .and_then(|storage_map| storage_map.value().read().unwrap().get(&store_id).cloned()) + } + + fn get_slot_stores(&self, slot: Slot) -> Option { + self.0.get(&slot).map(|result| result.value().clone()) } fn slot_store_count(&self, slot: Slot, store_id: AppendVecId) -> Option { - self.0 - .get(&slot) - .and_then(|slot_storages| slot_storages.get(&store_id)) + self.get_account_storage_entry(slot, store_id) .map(|store| store.count_and_status.read().unwrap().0) } + + fn all_slots(&self) -> Vec { + self.0.iter().map(|iter_item| *iter_item.key()).collect() + } } #[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize, AbiExample, AbiEnumVisitor)] @@ -269,6 +270,15 @@ impl AccountStorageEntry { self.accounts.flush() } + fn get_account(&self, account_info: &AccountInfo) -> Option { + Some( + self.accounts + .get_account(account_info.offset)? + .0 + .clone_account(), + ) + } + fn add_account(&self) { let mut count_and_status = self.count_and_status.write().unwrap(); *count_and_status = (count_and_status.0 + 1, count_and_status.1); @@ -394,7 +404,7 @@ pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per slot basis pub accounts_index: RwLock>, - pub storage: RwLock, + pub storage: AccountStorage, /// distribute the accounts across storage lists pub next_id: AtomicUsize, @@ -471,7 +481,7 @@ impl Default for AccountsDB { bank_hashes.insert(0, BankHashInfo::default()); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(AccountStorage(HashMap::new())), + storage: AccountStorage(DashMap::new()), next_id: AtomicUsize::new(0), shrink_candidate_slots: Mutex::new(Vec::new()), write_version: AtomicU64::new(0), @@ -792,8 +802,6 @@ impl AccountsDB { key_set.insert(*key); let count = self .storage - .read() - .unwrap() .slot_store_count(*slot, account_info.store_id) .unwrap() - 1; @@ -965,8 +973,8 @@ impl AccountsDB { let mut stored_accounts = vec![]; let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); { - let slot_storages = self.storage.read().unwrap().0.get(&slot).cloned(); - if let Some(stores) = slot_storages { + if let Some(stores_lock) = self.storage.get_slot_stores(slot) { + let stores = stores_lock.read().unwrap(); let mut alive_count = 0; let mut stored_count = 0; for store in stores.values() { @@ -1099,15 +1107,14 @@ impl AccountsDB { start.stop(); update_index_elapsed = start.as_us(); - let mut start = Measure::start("update_index_elapsed"); + let mut start = Measure::start("handle_reclaims_elapsed"); self.handle_reclaims(&reclaims, Some(slot), true, None); start.stop(); handle_reclaims_elapsed = start.as_us(); let mut start = Measure::start("write_storage_elapsed"); - let mut storage = self.storage.write().unwrap(); - if let Some(slot_storage) = storage.0.get_mut(&slot) { - slot_storage.retain(|_key, store| { + if let Some(slot_stores) = self.storage.get_slot_stores(slot) { + slot_stores.write().unwrap().retain(|_key, store| { if store.count() == 0 { dead_storages.push(store.clone()); } @@ -1178,8 +1185,7 @@ impl AccountsDB { } fn all_slots_in_storage(&self) -> Vec { - let storage = self.storage.read().unwrap(); - storage.0.keys().cloned().collect() + self.storage.all_slots() } pub fn process_stale_slot(&self) -> usize { @@ -1221,14 +1227,11 @@ impl AccountsDB { { let mut collector = A::default(); let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| { - scan_func( - &mut collector, - storage - .scan_accounts(account_info, slot) - .map(|(account, slot)| (pubkey, account, slot)), - ) + let account_slot = self + .get_account_from_storage(slot, account_info) + .map(|account| (pubkey, account, slot)); + scan_func(&mut collector, account_slot) }); collector } @@ -1241,46 +1244,34 @@ impl AccountsDB { { let mut collector = A::default(); let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); accounts_index.range_scan_accounts(ancestors, range, |pubkey, (account_info, slot)| { - scan_func( - &mut collector, - storage - .scan_accounts(account_info, slot) - .map(|(account, slot)| (pubkey, account, slot)), - ) + let account_slot = self + .get_account_from_storage(slot, account_info) + .map(|account| (pubkey, account, slot)); + scan_func(&mut collector, account_slot) }); collector } - /// Scan a specific slot through all the account storage in parallel with sequential read - // PERF: Sequentially read each storage entry in parallel + /// Scan a specific slot through all the account storage in parallel pub fn scan_account_storage(&self, slot: Slot, scan_func: F) -> Vec where F: Fn(&StoredAccount, AppendVecId, &mut B) + Send + Sync, B: Send + Default, { - self.scan_account_storage_inner(slot, scan_func, &self.storage.read().unwrap()) + self.scan_account_storage_inner(slot, scan_func) } - // The input storage must come from self.storage.read().unwrap() - fn scan_account_storage_inner( - &self, - slot: Slot, - scan_func: F, - storage: &RwLockReadGuard, - ) -> Vec + fn scan_account_storage_inner(&self, slot: Slot, scan_func: F) -> Vec where F: Fn(&StoredAccount, AppendVecId, &mut B) + Send + Sync, B: Send + Default, { - let storage_maps: Vec> = storage - .0 - .get(&slot) - .unwrap_or(&HashMap::new()) - .values() - .cloned() - .collect(); + let storage_maps: Vec> = self + .storage + .get_slot_stores(slot) + .map(|res| res.read().unwrap().values().cloned().collect()) + .unwrap_or_default(); self.thread_pool.install(|| { storage_maps .into_par_iter() @@ -1323,15 +1314,15 @@ impl AccountsDB { let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None)?; let slot = lock[index].0; //TODO: thread this as a ref - if let Some(slot_storage) = storage.0.get(&slot) { - let info = &lock[index].1; - slot_storage - .get(&info.store_id) - .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) - .map(|account| (account, slot)) - } else { - None - } + storage + .get_account_storage_entry(slot, lock[index].1.store_id) + .and_then(|store| { + let info = &lock[index].1; + store + .accounts + .get_account(info.offset) + .map(|account| (account.0.clone_account(), slot)) + }) } #[cfg(test)] @@ -1339,25 +1330,33 @@ impl AccountsDB { let accounts_index = self.accounts_index.read().unwrap(); let (lock, index) = accounts_index.get(pubkey, Some(ancestors), None).unwrap(); let slot = lock[index].0; - let storage = self.storage.read().unwrap(); - let slot_storage = storage.0.get(&slot).unwrap(); let info = &lock[index].1; - let entry = slot_storage.get(&info.store_id).unwrap(); + let entry = self + .storage + .get_account_storage_entry(slot, info.store_id) + .unwrap(); let account = entry.accounts.get_account(info.offset); *account.as_ref().unwrap().0.hash } pub fn load_slow(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Option<(Account, Slot)> { let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); - Self::load(&storage, ancestors, &accounts_index, pubkey) + Self::load(&self.storage, ancestors, &accounts_index, pubkey) + } + + fn get_account_from_storage(&self, slot: Slot, account_info: &AccountInfo) -> Option { + let account_storage_entry = self + .storage + .get_account_storage_entry(slot, account_info.store_id); + account_storage_entry + .and_then(|account_storage_entry| account_storage_entry.get_account(account_info)) } fn find_storage_candidate(&self, slot: Slot) -> Arc { let mut create_extra = false; - let stores = self.storage.read().unwrap(); - - if let Some(slot_stores) = stores.0.get(&slot) { + let slot_stores_lock = self.storage.get_slot_stores(slot); + if let Some(slot_stores_lock) = slot_stores_lock { + let slot_stores = slot_stores_lock.read().unwrap(); if !slot_stores.is_empty() { if slot_stores.len() <= self.min_num_stores { let mut total_accounts = 0; @@ -1377,7 +1376,7 @@ impl AccountsDB { for (i, store) in slot_stores.values().cycle().skip(to_skip).enumerate() { if store.try_available() { let ret = store.clone(); - drop(stores); + drop(slot_stores); if create_extra { self.create_and_insert_store(slot, self.file_size); } @@ -1391,8 +1390,6 @@ impl AccountsDB { } } - drop(stores); - let store = self.create_and_insert_store(slot, self.file_size); store.try_available(); store @@ -1403,10 +1400,15 @@ impl AccountsDB { let store = Arc::new(self.new_storage_entry(slot, &Path::new(&self.paths[path_index]), size)); let store_for_index = store.clone(); - - let mut stores = self.storage.write().unwrap(); - let slot_storage = stores.0.entry(slot).or_insert_with(HashMap::new); - slot_storage.insert(store.id, store_for_index); + let slot_storage = self + .storage + .0 + .entry(slot) + .or_insert(Arc::new(RwLock::new(HashMap::new()))); + slot_storage + .write() + .unwrap() + .insert(store.id, store_for_index); store } @@ -1424,27 +1426,25 @@ impl AccountsDB { .filter(|slot| !accounts_index.is_root(**slot)) .collect(); drop(accounts_index); - let mut storage_lock_elapsed = Measure::start("storage_lock_elapsed"); - let mut storage = self.storage.write().unwrap(); - storage_lock_elapsed.stop(); - let mut all_removed_slot_storages = vec![]; let mut total_removed_storage_entries = 0; let mut total_removed_bytes = 0; let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed"); for slot in non_roots { - if let Some(slot_removed_storages) = storage.0.remove(&slot) { - total_removed_storage_entries += slot_removed_storages.len(); - total_removed_bytes += slot_removed_storages - .values() - .map(|i| i.accounts.capacity()) - .sum::(); + if let Some((_, slot_removed_storages)) = self.storage.0.remove(&slot) { + { + let r_slot_removed_storages = slot_removed_storages.read().unwrap(); + total_removed_storage_entries += r_slot_removed_storages.len(); + total_removed_bytes += r_slot_removed_storages + .values() + .map(|i| i.accounts.capacity()) + .sum::(); + } all_removed_slot_storages.push(slot_removed_storages); } } remove_storages_elapsed.stop(); - drop(storage); let num_slots_removed = all_removed_slot_storages.len(); @@ -1456,7 +1456,6 @@ impl AccountsDB { datapoint_info!( "purge_slots_time", - ("storage_lock_elapsed", storage_lock_elapsed.as_us(), i64), ( "remove_storages_elapsed", remove_storages_elapsed.as_us(), @@ -1503,7 +1502,7 @@ impl AccountsDB { // 1) Remove old bank hash from self.bank_hashes // 2) Purge this slot's storage entries from self.storage self.handle_reclaims(&reclaims, Some(remove_slot), false, None); - assert!(self.storage.read().unwrap().0.get(&remove_slot).is_none()); + assert!(self.storage.get_slot_stores(remove_slot).is_none()); } fn include_owner(cluster_type: &ClusterType, slot: Slot) -> bool { @@ -1777,8 +1776,9 @@ impl AccountsDB { let mut max_slot = 0; let mut newest_slot = 0; let mut oldest_slot = std::u64::MAX; - let stores = self.storage.read().unwrap(); - for (slot, slot_stores) in &stores.0 { + for iter_item in self.storage.0.iter() { + let slot = iter_item.key(); + let slot_stores = iter_item.value().read().unwrap(); total_count += slot_stores.len(); if slot_stores.len() < min { min = slot_stores.len(); @@ -1797,7 +1797,6 @@ impl AccountsDB { oldest_slot = *slot; } } - drop(stores); info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})", total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min); datapoint_info!("accounts_db-stores", ("total_count", total_count, i64)); @@ -1935,7 +1934,6 @@ impl AccountsDB { use BankHashVerificationError::*; let mut scan = Measure::start("scan"); let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); let keys: Vec<_> = accounts_index.account_maps.keys().collect(); let mismatch_found = AtomicU64::new(0); let hashes: Vec<_> = keys @@ -1944,10 +1942,8 @@ impl AccountsDB { if let Some((list, index)) = accounts_index.get(pubkey, Some(ancestors), None) { let (slot, account_info) = &list[index]; if account_info.lamports != 0 { - storage - .0 - .get(&slot) - .and_then(|storage_map| storage_map.get(&account_info.store_id)) + self.storage + .get_account_storage_entry(*slot, account_info.store_id) .and_then(|store| { let account = store.accounts.get_account(account_info.offset)?.0; let balance = Self::account_balance_for_capitalization( @@ -2128,7 +2124,6 @@ impl AccountsDB { expected_slot: Option, mut reclaimed_offsets: Option<&mut AppendVecOffsets>, ) -> HashSet { - let storage = self.storage.read().unwrap(); let mut dead_slots = HashSet::new(); for (slot, account_info) in reclaims { if let Some(ref mut reclaimed_offsets) = reclaimed_offsets { @@ -2140,23 +2135,25 @@ impl AccountsDB { if let Some(expected_slot) = expected_slot { assert_eq!(*slot, expected_slot); } - if let Some(slot_storage) = storage.0.get(slot) { - if let Some(store) = slot_storage.get(&account_info.store_id) { - assert_eq!( - *slot, store.slot, - "AccountDB::accounts_index corrupted. Storage should only point to one slot" - ); - let count = store.remove_account(); - if count == 0 { - dead_slots.insert(*slot); - } + if let Some(store) = self + .storage + .get_account_storage_entry(*slot, account_info.store_id) + { + assert_eq!( + *slot, store.slot, + "AccountDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, should only point to one slot", + store.slot, *slot + ); + let count = store.remove_account(); + if count == 0 { + dead_slots.insert(*slot); } } } dead_slots.retain(|slot| { - if let Some(slot_storage) = storage.0.get(&slot) { - for x in slot_storage.values() { + if let Some(slot_stores) = self.storage.get_slot_stores(*slot) { + for x in slot_stores.read().unwrap().values() { if x.count() != 0 { return false; } @@ -2175,16 +2172,14 @@ impl AccountsDB { ) { { let mut measure = Measure::start("clean_dead_slots-ms"); - let storage = self.storage.read().unwrap(); let mut stores: Vec> = vec![]; for slot in dead_slots.iter() { - if let Some(slot_storage) = storage.0.get(slot) { - for store in slot_storage.values() { + if let Some(slot_storage) = self.storage.get_slot_stores(*slot) { + for store in slot_storage.read().unwrap().values() { stores.push(store.clone()); } } } - drop(storage); datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64)); let slot_pubkeys: HashSet<(Slot, Pubkey)> = { self.thread_pool_clean.install(|| { @@ -2334,15 +2329,18 @@ impl AccountsDB { pub fn get_snapshot_storages(&self, snapshot_slot: Slot) -> SnapshotStorages { let accounts_index = self.accounts_index.read().unwrap(); - let r_storage = self.storage.read().unwrap(); - r_storage + self.storage .0 .iter() - .filter(|(slot, _slot_stores)| { - **slot <= snapshot_slot && accounts_index.is_root(**slot) + .filter(|iter_item| { + let slot = *iter_item.key(); + slot <= snapshot_slot && accounts_index.is_root(slot) }) - .map(|(_slot, slot_stores)| { - slot_stores + .map(|iter_item| { + iter_item + .value() + .read() + .unwrap() .values() .filter(|x| x.has_accounts()) .cloned() @@ -2368,8 +2366,7 @@ impl AccountsDB { pub fn generate_index(&self) { let mut accounts_index = self.accounts_index.write().unwrap(); - let storage = self.storage.read().unwrap(); - let mut slots: Vec = storage.0.keys().cloned().collect(); + let mut slots = self.storage.all_slots(); slots.sort(); let mut last_log_update = Instant::now(); @@ -2396,7 +2393,6 @@ impl AccountsDB { .or_insert_with(Vec::new); entry.push((stored_account.meta.write_version, account_info)); }, - &storage, ); let mut accounts_map: HashMap> = HashMap::new(); @@ -2432,8 +2428,8 @@ impl AccountsDB { *counts.entry(account_entry.store_id).or_insert(0) += 1; } } - for slot_stores in storage.0.values() { - for (id, store) in slot_stores { + for slot_stores in self.storage.0.iter() { + for (id, store) in slot_stores.value().read().unwrap().iter() { if let Some(count) = counts.get(&id) { trace!( "id: {} setting count: {} cur: {}", @@ -2476,17 +2472,16 @@ impl AccountsDB { } fn print_count_and_status(&self, label: &'static str) { - let storage = self.storage.read().unwrap(); - let mut slots: Vec<_> = storage.0.keys().cloned().collect(); + let mut slots: Vec<_> = self.storage.all_slots(); slots.sort(); info!("{}: count_and status for {} slots:", label, slots.len()); for slot in &slots { - let slot_stores = storage.0.get(slot).unwrap(); - - let mut ids: Vec<_> = slot_stores.keys().cloned().collect(); + let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); + let r_slot_stores = slot_stores.read().unwrap(); + let mut ids: Vec<_> = r_slot_stores.keys().cloned().collect(); ids.sort(); for id in &ids { - let entry = slot_stores.get(id).unwrap(); + let entry = r_slot_stores.get(id).unwrap(); info!( " slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}", slot, @@ -2674,45 +2669,48 @@ pub mod tests { db.store(1, &[(&pubkey, &account)]); db.store(1, &[(&pubkeys[0], &account)]); { - let stores = db.storage.read().unwrap(); - let slot_0_stores = &stores.0.get(&0).unwrap(); - let slot_1_stores = &stores.0.get(&1).unwrap(); - assert_eq!(slot_0_stores.len(), 1); - assert_eq!(slot_1_stores.len(), 1); - assert_eq!(slot_0_stores[&0].count(), 2); - assert_eq!(slot_1_stores[&1].count(), 2); - assert_eq!(slot_0_stores[&0].approx_stored_count(), 2); - assert_eq!(slot_1_stores[&1].approx_stored_count(), 2); + let slot_0_stores = &db.storage.get_slot_stores(0).unwrap(); + let slot_1_stores = &db.storage.get_slot_stores(1).unwrap(); + let r_slot_0_stores = slot_0_stores.read().unwrap(); + let r_slot_1_stores = slot_1_stores.read().unwrap(); + assert_eq!(r_slot_0_stores.len(), 1); + assert_eq!(r_slot_1_stores.len(), 1); + assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 2); + assert_eq!(r_slot_1_stores[&1].count(), 2); + assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2); + assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2); } // adding root doesn't change anything db.add_root(1); { - let stores = db.storage.read().unwrap(); - let slot_0_stores = &stores.0.get(&0).unwrap(); - let slot_1_stores = &stores.0.get(&1).unwrap(); - assert_eq!(slot_0_stores.len(), 1); - assert_eq!(slot_1_stores.len(), 1); - assert_eq!(slot_0_stores[&0].count(), 2); - assert_eq!(slot_1_stores[&1].count(), 2); - assert_eq!(slot_0_stores[&0].approx_stored_count(), 2); - assert_eq!(slot_1_stores[&1].approx_stored_count(), 2); + let slot_0_stores = &db.storage.get_slot_stores(0).unwrap(); + let slot_1_stores = &db.storage.get_slot_stores(1).unwrap(); + let r_slot_0_stores = slot_0_stores.read().unwrap(); + let r_slot_1_stores = slot_1_stores.read().unwrap(); + assert_eq!(r_slot_0_stores.len(), 1); + assert_eq!(r_slot_1_stores.len(), 1); + assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 2); + assert_eq!(r_slot_1_stores[&1].count(), 2); + assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2); + assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2); } - // overwrite old rooted account version; only the slot_0_stores.count() should be + // overwrite old rooted account version; only the r_slot_0_stores.count() should be // decremented db.store(2, &[(&pubkeys[0], &account)]); db.clean_accounts(None); { - let stores = db.storage.read().unwrap(); - let slot_0_stores = &stores.0.get(&0).unwrap(); - let slot_1_stores = &stores.0.get(&1).unwrap(); - assert_eq!(slot_0_stores.len(), 1); - assert_eq!(slot_1_stores.len(), 1); - assert_eq!(slot_0_stores[&0].count(), 1); - assert_eq!(slot_1_stores[&1].count(), 2); - assert_eq!(slot_0_stores[&0].approx_stored_count(), 2); - assert_eq!(slot_1_stores[&1].approx_stored_count(), 2); + let slot_0_stores = &db.storage.get_slot_stores(0).unwrap(); + let slot_1_stores = &db.storage.get_slot_stores(1).unwrap(); + let r_slot_0_stores = slot_0_stores.read().unwrap(); + let r_slot_1_stores = slot_1_stores.read().unwrap(); + assert_eq!(r_slot_0_stores.len(), 1); + assert_eq!(r_slot_1_stores.len(), 1); + assert_eq!(r_slot_0_stores.get(&0).unwrap().count(), 1); + assert_eq!(r_slot_1_stores[&1].count(), 2); + assert_eq!(r_slot_0_stores.get(&0).unwrap().approx_stored_count(), 2); + assert_eq!(r_slot_1_stores[&1].approx_stored_count(), 2); } } @@ -2761,7 +2759,7 @@ pub mod tests { db.remove_unrooted_slot(unrooted_slot); assert!(db.load_slow(&ancestors, &key).is_none()); assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); - assert!(db.storage.read().unwrap().0.get(&unrooted_slot).is_none()); + assert!(db.storage.0.get(&unrooted_slot).is_none()); assert!(db .accounts_index .read() @@ -2857,18 +2855,30 @@ pub mod tests { } fn check_storage(accounts: &AccountsDB, slot: Slot, count: usize) -> bool { - let storage = accounts.storage.read().unwrap(); - assert_eq!(storage.0[&slot].len(), 1); - let slot_storage = storage.0.get(&slot).unwrap(); + assert_eq!( + accounts + .storage + .get_slot_stores(slot) + .unwrap() + .read() + .unwrap() + .len(), + 1 + ); + let slot_storages = accounts.storage.get_slot_stores(slot).unwrap(); let mut total_count: usize = 0; - for store in slot_storage.values() { + let r_slot_storages = slot_storages.read().unwrap(); + for store in r_slot_storages.values() { assert_eq!(store.status(), AccountStorageStatus::Available); total_count += store.count(); } assert_eq!(total_count, count); let (expected_store_count, actual_store_count): (usize, usize) = ( - slot_storage.values().map(|s| s.approx_stored_count()).sum(), - slot_storage + r_slot_storages + .values() + .map(|s| s.approx_stored_count()) + .sum(), + r_slot_storages .values() .map(|s| s.accounts.accounts(0).len()) .sum(), @@ -2962,14 +2972,11 @@ pub mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts - .storage - .read() - .unwrap() - .0 - .values() - .flat_map(|x| x.values()) - { + let mut all_storages = vec![]; + for slot_storage in accounts.storage.0.iter() { + all_storages.extend(slot_storage.read().unwrap().values().cloned()) + } + for storage in all_storages { *append_vec_histogram.entry(storage.slot).or_insert(0) += 1; } for count in append_vec_histogram.values() { @@ -2987,23 +2994,25 @@ pub mod tests { let account1 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey1); accounts.store(0, &[(&pubkey1, &account1)]); { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 1); - assert_eq!(stores.0[&0][&0].count(), 1); - assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available); + let stores = &accounts.storage.get_slot_stores(0).unwrap(); + let r_stores = stores.read().unwrap(); + assert_eq!(r_stores.len(), 1); + assert_eq!(r_stores[&0].count(), 1); + assert_eq!(r_stores[&0].status(), AccountStorageStatus::Available); } let pubkey2 = Pubkey::new_rand(); let account2 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey2); accounts.store(0, &[(&pubkey2, &account2)]); { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 1); - assert_eq!(stores.0[&0].len(), 2); - assert_eq!(stores.0[&0][&0].count(), 1); - assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Full); - assert_eq!(stores.0[&0][&1].count(), 1); - assert_eq!(stores.0[&0][&1].status(), AccountStorageStatus::Available); + assert_eq!(accounts.storage.0.len(), 1); + let stores = &accounts.storage.get_slot_stores(0).unwrap(); + let r_stores = stores.read().unwrap(); + assert_eq!(r_stores.len(), 2); + assert_eq!(r_stores[&0].count(), 1); + assert_eq!(r_stores[&0].status(), AccountStorageStatus::Full); + assert_eq!(r_stores[&1].count(), 1); + assert_eq!(r_stores[&1].status(), AccountStorageStatus::Available); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -3020,15 +3029,16 @@ pub mod tests { let index = i % 2; accounts.store(0, &[(&pubkey1, &account1)]); { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 1); - assert_eq!(stores.0[&0].len(), 3); - assert_eq!(stores.0[&0][&0].count(), count[index]); - assert_eq!(stores.0[&0][&0].status(), status[0]); - assert_eq!(stores.0[&0][&1].count(), 1); - assert_eq!(stores.0[&0][&1].status(), status[1]); - assert_eq!(stores.0[&0][&2].count(), count[index ^ 1]); - assert_eq!(stores.0[&0][&2].status(), status[0]); + assert_eq!(accounts.storage.0.len(), 1); + let stores = &accounts.storage.get_slot_stores(0).unwrap(); + let r_stores = stores.read().unwrap(); + assert_eq!(r_stores.len(), 3); + assert_eq!(r_stores[&0].count(), count[index]); + assert_eq!(r_stores[&0].status(), status[0]); + assert_eq!(r_stores[&1].count(), 1); + assert_eq!(r_stores[&1].status(), status[1]); + assert_eq!(r_stores[&2].count(), count[index ^ 1]); + assert_eq!(r_stores[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -3084,7 +3094,14 @@ pub mod tests { accounts.add_root(1); //slot is still there, since gc is lazy - assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some()); + assert!(accounts + .storage + .get_slot_stores(0) + .unwrap() + .read() + .unwrap() + .get(&id) + .is_some()); //store causes clean accounts.store(1, &[(&pubkey, &account)]); @@ -3092,7 +3109,7 @@ pub mod tests { //slot is gone accounts.print_accounts_stats("pre-clean"); accounts.clean_accounts(None); - assert!(accounts.storage.read().unwrap().0.get(&0).is_none()); + assert!(accounts.storage.0.get(&0).is_none()); //new value is there let ancestors = vec![(1, 1)].into_iter().collect(); @@ -3101,26 +3118,28 @@ pub mod tests { impl AccountsDB { fn alive_account_count_in_store(&self, slot: Slot) -> usize { - let storage = self.storage.read().unwrap(); - - let slot_storage = storage.0.get(&slot); + let slot_storage = self.storage.get_slot_stores(slot); if let Some(slot_storage) = slot_storage { - slot_storage.values().map(|store| store.count()).sum() + slot_storage + .read() + .unwrap() + .values() + .map(|store| store.count()) + .sum() } else { 0 } } fn all_account_count_in_append_vec(&self, slot: Slot) -> usize { - let storage = self.storage.read().unwrap(); - - let slot_storage = storage.0.get(&slot); + let slot_storage = self.storage.get_slot_stores(slot); if let Some(slot_storage) = slot_storage { - let count = slot_storage + let r_slot_storage = slot_storage.read().unwrap(); + let count = r_slot_storage .values() .map(|store| store.accounts.accounts(0).len()) .sum(); - let stored_count: usize = slot_storage + let stored_count: usize = r_slot_storage .values() .map(|store| store.approx_stored_count()) .sum(); @@ -3194,8 +3213,8 @@ pub mod tests { // Slot 1 should be removed, slot 0 cannot be removed because it still has // the latest update for pubkey 2 accounts.clean_accounts(None); - assert!(accounts.storage.read().unwrap().0.get(&0).is_some()); - assert!(accounts.storage.read().unwrap().0.get(&1).is_none()); + assert!(accounts.storage.get_slot_stores(0).is_some()); + assert!(accounts.storage.get_slot_stores(1).is_none()); // Slot 1 should be cleaned because all it's accounts are // zero lamports, and are not present in any other slot's @@ -3225,8 +3244,8 @@ pub mod tests { // zero-lamport account should be cleaned accounts.clean_accounts(None); - assert!(accounts.storage.read().unwrap().0.get(&0).is_none()); - assert!(accounts.storage.read().unwrap().0.get(&1).is_none()); + assert!(accounts.storage.get_slot_stores(0).is_none()); + assert!(accounts.storage.get_slot_stores(1).is_none()); // Slot 0 should be cleaned because all it's accounts have been // updated in the rooted slot 1 @@ -3559,9 +3578,13 @@ pub mod tests { } fn assert_no_stores(accounts: &AccountsDB, slot: Slot) { - let stores = accounts.storage.read().unwrap(); - info!("{:?}", stores.0.get(&slot)); - assert!(stores.0.get(&slot).is_none() || stores.0.get(&slot).unwrap().is_empty()); + let slot_stores = accounts.storage.get_slot_stores(slot); + let r_slot_stores = slot_stores.as_ref().map(|slot_stores| { + let r_slot_stores = slot_stores.read().unwrap(); + info!("{:?}", *r_slot_stores); + r_slot_stores + }); + assert!(r_slot_stores.is_none() || r_slot_stores.unwrap().is_empty()); } #[test] @@ -3739,10 +3762,9 @@ pub mod tests { // Store enough accounts such that an additional store for slot 2 is created. while accounts .storage - .read() + .get_slot_stores(current_slot) .unwrap() - .0 - .get(¤t_slot) + .read() .unwrap() .len() < 2 @@ -4400,10 +4422,9 @@ pub mod tests { db.store(base_slot, &[(&key, &account)]); db.storage - .write() + .get_slot_stores(base_slot) .unwrap() - .0 - .get_mut(&base_slot) + .write() .unwrap() .clear(); db.add_root(base_slot); @@ -4442,8 +4463,15 @@ pub mod tests { db.add_root(base_slot); assert_eq!(1, db.get_snapshot_storages(after_slot).len()); - let storage = db.storage.read().unwrap(); - storage.0[&0].values().next().unwrap().remove_account(); + db.storage + .get_slot_stores(0) + .unwrap() + .read() + .unwrap() + .values() + .next() + .unwrap() + .remove_account(); assert!(db.get_snapshot_storages(after_slot).is_empty()); } @@ -4454,8 +4482,16 @@ pub mod tests { let pubkey = Pubkey::new_rand(); let account = Account::new(1, 0, &Account::default().owner); accounts.store(0, &[(&pubkey, &account)]); - let storage = accounts.storage.read().unwrap(); - let storage_entry = storage.0[&0].values().next().unwrap(); + let storage_entry = accounts + .storage + .get_slot_stores(0) + .unwrap() + .read() + .unwrap() + .values() + .next() + .unwrap() + .clone(); storage_entry.remove_account(); storage_entry.remove_account(); } diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index ad1c691be9..f914e25749 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -33,7 +33,7 @@ use { io::{BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, result::Result, - sync::{atomic::Ordering, Arc}, + sync::{atomic::Ordering, Arc, RwLock}, time::Instant, }, }; @@ -265,7 +265,7 @@ where E: Into, P: AsRef, { - let accounts_db = AccountsDB::new(account_paths.to_vec(), cluster_type); + let mut accounts_db = AccountsDB::new(account_paths.to_vec(), cluster_type); let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields; @@ -348,8 +348,11 @@ where .expect("At least one storage entry must exist from deserializing stream"); { - let mut stores = accounts_db.storage.write().unwrap(); - stores.0.extend(storage); + accounts_db.storage.0.extend( + storage.into_iter().map(|(slot, slot_storage_entry)| { + (slot, Arc::new(RwLock::new(slot_storage_entry))) + }), + ); } accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);