From 2745b79b74f4c07042560a3b1f271becd8f35da0 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 20 Jan 2021 00:50:17 -0800 Subject: [PATCH] Parallel cache scan (#14544) * Parallel cache scan * PR comments * PR comments Co-authored-by: Carl Lin --- Cargo.lock | 37 +------ programs/bpf/Cargo.lock | 37 +------ runtime/Cargo.toml | 2 +- runtime/benches/accounts.rs | 58 ++++++++++ runtime/src/accounts.rs | 59 +++++++--- runtime/src/accounts_db.rs | 215 ++++++++++++++++++++---------------- 6 files changed, 226 insertions(+), 182 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1bc4364889..9c96ea7601 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,15 +25,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" -[[package]] -name = "ahash" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" -dependencies = [ - "const-random", -] - [[package]] name = "ahash" version = "0.4.6" @@ -611,26 +602,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "const-random" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" -dependencies = [ - "const-random-macro", - "proc-macro-hack", -] - -[[package]] -name = "const-random-macro" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" -dependencies = [ - "getrandom 0.1.14", - "proc-macro-hack", -] - [[package]] name = "const_fn" version = "0.4.5" @@ -883,13 +854,13 @@ dependencies = [ [[package]] name = "dashmap" -version = "3.11.10" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" dependencies = [ - "ahash 0.3.8", - "cfg-if 0.1.10", + "cfg-if 1.0.0", "num_cpus", + "rayon", ] [[package]] diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index c9d949f4d1..4779fe392c 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -25,15 +25,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" -[[package]] -name = "ahash" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" -dependencies = [ - "const-random", -] - [[package]] name = "aho-corasick" version = "0.7.10" @@ -389,26 +380,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "const-random" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" -dependencies = [ - "const-random-macro", - "proc-macro-hack", -] - -[[package]] -name = "const-random-macro" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" -dependencies = [ - "getrandom 0.1.14", - "proc-macro-hack", -] - [[package]] name = "const_fn" version = "0.4.5" @@ -604,13 +575,13 @@ dependencies = [ [[package]] name = "dashmap" -version = "3.11.10" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" dependencies = [ - "ahash", - "cfg-if 0.1.10", + "cfg-if 1.0.0", "num_cpus", + "rayon", ] [[package]] diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 15b58f5d6d..64c13e737d 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,7 +14,7 @@ blake3 = "0.3.6" bv = { version = "0.11.1", features = ["serde"] } byteorder = "1.3.4" bzip2 = "0.3.3" -dashmap = "3.11.10" +dashmap = { version = "4.0.2", features = ["rayon"] } crossbeam-channel = "0.4" dir-diff = "0.3.2" flate2 = "1.0.14" diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index 217d2f669e..1c3b1d30a5 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -4,6 +4,7 @@ extern crate test; use dashmap::DashMap; use rand::Rng; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use solana_runtime::{ accounts::{create_test_accounts, Accounts}, bank::*, @@ -11,6 +12,7 @@ use solana_runtime::{ use solana_sdk::{ account::Account, genesis_config::{create_genesis_config, ClusterType}, + hash::Hash, pubkey::Pubkey, }; use std::{ @@ -297,3 +299,59 @@ fn bench_rwlock_hashmap_single_reader_with_n_writers(bencher: &mut Bencher) { } }) } + +fn setup_bench_dashmap_iter() -> (Arc, DashMap) { + let accounts = Arc::new(Accounts::new_with_config( + vec![ + PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string())) + .join("bench_dashmap_par_iter"), + ], + &ClusterType::Development, + HashSet::new(), + false, + )); + + let dashmap = DashMap::new(); + let num_keys = std::env::var("NUM_BENCH_KEYS") + .map(|num_keys| num_keys.parse::().unwrap()) + .unwrap_or_else(|_| 10000); + for _ in 0..num_keys { + dashmap.insert( + Pubkey::new_unique(), + ( + Account::new(1, 0, &Account::default().owner), + Hash::new_unique(), + ), + ); + } + + (accounts, dashmap) +} + +#[bench] +fn bench_dashmap_par_iter(bencher: &mut Bencher) { + let (accounts, dashmap) = setup_bench_dashmap_iter(); + + bencher.iter(|| { + test::black_box(accounts.accounts_db.thread_pool.install(|| { + dashmap + .par_iter() + .map(|cached_account| (*cached_account.key(), cached_account.value().1)) + .collect::>() + })); + }); +} + +#[bench] +fn bench_dashmap_iter(bencher: &mut Bencher) { + let (_accounts, dashmap) = setup_bench_dashmap_iter(); + + bencher.iter(|| { + test::black_box( + dashmap + .iter() + .map(|cached_account| (*cached_account.key(), cached_account.value().1)) + .collect::>(), + ); + }); +} diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 4f1a2a1828..b5957c67e7 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,5 +1,5 @@ use crate::{ - accounts_db::{AccountsDB, AppendVecId, BankHashInfo, ErrorCounters, LoadedAccount}, + accounts_db::{AccountsDB, BankHashInfo, ErrorCounters, LoadedAccount, ScanStorageResult}, accounts_index::{AccountIndex, Ancestors, IndexKey}, bank::{ NonceRollbackFull, NonceRollbackInfo, TransactionCheckResult, TransactionExecutionResult, @@ -9,9 +9,12 @@ use crate::{ system_instruction_processor::{get_system_account_kind, SystemAccountKind}, transaction_utils::OrderedIterator, }; +use dashmap::{ + mapref::entry::Entry::{Occupied, Vacant}, + DashMap, +}; use log::*; use rand::{thread_rng, Rng}; -use rayon::slice::ParallelSliceMut; use solana_sdk::{ account::Account, account_utils::StateMut, @@ -453,28 +456,48 @@ impl Accounts { pub fn scan_slot(&self, slot: Slot, func: F) -> Vec where F: Fn(LoadedAccount) -> Option + Send + Sync, - B: Send + Default, + B: Sync + Send + Default + std::cmp::Eq, { - let accumulator: Vec> = self.accounts_db.scan_account_storage( + let scan_result = self.accounts_db.scan_account_storage( slot, - |loaded_account: LoadedAccount, _id: AppendVecId, accum: &mut Vec<(Pubkey, u64, B)>| { - let pubkey = *loaded_account.pubkey(); - let write_version = loaded_account.write_version(); - if let Some(val) = func(loaded_account) { - accum.push((pubkey, std::u64::MAX - write_version, val)); + |loaded_account: LoadedAccount| { + // Cache only has one version per key, don't need to worry about versioning + func(loaded_account) + }, + |accum: &DashMap, loaded_account: LoadedAccount| { + let loaded_account_pubkey = *loaded_account.pubkey(); + let loaded_write_version = loaded_account.write_version(); + let should_insert = accum + .get(&loaded_account_pubkey) + .map(|existing_entry| loaded_write_version > existing_entry.value().0) + .unwrap_or(true); + if should_insert { + if let Some(val) = func(loaded_account) { + // Detected insertion is necessary, grabs the write lock to commit the write, + match accum.entry(loaded_account_pubkey) { + // Double check in case another thread interleaved a write between the read + write. + Occupied(mut occupied_entry) => { + if loaded_write_version > occupied_entry.get().0 { + occupied_entry.insert((loaded_write_version, val)); + } + } + + Vacant(vacant_entry) => { + vacant_entry.insert((loaded_write_version, val)); + } + } + } } }, ); - let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flatten().collect(); - self.accounts_db.thread_pool.install(|| { - versions.par_sort_by_key(|s| (s.0, s.1)); - }); - versions.dedup_by_key(|s| s.0); - versions - .into_iter() - .map(|(_pubkey, _version, val)| val) - .collect() + match scan_result { + ScanStorageResult::Cached(cached_result) => cached_result, + ScanStorageResult::Stored(stored_result) => stored_result + .into_iter() + .map(|(_pubkey, (_latest_write_version, val))| val) + .collect(), + } } pub fn load_by_program_slot( diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 281cb79136..eac0d0c2f7 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -27,7 +27,10 @@ use crate::{ contains::Contains, }; use blake3::traits::digest::Digest; -use dashmap::DashMap; +use dashmap::{ + mapref::entry::Entry::{Occupied, Vacant}, + DashMap, DashSet, +}; use lazy_static::lazy_static; use log::*; use rand::{prelude::SliceRandom, thread_rng, Rng}; @@ -62,6 +65,7 @@ const MAX_RECYCLE_STORES: usize = 1000; const STORE_META_OVERHEAD: usize = 256; const MAX_CACHE_SLOTS: usize = 200; const FLUSH_CACHE_RANDOM_THRESHOLD: usize = MAX_LOCKOUT_HISTORY; +const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000; pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024; pub const DEFAULT_NUM_THREADS: u32 = 8; @@ -87,12 +91,19 @@ const CACHE_VIRTUAL_WRITE_VERSION: u64 = 0; const CACHE_VIRTUAL_OFFSET: usize = 0; const CACHE_VIRTUAL_STORED_SIZE: usize = 0; +type DashMapVersionHash = DashMap; + lazy_static! { // FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDB panic has occurred, // as |cargo test| cannot observe panics in other threads pub static ref FROZEN_ACCOUNT_PANIC: Arc = Arc::new(AtomicBool::new(false)); } +pub enum ScanStorageResult { + Cached(Vec), + Stored(B), +} + #[derive(Debug, Default)] pub struct ErrorCounters { pub total: usize, @@ -623,7 +634,6 @@ pub struct AccountsDB { struct AccountsStats { delta_hash_scan_time_total_us: AtomicU64, delta_hash_accumulate_time_total_us: AtomicU64, - delta_hash_merge_time_total_us: AtomicU64, delta_hash_num: AtomicU64, last_store_report: AtomicU64, @@ -1894,35 +1904,46 @@ impl AccountsDB { } /// Scan a specific slot through all the account storage in parallel - pub fn scan_account_storage(&self, slot: Slot, scan_func: F) -> Vec + pub fn scan_account_storage( + &self, + slot: Slot, + cache_map_func: impl Fn(LoadedAccount) -> Option + Sync, + storage_scan_func: impl Fn(&B, LoadedAccount) + Sync, + ) -> ScanStorageResult where - F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync, - B: Send + Default, - { - self.scan_account_storage_inner(slot, scan_func) - } - - fn scan_account_storage_inner(&self, slot: Slot, scan_func: F) -> Vec - where - F: Fn(LoadedAccount, AppendVecId, &mut B) + Send + Sync, - B: Send + Default, + R: Send, + B: Send + Default + Sync, { if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) { // If we see the slot in the cache, then all the account information // is in this cached slot - let mut retval = B::default(); - for cached_account in slot_cache.iter() { - scan_func( - LoadedAccount::Cached(( - *cached_account.key(), - Cow::Borrowed(cached_account.value()), - )), - CACHE_VIRTUAL_STORAGE_ID, - &mut retval, - ); + if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD { + ScanStorageResult::Cached(self.thread_pool.install(|| { + slot_cache + .par_iter() + .filter_map(|cached_account| { + cache_map_func(LoadedAccount::Cached(( + *cached_account.key(), + Cow::Borrowed(cached_account.value()), + ))) + }) + .collect() + })) + } else { + ScanStorageResult::Cached( + slot_cache + .iter() + .filter_map(|cached_account| { + cache_map_func(LoadedAccount::Cached(( + *cached_account.key(), + Cow::Borrowed(cached_account.value()), + ))) + }) + .collect(), + ) } - vec![retval] } else { + let retval = B::default(); // If the slot is not in the cache, then all the account information must have // been flushed. This is guaranteed because we only remove the rooted slot from // the cache *after* we've finished flushing in `flush_slot_cache`. @@ -1933,21 +1954,12 @@ impl AccountsDB { .unwrap_or_default(); self.thread_pool.install(|| { storage_maps - .into_par_iter() - .map(|storage| { - let accounts = storage.accounts.accounts(0); - let mut retval = B::default(); - accounts.into_iter().for_each(|stored_account| { - scan_func( - LoadedAccount::Stored(stored_account), - storage.append_vec_id(), - &mut retval, - ) - }); - retval - }) - .collect() - }) + .par_iter() + .flat_map(|storage| storage.accounts.accounts(0)) + .for_each(|account| storage_scan_func(&retval, LoadedAccount::Stored(account))); + }); + + ScanStorageResult::Stored(retval) } } @@ -2518,9 +2530,10 @@ impl AccountsDB { // Reads will then always read the latest version of a slot. Scans will also know // which version their parents because banks will also be augmented with this version, // which handles cases where a deletion of one version happens in the middle of the scan. - let pubkey_sets: Vec> = self.scan_account_storage( + let scan_result: ScanStorageResult> = self.scan_account_storage( remove_slot, - |loaded_account: LoadedAccount, _, accum: &mut HashSet| { + |loaded_account: LoadedAccount| Some(*loaded_account.pubkey()), + |accum: &DashSet, loaded_account: LoadedAccount| { accum.insert(*loaded_account.pubkey()); }, ); @@ -2528,15 +2541,26 @@ impl AccountsDB { // Purge this slot from the accounts index let purge_slot: HashSet = vec![remove_slot].into_iter().collect(); let mut reclaims = vec![]; - { - let pubkeys = pubkey_sets.iter().flatten(); - for pubkey in pubkeys { - self.accounts_index.purge_exact( - pubkey, - &purge_slot, - &mut reclaims, - &self.account_indexes, - ); + match scan_result { + ScanStorageResult::Cached(cached_keys) => { + for pubkey in cached_keys.iter() { + self.accounts_index.purge_exact( + pubkey, + &purge_slot, + &mut reclaims, + &self.account_indexes, + ); + } + } + ScanStorageResult::Stored(stored_keys) => { + for set_ref in stored_keys.iter() { + self.accounts_index.purge_exact( + set_ref.key(), + &purge_slot, + &mut reclaims, + &self.account_indexes, + ); + } } } @@ -3061,13 +3085,6 @@ impl AccountsDB { .swap(0, Ordering::Relaxed), i64 ), - ( - "delta_hash_merge_us", - self.stats - .delta_hash_merge_time_total_us - .swap(0, Ordering::Relaxed), - i64 - ), ( "delta_hash_accumulate_us", self.stats @@ -3345,41 +3362,59 @@ impl AccountsDB { pub fn get_accounts_delta_hash(&self, slot: Slot) -> Hash { let mut scan = Measure::start("scan"); - let mut accumulator: Vec> = self.scan_account_storage( - slot, - |loaded_account: LoadedAccount, - _store_id: AppendVecId, - accum: &mut HashMap| { - accum.insert( - *loaded_account.pubkey(), - ( - loaded_account.write_version(), + + let scan_result: ScanStorageResult<(Pubkey, Hash, u64), DashMapVersionHash> = self + .scan_account_storage( + slot, + |loaded_account: LoadedAccount| { + // Cache only has one version per key, don't need to worry about versioning + Some(( + *loaded_account.pubkey(), *loaded_account.loaded_hash(), - ), - ); - }, - ); + CACHE_VIRTUAL_WRITE_VERSION, + )) + }, + |accum: &DashMap, loaded_account: LoadedAccount| { + let loaded_write_version = loaded_account.write_version(); + let loaded_hash = *loaded_account.loaded_hash(); + let should_insert = + if let Some(existing_entry) = accum.get(loaded_account.pubkey()) { + loaded_write_version > existing_entry.value().version() + } else { + true + }; + if should_insert { + // Detected insertion is necessary, grabs the write lock to commit the write, + match accum.entry(*loaded_account.pubkey()) { + // Double check in case another thread interleaved a write between the read + write. + Occupied(mut occupied_entry) => { + if loaded_write_version > occupied_entry.get().version() { + occupied_entry.insert((loaded_write_version, loaded_hash)); + } + } + + Vacant(vacant_entry) => { + vacant_entry.insert((loaded_write_version, loaded_hash)); + } + } + } + }, + ); scan.stop(); - let mut merge = Measure::start("merge"); - let mut account_maps = HashMap::new(); - while let Some(maps) = accumulator.pop() { - AccountsDB::merge(&mut account_maps, &maps); - } - merge.stop(); let mut accumulate = Measure::start("accumulate"); - let hashes: Vec<_> = account_maps - .into_iter() - .map(|(pubkey, (_, hash))| (pubkey, hash, 0)) - .collect(); + let hashes: Vec<_> = match scan_result { + ScanStorageResult::Cached(cached_result) => cached_result, + ScanStorageResult::Stored(stored_result) => stored_result + .into_iter() + .map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash, 0)) + .collect(), + }; let ret = Self::accumulate_account_hashes(hashes, slot, false); accumulate.stop(); self.stats .delta_hash_scan_time_total_us .fetch_add(scan.as_us(), Ordering::Relaxed); - self.stats - .delta_hash_merge_time_total_us - .fetch_add(merge.as_us(), Ordering::Relaxed); self.stats .delta_hash_accumulate_time_total_us .fetch_add(accumulate.as_us(), Ordering::Relaxed); @@ -3880,20 +3915,6 @@ impl AccountsDB { .collect() } - fn merge(dest: &mut HashMap, source: &HashMap) - where - X: Versioned + Clone, - { - for (key, source_item) in source.iter() { - if let Some(dest_item) = dest.get(key) { - if dest_item.version() > source_item.version() { - continue; - } - } - dest.insert(*key, source_item.clone()); - } - } - pub fn generate_index(&self) { type AccountsMap<'a> = DashMap)>>>;