Rework tx sig verify batching (#31355)
* Rework tx sig verify batching to eliminate special casing and increase the packet limit for sigverify from 2k to 5k
This commit is contained in:
parent
d621b48026
commit
e79b84ea70
|
@ -32,8 +32,8 @@ use {
|
|||
// 50ms/(300ns/packet) = 166666 packets ~ 1300 batches
|
||||
const MAX_DEDUP_BATCH: usize = 165_000;
|
||||
|
||||
// 50ms/(25us/packet) = 2000 packets
|
||||
const MAX_SIGVERIFY_BATCH: usize = 2_000;
|
||||
// 50ms/(10us/packet) = 5000 packets
|
||||
const MAX_SIGVERIFY_BATCH: usize = 5_000;
|
||||
|
||||
// Packet batch shrinker will reorganize packets into compacted batches if 10%
|
||||
// or more of the packets in a group of packet batches have been discarded.
|
||||
|
|
|
@ -55,7 +55,7 @@ fn gen_batches(
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE - 1;
|
||||
let mut batches = gen_batches(false, 1, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
@ -67,7 +67,7 @@ fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) {
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE - 1;
|
||||
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
@ -79,7 +79,7 @@ fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) {
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 8;
|
||||
let mut batches = gen_batches(false, 1, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
@ -91,7 +91,7 @@ fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) {
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 8;
|
||||
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
@ -103,7 +103,7 @@ fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) {
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 32;
|
||||
let mut batches = gen_batches(false, 1, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
@ -115,7 +115,7 @@ fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) {
|
|||
#[bench]
|
||||
#[ignore]
|
||||
fn bench_sigverify_high_packets_large_batch(bencher: &mut Bencher) {
|
||||
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32;
|
||||
let num_packets = sigverify::VERIFY_PACKET_CHUNK_SIZE * 32;
|
||||
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||
let recycler = Recycler::default();
|
||||
let recycler_out = Recycler::default();
|
||||
|
|
|
@ -31,7 +31,7 @@ const TRACER_KEY_BYTES: [u8; 32] = [
|
|||
const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES);
|
||||
const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69;
|
||||
// Empirically derived to constrain max verify latency to ~8ms at lower packet counts
|
||||
pub const VERIFY_MIN_PACKETS_PER_THREAD: usize = 128;
|
||||
pub const VERIFY_PACKET_CHUNK_SIZE: usize = 128;
|
||||
|
||||
lazy_static! {
|
||||
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
|
||||
|
@ -522,46 +522,20 @@ pub fn shrink_batches(batches: &mut Vec<PacketBatch>) {
|
|||
|
||||
pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) {
|
||||
debug!("CPU ECDSA for {}", packet_count);
|
||||
let desired_thread_count = packet_count
|
||||
.saturating_add(VERIFY_MIN_PACKETS_PER_THREAD)
|
||||
.saturating_div(VERIFY_MIN_PACKETS_PER_THREAD);
|
||||
if desired_thread_count <= 1 {
|
||||
// When using single thread, skip rayon overhead.
|
||||
batches.iter_mut().for_each(|batch| {
|
||||
batch.iter_mut().for_each(|packet| {
|
||||
if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) {
|
||||
packet.meta_mut().set_discard(true);
|
||||
PAR_THREAD_POOL.install(|| {
|
||||
batches
|
||||
.par_iter_mut()
|
||||
.flatten()
|
||||
.collect::<Vec<&mut Packet>>()
|
||||
.par_chunks_mut(VERIFY_PACKET_CHUNK_SIZE)
|
||||
.for_each(|packets| {
|
||||
for packet in packets.iter_mut() {
|
||||
if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) {
|
||||
packet.meta_mut().set_discard(true);
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
} else if desired_thread_count < get_thread_count() {
|
||||
// Dynamically compute minimum packet length to spread the load while minimizing threads.
|
||||
let packets_per_thread = packet_count.saturating_div(desired_thread_count);
|
||||
PAR_THREAD_POOL.install(|| {
|
||||
batches
|
||||
.into_par_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<&mut Packet>>()
|
||||
.into_par_iter()
|
||||
.with_min_len(packets_per_thread)
|
||||
.for_each(|packet: &mut Packet| {
|
||||
if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) {
|
||||
packet.meta_mut().set_discard(true);
|
||||
}
|
||||
})
|
||||
});
|
||||
} else {
|
||||
// When using all available threads, skip the overhead of flattening, collecting, etc.
|
||||
PAR_THREAD_POOL.install(|| {
|
||||
batches.into_par_iter().for_each(|batch: &mut PacketBatch| {
|
||||
batch.par_iter_mut().for_each(|packet: &mut Packet| {
|
||||
if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) {
|
||||
packet.meta_mut().set_discard(true);
|
||||
}
|
||||
})
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
inc_new_counter_debug!("ed25519_verify_cpu", packet_count);
|
||||
}
|
||||
|
||||
|
@ -1190,22 +1164,22 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_verify_medium_pass() {
|
||||
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, false);
|
||||
test_verify_n(VERIFY_PACKET_CHUNK_SIZE, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_large_pass() {
|
||||
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), false);
|
||||
test_verify_n(VERIFY_PACKET_CHUNK_SIZE * get_thread_count(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_medium_fail() {
|
||||
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, true);
|
||||
test_verify_n(VERIFY_PACKET_CHUNK_SIZE, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_large_fail() {
|
||||
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), true);
|
||||
test_verify_n(VERIFY_PACKET_CHUNK_SIZE * get_thread_count(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue