removes lazy-static thread-pool from sigverify-shreds (#30787)

Instead the thread-pool is passed explicitly from higher in the call
stack so that
https://github.com/solana-labs/solana/pull/30786
can use the same thread-pool for shred deduplication.
This commit is contained in:
behzad nouri 2023-03-20 20:33:22 +00:00 committed by GitHub
parent 5d9aba5548
commit c6e7aaf96c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 57 deletions

View File

@ -1,10 +1,12 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
@ -36,10 +38,15 @@ pub(crate) fn spawn_shred_sigverify(
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Builder::new()
.name("solShredVerifr".to_string())
.spawn(move || loop {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
let run_shred_sigverify = move || {
loop {
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
// because the identity might be hot swapped.
&cluster_info.id(),
@ -58,11 +65,17 @@ pub(crate) fn spawn_shred_sigverify(
Err(Error::SendError) => break,
}
stats.maybe_submit();
})
}
};
Builder::new()
.name("solShredVerifr".to_string())
.spawn(run_shred_sigverify)
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
@ -83,6 +96,7 @@ fn run_shred_sigverify(
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
verify_packets(
thread_pool,
self_pubkey,
bank_forks,
leader_schedule_cache,
@ -108,6 +122,7 @@ fn run_shred_sigverify(
}
fn verify_packets(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
@ -121,7 +136,7 @@ fn verify_packets(
.filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
.chain(std::iter::once((Slot::MAX, [0u8; 32])))
.collect();
let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache);
let out = verify_shreds_gpu(thread_pool, packets, &leader_slots, recycler_cache);
solana_perf::sigverify::mark_disabled(packets, &out);
}
@ -284,7 +299,9 @@ mod tests {
batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[0][1].meta_mut().size = shred.payload().len();
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
verify_packets(
&thread_pool,
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&leader_schedule_cache,

View File

@ -2,6 +2,7 @@
extern crate test;
use {
rayon::ThreadPoolBuilder,
solana_ledger::{
shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair},
@ -10,6 +11,7 @@ use {
packet::{Packet, PacketBatch},
recycler_cache::RecyclerCache,
},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::signature::Keypair,
std::sync::Arc,
test::Bencher,
@ -19,6 +21,10 @@ const NUM_PACKETS: usize = 256;
const NUM_BATCHES: usize = 1;
#[bench]
fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let recycler_cache = RecyclerCache::default();
let mut packet_batch = PacketBatch::new_pinned_with_capacity(NUM_PACKETS);
@ -43,15 +49,31 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let pinned_keypair = Some(Arc::new(pinned_keypair));
//warmup
for _ in 0..100 {
sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache);
sign_shreds_gpu(
&thread_pool,
&keypair,
&pinned_keypair,
&mut batches,
&recycler_cache,
);
}
bencher.iter(|| {
sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache);
sign_shreds_gpu(
&thread_pool,
&keypair,
&pinned_keypair,
&mut batches,
&recycler_cache,
);
})
}
#[bench]
fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let mut packet_batch = PacketBatch::default();
let slot = 0xdead_c0de;
packet_batch.resize(NUM_PACKETS, Packet::default());
@ -71,6 +93,6 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) {
let mut batches = vec![packet_batch; NUM_BATCHES];
let keypair = Keypair::new();
bencher.iter(|| {
sign_shreds_cpu(&keypair, &mut batches);
sign_shreds_cpu(&thread_pool, &keypair, &mut batches);
})
}

View File

@ -12,7 +12,6 @@ use {
recycler_cache::RecyclerCache,
sigverify::{self, count_packets_in_batches, TxOffset},
},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::{
clock::Slot,
hash::Hash,
@ -27,14 +26,6 @@ const SIGN_SHRED_GPU_MIN: usize = 256;
const_assert_eq!(SIZE_OF_MERKLE_ROOT, 32);
const SIZE_OF_MERKLE_ROOT: usize = std::mem::size_of::<Hash>();
lazy_static! {
static ref SIGVERIFY_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
}
#[must_use]
pub fn verify_shred_cpu(
packet: &Packet,
@ -69,12 +60,13 @@ pub fn verify_shred_cpu(
}
fn verify_shreds_cpu(
thread_pool: &ThreadPool,
batches: &[PacketBatch],
slot_leaders: &HashMap<Slot, /*pubkey:*/ [u8; 32]>,
) -> Vec<Vec<u8>> {
let packet_count = count_packets_in_batches(batches);
debug!("CPU SHRED ECDSA for {}", packet_count);
let rv = SIGVERIFY_THREAD_POOL.install(|| {
let rv = thread_pool.install(|| {
batches
.into_par_iter()
.map(|batch| {
@ -90,6 +82,7 @@ fn verify_shreds_cpu(
}
fn slot_key_data_for_gpu<T>(
thread_pool: &ThreadPool,
batches: &[PacketBatch],
slot_keys: &HashMap<Slot, /*pubkey:*/ T>,
recycler_cache: &RecyclerCache,
@ -99,7 +92,7 @@ where
{
//TODO: mark Pubkey::default shreds as failed after the GPU returns
assert_eq!(slot_keys.get(&Slot::MAX), Some(&T::default()));
let slots: Vec<Slot> = SIGVERIFY_THREAD_POOL.install(|| {
let slots: Vec<Slot> = thread_pool.install(|| {
batches
.into_par_iter()
.flat_map_iter(|batch| {
@ -151,13 +144,14 @@ where
// Recovers merkle roots from shreds binary.
fn get_merkle_roots(
thread_pool: &ThreadPool,
packets: &[PacketBatch],
recycler_cache: &RecyclerCache,
) -> (
PinnedVec<u8>, // Merkle roots
Vec<Option<usize>>, // Offsets
) {
let merkle_roots: Vec<Option<Hash>> = SIGVERIFY_THREAD_POOL.install(|| {
let merkle_roots: Vec<Option<Hash>> = thread_pool.install(|| {
packets
.par_iter()
.flat_map(|packets| {
@ -256,18 +250,21 @@ fn shred_gpu_offsets(
}
pub fn verify_shreds_gpu(
thread_pool: &ThreadPool,
batches: &[PacketBatch],
slot_leaders: &HashMap<Slot, /*pubkey:*/ [u8; 32]>,
recycler_cache: &RecyclerCache,
) -> Vec<Vec<u8>> {
let api = match perf_libs::api() {
None => return verify_shreds_cpu(batches, slot_leaders),
None => return verify_shreds_cpu(thread_pool, batches, slot_leaders),
Some(api) => api,
};
let (pubkeys, pubkey_offsets) = slot_key_data_for_gpu(batches, slot_leaders, recycler_cache);
let (pubkeys, pubkey_offsets) =
slot_key_data_for_gpu(thread_pool, batches, slot_leaders, recycler_cache);
//HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU
//TODO: GPU needs a more opaque interface, which can handle variable sized structures for data
let (merkle_roots, merkle_roots_offsets) = get_merkle_roots(batches, recycler_cache);
let (merkle_roots, merkle_roots_offsets) =
get_merkle_roots(thread_pool, batches, recycler_cache);
// Merkle roots are placed after pubkeys; adjust offsets accordingly.
let merkle_roots_offsets = {
let shift = pubkeys.len();
@ -338,10 +335,10 @@ fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) {
packet.buffer_mut()[sig].copy_from_slice(signature.as_ref());
}
pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) {
pub fn sign_shreds_cpu(thread_pool: &ThreadPool, keypair: &Keypair, batches: &mut [PacketBatch]) {
let packet_count = count_packets_in_batches(batches);
debug!("CPU SHRED ECDSA for {}", packet_count);
SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.install(|| {
batches.par_iter_mut().for_each(|batch| {
batch[..]
.par_iter_mut()
@ -369,6 +366,7 @@ pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache)
}
pub fn sign_shreds_gpu(
thread_pool: &ThreadPool,
keypair: &Keypair,
pinned_keypair: &Option<Arc<PinnedVec<u8>>>,
batches: &mut [PacketBatch],
@ -378,10 +376,10 @@ pub fn sign_shreds_gpu(
let pubkey_size = size_of::<Pubkey>();
let packet_count = count_packets_in_batches(batches);
if packet_count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() {
return sign_shreds_cpu(keypair, batches);
return sign_shreds_cpu(thread_pool, keypair, batches);
}
let api = match perf_libs::api() {
None => return sign_shreds_cpu(keypair, batches),
None => return sign_shreds_cpu(thread_pool, keypair, batches),
Some(api) => api,
};
let pinned_keypair = pinned_keypair.as_ref().unwrap();
@ -393,7 +391,8 @@ pub fn sign_shreds_gpu(
let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets");
secret_offsets.resize(packet_count, pubkey_size as u32);
let (merkle_roots, merkle_roots_offsets) = get_merkle_roots(batches, recycler_cache);
let (merkle_roots, merkle_roots_offsets) =
get_merkle_roots(thread_pool, batches, recycler_cache);
// Merkle roots are placed after the keypair; adjust offsets accordingly.
let merkle_roots_offsets = {
let shift = pinned_keypair.len();
@ -451,7 +450,7 @@ pub fn sign_shreds_gpu(
Some(out)
})
.collect();
SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.install(|| {
batches
.par_iter_mut()
.zip(num_packets)
@ -481,6 +480,7 @@ mod tests {
},
matches::assert_matches,
rand::{seq::SliceRandom, Rng},
rayon::ThreadPoolBuilder,
solana_entry::entry::Entry,
solana_sdk::{
hash,
@ -534,7 +534,7 @@ mod tests {
run_test_sigverify_shred_cpu(0xdead_c0de);
}
fn run_test_sigverify_shreds_cpu(slot: Slot) {
fn run_test_sigverify_shreds_cpu(thread_pool: &ThreadPool, slot: Slot) {
solana_logger::setup();
let mut batches = [PacketBatch::default()];
let mut shred = Shred::new_from_data(
@ -557,7 +557,7 @@ mod tests {
.iter()
.cloned()
.collect();
let rv = verify_shreds_cpu(&batches, &leader_slots);
let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots);
assert_eq!(rv, vec![vec![1]]);
let wrong_keypair = Keypair::new();
@ -565,11 +565,11 @@ mod tests {
.iter()
.cloned()
.collect();
let rv = verify_shreds_cpu(&batches, &leader_slots);
let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots);
assert_eq!(rv, vec![vec![0]]);
let leader_slots = HashMap::new();
let rv = verify_shreds_cpu(&batches, &leader_slots);
let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots);
assert_eq!(rv, vec![vec![0]]);
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
@ -577,16 +577,17 @@ mod tests {
.cloned()
.collect();
batches[0][0].meta_mut().size = 0;
let rv = verify_shreds_cpu(&batches, &leader_slots);
let rv = verify_shreds_cpu(thread_pool, &batches, &leader_slots);
assert_eq!(rv, vec![vec![0]]);
}
#[test]
fn test_sigverify_shreds_cpu() {
run_test_sigverify_shreds_cpu(0xdead_c0de);
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
run_test_sigverify_shreds_cpu(&thread_pool, 0xdead_c0de);
}
fn run_test_sigverify_shreds_gpu(slot: Slot) {
fn run_test_sigverify_shreds_gpu(thread_pool: &ThreadPool, slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
@ -614,7 +615,7 @@ mod tests {
.iter()
.cloned()
.collect();
let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache);
assert_eq!(rv, vec![vec![1]]);
let wrong_keypair = Keypair::new();
@ -625,11 +626,11 @@ mod tests {
.iter()
.cloned()
.collect();
let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache);
assert_eq!(rv, vec![vec![0]]);
let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect();
let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache);
assert_eq!(rv, vec![vec![0]]);
batches[0][0].meta_mut().size = 0;
@ -640,16 +641,17 @@ mod tests {
.iter()
.cloned()
.collect();
let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &leader_slots, &recycler_cache);
assert_eq!(rv, vec![vec![0]]);
}
#[test]
fn test_sigverify_shreds_gpu() {
run_test_sigverify_shreds_gpu(0xdead_c0de);
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
run_test_sigverify_shreds_gpu(&thread_pool, 0xdead_c0de);
}
fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
fn run_test_sigverify_shreds_sign_gpu(thread_pool: &ThreadPool, slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
@ -683,23 +685,30 @@ mod tests {
.cloned()
.collect();
//unsigned
let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &pubkeys, &recycler_cache);
assert_eq!(rv, vec![vec![0; num_packets]; num_batches]);
//signed
sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache);
let rv = verify_shreds_cpu(&batches, &pubkeys);
sign_shreds_gpu(
thread_pool,
&keypair,
&pinned_keypair,
&mut batches,
&recycler_cache,
);
let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys);
assert_eq!(rv, vec![vec![1; num_packets]; num_batches]);
let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache);
let rv = verify_shreds_gpu(thread_pool, &batches, &pubkeys, &recycler_cache);
assert_eq!(rv, vec![vec![1; num_packets]; num_batches]);
}
#[test]
fn test_sigverify_shreds_sign_gpu() {
run_test_sigverify_shreds_sign_gpu(0xdead_c0de);
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
run_test_sigverify_shreds_sign_gpu(&thread_pool, 0xdead_c0de);
}
fn run_test_sigverify_shreds_sign_cpu(slot: Slot) {
fn run_test_sigverify_shreds_sign_cpu(thread_pool: &ThreadPool, slot: Slot) {
solana_logger::setup();
let mut batches = [PacketBatch::default()];
@ -726,17 +735,18 @@ mod tests {
.cloned()
.collect();
//unsigned
let rv = verify_shreds_cpu(&batches, &pubkeys);
let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys);
assert_eq!(rv, vec![vec![0]]);
//signed
sign_shreds_cpu(&keypair, &mut batches);
let rv = verify_shreds_cpu(&batches, &pubkeys);
sign_shreds_cpu(thread_pool, &keypair, &mut batches);
let rv = verify_shreds_cpu(thread_pool, &batches, &pubkeys);
assert_eq!(rv, vec![vec![1]]);
}
#[test]
fn test_sigverify_shreds_sign_cpu() {
run_test_sigverify_shreds_sign_cpu(0xdead_c0de);
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
run_test_sigverify_shreds_sign_cpu(&thread_pool, 0xdead_c0de);
}
fn make_transaction<R: Rng>(rng: &mut R) -> Transaction {
@ -843,6 +853,7 @@ mod tests {
#[test]
fn test_verify_shreds_fuzz() {
let mut rng = rand::thread_rng();
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
let recycler_cache = RecyclerCache::default();
let keypairs = repeat_with(|| rng.gen_range(169_367_809, 169_906_789))
.map(|slot| (slot, Keypair::new()))
@ -856,7 +867,7 @@ mod tests {
.collect();
let mut packets = make_packets(&mut rng, &shreds);
assert_eq!(
verify_shreds_gpu(&packets, &pubkeys, &recycler_cache),
verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache),
packets
.iter()
.map(|batch| vec![1u8; batch.len()])
@ -878,12 +889,16 @@ mod tests {
.collect::<Vec<_>>()
})
.collect();
assert_eq!(verify_shreds_gpu(&packets, &pubkeys, &recycler_cache), out);
assert_eq!(
verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache),
out
);
}
#[test]
fn test_sign_shreds_gpu() {
let mut rng = rand::thread_rng();
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
let recycler_cache = RecyclerCache::default();
let shreds = {
let keypairs = repeat_with(|| rng.gen_range(169_367_809, 169_906_789))
@ -905,7 +920,7 @@ mod tests {
let mut packets = make_packets(&mut rng, &shreds);
// Assert that initially all signatrues are invalid.
assert_eq!(
verify_shreds_gpu(&packets, &pubkeys, &recycler_cache),
verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache),
packets
.iter()
.map(|batch| vec![0u8; batch.len()])
@ -914,9 +929,15 @@ mod tests {
let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache);
let pinned_keypair = Some(Arc::new(pinned_keypair));
// Sign and verify shreds signatures.
sign_shreds_gpu(&keypair, &pinned_keypair, &mut packets, &recycler_cache);
sign_shreds_gpu(
&thread_pool,
&keypair,
&pinned_keypair,
&mut packets,
&recycler_cache,
);
assert_eq!(
verify_shreds_gpu(&packets, &pubkeys, &recycler_cache),
verify_shreds_gpu(&thread_pool, &packets, &pubkeys, &recycler_cache),
packets
.iter()
.map(|batch| vec![1u8; batch.len()])