improves sigverify discard_excess_packets performance (#22577)

As shown by the added benchmark, current code does worse if there is a
spam address plus a lot of unique addresses.

on current master:
test bench_packet_discard_many_senders  ... bench:   1,997,960 ns/iter (+/- 103,715)
test bench_packet_discard_mixed_senders ... bench:  14,256,116 ns/iter (+/- 534,865)
test bench_packet_discard_single_sender ... bench:   1,306,809 ns/iter (+/- 61,992)

with this commit:
test bench_packet_discard_many_senders  ... bench:   1,644,025 ns/iter (+/- 83,715)
test bench_packet_discard_mixed_senders ... bench:   1,089,789 ns/iter (+/- 86,324)
test bench_packet_discard_single_sender ... bench:     955,234 ns/iter (+/- 55,953)
This commit is contained in:
behzad nouri 2022-01-19 18:10:02 +00:00 committed by GitHub
parent 650882217c
commit dcf44d2523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 28 deletions

View File

@ -49,11 +49,16 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for p in batch.packets.iter_mut() {
if !p.meta.discard() {
num_packets += 1;
}
p.meta.set_discard(false);
}
}
assert_eq!(num_packets, 10_000);
});
}
@ -67,6 +72,43 @@ fn bench_packet_discard_single_sender(bencher: &mut Bencher) {
run_bench_packet_discard(1, bencher);
}
#[bench]
fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
const SIZE: usize = 30 * 1000;
const CHUNK_SIZE: usize = 1024;
fn new_rand_addr<R: Rng>(rng: &mut R) -> std::net::IpAddr {
let mut addr = [0u16; 8];
rng.fill(&mut addr);
std::net::IpAddr::from(addr)
}
let mut rng = thread_rng();
let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE);
let spam_addr = new_rand_addr(&mut rng);
for batch in batches.iter_mut() {
for packet in batch.packets.iter_mut() {
// One spam address, ~1000 unique addresses.
packet.meta.addr = if rng.gen_ratio(1, 30) {
new_rand_addr(&mut rng)
} else {
spam_addr
}
}
}
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.packets.iter_mut() {
if !packet.meta.discard() {
num_packets += 1;
}
packet.meta.set_discard(false);
}
}
assert_eq!(num_packets, 10_000);
});
}
#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();

View File

@ -8,12 +8,12 @@
use {
crate::sigverify,
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
collections::{HashMap, VecDeque},
thread::{self, Builder, JoinHandle},
time::Instant,
},
@ -161,34 +161,27 @@ impl SigVerifyStage {
Self { thread_hdl }
}
pub fn discard_excess_packets(batches: &mut Vec<PacketBatch>, max_packets: usize) {
let mut received_ips = HashMap::new();
for (batch_index, batch) in batches.iter().enumerate() {
for (packet_index, packets) in batch.packets.iter().enumerate() {
let e = received_ips
.entry(packets.meta.addr().ip())
.or_insert_with(VecDeque::new);
e.push_back((batch_index, packet_index));
}
pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) {
// Group packets by their incoming IP address.
let mut addrs = batches
.iter_mut()
.rev()
.flat_map(|batch| batch.packets.iter_mut().rev())
.map(|packet| (packet.meta.addr, packet))
.into_group_map();
// Allocate max_packets evenly across addresses.
while max_packets > 0 && !addrs.is_empty() {
let num_addrs = addrs.len();
addrs.retain(|_, packets| {
let cap = (max_packets + num_addrs - 1) / num_addrs;
max_packets -= packets.len().min(cap);
packets.truncate(packets.len().saturating_sub(cap));
!packets.is_empty()
});
}
let mut batch_len = 0;
while batch_len < max_packets {
for (_ip, indexes) in received_ips.iter_mut() {
if !indexes.is_empty() {
indexes.pop_front();
batch_len += 1;
if batch_len >= max_packets {
break;
}
}
}
}
for (_addr, indexes) in received_ips {
for (batch_index, packet_index) in indexes {
batches[batch_index].packets[packet_index]
.meta
.set_discard(true);
}
// Discard excess packets from each address.
for packet in addrs.into_values().flatten() {
packet.meta.set_discard(true);
}
}