Add pre shrink pass before sigverify batch (#25136)

This commit is contained in:
Justin Starry 2022-05-28 01:51:55 +10:00 committed by GitHub
parent ce14c63bf0
commit e4409a87fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 27 deletions

View File

@ -13,7 +13,10 @@ use {
solana_measure::measure::Measure,
solana_perf::{
packet::{Packet, PacketBatch},
sigverify::{count_valid_packets, shrink_batches, Deduper},
sigverify::{
count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches,
Deduper,
},
},
solana_sdk::timing,
solana_streamer::streamer::{self, StreamerError},
@ -32,6 +35,10 @@ const MAX_DEDUP_BATCH: usize = 165_000;
// 50ms/(25us/packet) = 2000 packets
const MAX_SIGVERIFY_BATCH: usize = 2_000;
// Packet batch shrinker will reorganize packets into compacted batches if 10%
// or more of the packets in a group of packet batches have been discarded.
const MAX_DISCARDED_PACKET_RATE: f64 = 0.10;
#[derive(Error, Debug)]
pub enum SigVerifyServiceError<SendType> {
#[error("send packets batch error")]
@ -266,6 +273,21 @@ impl SigVerifyStage {
}
}
fn maybe_shrink_batches(packet_batches: &mut Vec<PacketBatch>) -> (u64, usize) {
let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_packets = count_packets_in_batches(packet_batches);
let num_discarded_packets = count_discarded_packets(packet_batches);
let pre_packet_batches_len = packet_batches.len();
let discarded_packet_rate = (num_discarded_packets as f64) / (num_packets as f64);
if discarded_packet_rate >= MAX_DISCARDED_PACKET_RATE {
shrink_batches(packet_batches);
}
let post_packet_batches_len = packet_batches.len();
let shrink_total = pre_packet_batches_len.saturating_sub(post_packet_batches_len);
shrink_time.stop();
(shrink_time.as_us(), shrink_total)
}
fn verifier<T: SigVerifier>(
deduper: &Deduper,
recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
@ -319,24 +341,20 @@ impl SigVerifyStage {
let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH);
discard_time.stop();
// Pre-shrink packet batches if many packets are discarded from dedup / discard
let (pre_shrink_time_us, pre_shrink_total) = Self::maybe_shrink_batches(&mut batches);
let mut verify_time = Measure::start("sigverify_batch_time");
let mut batches = verifier.verify_batches(batches, num_valid_packets);
verify_time.stop();
let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_valid_packets = count_valid_packets(
count_valid_packets(
&batches,
#[inline(always)]
|valid_packet| verifier.process_passed_sigverify_packet(valid_packet),
);
let start_len = batches.len();
const MAX_EMPTY_BATCH_RATIO: usize = 4;
if non_discarded_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) {
let valid = shrink_batches(&mut batches);
batches.truncate(valid);
}
let total_shrinks = start_len.saturating_sub(batches.len());
shrink_time.stop();
verify_time.stop();
// Post-shrink packet batches if many packets are discarded from sigverify
let (post_shrink_time_us, post_shrink_total) = Self::maybe_shrink_batches(&mut batches);
verifier.send_packets(batches)?;
@ -374,11 +392,11 @@ impl SigVerifyStage {
stats.total_discard_random_time_us += discard_random_time.as_us() as usize;
stats.total_discard_random += num_discarded_randomly;
stats.total_excess_fail += excess_fail;
stats.total_shrinks += total_shrinks;
stats.total_shrinks += pre_shrink_total + post_shrink_total;
stats.total_dedup_time_us += dedup_time.as_us() as usize;
stats.total_discard_time_us += discard_time.as_us() as usize;
stats.total_verify_time_us += verify_time.as_us() as usize;
stats.total_shrink_time_us += shrink_time.as_us() as usize;
stats.total_shrink_time_us += (pre_shrink_time_us + post_shrink_time_us) as usize;
Ok(())
}
@ -606,4 +624,41 @@ mod tests {
drop(packet_s);
stage.join().unwrap();
}
#[test]
fn test_maybe_shrink_batches() {
let packets_per_batch = 128;
let total_packets = 4096;
let mut batches = gen_batches(true, packets_per_batch, total_packets);
let num_generated_batches = batches.len();
let num_packets = count_packets_in_batches(&batches);
assert_eq!(SigVerifyStage::maybe_shrink_batches(&mut batches).1, 0);
// discard until the threshold is met but not exceeded
{
let mut index = 0;
batches.iter_mut().for_each(|batch| {
batch.iter_mut().for_each(|p| {
if ((index + 1) as f64 / num_packets as f64) < MAX_DISCARDED_PACKET_RATE {
p.meta.set_discard(true);
}
index += 1;
})
});
}
assert_eq!(SigVerifyStage::maybe_shrink_batches(&mut batches).1, 0);
// discard one more to exceed shrink threshold
batches.last_mut().unwrap()[0].meta.set_discard(true);
let expected_num_shrunk_batches =
1.max((num_generated_batches as f64 * MAX_DISCARDED_PACKET_RATE) as usize);
assert_eq!(
SigVerifyStage::maybe_shrink_batches(&mut batches).1,
expected_num_shrunk_batches
);
let expected_remaining_batches = num_generated_batches - expected_num_shrunk_batches;
assert_eq!(batches.len(), expected_remaining_batches);
}
}

View File

@ -189,6 +189,13 @@ pub fn count_valid_packets(
.sum()
}
pub fn count_discarded_packets(batches: &[PacketBatch]) -> usize {
batches
.iter()
.map(|batch| batch.iter().filter(|p| p.meta.discard()).count())
.sum()
}
// internal function to be unit-tested; should be used only by get_packet_offsets
fn do_get_packet_offsets(
packet: &Packet,
@ -537,7 +544,7 @@ impl Deduper {
}
//inplace shrink a batch of packets
pub fn shrink_batches(batches: &mut [PacketBatch]) -> usize {
pub fn shrink_batches(batches: &mut Vec<PacketBatch>) {
let mut valid_batch_ix = 0;
let mut valid_packet_ix = 0;
let mut last_valid_batch = 0;
@ -567,7 +574,7 @@ pub fn shrink_batches(batches: &mut [PacketBatch]) -> usize {
}
}
}
last_valid_batch
batches.truncate(last_valid_batch);
}
pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) {
@ -1514,8 +1521,7 @@ mod tests {
start.sort_by(|a, b| a.data().cmp(b.data()));
let packet_count = count_valid_packets(&batches, |_| ());
let res = shrink_batches(&mut batches);
batches.truncate(res);
shrink_batches(&mut batches);
//make sure all the non discarded packets are the same
let mut end = vec![];
@ -1538,14 +1544,21 @@ mod tests {
// No batches
// truncate of 1 on len 0 is a noop
assert_eq!(shrink_batches(&mut []), 0);
shrink_batches(&mut Vec::new());
// One empty batch
assert_eq!(shrink_batches(&mut [PacketBatch::with_capacity(0)]), 0);
{
let mut batches = vec![PacketBatch::with_capacity(0)];
shrink_batches(&mut batches);
assert_eq!(batches.len(), 0);
}
// Many empty batches
let mut batches = (0..BATCH_COUNT)
.map(|_| PacketBatch::with_capacity(0))
.collect::<Vec<_>>();
assert_eq!(shrink_batches(&mut batches), 0);
{
let mut batches = (0..BATCH_COUNT)
.map(|_| PacketBatch::with_capacity(0))
.collect::<Vec<_>>();
shrink_batches(&mut batches);
assert_eq!(batches.len(), 0);
}
}
#[test]
@ -1698,10 +1711,10 @@ mod tests {
})
});
debug!("done show valid packets for case {}", i);
let shrunken_batch_count = shrink_batches(&mut batches);
shrink_batches(&mut batches);
let shrunken_batch_count = batches.len();
debug!("shrunk batch test {} count: {}", i, shrunken_batch_count);
assert_eq!(shrunken_batch_count, *expect_batch_count);
batches.truncate(shrunken_batch_count);
assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets);
}
}