From c1d788880d49d15c6ab9da9603a3c54a3bb1dcd5 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 12 Sep 2019 11:39:39 -0700 Subject: [PATCH] Limit Rayon threadpool threads (#5871) --- Cargo.lock | 11 +++++++++++ Cargo.toml | 1 + ci/buildkite.yml | 2 +- core/Cargo.toml | 1 + core/src/blocktree_processor.rs | 11 ++++++----- core/src/entry.rs | 5 +++-- core/src/sigverify.rs | 9 ++++----- core/src/window_service.rs | 3 ++- local_cluster/Cargo.toml | 1 + local_cluster/src/local_cluster.rs | 3 +++ local_cluster/tests/local_cluster.rs | 18 ++++-------------- rayon-threadlimit/.gitignore | 2 ++ rayon-threadlimit/Cargo.toml | 13 +++++++++++++ rayon-threadlimit/src/lib.rs | 18 ++++++++++++++++++ runtime/Cargo.toml | 1 + runtime/src/accounts_db.rs | 4 ++-- 16 files changed, 73 insertions(+), 30 deletions(-) create mode 100644 rayon-threadlimit/.gitignore create mode 100644 rayon-threadlimit/Cargo.toml create mode 100644 rayon-threadlimit/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 79e7c8550..05e389f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3084,6 +3084,7 @@ dependencies = [ "solana-merkle-tree 0.19.0-pre0", "solana-metrics 0.19.0-pre0", "solana-netutil 0.19.0-pre0", + "solana-rayon-threadlimit 0.19.0-pre0", "solana-runtime 0.19.0-pre0", "solana-sdk 0.19.0-pre0", "solana-stake-api 0.19.0-pre0", @@ -3332,6 +3333,7 @@ dependencies = [ "solana-client 0.19.0-pre0", "solana-core 0.19.0-pre0", "solana-logger 0.19.0-pre0", + "solana-rayon-threadlimit 0.19.0-pre0", "solana-runtime 0.19.0-pre0", "solana-sdk 0.19.0-pre0", "solana-stake-api 0.19.0-pre0", @@ -3444,6 +3446,14 @@ dependencies = [ "solana-sdk 0.19.0-pre0", ] +[[package]] +name = "solana-rayon-threadlimit" +version = "0.19.0-pre0" +dependencies = [ + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "solana-replicator" version = "0.19.0-pre0" @@ -3482,6 +3492,7 @@ dependencies = [ "solana-measure 0.19.0-pre0", "solana-metrics 0.19.0-pre0", "solana-noop-program 0.19.0-pre0", + "solana-rayon-threadlimit 0.19.0-pre0", "solana-sdk 0.19.0-pre0", "solana-stake-api 0.19.0-pre0", "solana-stake-program 0.19.0-pre0", diff --git a/Cargo.toml b/Cargo.toml index 3392a788c..b41d8a050 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ members = [ "fixed-buf", "vote-signer", "cli", + "rayon-threadlimit", ] exclude = [ "programs/bpf", diff --git a/ci/buildkite.yml b/ci/buildkite.yml index 6a06a34d6..b2709af85 100644 --- a/ci/buildkite.yml +++ b/ci/buildkite.yml @@ -11,7 +11,7 @@ steps: - wait - command: "ci/test-stable-perf.sh" name: "stable-perf" - timeout_in_minutes: 30 + timeout_in_minutes: 40 artifact_paths: "log-*.txt" agents: - "queue=cuda" diff --git a/core/Cargo.toml b/core/Cargo.toml index 06c4fe160..99018d4b2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -75,6 +75,7 @@ tokio-codec = "0.1" tokio-fs = "0.1" tokio-io = "0.1" untrusted = "0.7.0" +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" } # reed-solomon-erasure's simd_c feature fails to build for x86_64-pc-windows-msvc, use pure-rust [target.'cfg(windows)'.dependencies] diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 272fcc2c5..02a71f019 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -2,6 +2,8 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, SlotMeta}; use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; +use rand::seq::SliceRandom; +use rand::thread_rng; use rayon::prelude::*; use rayon::ThreadPool; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; @@ -16,16 +18,15 @@ use std::result; use std::sync::Arc; use std::time::{Duration, Instant}; -use rand::seq::SliceRandom; -use rand::thread_rng; - pub const NUM_THREADS: u32 = 10; +use solana_rayon_threadlimit::get_thread_count; use std::cell::RefCell; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .num_threads(get_thread_count()) .build() - .unwrap())); + .unwrap()) +); fn first_err(results: &[Result<()>]) -> Result<()> { for r in results { diff --git a/core/src/entry.rs b/core/src/entry.rs index e156bf450..f5863a186 100644 --- a/core/src/entry.rs +++ b/core/src/entry.rs @@ -14,6 +14,7 @@ use solana_merkle_tree::MerkleTree; use solana_metrics::inc_new_counter_warn; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::timing; use solana_sdk::transaction::Transaction; use std::borrow::Borrow; use std::cell::RefCell; @@ -22,7 +23,7 @@ use std::sync::{Arc, RwLock}; #[cfg(feature = "cuda")] use crate::sigverify::poh_verify_many; -use solana_sdk::timing; +use solana_rayon_threadlimit::get_thread_count; #[cfg(feature = "cuda")] use std::sync::Mutex; #[cfg(feature = "cuda")] @@ -32,7 +33,7 @@ use std::time::Instant; pub const NUM_THREADS: u32 = 10; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .num_threads(get_thread_count()) .build() .unwrap())); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 6a7a997ba..579f9c830 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -19,17 +19,16 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::Transaction; use std::mem::size_of; -#[cfg(feature = "cuda")] -use std::os::raw::{c_int, c_uint}; - #[cfg(feature = "cuda")] use core::ffi::c_void; - +use solana_rayon_threadlimit::get_thread_count; +#[cfg(feature = "cuda")] +use std::os::raw::{c_int, c_uint}; pub const NUM_THREADS: u32 = 10; use std::cell::RefCell; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .num_threads(get_thread_count()) .build() .unwrap())); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index b5c8bfb33..b6e2a8b21 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -12,6 +12,7 @@ use crate::streamer::{PacketReceiver, PacketSender}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; use rayon::ThreadPool; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; +use solana_rayon_threadlimit::get_thread_count; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; @@ -205,7 +206,7 @@ impl WindowService { trace!("{}: RECV_WINDOW started", id); let mut now = Instant::now(); let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .num_threads(get_thread_count()) .build() .unwrap(); loop { diff --git a/local_cluster/Cargo.toml b/local_cluster/Cargo.toml index f9d364957..7f529de63 100644 --- a/local_cluster/Cargo.toml +++ b/local_cluster/Cargo.toml @@ -22,6 +22,7 @@ solana-storage-program = { path = "../programs/storage_program", version = "0.19 solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" } symlink = "0.1.0" tempfile = "3.1.0" +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" } [dev-dependencies] serial_test = "0.2.0" diff --git a/local_cluster/src/local_cluster.rs b/local_cluster/src/local_cluster.rs index fdd8ddef8..649ad4b39 100644 --- a/local_cluster/src/local_cluster.rs +++ b/local_cluster/src/local_cluster.rs @@ -10,6 +10,7 @@ use solana_core::{ service::Service, validator::{Validator, ValidatorConfig}, }; +use solana_rayon_threadlimit::set_thread_count; use solana_sdk::{ client::SyncClient, clock::DEFAULT_TICKS_PER_SLOT, @@ -116,6 +117,8 @@ impl LocalCluster { } pub fn new(config: &ClusterConfig) -> Self { + set_thread_count(1); + assert_eq!(config.validator_configs.len(), config.node_stakes.len()); let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index 4033d29e4..727790055 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -27,8 +27,6 @@ use tempfile::TempDir; #[test] #[serial] -#[allow(unused_attributes)] -#[ignore] fn test_ledger_cleanup_service() { solana_logger::setup(); error!("test_ledger_cleanup_service"); @@ -111,9 +109,8 @@ fn test_spend_and_verify_all_nodes_3() { ); } -#[allow(unused_attributes)] #[test] -#[serial] +#[allow(unused_attributes)] #[ignore] fn test_spend_and_verify_all_nodes_env_num_nodes() { solana_logger::setup(); @@ -132,7 +129,6 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() { #[allow(unused_attributes)] #[test] -#[serial] #[should_panic] fn test_fullnode_exit_default_config_should_panic() { solana_logger::setup(); @@ -161,10 +157,8 @@ fn test_fullnode_exit_2() { } // Cluster needs a supermajority to remain, so the minimum size for this test is 4 -#[allow(unused_attributes)] #[test] #[serial] -#[ignore] fn test_leader_failure_4() { solana_logger::setup(); error!("test_leader_failure_4"); @@ -222,14 +216,14 @@ fn test_two_unbalanced_stakes() { } #[test] -#[ignore] +#[serial] fn test_forwarding() { // Set up a cluster where one node is never the leader, so all txs sent to this node // will be have to be forwarded in order to be confirmed let config = ClusterConfig { node_stakes: vec![999_990, 3], cluster_lamports: 2_000_000, - validator_configs: vec![ValidatorConfig::default(); 3], + validator_configs: vec![ValidatorConfig::default(); 2], ..ClusterConfig::default() }; let cluster = LocalCluster::new(&config); @@ -363,7 +357,6 @@ fn test_snapshot_restart_locktower() { ); } -#[allow(unused_attributes)] #[test] #[serial] fn test_snapshots_blocktree_floor() { @@ -524,15 +517,14 @@ fn test_snapshots_restart_validity() { } } -#[allow(unused_attributes)] #[test] #[serial] -#[ignore] fn test_fail_entry_verification_leader() { test_faulty_node(BroadcastStageType::FailEntryVerification); } #[test] +#[allow(unused_attributes)] #[ignore] fn test_fake_blobs_broadcast_leader() { test_faulty_node(BroadcastStageType::BroadcastFakeBlobs); @@ -591,9 +583,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { ); } -#[allow(unused_attributes)] #[test] -#[serial] #[ignore] fn test_repairman_catchup() { solana_logger::setup(); diff --git a/rayon-threadlimit/.gitignore b/rayon-threadlimit/.gitignore new file mode 100644 index 000000000..5404b132d --- /dev/null +++ b/rayon-threadlimit/.gitignore @@ -0,0 +1,2 @@ +/target/ +/farf/ diff --git a/rayon-threadlimit/Cargo.toml b/rayon-threadlimit/Cargo.toml new file mode 100644 index 000000000..a726940d0 --- /dev/null +++ b/rayon-threadlimit/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "solana-rayon-threadlimit" +version = "0.19.0-pre0" +homepage = "https://solana.com/" +readme = "../README.md" +repository = "https://github.com/solana-labs/solana" +authors = ["Solana Maintainers "] +license = "Apache-2.0" +edition = "2018" + +[dependencies] +lazy_static = "1.4.0" +sys-info = "0.5.7" diff --git a/rayon-threadlimit/src/lib.rs b/rayon-threadlimit/src/lib.rs new file mode 100644 index 000000000..341c46264 --- /dev/null +++ b/rayon-threadlimit/src/lib.rs @@ -0,0 +1,18 @@ +#[macro_use] +extern crate lazy_static; + +use std::sync::RwLock; + +//TODO remove this hack when rayon fixes itself +lazy_static! { + static ref MAX_RAYON_THREADS: RwLock = + RwLock::new(sys_info::cpu_num().unwrap() as usize); +} + +pub fn get_thread_count() -> usize { + *MAX_RAYON_THREADS.read().unwrap() +} + +pub fn set_thread_count(count: usize) { + *MAX_RAYON_THREADS.write().unwrap() = count; +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b30c5c020..bc3a21338 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,6 +37,7 @@ solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" } solana-vote-program = { path = "../programs/vote_program", version = "0.19.0-pre0" } sys-info = "0.5.8" tempfile = "3.1.0" +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" } [lib] crate-type = ["lib"] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 17094ba74..f25ecee1f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -30,6 +30,7 @@ use serde::de::{MapAccess, Visitor}; use serde::ser::{SerializeMap, Serializer}; use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; +use solana_rayon_threadlimit::get_thread_count; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; @@ -39,7 +40,6 @@ use std::path::Path; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; -use sys_info; use tempfile::TempDir; pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024; @@ -370,7 +370,7 @@ pub struct AccountsDB { impl Default for AccountsDB { fn default() -> Self { - let num_threads = sys_info::cpu_num().unwrap_or(DEFAULT_NUM_THREADS) as usize; + let num_threads = get_thread_count(); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()),