Share the threadpool for tx execution and entry verifification (#216)

Previously, entry verification had a dedicated threadpool used to verify
PoH hashes as well as some basic transaction verification via
Bank::verify_transaction(). It should also be noted that the entry
verification code provides logic to offload to a GPU if one is present.

Regardless of whether a GPU is present or not, some of the verification
must be done on a CPU. Moreso, the CPU verification of entries and
transaction execution are serial operations; entry verification finishes
first before moving onto transaction execution.

So, tx execution and entry verification are not competing for CPU cycles
at the same time and can use the same pool.

One exception to the above statement is that if someone is using the
feature to replay forks in parallel, then hypothetically, different
forks may end up competing for the same resources at the same time.
However, that is already true given that we had pools that were shared
between replay of multiple forks. So, this change doesn't really change
much for that case, but will reduce overhead in the single fork case
which is the vast majority of the time.
This commit is contained in:
steviez 2024-03-27 16:33:21 -05:00 committed by GitHub
parent e70ff38f35
commit 10d06773cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 200 additions and 98 deletions

1
Cargo.lock generated
View File

@ -6613,6 +6613,7 @@ dependencies = [
"solana-logger",
"solana-measure",
"solana-perf",
"solana-rayon-threadlimit",
"solana-sdk",
"solana-version",
]

View File

@ -786,7 +786,7 @@ mod tests {
crate::banking_trace::{BankingPacketBatch, BankingTracer},
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_entry::entry::{Entry, EntrySlice},
solana_entry::entry::{self, Entry, EntrySlice},
solana_gossip::cluster_info::Node,
solana_ledger::{
blockstore::Blockstore,
@ -941,7 +941,7 @@ mod tests {
.collect();
trace!("done");
assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize);
assert!(entries.verify(&start_hash));
assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests()));
assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash());
banking_stage.join().unwrap();
}
@ -1060,7 +1060,7 @@ mod tests {
.map(|(_bank, (entry, _tick_height))| entry)
.collect();
assert!(entries.verify(&blockhash));
assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests()));
if !entries.is_empty() {
blockhash = entries.last().unwrap().hash;
for entry in entries {

View File

@ -16,6 +16,7 @@ use {
#[bench]
fn bench_gpusigverify(bencher: &mut Bencher) {
let thread_pool = entry::thread_pool_for_benches();
let entries = (0..131072)
.map(|_| {
let transaction = test_tx();
@ -53,6 +54,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) {
let res = entry::start_verify_transactions(
entries.clone(),
false,
&thread_pool,
recycler.clone(),
Arc::new(verify_transaction),
);
@ -65,6 +67,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) {
#[bench]
fn bench_cpusigverify(bencher: &mut Bencher) {
let thread_pool = entry::thread_pool_for_benches();
let entries = (0..131072)
.map(|_| {
let transaction = test_tx();
@ -89,6 +92,7 @@ fn bench_cpusigverify(bencher: &mut Bencher) {
};
bencher.iter(|| {
let _ans = entry::verify_transactions(entries.clone(), Arc::new(verify_transaction));
let _ans =
entry::verify_transactions(entries.clone(), &thread_pool, Arc::new(verify_transaction));
})
}

View File

@ -6,7 +6,6 @@ use {
crate::poh::Poh,
crossbeam_channel::{Receiver, Sender},
dlopen2::symbor::{Container, SymBorApi, Symbol},
lazy_static::lazy_static,
log::*,
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
@ -41,16 +40,6 @@ use {
},
};
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solEntry{i:02}"))
.build()
.unwrap();
}
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
@ -359,7 +348,7 @@ impl EntryVerificationState {
self.poh_duration_us
}
pub fn finish_verify(&mut self) -> bool {
pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool {
match &mut self.device_verification_data {
DeviceVerificationData::Gpu(verification_state) => {
let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap();
@ -370,7 +359,7 @@ impl EntryVerificationState {
.expect("unwrap Arc")
.into_inner()
.expect("into_inner");
let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
hashes
.into_par_iter()
.cloned()
@ -405,9 +394,10 @@ impl EntryVerificationState {
pub fn verify_transactions(
entries: Vec<Entry>,
thread_pool: &ThreadPool,
verify: Arc<dyn Fn(VersionedTransaction) -> Result<SanitizedTransaction> + Send + Sync>,
) -> Result<Vec<EntryType>> {
PAR_THREAD_POOL.install(|| {
thread_pool.install(|| {
entries
.into_par_iter()
.map(|entry| {
@ -430,6 +420,7 @@ pub fn verify_transactions(
pub fn start_verify_transactions(
entries: Vec<Entry>,
skip_verification: bool,
thread_pool: &ThreadPool,
verify_recyclers: VerifyRecyclers,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
@ -459,15 +450,16 @@ pub fn start_verify_transactions(
.is_some();
if use_cpu {
start_verify_transactions_cpu(entries, skip_verification, verify)
start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify)
} else {
start_verify_transactions_gpu(entries, verify_recyclers, verify)
start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify)
}
}
fn start_verify_transactions_cpu(
entries: Vec<Entry>,
skip_verification: bool,
thread_pool: &ThreadPool,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
@ -484,7 +476,7 @@ fn start_verify_transactions_cpu(
move |versioned_tx| verify(versioned_tx, mode)
};
let entries = verify_transactions(entries, Arc::new(verify_func))?;
let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?;
Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
@ -497,6 +489,7 @@ fn start_verify_transactions_cpu(
fn start_verify_transactions_gpu(
entries: Vec<Entry>,
verify_recyclers: VerifyRecyclers,
thread_pool: &ThreadPool,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
@ -512,7 +505,7 @@ fn start_verify_transactions_gpu(
}
};
let entries = verify_transactions(entries, Arc::new(verify_func))?;
let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?;
let entry_txs: Vec<&SanitizedTransaction> = entries
.iter()
@ -618,12 +611,25 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool {
// an EntrySlice is a slice of Entries
pub trait EntrySlice {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState;
fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState;
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState;
fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers)
-> EntryVerificationState;
fn verify(&self, start_hash: &Hash) -> bool;
fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState;
fn verify_cpu_generic(
&self,
start_hash: &Hash,
thread_pool: &ThreadPool,
) -> EntryVerificationState;
fn verify_cpu_x86_simd(
&self,
start_hash: &Hash,
simd_len: usize,
thread_pool: &ThreadPool,
) -> EntryVerificationState;
fn start_verify(
&self,
start_hash: &Hash,
thread_pool: &ThreadPool,
recyclers: VerifyRecyclers,
) -> EntryVerificationState;
fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool;
/// Checks that each entry tick has the correct number of hashes. Entry slices do not
/// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count
/// for the next entry slice.
@ -633,12 +639,16 @@ pub trait EntrySlice {
}
impl EntrySlice for [Entry] {
fn verify(&self, start_hash: &Hash) -> bool {
self.start_verify(start_hash, VerifyRecyclers::default())
.finish_verify()
fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool {
self.start_verify(start_hash, thread_pool, VerifyRecyclers::default())
.finish_verify(thread_pool)
}
fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState {
fn verify_cpu_generic(
&self,
start_hash: &Hash,
thread_pool: &ThreadPool,
) -> EntryVerificationState {
let now = Instant::now();
let genesis = [Entry {
num_hashes: 0,
@ -646,7 +656,7 @@ impl EntrySlice for [Entry] {
transactions: vec![],
}];
let entry_pairs = genesis.par_iter().chain(self).zip(self);
let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
entry_pairs.all(|(x0, x1)| {
let r = x1.verify(&x0.hash);
if !r {
@ -672,7 +682,12 @@ impl EntrySlice for [Entry] {
}
}
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState {
fn verify_cpu_x86_simd(
&self,
start_hash: &Hash,
simd_len: usize,
thread_pool: &ThreadPool,
) -> EntryVerificationState {
use solana_sdk::hash::HASH_BYTES;
let now = Instant::now();
let genesis = [Entry {
@ -703,7 +718,7 @@ impl EntrySlice for [Entry] {
num_hashes.resize(aligned_len, 0);
let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect();
let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
hashes_chunked
.par_iter_mut()
.zip(num_hashes)
@ -753,7 +768,7 @@ impl EntrySlice for [Entry] {
}
}
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState {
fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let (has_avx2, has_avx512) = (
is_x86_feature_detected!("avx2"),
@ -764,25 +779,26 @@ impl EntrySlice for [Entry] {
if api().is_some() {
if has_avx512 && self.len() >= 128 {
self.verify_cpu_x86_simd(start_hash, 16)
self.verify_cpu_x86_simd(start_hash, 16, thread_pool)
} else if has_avx2 && self.len() >= 48 {
self.verify_cpu_x86_simd(start_hash, 8)
self.verify_cpu_x86_simd(start_hash, 8, thread_pool)
} else {
self.verify_cpu_generic(start_hash)
self.verify_cpu_generic(start_hash, thread_pool)
}
} else {
self.verify_cpu_generic(start_hash)
self.verify_cpu_generic(start_hash, thread_pool)
}
}
fn start_verify(
&self,
start_hash: &Hash,
thread_pool: &ThreadPool,
recyclers: VerifyRecyclers,
) -> EntryVerificationState {
let start = Instant::now();
let Some(api) = perf_libs::api() else {
return self.verify_cpu(start_hash);
return self.verify_cpu(start_hash, thread_pool);
};
inc_new_counter_info!("entry_verify-num_entries", self.len());
@ -839,7 +855,7 @@ impl EntrySlice for [Entry] {
})
.unwrap();
let verifications = PAR_THREAD_POOL.install(|| {
let verifications = thread_pool.install(|| {
self.into_par_iter()
.map(|entry| {
let answer = entry.hash;
@ -938,6 +954,26 @@ pub fn next_versioned_entry(
}
}
pub fn thread_pool_for_tests() -> ThreadPool {
// Allocate fewer threads for unit tests
// Unit tests typically aren't creating massive blocks to verify, and
// multiple tests could be running in parallel so any further parallelism
// will do more harm than good
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.thread_name(|i| format!("solEntryTest{i:02}"))
.build()
.expect("new rayon threadpool")
}
pub fn thread_pool_for_benches() -> ThreadPool {
rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solEntryBnch{i:02}"))
.build()
.expect("new rayon threadpool")
}
#[cfg(test)]
mod tests {
use {
@ -968,6 +1004,7 @@ mod tests {
entries: Vec<Entry>,
skip_verification: bool,
verify_recyclers: VerifyRecyclers,
thread_pool: &ThreadPool,
verify: Arc<
dyn Fn(
VersionedTransaction,
@ -989,10 +1026,16 @@ mod tests {
}
};
let cpu_verify_result = verify_transactions(entries.clone(), Arc::new(verify_func));
let cpu_verify_result =
verify_transactions(entries.clone(), thread_pool, Arc::new(verify_func));
let mut gpu_verify_result: EntrySigVerificationState = {
let verify_result =
start_verify_transactions(entries, skip_verification, verify_recyclers, verify);
let verify_result = start_verify_transactions(
entries,
skip_verification,
thread_pool,
verify_recyclers,
verify,
);
match verify_result {
Ok(res) => res,
_ => EntrySigVerificationState {
@ -1022,6 +1065,8 @@ mod tests {
#[test]
fn test_entry_gpu_verify() {
let thread_pool = thread_pool_for_tests();
let verify_transaction = {
move |versioned_tx: VersionedTransaction,
verification_mode: TransactionVerificationMode|
@ -1067,12 +1112,14 @@ mod tests {
entries_invalid,
false,
recycler.clone(),
&thread_pool,
Arc::new(verify_transaction)
));
assert!(test_verify_transactions(
entries_valid,
false,
recycler,
&thread_pool,
Arc::new(verify_transaction)
));
}
@ -1096,6 +1143,8 @@ mod tests {
#[test]
fn test_transaction_signing() {
let thread_pool = thread_pool_for_tests();
use solana_sdk::signature::Signature;
let zero = Hash::default();
@ -1105,27 +1154,27 @@ mod tests {
// Verify entry with 2 transactions
let mut e0 = [Entry::new(&zero, 0, vec![tx0, tx1])];
assert!(e0.verify(&zero));
assert!(e0.verify(&zero, &thread_pool));
// Clear signature of the first transaction, see that it does not verify
let orig_sig = e0[0].transactions[0].signatures[0];
e0[0].transactions[0].signatures[0] = Signature::default();
assert!(!e0.verify(&zero));
assert!(!e0.verify(&zero, &thread_pool));
// restore original signature
e0[0].transactions[0].signatures[0] = orig_sig;
assert!(e0.verify(&zero));
assert!(e0.verify(&zero, &thread_pool));
// Resize signatures and see verification fails.
let len = e0[0].transactions[0].signatures.len();
e0[0].transactions[0]
.signatures
.resize(len - 1, Signature::default());
assert!(!e0.verify(&zero));
assert!(!e0.verify(&zero, &thread_pool));
// Pass an entry with no transactions
let e0 = [Entry::new(&zero, 0, vec![])];
assert!(e0.verify(&zero));
assert!(e0.verify(&zero, &thread_pool));
}
#[test]
@ -1158,41 +1207,57 @@ mod tests {
#[test]
fn test_verify_slice1() {
solana_logger::setup();
let thread_pool = thread_pool_for_tests();
let zero = Hash::default();
let one = hash(zero.as_ref());
assert!(vec![][..].verify(&zero)); // base case
assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1
assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad
assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step
// base case
assert!(vec![][..].verify(&zero, &thread_pool));
// singleton case 1
assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero, &thread_pool));
// singleton case 2, bad
assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one, &thread_pool));
// inductive step
assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero, &thread_pool));
let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2];
bad_ticks[1].hash = one;
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
// inductive step, bad
assert!(!bad_ticks.verify(&zero, &thread_pool));
}
#[test]
fn test_verify_slice_with_hashes1() {
solana_logger::setup();
let thread_pool = thread_pool_for_tests();
let zero = Hash::default();
let one = hash(zero.as_ref());
let two = hash(one.as_ref());
assert!(vec![][..].verify(&one)); // base case
assert!(vec![Entry::new_tick(1, &two)][..].verify(&one)); // singleton case 1
assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two)); // singleton case 2, bad
// base case
assert!(vec![][..].verify(&one, &thread_pool));
// singleton case 1
assert!(vec![Entry::new_tick(1, &two)][..].verify(&one, &thread_pool));
// singleton case 2, bad
assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two, &thread_pool));
let mut ticks = vec![next_entry(&one, 1, vec![])];
ticks.push(next_entry(&ticks.last().unwrap().hash, 1, vec![]));
assert!(ticks.verify(&one)); // inductive step
// inductive step
assert!(ticks.verify(&one, &thread_pool));
let mut bad_ticks = vec![next_entry(&one, 1, vec![])];
bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![]));
bad_ticks[1].hash = one;
assert!(!bad_ticks.verify(&one)); // inductive step, bad
// inductive step, bad
assert!(!bad_ticks.verify(&one, &thread_pool));
}
#[test]
fn test_verify_slice_with_hashes_and_transactions() {
solana_logger::setup();
let thread_pool = thread_pool_for_tests();
let zero = Hash::default();
let one = hash(zero.as_ref());
let two = hash(one.as_ref());
@ -1200,9 +1265,12 @@ mod tests {
let bob_keypair = Keypair::new();
let tx0 = system_transaction::transfer(&alice_keypair, &bob_keypair.pubkey(), 1, one);
let tx1 = system_transaction::transfer(&bob_keypair, &alice_keypair.pubkey(), 1, one);
assert!(vec![][..].verify(&one)); // base case
assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one)); // singleton case 1
assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two)); // singleton case 2, bad
// base case
assert!(vec![][..].verify(&one, &thread_pool));
// singleton case 1
assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one, &thread_pool));
// singleton case 2, bad
assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two, &thread_pool));
let mut ticks = vec![next_entry(&one, 1, vec![tx0.clone()])];
ticks.push(next_entry(
@ -1210,12 +1278,15 @@ mod tests {
1,
vec![tx1.clone()],
));
assert!(ticks.verify(&one)); // inductive step
// inductive step
assert!(ticks.verify(&one, &thread_pool));
let mut bad_ticks = vec![next_entry(&one, 1, vec![tx0])];
bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![tx1]));
bad_ticks[1].hash = one;
assert!(!bad_ticks.verify(&one)); // inductive step, bad
// inductive step, bad
assert!(!bad_ticks.verify(&one, &thread_pool));
}
#[test]
@ -1354,7 +1425,7 @@ mod tests {
info!("done.. {}", time);
let mut time = Measure::start("poh");
let res = entries.verify(&Hash::default());
let res = entries.verify(&Hash::default(), &thread_pool_for_tests());
assert_eq!(res, !modified);
time.stop();
info!("{} {}", time, res);

View File

@ -519,20 +519,23 @@ pub fn process_entries_for_tests(
let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut batch_timing = BatchExecutionTiming::default();
let mut replay_entries: Vec<_> =
entry::verify_transactions(entries, Arc::new(verify_transaction))?
.into_iter()
.map(|entry| {
let starting_index = entry_starting_index;
if let EntryType::Transactions(ref transactions) = entry {
entry_starting_index = entry_starting_index.saturating_add(transactions.len());
}
ReplayEntry {
entry,
starting_index,
}
})
.collect();
let mut replay_entries: Vec<_> = entry::verify_transactions(
entries,
&replay_tx_thread_pool,
Arc::new(verify_transaction),
)?
.into_iter()
.map(|entry| {
let starting_index = entry_starting_index;
if let EntryType::Transactions(ref transactions) = entry {
entry_starting_index = entry_starting_index.saturating_add(transactions.len());
}
ReplayEntry {
entry,
starting_index,
}
})
.collect();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
let result = process_entries(
@ -1292,7 +1295,11 @@ fn confirm_slot_entries(
let last_entry_hash = entries.last().map(|e| e.hash);
let verifier = if !skip_verification {
datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64));
let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone());
let entry_state = entries.start_verify(
&progress.last_entry,
replay_tx_thread_pool,
recyclers.clone(),
);
if entry_state.status() == EntryVerificationStatus::Failure {
warn!("Ledger proof of history failed at slot: {}", slot);
return Err(BlockError::InvalidEntryHash.into());
@ -1315,6 +1322,7 @@ fn confirm_slot_entries(
let transaction_verification_result = entry::start_verify_transactions(
entries,
skip_verification,
replay_tx_thread_pool,
recyclers.clone(),
Arc::new(verify_transaction),
);
@ -1381,7 +1389,7 @@ fn confirm_slot_entries(
}
if let Some(mut verifier) = verifier {
let verified = verifier.finish_verify();
let verified = verifier.finish_verify(replay_tx_thread_pool);
*poh_verify_elapsed += verifier.poh_duration_us();
if !verified {
warn!("Ledger proof of history failed at slot: {}", bank.slot());

View File

@ -5,7 +5,7 @@
use log::*;
use {
rand::{thread_rng, Rng},
rayon::prelude::*,
rayon::{prelude::*, ThreadPool},
solana_client::{
connection_cache::{ConnectionCache, Protocol},
thin_client::ThinClient,
@ -14,7 +14,7 @@ use {
tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage},
VOTE_THRESHOLD_DEPTH,
},
solana_entry::entry::{Entry, EntrySlice},
solana_entry::entry::{self, Entry, EntrySlice},
solana_gossip::{
cluster_info::{self, ClusterInfo},
contact_info::{ContactInfo, LegacyContactInfo},
@ -180,6 +180,8 @@ pub fn send_many_transactions(
pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) {
let ledger = Blockstore::open(ledger_path).unwrap();
let thread_pool = entry::thread_pool_for_tests();
let zeroth_slot = ledger.get_slot_entries(0, 0).unwrap();
let last_id = zeroth_slot.last().unwrap().hash;
let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap();
@ -201,7 +203,7 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) {
None
};
let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks);
let last_id = verify_slot_ticks(&ledger, &thread_pool, slot, &last_id, should_verify_ticks);
pending_slots.extend(
next_slots
.into_iter()
@ -630,21 +632,23 @@ pub fn start_gossip_voter(
fn get_and_verify_slot_entries(
blockstore: &Blockstore,
thread_pool: &ThreadPool,
slot: Slot,
last_entry: &Hash,
) -> Vec<Entry> {
let entries = blockstore.get_slot_entries(slot, 0).unwrap();
assert!(entries.verify(last_entry));
assert!(entries.verify(last_entry, thread_pool));
entries
}
fn verify_slot_ticks(
blockstore: &Blockstore,
thread_pool: &ThreadPool,
slot: Slot,
last_entry: &Hash,
expected_num_ticks: Option<usize>,
) -> Hash {
let entries = get_and_verify_slot_entries(blockstore, slot, last_entry);
let entries = get_and_verify_slot_entries(blockstore, thread_pool, slot, last_entry);
let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum();
if let Some(expected_num_ticks) = expected_num_ticks {
assert_eq!(num_ticks, expected_num_ticks);

View File

@ -17,6 +17,7 @@ solana-entry = { workspace = true }
solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-perf = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-sdk = { workspace = true }
solana-version = { workspace = true }

View File

@ -7,6 +7,7 @@ use {
clap::{crate_description, crate_name, Arg, Command},
solana_measure::measure::Measure,
solana_perf::perf_libs,
solana_rayon_threadlimit::get_max_thread_count,
solana_sdk::hash::hash,
};
@ -73,6 +74,14 @@ fn main() {
let start_hash = hash(&[1, 2, 3, 4]);
let ticks = create_ticks(max_num_entries, hashes_per_tick, start_hash);
let mut num_entries = start_num_entries as usize;
let num_threads = matches
.value_of_t("num_threads")
.unwrap_or(get_max_thread_count());
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("solPohBench{i:02}"))
.build()
.expect("new rayon threadpool");
if matches.is_present("cuda") {
perf_libs::init_cuda();
}
@ -81,8 +90,8 @@ fn main() {
let mut time = Measure::start("time");
for _ in 0..iterations {
assert!(ticks[..num_entries]
.verify_cpu_generic(&start_hash)
.finish_verify());
.verify_cpu_generic(&start_hash, &thread_pool)
.finish_verify(&thread_pool));
}
time.stop();
println!(
@ -100,8 +109,8 @@ fn main() {
let mut time = Measure::start("time");
for _ in 0..iterations {
assert!(ticks[..num_entries]
.verify_cpu_x86_simd(&start_hash, 8)
.finish_verify());
.verify_cpu_x86_simd(&start_hash, 8, &thread_pool)
.finish_verify(&thread_pool));
}
time.stop();
println!(
@ -115,8 +124,8 @@ fn main() {
let mut time = Measure::start("time");
for _ in 0..iterations {
assert!(ticks[..num_entries]
.verify_cpu_x86_simd(&start_hash, 16)
.finish_verify());
.verify_cpu_x86_simd(&start_hash, 16, &thread_pool)
.finish_verify(&thread_pool));
}
time.stop();
println!(
@ -132,8 +141,8 @@ fn main() {
let recyclers = VerifyRecyclers::default();
for _ in 0..iterations {
assert!(ticks[..num_entries]
.start_verify(&start_hash, recyclers.clone())
.finish_verify());
.start_verify(&start_hash, &thread_pool, recyclers.clone())
.finish_verify(&thread_pool));
}
time.stop();
println!(

View File

@ -2,7 +2,7 @@
extern crate test;
use {
solana_entry::entry::{next_entry_mut, Entry, EntrySlice},
solana_entry::entry::{self, next_entry_mut, Entry, EntrySlice},
solana_sdk::{
hash::{hash, Hash},
signature::{Keypair, Signer},
@ -17,6 +17,8 @@ const NUM_ENTRIES: usize = 800;
#[bench]
fn bench_poh_verify_ticks(bencher: &mut Bencher) {
solana_logger::setup();
let thread_pool = entry::thread_pool_for_benches();
let zero = Hash::default();
let start_hash = hash(zero.as_ref());
let mut cur_hash = start_hash;
@ -27,12 +29,14 @@ fn bench_poh_verify_ticks(bencher: &mut Bencher) {
}
bencher.iter(|| {
assert!(ticks.verify(&start_hash));
assert!(ticks.verify(&start_hash, &thread_pool));
})
}
#[bench]
fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) {
let thread_pool = entry::thread_pool_for_benches();
let zero = Hash::default();
let start_hash = hash(zero.as_ref());
let mut cur_hash = start_hash;
@ -47,6 +51,6 @@ fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) {
}
bencher.iter(|| {
assert!(ticks.verify(&start_hash));
assert!(ticks.verify(&start_hash, &thread_pool));
})
}