From fabecdc86cc9ef6365d5cb550b4a8a411c15eb90 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Fri, 5 Feb 2021 13:48:55 -0600 Subject: [PATCH] use thread pool for non-index hash calculations (#15149) --- core/src/accounts_hash_verifier.rs | 23 +++++++++++++++++++---- core/src/validator.rs | 1 + core/tests/snapshots.rs | 9 ++++++++- ledger-tool/src/main.rs | 1 + runtime/src/accounts_db.rs | 16 ++++++++++++---- runtime/src/bank.rs | 5 +++++ runtime/src/snapshot_utils.rs | 10 ++++++++-- 7 files changed, 54 insertions(+), 11 deletions(-) diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index fb3c2ef3c9..7b1870bdb5 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -8,8 +8,10 @@ use crate::{ cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, snapshot_packager_service::PendingSnapshotPackage, }; -use solana_runtime::snapshot_package::{ - AccountsPackage, AccountsPackagePre, AccountsPackageReceiver, +use rayon::ThreadPool; +use solana_runtime::{ + accounts_db, + snapshot_package::{AccountsPackage, AccountsPackagePre, AccountsPackageReceiver}, }; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::collections::{HashMap, HashSet}; @@ -44,6 +46,7 @@ impl AccountsHashVerifier { .name("solana-accounts-hash".to_string()) .spawn(move || { let mut hashes = vec![]; + let mut thread_pool_storage = None; loop { if exit.load(Ordering::Relaxed) { break; @@ -51,6 +54,13 @@ impl AccountsHashVerifier { match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) { Ok(accounts_package) => { + if accounts_package.hash_for_testing.is_some() + && thread_pool_storage.is_none() + { + thread_pool_storage = + Some(accounts_db::make_min_priority_thread_pool()); + } + Self::process_accounts_package_pre( accounts_package, &cluster_info, @@ -61,6 +71,7 @@ impl AccountsHashVerifier { &exit, fault_injection_rate_slots, snapshot_interval_slots, + thread_pool_storage.as_ref(), ); } Err(RecvTimeoutError::Disconnected) => break, @@ -74,6 +85,7 @@ impl AccountsHashVerifier { } } + #[allow(clippy::too_many_arguments)] fn process_accounts_package_pre( accounts_package: AccountsPackagePre, cluster_info: &ClusterInfo, @@ -84,9 +96,12 @@ impl AccountsHashVerifier { exit: &Arc, fault_injection_rate_slots: u64, snapshot_interval_slots: u64, + thread_pool: Option<&ThreadPool>, ) { - let accounts_package = - solana_runtime::snapshot_utils::process_accounts_package_pre(accounts_package); + let accounts_package = solana_runtime::snapshot_utils::process_accounts_package_pre( + accounts_package, + thread_pool, + ); Self::process_accounts_package( accounts_package, cluster_info, diff --git a/core/src/validator.rs b/core/src/validator.rs index 5b1c808696..b8f7b477fd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1050,6 +1050,7 @@ fn new_banks_from_ledger( None, &snapshot_config.snapshot_package_output_path, snapshot_config.archive_format, + &bank_forks.root_bank().get_thread_pool(), ) .unwrap_or_else(|err| { error!("Unable to create snapshot: {}", err); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 584f01adec..40ee6b02d2 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -45,6 +45,7 @@ mod tests { }; use solana_runtime::{ accounts_background_service::{ABSRequestSender, SnapshotRequestHandler}, + accounts_db, bank::{Bank, BankSlotDelta}, bank_forks::{ArchiveFormat, BankForks, SnapshotConfig}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -241,7 +242,10 @@ mod tests { None, ) .unwrap(); - let snapshot_package = snapshot_utils::process_accounts_package_pre(snapshot_package); + let snapshot_package = snapshot_utils::process_accounts_package_pre( + snapshot_package, + Some(&last_bank.get_thread_pool()), + ); snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); // Restore bank from snapshot @@ -419,6 +423,8 @@ mod tests { &cluster_info, ); + let thread_pool = accounts_db::make_min_priority_thread_pool(); + let _package_receiver = std::thread::Builder::new() .name("package-receiver".to_string()) .spawn(move || { @@ -431,6 +437,7 @@ mod tests { let snapshot_package = solana_runtime::snapshot_utils::process_accounts_package_pre( snapshot_package, + Some(&thread_pool), ); *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 45d3d8cf10..0c8f15075a 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1933,6 +1933,7 @@ fn main() { Some(snapshot_version), output_directory, ArchiveFormat::TarZstd, + &bank.get_thread_pool(), ) .unwrap_or_else(|err| { eprintln!("Unable to create snapshot: {}", err); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 4bc0f06d9b..e2839901d7 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -983,7 +983,7 @@ impl ShrinkStats { } } -fn make_min_priority_thread_pool() -> ThreadPool { +pub fn make_min_priority_thread_pool() -> ThreadPool { // Use lower thread count to reduce priority. let num_threads = std::cmp::max(2, num_cpus::get() / 4); rayon::ThreadPoolBuilder::new() @@ -3748,6 +3748,7 @@ impl AccountsDB { Self::calculate_accounts_hash_without_index( &combined_maps, simple_capitalization_enabled, + &self.thread_pool_clean, ) } else { self.calculate_accounts_hash(slot, ancestors, false, simple_capitalization_enabled) @@ -3851,10 +3852,13 @@ impl AccountsDB { pub fn calculate_accounts_hash_without_index( storages: &[SnapshotStorage], simple_capitalization_enabled: bool, + thread_pool: &ThreadPool, ) -> (Hash, u64) { - let result = Self::scan_snapshot_stores(storages, simple_capitalization_enabled); + thread_pool.install(|| { + let result = Self::scan_snapshot_stores(storages, simple_capitalization_enabled); - Self::rest_of_hash_calculation(result) + Self::rest_of_hash_calculation(result) + }) } pub fn verify_bank_hash_and_lamports( @@ -5180,7 +5184,11 @@ pub mod tests { solana_logger::setup(); let (storages, _size, _slot_expected) = sample_storage(); - let result = AccountsDB::calculate_accounts_hash_without_index(&storages, true); + let result = AccountsDB::calculate_accounts_hash_without_index( + &storages, + true, + &make_min_priority_thread_pool(), + ); let expected_hash = Hash::from_str("GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn").unwrap(); assert_eq!(result, (expected_hash, 0)); } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f6182a4203..6d0c40b588 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -27,6 +27,7 @@ use crate::{ use byteorder::{ByteOrder, LittleEndian}; use itertools::Itertools; use log::*; +use rayon::ThreadPool; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_info}; use solana_sdk::{ @@ -4307,6 +4308,10 @@ impl Bank { self.rc.accounts.accounts_db.get_accounts_hash(self.slot) } + pub fn get_thread_pool(&self) -> &ThreadPool { + &self.rc.accounts.accounts_db.thread_pool_clean + } + pub fn update_accounts_hash_with_index_option( &self, do_not_use_index: bool, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index be391a6cc7..db6fe93287 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -15,6 +15,7 @@ use bincode::{config::Options, serialize_into}; use bzip2::bufread::BzDecoder; use flate2::read::GzDecoder; use log::*; +use rayon::ThreadPool; use regex::Regex; use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}; @@ -926,6 +927,7 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( snapshot_version: Option, snapshot_package_output_path: Q, archive_format: ArchiveFormat, + thread_pool: &ThreadPool, ) -> Result { let snapshot_version = snapshot_version.unwrap_or_default(); @@ -952,13 +954,16 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( None, )?; - let package = process_accounts_package_pre(package); + let package = process_accounts_package_pre(package, Some(&thread_pool)); archive_snapshot_package(&package)?; Ok(package.tar_output_file) } -pub fn process_accounts_package_pre(accounts_package: AccountsPackagePre) -> AccountsPackage { +pub fn process_accounts_package_pre( + accounts_package: AccountsPackagePre, + thread_pool: Option<&ThreadPool>, +) -> AccountsPackage { let mut time = Measure::start("hash"); let hash = accounts_package.hash; // temporarily remaining here @@ -966,6 +971,7 @@ pub fn process_accounts_package_pre(accounts_package: AccountsPackagePre) -> Acc let (hash, lamports) = AccountsDB::calculate_accounts_hash_without_index( &accounts_package.storages, accounts_package.simple_capitalization_testing, + &thread_pool.unwrap(), ); assert_eq!(accounts_package.expected_capitalization, lamports);