diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index cde843a923..e8fadfee44 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -9,7 +9,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage}, - solana_perf::{packet::to_packet_batches, test_tx::test_tx}, + solana_perf::{packet::to_packet_batches, packet::PacketBatch, test_tx::test_tx}, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -109,19 +109,10 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { }); } -#[bench] -fn bench_sigverify_stage(bencher: &mut Bencher) { - solana_logger::setup(); - let (packet_s, packet_r) = unbounded(); - let (verified_s, verified_r) = unbounded(); - let verifier = TransactionSigVerifier::default(); - let stage = SigVerifyStage::new(packet_r, verified_s, verifier); - - let now = Instant::now(); +fn gen_batches(use_same_tx: bool) -> Vec { let len = 4096; - let use_same_tx = true; let chunk_size = 1024; - let mut batches = if use_same_tx { + if use_same_tx { let tx = test_tx(); to_packet_batches(&vec![tx; len], chunk_size) } else { @@ -139,14 +130,28 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { }) .collect(); to_packet_batches(&txs, chunk_size) - }; + } +} - trace!( - "starting... generation took: {} ms batches: {}", - duration_as_ms(&now.elapsed()), - batches.len() - ); +#[bench] +fn bench_sigverify_stage(bencher: &mut Bencher) { + solana_logger::setup(); + trace!("start"); + let (packet_s, packet_r) = unbounded(); + let (verified_s, verified_r) = unbounded(); + let verifier = TransactionSigVerifier::default(); + let stage = SigVerifyStage::new(packet_r, verified_s, verifier); + + let use_same_tx = true; bencher.iter(move || { + let now = Instant::now(); + let mut batches = gen_batches(use_same_tx); + trace!( + "starting... generation took: {} ms batches: {}", + duration_as_ms(&now.elapsed()), + batches.len() + ); + let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { @@ -162,7 +167,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { received += v.packets.len(); batches.push(v); } - if received >= sent_len { + if use_same_tx || received >= sent_len { break; } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ec9dc69283..baf4cd37d9 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -12,7 +12,7 @@ use { itertools::Itertools, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, - solana_perf::sigverify::Deduper, + solana_perf::sigverify::{count_valid_packets, shrink_batches, Deduper}, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -58,6 +58,9 @@ struct SigVerifierStats { total_packets: usize, total_dedup: usize, total_excess_fail: usize, + total_shrink_time: usize, + total_shrinks: usize, + total_valid_packets: usize, } impl SigVerifierStats { @@ -166,6 +169,9 @@ impl SigVerifierStats { ("total_packets", self.total_packets, i64), ("total_dedup", self.total_dedup, i64), ("total_excess_fail", self.total_excess_fail, i64), + ("total_shrink_time", self.total_shrink_time, i64), + ("total_shrinks", self.total_shrinks, i64), + ("total_valid_packets", self.total_valid_packets, i64), ); } } @@ -248,7 +254,20 @@ impl SigVerifyStage { discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); - let batches = verifier.verify_batches(batches, num_valid_packets); + let mut batches = verifier.verify_batches(batches, num_valid_packets); + verify_batch_time.stop(); + + let mut shrink_time = Measure::start("sigverify_shrink_time"); + let num_valid_packets = count_valid_packets(&batches); + let start_len = batches.len(); + const MAX_EMPTY_BATCH_RATIO: usize = 4; + if num_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) { + let valid = shrink_batches(&mut batches); + batches.truncate(valid); + } + let total_shrinks = start_len.saturating_sub(batches.len()); + shrink_time.stop(); + sendr.send(batches)?; verify_batch_time.stop(); @@ -282,7 +301,10 @@ impl SigVerifyStage { stats.total_batches += batches_len; stats.total_packets += num_packets; stats.total_dedup += dedup_fail; + stats.total_valid_packets += num_valid_packets; stats.total_excess_fail += excess_fail; + stats.total_shrink_time += shrink_time.as_us() as usize; + stats.total_shrinks += total_shrinks; Ok(()) } @@ -348,6 +370,11 @@ impl SigVerifyStage { #[cfg(test)] mod tests { + use crate::sigverify::TransactionSigVerifier; + use crate::sigverify_stage::timing::duration_as_ms; + use crossbeam_channel::unbounded; + use solana_perf::packet::to_packet_batches; + use solana_perf::test_tx::test_tx; use {super::*, solana_perf::packet::Packet}; fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { @@ -379,4 +406,58 @@ mod tests { assert!(batches[0].packets[3].meta.discard()); assert!(!batches[0].packets[4].meta.discard()); } + fn gen_batches(use_same_tx: bool) -> Vec { + let len = 4096; + let chunk_size = 1024; + if use_same_tx { + let tx = test_tx(); + to_packet_batches(&vec![tx; len], chunk_size) + } else { + let txs: Vec<_> = (0..len).map(|_| test_tx()).collect(); + to_packet_batches(&txs, chunk_size) + } + } + + #[test] + fn test_sigverify_stage() { + solana_logger::setup(); + trace!("start"); + let (packet_s, packet_r) = unbounded(); + let (verified_s, verified_r) = unbounded(); + let verifier = TransactionSigVerifier::default(); + let stage = SigVerifyStage::new(packet_r, verified_s, verifier); + + let use_same_tx = true; + let now = Instant::now(); + let mut batches = gen_batches(use_same_tx); + trace!( + "starting... generation took: {} ms batches: {}", + duration_as_ms(&now.elapsed()), + batches.len() + ); + + let mut sent_len = 0; + for _ in 0..batches.len() { + if let Some(batch) = batches.pop() { + sent_len += batch.packets.len(); + packet_s.send(batch).unwrap(); + } + } + let mut received = 0; + trace!("sent: {}", sent_len); + loop { + if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { + while let Some(v) = verifieds.pop() { + received += v.packets.len(); + batches.push(v); + } + if use_same_tx || received >= sent_len { + break; + } + } + } + trace!("received: {}", received); + drop(packet_s); + stage.join().unwrap(); + } } diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs new file mode 100644 index 0000000000..cf60fd90e5 --- /dev/null +++ b/perf/benches/shrink.rs @@ -0,0 +1,86 @@ +#![allow(clippy::integer_arithmetic)] +#![feature(test)] + +extern crate test; + +use { + rand::prelude::*, + solana_perf::{ + packet::{to_packet_batches, PacketBatch, PACKETS_PER_BATCH}, + sigverify, + }, + test::Bencher, +}; + +const NUM_PACKETS: usize = 1024 * 4; + +fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { + // subtract 8 bytes because the length will get serialized as well + (0..size.checked_sub(8).unwrap()) + .map(|_| rng.gen()) + .collect() +} + +fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec) { + // verify packets + bencher.iter(|| { + let _ans = sigverify::shrink_batches(&mut batches); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + }); +} + +#[bench] +#[ignore] +fn bench_shrink_diff_small_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + + do_bench_shrink_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_shrink_diff_big_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(1024, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + + do_bench_shrink_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_shrink_count_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let mut batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + + bencher.iter(|| { + let _ = sigverify::count_valid_packets(&batches); + }); +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 83a895d790..df330da128 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -167,6 +167,13 @@ pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { batches.iter().map(|batch| batch.packets.len()).sum() } +pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { + batches + .iter() + .map(|batch| batch.packets.iter().filter(|p| !p.meta.discard()).count()) + .sum() +} + // internal function to be unit-tested; should be used only by get_packet_offsets fn do_get_packet_offsets( packet: &Packet, @@ -496,6 +503,43 @@ impl Deduper { } } +//inplace shrink a batch of packets +pub fn shrink_batches(batches: &mut Vec) -> usize { + let mut valid_batch_ix = 0; + let mut valid_packet_ix = 0; + let mut last_valid_batch = 0; + for batch_ix in 0..batches.len() { + for packet_ix in 0..batches[batch_ix].packets.len() { + if batches[batch_ix].packets[packet_ix].meta.discard() { + continue; + } + last_valid_batch = batch_ix.saturating_add(1); + let mut found_spot = false; + while valid_batch_ix < batch_ix && !found_spot { + while valid_packet_ix < batches[valid_batch_ix].packets.len() { + if batches[valid_batch_ix].packets[valid_packet_ix] + .meta + .discard() + { + batches[valid_batch_ix].packets[valid_packet_ix] = + batches[batch_ix].packets[packet_ix].clone(); + batches[batch_ix].packets[packet_ix].meta.set_discard(true); + last_valid_batch = valid_batch_ix.saturating_add(1); + found_spot = true; + break; + } + valid_packet_ix = valid_packet_ix.saturating_add(1); + } + if valid_packet_ix >= batches[valid_batch_ix].packets.len() { + valid_packet_ix = 0; + valid_batch_ix = valid_batch_ix.saturating_add(1); + } + } + } + } + last_valid_batch +} + pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { use rayon::prelude::*; debug!("CPU ECDSA for {}", packet_count); @@ -681,7 +725,7 @@ mod tests { use { super::*, crate::{ - packet::{to_packet_batches, Packet, PacketBatch}, + packet::{to_packet_batches, Packet, PacketBatch, PACKETS_PER_BATCH}, sigverify::{self, PacketOffsets}, test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, @@ -1433,4 +1477,223 @@ mod tests { //allow for 1 false positive even if extremely unlikely assert!(discard < 2); } + + #[test] + fn test_shrink_fuzz() { + for _ in 0..5 { + let mut batches = to_packet_batches( + &(0..PACKETS_PER_BATCH * 3) + .map(|_| test_tx()) + .collect::>(), + PACKETS_PER_BATCH, + ); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + //find all the non discarded packets + let mut start = vec![]; + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .filter(|p| !p.meta.discard()) + .for_each(|p| start.push(p.clone())) + }); + start.sort_by_key(|p| p.data); + + let packet_count = count_valid_packets(&batches); + let res = shrink_batches(&mut batches); + batches.truncate(res); + + //make sure all the non discarded packets are the same + let mut end = vec![]; + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .filter(|p| !p.meta.discard()) + .for_each(|p| end.push(p.clone())) + }); + end.sort_by_key(|p| p.data); + let packet_count2 = count_valid_packets(&batches); + assert_eq!(packet_count, packet_count2); + assert_eq!(start, end); + } + } + + #[test] + fn test_shrink_empty() { + const PACKET_COUNT: usize = 1024; + const BATCH_COUNT: usize = PACKET_COUNT / PACKETS_PER_BATCH; + + // No batches + // truncate of 1 on len 0 is a noop + assert_eq!(shrink_batches(&mut vec![]), 0); + // One empty batch + assert_eq!(shrink_batches(&mut vec![PacketBatch::with_capacity(0)]), 0); + // Many empty batches + let mut batches = (0..BATCH_COUNT) + .map(|_| PacketBatch::with_capacity(0)) + .collect::>(); + assert_eq!(shrink_batches(&mut batches), 0); + } + + #[test] + fn test_shrink_vectors() { + const PACKET_COUNT: usize = 1024; + const BATCH_COUNT: usize = PACKET_COUNT / PACKETS_PER_BATCH; + + let set_discards = [ + // contiguous + // 0 + // No discards + |_, _| false, + // All discards + |_, _| true, + // single partitions + // discard last half of packets + |b, p| ((b * PACKETS_PER_BATCH) + p) >= (PACKET_COUNT / 2), + // discard first half of packets + |b, p| ((b * PACKETS_PER_BATCH) + p) < (PACKET_COUNT / 2), + // discard last half of each batch + |_, p| p >= (PACKETS_PER_BATCH / 2), + // 5 + // discard first half of each batch + |_, p| p < (PACKETS_PER_BATCH / 2), + // uniform sparse + // discard even packets + |b, p| ((b * PACKETS_PER_BATCH) + p) % 2 == 0, + // discard odd packets + |b, p| ((b * PACKETS_PER_BATCH) + p) % 2 == 1, + // discard even batches + |b, _| b % 2 == 0, + // discard odd batches + |b, _| b % 2 == 1, + // edges + // 10 + // discard first batch + |b, _| b == 0, + // discard last batch + |b, _| b == BATCH_COUNT - 1, + // discard first and last batches + |b, _| b == 0 || b == BATCH_COUNT - 1, + // discard all but first and last batches + |b, _| b != 0 && b != BATCH_COUNT - 1, + // discard first packet + |b, p| ((b * PACKETS_PER_BATCH) + p) == 0, + // 15 + // discard all but first packet + |b, p| ((b * PACKETS_PER_BATCH) + p) != 0, + // discard last packet + |b, p| ((b * PACKETS_PER_BATCH) + p) == PACKET_COUNT - 1, + // discard all but last packet + |b, p| ((b * PACKETS_PER_BATCH) + p) != PACKET_COUNT - 1, + // discard first packet of each batch + |_, p| p == 0, + // discard all but first packet of each batch + |_, p| p != 0, + // 20 + // discard last packet of each batch + |_, p| p == PACKETS_PER_BATCH - 1, + // discard all but last packet of each batch + |_, p| p != PACKETS_PER_BATCH - 1, + // discard first and last packet of each batch + |_, p| p == 0 || p == PACKETS_PER_BATCH - 1, + // discard all but first and last packet of each batch + |_, p| p != 0 && p != PACKETS_PER_BATCH - 1, + // discard all after first packet in second to last batch + |b, p| (b == BATCH_COUNT - 2 && p > 0) || b == BATCH_COUNT - 1, + // 25 + ]; + + let expect_valids = [ + // (expected_batches, expected_valid_packets) + // + // contiguous + // 0 + (BATCH_COUNT, PACKET_COUNT), + (0, 0), + // single partitions + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // 5 + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // uniform sparse + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // edges + // 10 + (BATCH_COUNT - 1, PACKET_COUNT - PACKETS_PER_BATCH), + (BATCH_COUNT - 1, PACKET_COUNT - PACKETS_PER_BATCH), + (BATCH_COUNT - 2, PACKET_COUNT - 2 * PACKETS_PER_BATCH), + (2, 2 * PACKETS_PER_BATCH), + (BATCH_COUNT, PACKET_COUNT - 1), + // 15 + (1, 1), + (BATCH_COUNT, PACKET_COUNT - 1), + (1, 1), + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 1) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 1) * BATCH_COUNT, + ), + ( + (BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + BATCH_COUNT, + ), + // 20 + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 1) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 1) * BATCH_COUNT, + ), + ( + (BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + BATCH_COUNT, + ), + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 2) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 2) * BATCH_COUNT, + ), + ( + (2 * BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + PACKET_COUNT - (PACKETS_PER_BATCH - 2) * BATCH_COUNT, + ), + (BATCH_COUNT - 1, PACKET_COUNT - 2 * PACKETS_PER_BATCH + 1), + // 25 + ]; + + let test_cases = set_discards.iter().zip(&expect_valids).enumerate(); + for (i, (set_discard, (expect_batch_count, expect_valid_packets))) in test_cases { + println!("test_shrink case: {}", i); + let mut batches = to_packet_batches( + &(0..PACKET_COUNT).map(|_| test_tx()).collect::>(), + PACKETS_PER_BATCH, + ); + assert_eq!(batches.len(), BATCH_COUNT); + assert_eq!(count_valid_packets(&batches), PACKET_COUNT); + batches.iter_mut().enumerate().for_each(|(i, b)| { + b.packets + .iter_mut() + .enumerate() + .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) + }); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + println!("show valid packets for case {}", i); + batches.iter_mut().enumerate().for_each(|(i, b)| { + b.packets.iter_mut().enumerate().for_each(|(j, p)| { + if !p.meta.discard() { + println!("{} {}", i, j) + } + }) + }); + println!("done show valid packets for case {}", i); + let shrunken_batch_count = shrink_batches(&mut batches); + println!("shrunk batch test {} count: {}", i, shrunken_batch_count); + assert_eq!(shrunken_batch_count, *expect_batch_count); + batches.truncate(shrunken_batch_count); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + } + } }