diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index aad92f5b9..e5e06a3bc 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -32,8 +32,8 @@ use { // 50ms/(300ns/packet) = 166666 packets ~ 1300 batches const MAX_DEDUP_BATCH: usize = 165_000; -// 50ms/(25us/packet) = 2000 packets -const MAX_SIGVERIFY_BATCH: usize = 2_000; +// 50ms/(10us/packet) = 5000 packets +const MAX_SIGVERIFY_BATCH: usize = 5_000; // Packet batch shrinker will reorganize packets into compacted batches if 10% // or more of the packets in a group of packet batches have been discarded. diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index e2cfddb15..5de177995 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -55,7 +55,7 @@ fn gen_batches( #[bench] #[ignore] fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE - 1; let mut batches = gen_batches(false, 1, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -67,7 +67,7 @@ fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE - 1; let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -79,7 +79,7 @@ fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 8; let mut batches = gen_batches(false, 1, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -91,7 +91,7 @@ fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 8; let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -103,7 +103,7 @@ fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 32; let mut batches = gen_batches(false, 1, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -115,7 +115,7 @@ fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_sigverify_high_packets_large_batch(bencher: &mut Bencher) { - let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32; + let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 32; let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets); let recycler = Recycler::default(); let recycler_out = Recycler::default(); diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index aad913d58..d17f97e03 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -31,7 +31,7 @@ const TRACER_KEY_BYTES: [u8; 32] = [ const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES); const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69; // Empirically derived to constrain max verify latency to ~8ms at lower packet counts -pub const VERIFY_MIN_PACKETS_PER_THREAD: usize = 128; +pub const VERIFY_PACKET_CHUNK_SIZE: usize = 128; lazy_static! { static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() @@ -522,46 +522,20 @@ pub fn shrink_batches(batches: &mut Vec) { pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { debug!("CPU ECDSA for {}", packet_count); - let desired_thread_count = packet_count - .saturating_add(VERIFY_MIN_PACKETS_PER_THREAD) - .saturating_div(VERIFY_MIN_PACKETS_PER_THREAD); - if desired_thread_count <= 1 { - // When using single thread, skip rayon overhead. - batches.iter_mut().for_each(|batch| { - batch.iter_mut().for_each(|packet| { - if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { - packet.meta_mut().set_discard(true); + PAR_THREAD_POOL.install(|| { + batches + .par_iter_mut() + .flatten() + .collect::>() + .par_chunks_mut(VERIFY_PACKET_CHUNK_SIZE) + .for_each(|packets| { + for packet in packets.iter_mut() { + if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { + packet.meta_mut().set_discard(true); + } } - }) - }); - } else if desired_thread_count < get_thread_count() { - // Dynamically compute minimum packet length to spread the load while minimizing threads. - let packets_per_thread = packet_count.saturating_div(desired_thread_count); - PAR_THREAD_POOL.install(|| { - batches - .into_par_iter() - .flatten() - .collect::>() - .into_par_iter() - .with_min_len(packets_per_thread) - .for_each(|packet: &mut Packet| { - if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { - packet.meta_mut().set_discard(true); - } - }) - }); - } else { - // When using all available threads, skip the overhead of flattening, collecting, etc. - PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch: &mut PacketBatch| { - batch.par_iter_mut().for_each(|packet: &mut Packet| { - if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { - packet.meta_mut().set_discard(true); - } - }) }); - }); - } + }); inc_new_counter_debug!("ed25519_verify_cpu", packet_count); } @@ -1190,22 +1164,22 @@ mod tests { #[test] fn test_verify_medium_pass() { - test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, false); + test_verify_n(VERIFY_PACKET_CHUNK_SIZE, false); } #[test] fn test_verify_large_pass() { - test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), false); + test_verify_n(VERIFY_PACKET_CHUNK_SIZE * get_thread_count(), false); } #[test] fn test_verify_medium_fail() { - test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, true); + test_verify_n(VERIFY_PACKET_CHUNK_SIZE, true); } #[test] fn test_verify_large_fail() { - test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), true); + test_verify_n(VERIFY_PACKET_CHUNK_SIZE * get_thread_count(), true); } #[test]