diff --git a/Cargo.lock b/Cargo.lock index 1c728d1b2d..e2f94e0dab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2629,6 +2629,7 @@ dependencies = [ "solana-storage-program 0.16.0", "solana-vote-api 0.16.0", "solana-vote-program 0.16.0", + "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index f53b398adc..ec96fd0473 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -3,6 +3,7 @@ use crate::blocktree::Blocktree; use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; use rayon::prelude::*; +use rayon::ThreadPool; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; @@ -15,6 +16,14 @@ use std::result; use std::sync::Arc; use std::time::{Duration, Instant}; +pub const NUM_THREADS: u32 = 10; +use std::cell::RefCell; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap())); + fn first_err(results: &[Result<()>]) -> Result<()> { for r in results { if r.is_err() { @@ -29,32 +38,36 @@ fn par_execute_entries( entries: &[(&Entry, LockedAccountsResults)], ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", entries.len()); - let results: Vec> = entries - .into_par_iter() - .map(|(e, locked_accounts)| { - let results = bank.load_execute_and_commit_transactions( - &e.transactions, - locked_accounts, - MAX_RECENT_BLOCKHASHES, - ); - let mut first_err = None; - for (r, tx) in results.iter().zip(e.transactions.iter()) { - if let Err(ref e) = r { - if first_err.is_none() { - first_err = Some(r.clone()); + let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + entries + .into_par_iter() + .map(|(e, locked_accounts)| { + let results = bank.load_execute_and_commit_transactions( + &e.transactions, + locked_accounts, + MAX_RECENT_BLOCKHASHES, + ); + let mut first_err = None; + for (r, tx) in results.iter().zip(e.transactions.iter()) { + if let Err(ref e) = r { + if first_err.is_none() { + first_err = Some(r.clone()); + } + if !Bank::can_commit(&r) { + warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); + datapoint_error!( + "validator_process_entry_error", + ("error", format!("error: {:?}, tx: {:?}", e, tx), String) + ); + } + } } - if !Bank::can_commit(&r) { - warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); - datapoint_error!( - "validator_process_entry_error", - ("error", format!("error: {:?}, tx: {:?}", e, tx), String) - ); - } - } - } - first_err.unwrap_or(Ok(())) + first_err.unwrap_or(Ok(())) + }) + .collect() }) - .collect(); + }); first_err(&results) } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 28b341abc4..3db0c38cae 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -10,6 +10,7 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; use rayon::prelude::*; +use rayon::ThreadPool; use solana_metrics::{ datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, inc_new_counter_warn, @@ -24,6 +25,8 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; +pub const NUM_THREADS: u32 = 10; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { ChannelDisconnected, @@ -40,6 +43,7 @@ struct Broadcast { id: Pubkey, coding_generator: CodingGenerator, stats: BroadcastStats, + thread_pool: ThreadPool, } impl Broadcast { @@ -96,14 +100,16 @@ impl Broadcast { let to_blobs_start = Instant::now(); - let blobs: Vec<_> = ventries - .into_par_iter() - .map(|p| { - let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); - entries.to_shared_blobs() - }) - .flatten() - .collect(); + let blobs: Vec<_> = self.thread_pool.install(|| { + ventries + .into_par_iter() + .map(|p| { + let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); + entries.to_shared_blobs() + }) + .flatten() + .collect() + }); let blob_index = blocktree .meta(bank.slot()) @@ -218,6 +224,10 @@ impl BroadcastStage { id: me.id, coding_generator, stats: BroadcastStats::default(), + thread_pool: rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap(), }; loop { diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index fc2fd2dbbc..8ed0f0349b 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -7,6 +7,7 @@ use crate::packet::{Packet, Packets}; use crate::result::Result; use bincode::serialized_size; +use rayon::ThreadPool; use solana_metrics::inc_new_counter_debug; use solana_sdk::message::MessageHeader; use solana_sdk::pubkey::Pubkey; @@ -16,6 +17,14 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem::size_of; +pub const NUM_THREADS: u32 = 10; +use std::cell::RefCell; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap())); + type TxOffsets = (Vec, Vec, Vec, Vec, Vec>); #[cfg(feature = "cuda")] @@ -174,10 +183,14 @@ pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); - let rv = batches - .into_par_iter() - .map(|p| p.packets.par_iter().map(verify_packet).collect()) - .collect(); + let rv = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| p.packets.par_iter().map(verify_packet).collect()) + .collect() + }) + }); inc_new_counter_debug!("ed25519_verify_cpu", count); rv } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 468f93fe4e..8b409e3aba 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -33,6 +33,7 @@ solana-vote-program = { path = "../programs/vote_program", version = "0.16.0" solana-stake-program = { path = "../programs/stake_program", version = "0.16.0" } solana-storage-program = { path = "../programs/storage_program", version = "0.16.0" } solana-noop-program = { path = "../programs/noop_program", version = "0.16.0" } +sys-info = "0.5.7" [lib] name = "solana_runtime" diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 1994a45858..6e35bd3d88 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -24,15 +24,18 @@ use hashbrown::{HashMap, HashSet}; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; +use rayon::ThreadPool; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use sys_info; const ACCOUNT_DATA_FILE_SIZE: u64 = 64 * 1024 * 1024; const ACCOUNT_DATA_FILE: &str = "data"; +pub const NUM_THREADS: u32 = 10; #[derive(Debug, Default)] pub struct ErrorCounters { @@ -166,7 +169,6 @@ impl AccountStorageEntry { } // This structure handles the load/store of the accounts -#[derive(Default)] pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per fork basis pub accounts_index: RwLock>, @@ -185,12 +187,32 @@ pub struct AccountsDB { /// Starting file size of appendvecs file_size: u64, + + /// Thread pool used for par_iter + thread_pool: ThreadPool, } pub fn get_paths_vec(paths: &str) -> Vec { paths.split(',').map(ToString::to_string).collect() } +impl Default for AccountsDB { + fn default() -> Self { + AccountsDB { + accounts_index: RwLock::new(AccountsIndex::default()), + storage: RwLock::new(HashMap::new()), + next_id: AtomicUsize::new(0), + write_version: AtomicUsize::new(0), + paths: Vec::default(), + file_size: u64::default(), + thread_pool: rayon::ThreadPoolBuilder::new() + .num_threads(2) + .build() + .unwrap(), + } + } +} + impl AccountsDB { pub fn new_with_file_size(paths: &str, file_size: u64) -> Self { let paths = get_paths_vec(&paths); @@ -201,6 +223,10 @@ impl AccountsDB { write_version: AtomicUsize::new(0), paths, file_size, + thread_pool: rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap(), } } @@ -242,17 +268,19 @@ impl AccountsDB { .filter(|store| store.fork_id == fork_id) .cloned() .collect(); - storage_maps - .into_par_iter() - .map(|storage| { - let accounts = storage.accounts.accounts(0); - let mut retval = B::default(); - accounts - .iter() - .for_each(|stored_account| scan_func(stored_account, &mut retval)); - retval - }) - .collect() + self.thread_pool.install(|| { + storage_maps + .into_par_iter() + .map(|storage| { + let accounts = storage.accounts.accounts(0); + let mut retval = B::default(); + accounts + .iter() + .for_each(|stored_account| scan_func(stored_account, &mut retval)); + retval + }) + .collect() + }) } pub fn load(