From 2dd8ab197de2dfc22e656276e61e4f8cdeaba4f4 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Mon, 20 Jan 2020 20:08:19 -0800 Subject: [PATCH] Remove redundant threadpools in sigverify (#7888) * Limit the number of thread pools sigverify creates * Name local threadpools --- Cargo.lock | 1 + ledger/Cargo.toml | 1 + ledger/src/blockstore.rs | 1 + ledger/src/blockstore_processor.rs | 1 + ledger/src/entry.rs | 1 + ledger/src/lib.rs | 3 + ledger/src/shred.rs | 1 + ledger/src/sigverify_shreds.rs | 132 ++++++++++++++--------------- perf/src/sigverify.rs | 25 +++--- 9 files changed, 84 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b646c5bd89..43b10026b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3979,6 +3979,7 @@ dependencies = [ "ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)", "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 6b5cb3fac1..80e23462f6 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -46,6 +46,7 @@ sys-info = "0.5.8" tar = "0.4.26" thiserror = "1.0" tempfile = "3.1.0" +lazy_static = "1.4.0" [dependencies.rocksdb] # Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 406433fe57..48803436ee 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -58,6 +58,7 @@ pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|ix| format!("blockstore_{}", ix)) .build() .unwrap())); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ad6cab09d6..bba50268dd 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -39,6 +39,7 @@ use thiserror::Error; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|ix| format!("blockstore_processor_{}", ix)) .build() .unwrap()) ); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index bd03265c2b..2aa85f6bc5 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -26,6 +26,7 @@ use std::{cmp, thread}; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|ix| format!("entry_{}", ix)) .build() .unwrap())); diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index de2c2047ae..dde02ecdb8 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -25,3 +25,6 @@ extern crate solana_metrics; #[macro_use] extern crate log; + +#[macro_use] +extern crate lazy_static; diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 59170d2e04..4cb3087aaa 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -46,6 +46,7 @@ pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SL thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|ix| format!("shredder_{}", ix)) .build() .unwrap())); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 73d2599294..741af58e22 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -20,15 +20,17 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; -use std::{cell::RefCell, collections::HashMap, mem::size_of}; +use std::{collections::HashMap, mem::size_of}; pub const SIGN_SHRED_GPU_MIN: usize = 256; -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("sigverify_shreds_{}", ix)) - .build() - .unwrap())); +lazy_static! { + pub static ref SIGVERIFY_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_shreds_{}", ix)) + .build() + .unwrap(); +} /// Assuming layout is /// signature: Signature @@ -70,18 +72,16 @@ fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) use rayon::prelude::*; let count = batch_size(batches); debug!("CPU SHRED ECDSA for {}", count); - let rv = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| { - p.packets - .par_iter() - .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) - .collect() - }) - .collect() - }) + let rv = SIGVERIFY_THREAD_POOL.install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .par_iter() + .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) + .collect() + }) + .collect() }); inc_new_counter_debug!("ed25519_shred_verify_cpu", count); rv @@ -97,30 +97,28 @@ fn slot_key_data_for_gpu< ) -> (PinnedVec, TxOffset, usize) { //TODO: mark Pubkey::default shreds as failed after the GPU returns assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); - let slots: Vec> = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| { - p.packets - .iter() - .map(|packet| { - let slot_start = size_of::() + size_of::(); - let slot_end = slot_start + size_of::(); - if packet.meta.size < slot_end || packet.meta.discard { - return std::u64::MAX; - } - let slot: Option = - limited_deserialize(&packet.data[slot_start..slot_end]).ok(); - match slot { - Some(slot) if slot_keys.get(&slot).is_some() => slot, - _ => std::u64::MAX, - } - }) - .collect() - }) - .collect() - }) + let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .iter() + .map(|packet| { + let slot_start = size_of::() + size_of::(); + let slot_end = slot_start + size_of::(); + if packet.meta.size < slot_end || packet.meta.discard { + return std::u64::MAX; + } + let slot: Option = + limited_deserialize(&packet.data[slot_start..slot_end]).ok(); + match slot { + Some(slot) if slot_keys.get(&slot).is_some() => slot, + _ => std::u64::MAX, + } + }) + .collect() + }) + .collect() }); let mut keys_to_slots: HashMap> = HashMap::new(); for batch in slots.iter() { @@ -312,14 +310,12 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU SHRED ECDSA for {}", count); - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches.par_iter_mut().for_each(|p| { - p.packets[..] - .par_iter_mut() - .for_each(|mut p| sign_shred_cpu(keypair, &mut p)); - }); - }) + SIGVERIFY_THREAD_POOL.install(|| { + batches.par_iter_mut().for_each(|p| { + p.packets[..] + .par_iter_mut() + .for_each(|mut p| sign_shred_cpu(keypair, &mut p)); + }); }); inc_new_counter_debug!("ed25519_shred_verify_cpu", count); } @@ -425,25 +421,23 @@ pub fn sign_shreds_gpu( } sizes[i] += sizes[i - 1]; } - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .par_iter_mut() - .enumerate() - .for_each(|(batch_ix, batch)| { - let num_packets = sizes[batch_ix]; - batch.packets[..] - .par_iter_mut() - .enumerate() - .for_each(|(packet_ix, packet)| { - let sig_ix = packet_ix + num_packets; - let sig_start = sig_ix * sig_size; - let sig_end = sig_start + sig_size; - packet.data[0..sig_size] - .copy_from_slice(&signatures_out[sig_start..sig_end]); - }); - }); - }); + SIGVERIFY_THREAD_POOL.install(|| { + batches + .par_iter_mut() + .enumerate() + .for_each(|(batch_ix, batch)| { + let num_packets = sizes[batch_ix]; + batch.packets[..] + .par_iter_mut() + .enumerate() + .for_each(|(packet_ix, packet)| { + let sig_ix = packet_ix + num_packets; + let sig_start = sig_ix * sig_size; + let sig_end = sig_start + sig_size; + packet.data[0..sig_size] + .copy_from_slice(&signatures_out[sig_start..sig_end]); + }); + }); }); inc_new_counter_debug!("ed25519_shred_sign_gpu", count); } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index d262c7cdfb..9b8f0541b5 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -18,14 +18,15 @@ use solana_sdk::short_vec::decode_len; use solana_sdk::signature::Signature; #[cfg(test)] use solana_sdk::transaction::Transaction; -use std::cell::RefCell; use std::mem::size_of; -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("sigverify_{}", ix)) - .build() - .unwrap())); +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_{}", ix)) + .build() + .unwrap(); +} pub type TxOffset = PinnedVec; @@ -247,13 +248,11 @@ pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { use rayon::prelude::*; let count = batch_size(batches); debug!("CPU ECDSA for {}", batch_size(batches)); - let rv = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| p.packets.par_iter().map(verify_packet).collect()) - .collect() - }) + let rv = PAR_THREAD_POOL.install(|| { + batches + .into_par_iter() + .map(|p| p.packets.par_iter().map(verify_packet).collect()) + .collect() }); inc_new_counter_debug!("ed25519_verify_cpu", count); rv