Parallel cache scan (#14544)

* Parallel cache scan

* PR comments

* PR comments

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2021-01-20 00:50:17 -08:00 committed by GitHub
parent a480b63234
commit 2745b79b74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 226 additions and 182 deletions

37
Cargo.lock generated
View File

@ -25,15 +25,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "ahash"
version = "0.4.6"
@ -611,26 +602,6 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom 0.1.14",
"proc-macro-hack",
]
[[package]]
name = "const_fn"
version = "0.4.5"
@ -883,13 +854,13 @@ dependencies = [
[[package]]
name = "dashmap"
version = "3.11.10"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"ahash 0.3.8",
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"num_cpus",
"rayon",
]
[[package]]

View File

@ -25,15 +25,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d"
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "aho-corasick"
version = "0.7.10"
@ -389,26 +380,6 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom 0.1.14",
"proc-macro-hack",
]
[[package]]
name = "const_fn"
version = "0.4.5"
@ -604,13 +575,13 @@ dependencies = [
[[package]]
name = "dashmap"
version = "3.11.10"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"ahash",
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"num_cpus",
"rayon",
]
[[package]]

View File

@ -14,7 +14,7 @@ blake3 = "0.3.6"
bv = { version = "0.11.1", features = ["serde"] }
byteorder = "1.3.4"
bzip2 = "0.3.3"
dashmap = "3.11.10"
dashmap = { version = "4.0.2", features = ["rayon"] }
crossbeam-channel = "0.4"
dir-diff = "0.3.2"
flate2 = "1.0.14"

View File

@ -4,6 +4,7 @@ extern crate test;
use dashmap::DashMap;
use rand::Rng;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use solana_runtime::{
accounts::{create_test_accounts, Accounts},
bank::*,
@ -11,6 +12,7 @@ use solana_runtime::{
use solana_sdk::{
account::Account,
genesis_config::{create_genesis_config, ClusterType},
hash::Hash,
pubkey::Pubkey,
};
use std::{
@ -297,3 +299,59 @@ fn bench_rwlock_hashmap_single_reader_with_n_writers(bencher: &mut Bencher) {
}
})
}
fn setup_bench_dashmap_iter() -> (Arc<Accounts>, DashMap<Pubkey, (Account, Hash)>) {
let accounts = Arc::new(Accounts::new_with_config(
vec![
PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()))
.join("bench_dashmap_par_iter"),
],
&ClusterType::Development,
HashSet::new(),
false,
));
let dashmap = DashMap::new();
let num_keys = std::env::var("NUM_BENCH_KEYS")
.map(|num_keys| num_keys.parse::<usize>().unwrap())
.unwrap_or_else(|_| 10000);
for _ in 0..num_keys {
dashmap.insert(
Pubkey::new_unique(),
(
Account::new(1, 0, &Account::default().owner),
Hash::new_unique(),
),
);
}
(accounts, dashmap)
}
#[bench]
fn bench_dashmap_par_iter(bencher: &mut Bencher) {
let (accounts, dashmap) = setup_bench_dashmap_iter();
bencher.iter(|| {
test::black_box(accounts.accounts_db.thread_pool.install(|| {
dashmap
.par_iter()
.map(|cached_account| (*cached_account.key(), cached_account.value().1))
.collect::<Vec<(Pubkey, Hash)>>()
}));
});
}
#[bench]
fn bench_dashmap_iter(bencher: &mut Bencher) {
let (_accounts, dashmap) = setup_bench_dashmap_iter();
bencher.iter(|| {
test::black_box(
dashmap
.iter()
.map(|cached_account| (*cached_account.key(), cached_account.value().1))
.collect::<Vec<(Pubkey, Hash)>>(),
);
});
}

View File

@ -1,5 +1,5 @@
use crate::{
accounts_db::{AccountsDB, AppendVecId, BankHashInfo, ErrorCounters, LoadedAccount},
accounts_db::{AccountsDB, BankHashInfo, ErrorCounters, LoadedAccount, ScanStorageResult},
accounts_index::{AccountIndex, Ancestors, IndexKey},
bank::{
NonceRollbackFull, NonceRollbackInfo, TransactionCheckResult, TransactionExecutionResult,
@ -9,9 +9,12 @@ use crate::{
system_instruction_processor::{get_system_account_kind, SystemAccountKind},
transaction_utils::OrderedIterator,
};
use dashmap::{
mapref::entry::Entry::{Occupied, Vacant},
DashMap,
};
use log::*;
use rand::{thread_rng, Rng};
use rayon::slice::ParallelSliceMut;
use solana_sdk::{
account::Account,
account_utils::StateMut,
@ -453,28 +456,48 @@ impl Accounts {
pub fn scan_slot<F, B>(&self, slot: Slot, func: F) -> Vec<B>
where
F: Fn(LoadedAccount) -> Option<B> + Send + Sync,
B: Send + Default,
B: Sync + Send + Default + std::cmp::Eq,
{
let accumulator: Vec<Vec<(Pubkey, u64, B)>> = self.accounts_db.scan_account_storage(
let scan_result = self.accounts_db.scan_account_storage(
slot,
|loaded_account: LoadedAccount, _id: AppendVecId, accum: &mut Vec<(Pubkey, u64, B)>| {
let pubkey = *loaded_account.pubkey();
let write_version = loaded_account.write_version();
if let Some(val) = func(loaded_account) {
accum.push((pubkey, std::u64::MAX - write_version, val));
|loaded_account: LoadedAccount| {
// Cache only has one version per key, don't need to worry about versioning
func(loaded_account)
},
|accum: &DashMap<Pubkey, (u64, B)>, loaded_account: LoadedAccount| {
let loaded_account_pubkey = *loaded_account.pubkey();
let loaded_write_version = loaded_account.write_version();
let should_insert = accum
.get(&loaded_account_pubkey)
.map(|existing_entry| loaded_write_version > existing_entry.value().0)
.unwrap_or(true);
if should_insert {
if let Some(val) = func(loaded_account) {
// Detected insertion is necessary, grabs the write lock to commit the write,
match accum.entry(loaded_account_pubkey) {
// Double check in case another thread interleaved a write between the read + write.
Occupied(mut occupied_entry) => {
if loaded_write_version > occupied_entry.get().0 {
occupied_entry.insert((loaded_write_version, val));
}
}
Vacant(vacant_entry) => {
vacant_entry.insert((loaded_write_version, val));
}
}
}
}
},
);
let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flatten().collect();
self.accounts_db.thread_pool.install(|| {
versions.par_sort_by_key(|s| (s.0, s.1));
});
versions.dedup_by_key(|s| s.0);
versions
.into_iter()
.map(|(_pubkey, _version, val)| val)
.collect()
match scan_result {
ScanStorageResult::Cached(cached_result) => cached_result,
ScanStorageResult::Stored(stored_result) => stored_result
.into_iter()
.map(|(_pubkey, (_latest_write_version, val))| val)
.collect(),
}
}
pub fn load_by_program_slot(

View File

@ -27,7 +27,10 @@ use crate::{
contains::Contains,
};
use blake3::traits::digest::Digest;
use dashmap::DashMap;
use dashmap::{
mapref::entry::Entry::{Occupied, Vacant},
DashMap, DashSet,
};
use lazy_static::lazy_static;
use log::*;
use rand::{prelude::SliceRandom, thread_rng, Rng};
@ -62,6 +65,7 @@ const MAX_RECYCLE_STORES: usize = 1000;
const STORE_META_OVERHEAD: usize = 256;
const MAX_CACHE_SLOTS: usize = 200;
const FLUSH_CACHE_RANDOM_THRESHOLD: usize = MAX_LOCKOUT_HISTORY;
const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
pub const DEFAULT_NUM_THREADS: u32 = 8;
@ -87,12 +91,19 @@ const CACHE_VIRTUAL_WRITE_VERSION: u64 = 0;
const CACHE_VIRTUAL_OFFSET: usize = 0;
const CACHE_VIRTUAL_STORED_SIZE: usize = 0;
type DashMapVersionHash = DashMap<Pubkey, (u64, Hash)>;
lazy_static! {
// FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDB panic has occurred,
// as |cargo test| cannot observe panics in other threads
pub static ref FROZEN_ACCOUNT_PANIC: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
}
pub enum ScanStorageResult<R, B> {
Cached(Vec<R>),
Stored(B),
}
#[derive(Debug, Default)]
pub struct ErrorCounters {
pub total: usize,
@ -623,7 +634,6 @@ pub struct AccountsDB {
struct AccountsStats {
delta_hash_scan_time_total_us: AtomicU64,
delta_hash_accumulate_time_total_us: AtomicU64,
delta_hash_merge_time_total_us: AtomicU64,
delta_hash_num: AtomicU64,
last_store_report: AtomicU64,
@ -1894,35 +1904,46 @@ impl AccountsDB {
}
/// Scan a specific slot through all the account storage in parallel
pub fn scan_account_storage<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
pub fn scan_account_storage<R, B>(
&self,
slot: Slot,
cache_map_func: impl Fn(LoadedAccount) -> Option<R> + Sync,
storage_scan_func: impl Fn(&B, LoadedAccount) + Sync,
) -> ScanStorageResult<R, B>
where
F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync,
B: Send + Default,
{
self.scan_account_storage_inner(slot, scan_func)
}
fn scan_account_storage_inner<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
where
F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync,
B: Send + Default,
R: Send,
B: Send + Default + Sync,
{
if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
// If we see the slot in the cache, then all the account information
// is in this cached slot
let mut retval = B::default();
for cached_account in slot_cache.iter() {
scan_func(
LoadedAccount::Cached((
*cached_account.key(),
Cow::Borrowed(cached_account.value()),
)),
CACHE_VIRTUAL_STORAGE_ID,
&mut retval,
);
if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD {
ScanStorageResult::Cached(self.thread_pool.install(|| {
slot_cache
.par_iter()
.filter_map(|cached_account| {
cache_map_func(LoadedAccount::Cached((
*cached_account.key(),
Cow::Borrowed(cached_account.value()),
)))
})
.collect()
}))
} else {
ScanStorageResult::Cached(
slot_cache
.iter()
.filter_map(|cached_account| {
cache_map_func(LoadedAccount::Cached((
*cached_account.key(),
Cow::Borrowed(cached_account.value()),
)))
})
.collect(),
)
}
vec![retval]
} else {
let retval = B::default();
// If the slot is not in the cache, then all the account information must have
// been flushed. This is guaranteed because we only remove the rooted slot from
// the cache *after* we've finished flushing in `flush_slot_cache`.
@ -1933,21 +1954,12 @@ impl AccountsDB {
.unwrap_or_default();
self.thread_pool.install(|| {
storage_maps
.into_par_iter()
.map(|storage| {
let accounts = storage.accounts.accounts(0);
let mut retval = B::default();
accounts.into_iter().for_each(|stored_account| {
scan_func(
LoadedAccount::Stored(stored_account),
storage.append_vec_id(),
&mut retval,
)
});
retval
})
.collect()
})
.par_iter()
.flat_map(|storage| storage.accounts.accounts(0))
.for_each(|account| storage_scan_func(&retval, LoadedAccount::Stored(account)));
});
ScanStorageResult::Stored(retval)
}
}
@ -2518,9 +2530,10 @@ impl AccountsDB {
// Reads will then always read the latest version of a slot. Scans will also know
// which version their parents because banks will also be augmented with this version,
// which handles cases where a deletion of one version happens in the middle of the scan.
let pubkey_sets: Vec<HashSet<Pubkey>> = self.scan_account_storage(
let scan_result: ScanStorageResult<Pubkey, DashSet<Pubkey>> = self.scan_account_storage(
remove_slot,
|loaded_account: LoadedAccount, _, accum: &mut HashSet<Pubkey>| {
|loaded_account: LoadedAccount| Some(*loaded_account.pubkey()),
|accum: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
accum.insert(*loaded_account.pubkey());
},
);
@ -2528,15 +2541,26 @@ impl AccountsDB {
// Purge this slot from the accounts index
let purge_slot: HashSet<Slot> = vec![remove_slot].into_iter().collect();
let mut reclaims = vec![];
{
let pubkeys = pubkey_sets.iter().flatten();
for pubkey in pubkeys {
self.accounts_index.purge_exact(
pubkey,
&purge_slot,
&mut reclaims,
&self.account_indexes,
);
match scan_result {
ScanStorageResult::Cached(cached_keys) => {
for pubkey in cached_keys.iter() {
self.accounts_index.purge_exact(
pubkey,
&purge_slot,
&mut reclaims,
&self.account_indexes,
);
}
}
ScanStorageResult::Stored(stored_keys) => {
for set_ref in stored_keys.iter() {
self.accounts_index.purge_exact(
set_ref.key(),
&purge_slot,
&mut reclaims,
&self.account_indexes,
);
}
}
}
@ -3061,13 +3085,6 @@ impl AccountsDB {
.swap(0, Ordering::Relaxed),
i64
),
(
"delta_hash_merge_us",
self.stats
.delta_hash_merge_time_total_us
.swap(0, Ordering::Relaxed),
i64
),
(
"delta_hash_accumulate_us",
self.stats
@ -3345,41 +3362,59 @@ impl AccountsDB {
pub fn get_accounts_delta_hash(&self, slot: Slot) -> Hash {
let mut scan = Measure::start("scan");
let mut accumulator: Vec<HashMap<Pubkey, (u64, Hash)>> = self.scan_account_storage(
slot,
|loaded_account: LoadedAccount,
_store_id: AppendVecId,
accum: &mut HashMap<Pubkey, (u64, Hash)>| {
accum.insert(
*loaded_account.pubkey(),
(
loaded_account.write_version(),
let scan_result: ScanStorageResult<(Pubkey, Hash, u64), DashMapVersionHash> = self
.scan_account_storage(
slot,
|loaded_account: LoadedAccount| {
// Cache only has one version per key, don't need to worry about versioning
Some((
*loaded_account.pubkey(),
*loaded_account.loaded_hash(),
),
);
},
);
CACHE_VIRTUAL_WRITE_VERSION,
))
},
|accum: &DashMap<Pubkey, (u64, Hash)>, loaded_account: LoadedAccount| {
let loaded_write_version = loaded_account.write_version();
let loaded_hash = *loaded_account.loaded_hash();
let should_insert =
if let Some(existing_entry) = accum.get(loaded_account.pubkey()) {
loaded_write_version > existing_entry.value().version()
} else {
true
};
if should_insert {
// Detected insertion is necessary, grabs the write lock to commit the write,
match accum.entry(*loaded_account.pubkey()) {
// Double check in case another thread interleaved a write between the read + write.
Occupied(mut occupied_entry) => {
if loaded_write_version > occupied_entry.get().version() {
occupied_entry.insert((loaded_write_version, loaded_hash));
}
}
Vacant(vacant_entry) => {
vacant_entry.insert((loaded_write_version, loaded_hash));
}
}
}
},
);
scan.stop();
let mut merge = Measure::start("merge");
let mut account_maps = HashMap::new();
while let Some(maps) = accumulator.pop() {
AccountsDB::merge(&mut account_maps, &maps);
}
merge.stop();
let mut accumulate = Measure::start("accumulate");
let hashes: Vec<_> = account_maps
.into_iter()
.map(|(pubkey, (_, hash))| (pubkey, hash, 0))
.collect();
let hashes: Vec<_> = match scan_result {
ScanStorageResult::Cached(cached_result) => cached_result,
ScanStorageResult::Stored(stored_result) => stored_result
.into_iter()
.map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash, 0))
.collect(),
};
let ret = Self::accumulate_account_hashes(hashes, slot, false);
accumulate.stop();
self.stats
.delta_hash_scan_time_total_us
.fetch_add(scan.as_us(), Ordering::Relaxed);
self.stats
.delta_hash_merge_time_total_us
.fetch_add(merge.as_us(), Ordering::Relaxed);
self.stats
.delta_hash_accumulate_time_total_us
.fetch_add(accumulate.as_us(), Ordering::Relaxed);
@ -3880,20 +3915,6 @@ impl AccountsDB {
.collect()
}
fn merge<X>(dest: &mut HashMap<Pubkey, X>, source: &HashMap<Pubkey, X>)
where
X: Versioned + Clone,
{
for (key, source_item) in source.iter() {
if let Some(dest_item) = dest.get(key) {
if dest_item.version() > source_item.version() {
continue;
}
}
dest.insert(*key, source_item.clone());
}
}
pub fn generate_index(&self) {
type AccountsMap<'a> =
DashMap<Pubkey, Mutex<BTreeMap<u64, (AppendVecId, StoredAccountMeta<'a>)>>>;