diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 748a295ce..65991a950 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -15,6 +15,7 @@ use { }; const NUM: usize = 256; +const LARGE_BATCH_PACKET_COUNT: usize = 128; #[bench] fn bench_sigverify_simple(bencher: &mut Bencher) { @@ -35,6 +36,95 @@ fn bench_sigverify_simple(bencher: &mut Bencher) { }) } +fn gen_batches( + use_same_tx: bool, + packets_per_batch: usize, + total_packets: usize, +) -> Vec { + if use_same_tx { + let tx = test_tx(); + to_packet_batches(&vec![tx; total_packets], packets_per_batch) + } else { + let txs: Vec<_> = std::iter::repeat_with(test_tx) + .take(total_packets) + .collect(); + to_packet_batches(&txs, packets_per_batch) + } +} + +#[bench] +#[ignore] +fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1; + let mut batches = gen_batches(false, 1, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1; + let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8; + let mut batches = gen_batches(false, 1, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8; + let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32; + let mut batches = gen_batches(false, 1, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_high_packets_large_batch(bencher: &mut Bencher) { + let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32; + let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + bencher.iter(|| { + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + #[bench] #[ignore] fn bench_sigverify_uneven(bencher: &mut Bencher) { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e859afba2..aee1b310d 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -39,6 +39,8 @@ const TRACER_KEY_BYTES: [u8; 32] = [ ]; const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES); const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69; +// Empirically derived to constrain max verify latency to ~8ms at lower packet counts +pub const VERIFY_MIN_PACKETS_PER_THREAD: usize = 128; lazy_static! { static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() @@ -608,15 +610,46 @@ pub fn shrink_batches(batches: &mut Vec) { pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { debug!("CPU ECDSA for {}", packet_count); - PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch| { - batch.par_iter_mut().for_each(|packet| { + let desired_thread_count = packet_count + .saturating_add(VERIFY_MIN_PACKETS_PER_THREAD) + .saturating_div(VERIFY_MIN_PACKETS_PER_THREAD); + if desired_thread_count <= 1 { + // When using single thread, skip rayon overhead. + batches.iter_mut().for_each(|batch| { + batch.iter_mut().for_each(|packet| { if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { packet.meta.set_discard(true); } }) }); - }); + } else if desired_thread_count < get_thread_count() { + // Dynamically compute minimum packet length to spread the load while minimizing threads. + let packets_per_thread = packet_count.saturating_div(desired_thread_count); + PAR_THREAD_POOL.install(|| { + batches + .into_par_iter() + .flatten() + .collect::>() + .into_par_iter() + .with_min_len(packets_per_thread) + .for_each(|packet: &mut Packet| { + if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { + packet.meta.set_discard(true); + } + }) + }); + } else { + // When using all available threads, skip the overhead of flattening, collecting, etc. + PAR_THREAD_POOL.install(|| { + batches.into_par_iter().for_each(|batch: &mut PacketBatch| { + batch.par_iter_mut().for_each(|packet: &mut Packet| { + if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { + packet.meta.set_discard(true); + } + }) + }); + }); + } inc_new_counter_debug!("ed25519_verify_cpu", packet_count); } @@ -1216,6 +1249,26 @@ mod tests { test_verify_n(71, false); } + #[test] + fn test_verify_medium_pass() { + test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, false); + } + + #[test] + fn test_verify_large_pass() { + test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), false); + } + + #[test] + fn test_verify_medium_fail() { + test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, true); + } + + #[test] + fn test_verify_large_fail() { + test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), true); + } + #[test] fn test_verify_multisig() { solana_logger::setup();