From 10d06773cd1daf251aeb7a6283da6bd8d75f571a Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 27 Mar 2024 16:33:21 -0500 Subject: [PATCH] Share the threadpool for tx execution and entry verifification (#216) Previously, entry verification had a dedicated threadpool used to verify PoH hashes as well as some basic transaction verification via Bank::verify_transaction(). It should also be noted that the entry verification code provides logic to offload to a GPU if one is present. Regardless of whether a GPU is present or not, some of the verification must be done on a CPU. Moreso, the CPU verification of entries and transaction execution are serial operations; entry verification finishes first before moving onto transaction execution. So, tx execution and entry verification are not competing for CPU cycles at the same time and can use the same pool. One exception to the above statement is that if someone is using the feature to replay forks in parallel, then hypothetically, different forks may end up competing for the same resources at the same time. However, that is already true given that we had pools that were shared between replay of multiple forks. So, this change doesn't really change much for that case, but will reduce overhead in the single fork case which is the vast majority of the time. --- Cargo.lock | 1 + core/src/banking_stage.rs | 6 +- entry/benches/entry_sigverify.rs | 6 +- entry/src/entry.rs | 195 ++++++++++++++++++++--------- ledger/src/blockstore_processor.rs | 40 +++--- local-cluster/src/cluster_tests.rs | 14 ++- poh-bench/Cargo.toml | 1 + poh-bench/src/main.rs | 25 ++-- poh/benches/poh_verify.rs | 10 +- 9 files changed, 200 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b94b288e..a22e44881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6613,6 +6613,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-perf", + "solana-rayon-threadlimit", "solana-sdk", "solana-version", ] diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 603ff55f0..7be8af137 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -786,7 +786,7 @@ mod tests { crate::banking_trace::{BankingPacketBatch, BankingTracer}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::cluster_info::Node, solana_ledger::{ blockstore::Blockstore, @@ -941,7 +941,7 @@ mod tests { .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); - assert!(entries.verify(&start_hash)); + assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests())); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } @@ -1060,7 +1060,7 @@ mod tests { .map(|(_bank, (entry, _tick_height))| entry) .collect(); - assert!(entries.verify(&blockhash)); + assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests())); if !entries.is_empty() { blockhash = entries.last().unwrap().hash; for entry in entries { diff --git a/entry/benches/entry_sigverify.rs b/entry/benches/entry_sigverify.rs index b3a1b7b5c..09adeb6cf 100644 --- a/entry/benches/entry_sigverify.rs +++ b/entry/benches/entry_sigverify.rs @@ -16,6 +16,7 @@ use { #[bench] fn bench_gpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -53,6 +54,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { let res = entry::start_verify_transactions( entries.clone(), false, + &thread_pool, recycler.clone(), Arc::new(verify_transaction), ); @@ -65,6 +67,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { #[bench] fn bench_cpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -89,6 +92,7 @@ fn bench_cpusigverify(bencher: &mut Bencher) { }; bencher.iter(|| { - let _ans = entry::verify_transactions(entries.clone(), Arc::new(verify_transaction)); + let _ans = + entry::verify_transactions(entries.clone(), &thread_pool, Arc::new(verify_transaction)); }) } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 46aad401d..7497f96d6 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -6,7 +6,6 @@ use { crate::poh::Poh, crossbeam_channel::{Receiver, Sender}, dlopen2::symbor::{Container, SymBorApi, Symbol}, - lazy_static::lazy_static, log::*, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, @@ -41,16 +40,6 @@ use { }, }; -// get_max_thread_count to match number of threads in the old code. -// see: https://github.com/solana-labs/solana/pull/24853 -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solEntry{i:02}")) - .build() - .unwrap(); -} - pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -359,7 +348,7 @@ impl EntryVerificationState { self.poh_duration_us } - pub fn finish_verify(&mut self) -> bool { + pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool { match &mut self.device_verification_data { DeviceVerificationData::Gpu(verification_state) => { let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); @@ -370,7 +359,7 @@ impl EntryVerificationState { .expect("unwrap Arc") .into_inner() .expect("into_inner"); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes .into_par_iter() .cloned() @@ -405,9 +394,10 @@ impl EntryVerificationState { pub fn verify_transactions( entries: Vec, + thread_pool: &ThreadPool, verify: Arc Result + Send + Sync>, ) -> Result> { - PAR_THREAD_POOL.install(|| { + thread_pool.install(|| { entries .into_par_iter() .map(|entry| { @@ -430,6 +420,7 @@ pub fn verify_transactions( pub fn start_verify_transactions( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify_recyclers: VerifyRecyclers, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result @@ -459,15 +450,16 @@ pub fn start_verify_transactions( .is_some(); if use_cpu { - start_verify_transactions_cpu(entries, skip_verification, verify) + start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify) } else { - start_verify_transactions_gpu(entries, verify_recyclers, verify) + start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify) } } fn start_verify_transactions_cpu( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -484,7 +476,7 @@ fn start_verify_transactions_cpu( move |versioned_tx| verify(versioned_tx, mode) }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; Ok(EntrySigVerificationState { verification_status: EntryVerificationStatus::Success, @@ -497,6 +489,7 @@ fn start_verify_transactions_cpu( fn start_verify_transactions_gpu( entries: Vec, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -512,7 +505,7 @@ fn start_verify_transactions_gpu( } }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; let entry_txs: Vec<&SanitizedTransaction> = entries .iter() @@ -618,12 +611,25 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState; - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) - -> EntryVerificationState; - fn verify(&self, start_hash: &Hash) -> bool; + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn start_verify( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + recyclers: VerifyRecyclers, + ) -> EntryVerificationState; + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count /// for the next entry slice. @@ -633,12 +639,16 @@ pub trait EntrySlice { } impl EntrySlice for [Entry] { - fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash, VerifyRecyclers::default()) - .finish_verify() + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool { + self.start_verify(start_hash, thread_pool, VerifyRecyclers::default()) + .finish_verify(thread_pool) } - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -646,7 +656,7 @@ impl EntrySlice for [Entry] { transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { entry_pairs.all(|(x0, x1)| { let r = x1.verify(&x0.hash); if !r { @@ -672,7 +682,12 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState { + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { use solana_sdk::hash::HASH_BYTES; let now = Instant::now(); let genesis = [Entry { @@ -703,7 +718,7 @@ impl EntrySlice for [Entry] { num_hashes.resize(aligned_len, 0); let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect(); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes_chunked .par_iter_mut() .zip(num_hashes) @@ -753,7 +768,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] let (has_avx2, has_avx512) = ( is_x86_feature_detected!("avx2"), @@ -764,25 +779,26 @@ impl EntrySlice for [Entry] { if api().is_some() { if has_avx512 && self.len() >= 128 { - self.verify_cpu_x86_simd(start_hash, 16) + self.verify_cpu_x86_simd(start_hash, 16, thread_pool) } else if has_avx2 && self.len() >= 48 { - self.verify_cpu_x86_simd(start_hash, 8) + self.verify_cpu_x86_simd(start_hash, 8, thread_pool) } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } fn start_verify( &self, start_hash: &Hash, + thread_pool: &ThreadPool, recyclers: VerifyRecyclers, ) -> EntryVerificationState { let start = Instant::now(); let Some(api) = perf_libs::api() else { - return self.verify_cpu(start_hash); + return self.verify_cpu(start_hash, thread_pool); }; inc_new_counter_info!("entry_verify-num_entries", self.len()); @@ -839,7 +855,7 @@ impl EntrySlice for [Entry] { }) .unwrap(); - let verifications = PAR_THREAD_POOL.install(|| { + let verifications = thread_pool.install(|| { self.into_par_iter() .map(|entry| { let answer = entry.hash; @@ -938,6 +954,26 @@ pub fn next_versioned_entry( } } +pub fn thread_pool_for_tests() -> ThreadPool { + // Allocate fewer threads for unit tests + // Unit tests typically aren't creating massive blocks to verify, and + // multiple tests could be running in parallel so any further parallelism + // will do more harm than good + rayon::ThreadPoolBuilder::new() + .num_threads(4) + .thread_name(|i| format!("solEntryTest{i:02}")) + .build() + .expect("new rayon threadpool") +} + +pub fn thread_pool_for_benches() -> ThreadPool { + rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|i| format!("solEntryBnch{i:02}")) + .build() + .expect("new rayon threadpool") +} + #[cfg(test)] mod tests { use { @@ -968,6 +1004,7 @@ mod tests { entries: Vec, skip_verification: bool, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn( VersionedTransaction, @@ -989,10 +1026,16 @@ mod tests { } }; - let cpu_verify_result = verify_transactions(entries.clone(), Arc::new(verify_func)); + let cpu_verify_result = + verify_transactions(entries.clone(), thread_pool, Arc::new(verify_func)); let mut gpu_verify_result: EntrySigVerificationState = { - let verify_result = - start_verify_transactions(entries, skip_verification, verify_recyclers, verify); + let verify_result = start_verify_transactions( + entries, + skip_verification, + thread_pool, + verify_recyclers, + verify, + ); match verify_result { Ok(res) => res, _ => EntrySigVerificationState { @@ -1022,6 +1065,8 @@ mod tests { #[test] fn test_entry_gpu_verify() { + let thread_pool = thread_pool_for_tests(); + let verify_transaction = { move |versioned_tx: VersionedTransaction, verification_mode: TransactionVerificationMode| @@ -1067,12 +1112,14 @@ mod tests { entries_invalid, false, recycler.clone(), + &thread_pool, Arc::new(verify_transaction) )); assert!(test_verify_transactions( entries_valid, false, recycler, + &thread_pool, Arc::new(verify_transaction) )); } @@ -1096,6 +1143,8 @@ mod tests { #[test] fn test_transaction_signing() { + let thread_pool = thread_pool_for_tests(); + use solana_sdk::signature::Signature; let zero = Hash::default(); @@ -1105,27 +1154,27 @@ mod tests { // Verify entry with 2 transactions let mut e0 = [Entry::new(&zero, 0, vec![tx0, tx1])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Clear signature of the first transaction, see that it does not verify let orig_sig = e0[0].transactions[0].signatures[0]; e0[0].transactions[0].signatures[0] = Signature::default(); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // restore original signature e0[0].transactions[0].signatures[0] = orig_sig; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Resize signatures and see verification fails. let len = e0[0].transactions[0].signatures.len(); e0[0].transactions[0] .signatures .resize(len - 1, Signature::default()); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // Pass an entry with no transactions let e0 = [Entry::new(&zero, 0, vec![])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); } #[test] @@ -1158,41 +1207,57 @@ mod tests { #[test] fn test_verify_slice1() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); - assert!(vec![][..].verify(&zero)); // base case - assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 - assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad - assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step + // base case + assert!(vec![][..].verify(&zero, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one, &thread_pool)); + // inductive step + assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero, &thread_pool)); let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2]; bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&zero)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&zero, &thread_pool)); } #[test] fn test_verify_slice_with_hashes1() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![Entry::new_tick(1, &two)][..].verify(&one)); // singleton case 1 - assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(1, &two)][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![])]; ticks.push(next_entry(&ticks.last().unwrap().hash, 1, vec![])); - assert!(ticks.verify(&one)); // inductive step + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] fn test_verify_slice_with_hashes_and_transactions() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); @@ -1200,9 +1265,12 @@ mod tests { let bob_keypair = Keypair::new(); let tx0 = system_transaction::transfer(&alice_keypair, &bob_keypair.pubkey(), 1, one); let tx1 = system_transaction::transfer(&bob_keypair, &alice_keypair.pubkey(), 1, one); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one)); // singleton case 1 - assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![tx0.clone()])]; ticks.push(next_entry( @@ -1210,12 +1278,15 @@ mod tests { 1, vec![tx1.clone()], )); - assert!(ticks.verify(&one)); // inductive step + + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![tx0])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![tx1])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] @@ -1354,7 +1425,7 @@ mod tests { info!("done.. {}", time); let mut time = Measure::start("poh"); - let res = entries.verify(&Hash::default()); + let res = entries.verify(&Hash::default(), &thread_pool_for_tests()); assert_eq!(res, !modified); time.stop(); info!("{} {}", time, res); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9eace1e7c..7f419c464 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -519,20 +519,23 @@ pub fn process_entries_for_tests( let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); let mut batch_timing = BatchExecutionTiming::default(); - let mut replay_entries: Vec<_> = - entry::verify_transactions(entries, Arc::new(verify_transaction))? - .into_iter() - .map(|entry| { - let starting_index = entry_starting_index; - if let EntryType::Transactions(ref transactions) = entry { - entry_starting_index = entry_starting_index.saturating_add(transactions.len()); - } - ReplayEntry { - entry, - starting_index, - } - }) - .collect(); + let mut replay_entries: Vec<_> = entry::verify_transactions( + entries, + &replay_tx_thread_pool, + Arc::new(verify_transaction), + )? + .into_iter() + .map(|entry| { + let starting_index = entry_starting_index; + if let EntryType::Transactions(ref transactions) = entry { + entry_starting_index = entry_starting_index.saturating_add(transactions.len()); + } + ReplayEntry { + entry, + starting_index, + } + }) + .collect(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( @@ -1292,7 +1295,11 @@ fn confirm_slot_entries( let last_entry_hash = entries.last().map(|e| e.hash); let verifier = if !skip_verification { datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); - let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); + let entry_state = entries.start_verify( + &progress.last_entry, + replay_tx_thread_pool, + recyclers.clone(), + ); if entry_state.status() == EntryVerificationStatus::Failure { warn!("Ledger proof of history failed at slot: {}", slot); return Err(BlockError::InvalidEntryHash.into()); @@ -1315,6 +1322,7 @@ fn confirm_slot_entries( let transaction_verification_result = entry::start_verify_transactions( entries, skip_verification, + replay_tx_thread_pool, recyclers.clone(), Arc::new(verify_transaction), ); @@ -1381,7 +1389,7 @@ fn confirm_slot_entries( } if let Some(mut verifier) = verifier { - let verified = verifier.finish_verify(); + let verified = verifier.finish_verify(replay_tx_thread_pool); *poh_verify_elapsed += verifier.poh_duration_us(); if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot()); diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index dffe2a871..aa318f9df 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -5,7 +5,7 @@ use log::*; use { rand::{thread_rng, Rng}, - rayon::prelude::*, + rayon::{prelude::*, ThreadPool}, solana_client::{ connection_cache::{ConnectionCache, Protocol}, thin_client::ThinClient, @@ -14,7 +14,7 @@ use { tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, }, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::{ cluster_info::{self, ClusterInfo}, contact_info::{ContactInfo, LegacyContactInfo}, @@ -180,6 +180,8 @@ pub fn send_many_transactions( pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { let ledger = Blockstore::open(ledger_path).unwrap(); + let thread_pool = entry::thread_pool_for_tests(); + let zeroth_slot = ledger.get_slot_entries(0, 0).unwrap(); let last_id = zeroth_slot.last().unwrap().hash; let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap(); @@ -201,7 +203,7 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { None }; - let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks); + let last_id = verify_slot_ticks(&ledger, &thread_pool, slot, &last_id, should_verify_ticks); pending_slots.extend( next_slots .into_iter() @@ -630,21 +632,23 @@ pub fn start_gossip_voter( fn get_and_verify_slot_entries( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, ) -> Vec { let entries = blockstore.get_slot_entries(slot, 0).unwrap(); - assert!(entries.verify(last_entry)); + assert!(entries.verify(last_entry, thread_pool)); entries } fn verify_slot_ticks( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, expected_num_ticks: Option, ) -> Hash { - let entries = get_and_verify_slot_entries(blockstore, slot, last_entry); + let entries = get_and_verify_slot_entries(blockstore, thread_pool, slot, last_entry); let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum(); if let Some(expected_num_ticks) = expected_num_ticks { assert_eq!(num_ticks, expected_num_ticks); diff --git a/poh-bench/Cargo.toml b/poh-bench/Cargo.toml index fb44c0cb8..8cd3979b1 100644 --- a/poh-bench/Cargo.toml +++ b/poh-bench/Cargo.toml @@ -17,6 +17,7 @@ solana-entry = { workspace = true } solana-logger = { workspace = true } solana-measure = { workspace = true } solana-perf = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-sdk = { workspace = true } solana-version = { workspace = true } diff --git a/poh-bench/src/main.rs b/poh-bench/src/main.rs index d835bac05..941d581a8 100644 --- a/poh-bench/src/main.rs +++ b/poh-bench/src/main.rs @@ -7,6 +7,7 @@ use { clap::{crate_description, crate_name, Arg, Command}, solana_measure::measure::Measure, solana_perf::perf_libs, + solana_rayon_threadlimit::get_max_thread_count, solana_sdk::hash::hash, }; @@ -73,6 +74,14 @@ fn main() { let start_hash = hash(&[1, 2, 3, 4]); let ticks = create_ticks(max_num_entries, hashes_per_tick, start_hash); let mut num_entries = start_num_entries as usize; + let num_threads = matches + .value_of_t("num_threads") + .unwrap_or(get_max_thread_count()); + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("solPohBench{i:02}")) + .build() + .expect("new rayon threadpool"); if matches.is_present("cuda") { perf_libs::init_cuda(); } @@ -81,8 +90,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_generic(&start_hash) - .finish_verify()); + .verify_cpu_generic(&start_hash, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -100,8 +109,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 8) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 8, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -115,8 +124,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 16) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 16, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -132,8 +141,8 @@ fn main() { let recyclers = VerifyRecyclers::default(); for _ in 0..iterations { assert!(ticks[..num_entries] - .start_verify(&start_hash, recyclers.clone()) - .finish_verify()); + .start_verify(&start_hash, &thread_pool, recyclers.clone()) + .finish_verify(&thread_pool)); } time.stop(); println!( diff --git a/poh/benches/poh_verify.rs b/poh/benches/poh_verify.rs index 47f31860c..cd33cdae4 100644 --- a/poh/benches/poh_verify.rs +++ b/poh/benches/poh_verify.rs @@ -2,7 +2,7 @@ extern crate test; use { - solana_entry::entry::{next_entry_mut, Entry, EntrySlice}, + solana_entry::entry::{self, next_entry_mut, Entry, EntrySlice}, solana_sdk::{ hash::{hash, Hash}, signature::{Keypair, Signer}, @@ -17,6 +17,8 @@ const NUM_ENTRIES: usize = 800; #[bench] fn bench_poh_verify_ticks(bencher: &mut Bencher) { solana_logger::setup(); + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -27,12 +29,14 @@ fn bench_poh_verify_ticks(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) } #[bench] fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -47,6 +51,6 @@ fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) }