diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index ea174a7a8..df92403c9 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,10 +1,12 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + rayon::{ThreadPool, ThreadPoolBuilder}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu, }, solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache}, + solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ @@ -36,10 +38,15 @@ pub(crate) fn spawn_shred_sigverify( ) -> JoinHandle<()> { let recycler_cache = RecyclerCache::warmed(); let mut stats = ShredSigVerifyStats::new(Instant::now()); - Builder::new() - .name("solShredVerifr".to_string()) - .spawn(move || loop { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|i| format!("solSvrfyShred{i:02}")) + .build() + .unwrap(); + let run_shred_sigverify = move || { + loop { match run_shred_sigverify( + &thread_pool, // We can't store the pubkey outside the loop // because the identity might be hot swapped. &cluster_info.id(), @@ -58,11 +65,17 @@ pub(crate) fn spawn_shred_sigverify( Err(Error::SendError) => break, } stats.maybe_submit(); - }) + } + }; + Builder::new() + .name("solShredVerifr".to_string()) + .spawn(run_shred_sigverify) .unwrap() } +#[allow(clippy::too_many_arguments)] fn run_shred_sigverify( + thread_pool: &ThreadPool, self_pubkey: &Pubkey, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, @@ -83,6 +96,7 @@ fn run_shred_sigverify( stats.num_packets += packets.iter().map(PacketBatch::len).sum::(); stats.num_discards_pre += count_discards(&packets); verify_packets( + thread_pool, self_pubkey, bank_forks, leader_schedule_cache, @@ -108,6 +122,7 @@ fn run_shred_sigverify( } fn verify_packets( + thread_pool: &ThreadPool, self_pubkey: &Pubkey, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, @@ -121,7 +136,7 @@ fn verify_packets( .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes()))) .chain(std::iter::once((Slot::MAX, [0u8; 32]))) .collect(); - let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache); + let out = verify_shreds_gpu(thread_pool, packets, &leader_slots, recycler_cache); solana_perf::sigverify::mark_disabled(packets, &out); } @@ -284,7 +299,9 @@ mod tests { batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); batches[0][1].meta_mut().size = shred.payload().len(); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); verify_packets( + &thread_pool, &Pubkey::new_unique(), // self_pubkey &bank_forks, &leader_schedule_cache, diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 4007291d2..ab69441c1 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -2,6 +2,7 @@ extern crate test; use { + rayon::ThreadPoolBuilder, solana_ledger::{ shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY}, sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair}, @@ -10,6 +11,7 @@ use { packet::{Packet, PacketBatch}, recycler_cache::RecyclerCache, }, + solana_rayon_threadlimit::get_thread_count, solana_sdk::signature::Keypair, std::sync::Arc, test::Bencher, @@ -19,6 +21,10 @@ const NUM_PACKETS: usize = 256; const NUM_BATCHES: usize = 1; #[bench] fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .build() + .unwrap(); let recycler_cache = RecyclerCache::default(); let mut packet_batch = PacketBatch::new_pinned_with_capacity(NUM_PACKETS); @@ -43,15 +49,31 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { let pinned_keypair = Some(Arc::new(pinned_keypair)); //warmup for _ in 0..100 { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); + sign_shreds_gpu( + &thread_pool, + &keypair, + &pinned_keypair, + &mut batches, + &recycler_cache, + ); } bencher.iter(|| { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); + sign_shreds_gpu( + &thread_pool, + &keypair, + &pinned_keypair, + &mut batches, + &recycler_cache, + ); }) } #[bench] fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .build() + .unwrap(); let mut packet_batch = PacketBatch::default(); let slot = 0xdead_c0de; packet_batch.resize(NUM_PACKETS, Packet::default()); @@ -71,6 +93,6 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { let mut batches = vec![packet_batch; NUM_BATCHES]; let keypair = Keypair::new(); bencher.iter(|| { - sign_shreds_cpu(&keypair, &mut batches); + sign_shreds_cpu(&thread_pool, &keypair, &mut batches); }) } diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index e9370e369..5f315c067 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -12,7 +12,6 @@ use { recycler_cache::RecyclerCache, sigverify::{self, count_packets_in_batches, TxOffset}, }, - solana_rayon_threadlimit::get_thread_count, solana_sdk::{ clock::Slot, hash::Hash, @@ -27,14 +26,6 @@ const SIGN_SHRED_GPU_MIN: usize = 256; const_assert_eq!(SIZE_OF_MERKLE_ROOT, 32); const SIZE_OF_MERKLE_ROOT: usize = std::mem::size_of::(); -lazy_static! { - static ref SIGVERIFY_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|i| format!("solSvrfyShred{i:02}")) - .build() - .unwrap(); -} - #[must_use] pub fn verify_shred_cpu( packet: &Packet, @@ -69,12 +60,13 @@ pub fn verify_shred_cpu( } fn verify_shreds_cpu( + thread_pool: &ThreadPool, batches: &[PacketBatch], slot_leaders: &HashMap, ) -> Vec> { let packet_count = count_packets_in_batches(batches); debug!("CPU SHRED ECDSA for {}", packet_count); - let rv = SIGVERIFY_THREAD_POOL.install(|| { + let rv = thread_pool.install(|| { batches .into_par_iter() .map(|batch| { @@ -90,6 +82,7 @@ fn verify_shreds_cpu( } fn slot_key_data_for_gpu( + thread_pool: &ThreadPool, batches: &[PacketBatch], slot_keys: &HashMap, recycler_cache: &RecyclerCache, @@ -99,7 +92,7 @@ where { //TODO: mark Pubkey::default shreds as failed after the GPU returns assert_eq!(slot_keys.get(&Slot::MAX), Some(&T::default())); - let slots: Vec = SIGVERIFY_THREAD_POOL.install(|| { + let slots: Vec = thread_pool.install(|| { batches .into_par_iter() .flat_map_iter(|batch| { @@ -151,13 +144,14 @@ where // Recovers merkle roots from shreds binary. fn get_merkle_roots( + thread_pool: &ThreadPool, packets: &[PacketBatch], recycler_cache: &RecyclerCache, ) -> ( PinnedVec, // Merkle roots Vec>, // Offsets ) { - let merkle_roots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { + let merkle_roots: Vec> = thread_pool.install(|| { packets .par_iter() .flat_map(|packets| { @@ -256,18 +250,21 @@ fn shred_gpu_offsets( } pub fn verify_shreds_gpu( + thread_pool: &ThreadPool, batches: &[PacketBatch], slot_leaders: &HashMap, recycler_cache: &RecyclerCache, ) -> Vec> { let api = match perf_libs::api() { - None => return verify_shreds_cpu(batches, slot_leaders), + None => return verify_shreds_cpu(thread_pool, batches, slot_leaders), Some(api) => api, }; - let (pubkeys, pubkey_offsets) = slot_key_data_for_gpu(batches, slot_leaders, recycler_cache); + let (pubkeys, pubkey_offsets) = + slot_key_data_for_gpu(thread_pool, batches, slot_leaders, recycler_cache); //HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data - let (merkle_roots, merkle_roots_offsets) = get_merkle_roots(batches, recycler_cache); + let (merkle_roots, merkle_roots_offsets) = + get_merkle_roots(thread_pool, batches, recycler_cache); // Merkle roots are placed after pubkeys; adjust offsets accordingly. let merkle_roots_offsets = { let shift = pubkeys.len(); @@ -338,10 +335,10 @@ fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { packet.buffer_mut()[sig].copy_from_slice(signature.as_ref()); } -pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) { +pub fn sign_shreds_cpu(thread_pool: &ThreadPool, keypair: &Keypair, batches: &mut [PacketBatch]) { let packet_count = count_packets_in_batches(batches); debug!("CPU SHRED ECDSA for {}", packet_count); - SIGVERIFY_THREAD_POOL.install(|| { + thread_pool.install(|| { batches.par_iter_mut().for_each(|batch| { batch[..] .par_iter_mut() @@ -369,6 +366,7 @@ pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) } pub fn sign_shreds_gpu( + thread_pool: &ThreadPool, keypair: &Keypair, pinned_keypair: &Option>>, batches: &mut [PacketBatch], @@ -378,10 +376,10 @@ pub fn sign_shreds_gpu( let pubkey_size = size_of::(); let packet_count = count_packets_in_batches(batches); if packet_count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { - return sign_shreds_cpu(keypair, batches); + return sign_shreds_cpu(thread_pool, keypair, batches); } let api = match perf_libs::api() { - None => return sign_shreds_cpu(keypair, batches), + None => return sign_shreds_cpu(thread_pool, keypair, batches), Some(api) => api, }; let pinned_keypair = pinned_keypair.as_ref().unwrap(); @@ -393,7 +391,8 @@ pub fn sign_shreds_gpu( let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); secret_offsets.resize(packet_count, pubkey_size as u32); - let (merkle_roots, merkle_roots_offsets) = get_merkle_roots(batches, recycler_cache); + let (merkle_roots, merkle_roots_offsets) = + get_merkle_roots(thread_pool, batches, recycler_cache); // Merkle roots are placed after the keypair; adjust offsets accordingly. let merkle_roots_offsets = { let shift = pinned_keypair.len(); @@ -451,7 +450,7 @@ pub fn sign_shreds_gpu( Some(out) }) .collect(); - SIGVERIFY_THREAD_POOL.install(|| { + thread_pool.install(|| { batches .par_iter_mut() .zip(num_packets) @@ -481,6 +480,7 @@ mod tests { }, matches::assert_matches, rand::{seq::SliceRandom, Rng}, + rayon::ThreadPoolBuilder, solana_entry::entry::Entry, solana_sdk::{ hash, @@ -534,7 +534,7 @@ mod tests { run_test_sigverify_shred_cpu(0xdead_c0de); } - fn run_test_sigverify_shreds_cpu(slot: Slot) { + fn run_test_sigverify_shreds_cpu(thread_pool: &ThreadPool, slot: Slot) { solana_logger::setup(); let mut batches = [PacketBatch::default()]; let mut shred = Shred::new_from_data( @@ -557,7 +557,7 @@ mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -565,11 +565,11 @@ mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = HashMap::new(); - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(slot, keypair.pubkey().to_bytes())] @@ -577,16 +577,17 @@ mod tests { .cloned() .collect(); batches[0][0].meta_mut().size = 0; - let rv = verify_shreds_cpu(&batches, &leader_slots); + let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); } #[test] fn test_sigverify_shreds_cpu() { - run_test_sigverify_shreds_cpu(0xdead_c0de); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + run_test_sigverify_shreds_cpu(&thread_pool, 0xdead_c0de); } - fn run_test_sigverify_shreds_gpu(slot: Slot) { + fn run_test_sigverify_shreds_gpu(thread_pool: &ThreadPool, slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); @@ -614,7 +615,7 @@ mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -625,11 +626,11 @@ mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); batches[0][0].meta_mut().size = 0; @@ -640,16 +641,17 @@ mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); } #[test] fn test_sigverify_shreds_gpu() { - run_test_sigverify_shreds_gpu(0xdead_c0de); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + run_test_sigverify_shreds_gpu(&thread_pool, 0xdead_c0de); } - fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { + fn run_test_sigverify_shreds_sign_gpu(thread_pool: &ThreadPool, slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); @@ -683,23 +685,30 @@ mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![0; num_packets]; num_batches]); //signed - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); - let rv = verify_shreds_cpu(&batches, &pubkeys); + sign_shreds_gpu( + thread_pool, + &keypair, + &pinned_keypair, + &mut batches, + &recycler_cache, + ); + let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); - let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(thread_pool, &batches, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); } #[test] fn test_sigverify_shreds_sign_gpu() { - run_test_sigverify_shreds_sign_gpu(0xdead_c0de); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + run_test_sigverify_shreds_sign_gpu(&thread_pool, 0xdead_c0de); } - fn run_test_sigverify_shreds_sign_cpu(slot: Slot) { + fn run_test_sigverify_shreds_sign_cpu(thread_pool: &ThreadPool, slot: Slot) { solana_logger::setup(); let mut batches = [PacketBatch::default()]; @@ -726,17 +735,18 @@ mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_cpu(&batches, &pubkeys); + let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys); assert_eq!(rv, vec![vec![0]]); //signed - sign_shreds_cpu(&keypair, &mut batches); - let rv = verify_shreds_cpu(&batches, &pubkeys); + sign_shreds_cpu(thread_pool, &keypair, &mut batches); + let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys); assert_eq!(rv, vec![vec![1]]); } #[test] fn test_sigverify_shreds_sign_cpu() { - run_test_sigverify_shreds_sign_cpu(0xdead_c0de); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + run_test_sigverify_shreds_sign_cpu(&thread_pool, 0xdead_c0de); } fn make_transaction(rng: &mut R) -> Transaction { @@ -843,6 +853,7 @@ mod tests { #[test] fn test_verify_shreds_fuzz() { let mut rng = rand::thread_rng(); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); let recycler_cache = RecyclerCache::default(); let keypairs = repeat_with(|| rng.gen_range(169_367_809, 169_906_789)) .map(|slot| (slot, Keypair::new())) @@ -856,7 +867,7 @@ mod tests { .collect(); let mut packets = make_packets(&mut rng, &shreds); assert_eq!( - verify_shreds_gpu(&packets, &pubkeys, &recycler_cache), + verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache), packets .iter() .map(|batch| vec![1u8; batch.len()]) @@ -878,12 +889,16 @@ mod tests { .collect::>() }) .collect(); - assert_eq!(verify_shreds_gpu(&packets, &pubkeys, &recycler_cache), out); + assert_eq!( + verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache), + out + ); } #[test] fn test_sign_shreds_gpu() { let mut rng = rand::thread_rng(); + let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); let recycler_cache = RecyclerCache::default(); let shreds = { let keypairs = repeat_with(|| rng.gen_range(169_367_809, 169_906_789)) @@ -905,7 +920,7 @@ mod tests { let mut packets = make_packets(&mut rng, &shreds); // Assert that initially all signatrues are invalid. assert_eq!( - verify_shreds_gpu(&packets, &pubkeys, &recycler_cache), + verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache), packets .iter() .map(|batch| vec![0u8; batch.len()]) @@ -914,9 +929,15 @@ mod tests { let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); let pinned_keypair = Some(Arc::new(pinned_keypair)); // Sign and verify shreds signatures. - sign_shreds_gpu(&keypair, &pinned_keypair, &mut packets, &recycler_cache); + sign_shreds_gpu( + &thread_pool, + &keypair, + &pinned_keypair, + &mut packets, + &recycler_cache, + ); assert_eq!( - verify_shreds_gpu(&packets, &pubkeys, &recycler_cache), + verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache), packets .iter() .map(|batch| vec![1u8; batch.len()])