Add minimum iterator packet count for rayon verification threads (#25883)
* Add minimum iterator packet count of 128 for rayon sig verification threads * Add bench performance and unit verification tests
This commit is contained in:
parent
306faed3a8
commit
fc166daa17
|
@ -15,6 +15,7 @@ use {
|
||||||
};
|
};
|
||||||
|
|
||||||
const NUM: usize = 256;
|
const NUM: usize = 256;
|
||||||
|
const LARGE_BATCH_PACKET_COUNT: usize = 128;
|
||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
fn bench_sigverify_simple(bencher: &mut Bencher) {
|
fn bench_sigverify_simple(bencher: &mut Bencher) {
|
||||||
|
@ -35,6 +36,95 @@ fn bench_sigverify_simple(bencher: &mut Bencher) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn gen_batches(
|
||||||
|
use_same_tx: bool,
|
||||||
|
packets_per_batch: usize,
|
||||||
|
total_packets: usize,
|
||||||
|
) -> Vec<PacketBatch> {
|
||||||
|
if use_same_tx {
|
||||||
|
let tx = test_tx();
|
||||||
|
to_packet_batches(&vec![tx; total_packets], packets_per_batch)
|
||||||
|
} else {
|
||||||
|
let txs: Vec<_> = std::iter::repeat_with(test_tx)
|
||||||
|
.take(total_packets)
|
||||||
|
.collect();
|
||||||
|
to_packet_batches(&txs, packets_per_batch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_low_packets_small_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1;
|
||||||
|
let mut batches = gen_batches(false, 1, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_low_packets_large_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD - 1;
|
||||||
|
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_medium_packets_small_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8;
|
||||||
|
let mut batches = gen_batches(false, 1, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_medium_packets_large_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 8;
|
||||||
|
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_high_packets_small_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32;
|
||||||
|
let mut batches = gen_batches(false, 1, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_sigverify_high_packets_large_batch(bencher: &mut Bencher) {
|
||||||
|
let num_packets = sigverify::VERIFY_MIN_PACKETS_PER_THREAD * 32;
|
||||||
|
let mut batches = gen_batches(false, LARGE_BATCH_PACKET_COUNT, num_packets);
|
||||||
|
let recycler = Recycler::default();
|
||||||
|
let recycler_out = Recycler::default();
|
||||||
|
// verify packets
|
||||||
|
bencher.iter(|| {
|
||||||
|
sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn bench_sigverify_uneven(bencher: &mut Bencher) {
|
fn bench_sigverify_uneven(bencher: &mut Bencher) {
|
||||||
|
|
|
@ -39,6 +39,8 @@ const TRACER_KEY_BYTES: [u8; 32] = [
|
||||||
];
|
];
|
||||||
const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES);
|
const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES);
|
||||||
const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69;
|
const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69;
|
||||||
|
// Empirically derived to constrain max verify latency to ~8ms at lower packet counts
|
||||||
|
pub const VERIFY_MIN_PACKETS_PER_THREAD: usize = 128;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
|
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
|
||||||
|
@ -608,15 +610,46 @@ pub fn shrink_batches(batches: &mut Vec<PacketBatch>) {
|
||||||
|
|
||||||
pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) {
|
pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) {
|
||||||
debug!("CPU ECDSA for {}", packet_count);
|
debug!("CPU ECDSA for {}", packet_count);
|
||||||
PAR_THREAD_POOL.install(|| {
|
let desired_thread_count = packet_count
|
||||||
batches.into_par_iter().for_each(|batch| {
|
.saturating_add(VERIFY_MIN_PACKETS_PER_THREAD)
|
||||||
batch.par_iter_mut().for_each(|packet| {
|
.saturating_div(VERIFY_MIN_PACKETS_PER_THREAD);
|
||||||
|
if desired_thread_count <= 1 {
|
||||||
|
// When using single thread, skip rayon overhead.
|
||||||
|
batches.iter_mut().for_each(|batch| {
|
||||||
|
batch.iter_mut().for_each(|packet| {
|
||||||
if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) {
|
if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) {
|
||||||
packet.meta.set_discard(true);
|
packet.meta.set_discard(true);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
});
|
} else if desired_thread_count < get_thread_count() {
|
||||||
|
// Dynamically compute minimum packet length to spread the load while minimizing threads.
|
||||||
|
let packets_per_thread = packet_count.saturating_div(desired_thread_count);
|
||||||
|
PAR_THREAD_POOL.install(|| {
|
||||||
|
batches
|
||||||
|
.into_par_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<&mut Packet>>()
|
||||||
|
.into_par_iter()
|
||||||
|
.with_min_len(packets_per_thread)
|
||||||
|
.for_each(|packet: &mut Packet| {
|
||||||
|
if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) {
|
||||||
|
packet.meta.set_discard(true);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// When using all available threads, skip the overhead of flattening, collecting, etc.
|
||||||
|
PAR_THREAD_POOL.install(|| {
|
||||||
|
batches.into_par_iter().for_each(|batch: &mut PacketBatch| {
|
||||||
|
batch.par_iter_mut().for_each(|packet: &mut Packet| {
|
||||||
|
if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) {
|
||||||
|
packet.meta.set_discard(true);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
inc_new_counter_debug!("ed25519_verify_cpu", packet_count);
|
inc_new_counter_debug!("ed25519_verify_cpu", packet_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1216,6 +1249,26 @@ mod tests {
|
||||||
test_verify_n(71, false);
|
test_verify_n(71, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_verify_medium_pass() {
|
||||||
|
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_verify_large_pass() {
|
||||||
|
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_verify_medium_fail() {
|
||||||
|
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_verify_large_fail() {
|
||||||
|
test_verify_n(VERIFY_MIN_PACKETS_PER_THREAD * get_thread_count(), true);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_verify_multisig() {
|
fn test_verify_multisig() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
|
Loading…
Reference in New Issue