Prune sigverify queue (#20331)

This commit is contained in:
sakridge 2021-09-29 20:41:05 -07:00 committed by GitHub
parent 5952b65932
commit 94668c95c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 57 deletions

View File

@ -18,6 +18,44 @@ use std::sync::mpsc::channel;
use std::time::{Duration, Instant};
use test::Bencher;
#[bench]
fn bench_packet_discard(bencher: &mut Bencher) {
solana_logger::setup();
let len = 30 * 1000;
let chunk_size = 1024;
let tx = test_tx();
let mut batches = to_packets_chunked(&vec![tx; len], chunk_size);
let mut total = 0;
let ips: Vec<_> = (0..10_000)
.into_iter()
.map(|_| {
let mut addr = [0u16; 8];
thread_rng().fill(&mut addr);
addr
})
.collect();
for batch in batches.iter_mut() {
total += batch.packets.len();
for p in batch.packets.iter_mut() {
let ip_index = thread_rng().gen_range(0, ips.len());
p.meta.addr = ips[ip_index];
}
}
info!("total packets: {}", total);
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
for batch in batches.iter_mut() {
for p in batch.packets.iter_mut() {
p.meta.discard = false;
}
}
});
}
#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();

View File

@ -9,21 +9,17 @@ use crate::sigverify;
use crossbeam_channel::{SendError, Sender as CrossbeamSender};
use solana_measure::measure::Measure;
use solana_perf::packet::Packets;
use solana_perf::perf_libs;
use solana_sdk::timing;
use solana_streamer::streamer::{self, PacketReceiver, StreamerError};
use std::{
sync::{
mpsc::{Receiver, RecvTimeoutError},
Arc, Mutex,
},
collections::HashMap,
sync::mpsc::{Receiver, RecvTimeoutError},
thread::{self, Builder, JoinHandle},
time::Instant,
};
use thiserror::Error;
const RECV_BATCH_MAX_CPU: usize = 1_000;
const RECV_BATCH_MAX_GPU: usize = 5_000;
const MAX_SIGVERIFY_BATCH: usize = 10_000;
#[derive(Error, Debug)]
pub enum SigVerifyServiceError {
@ -37,7 +33,7 @@ pub enum SigVerifyServiceError {
type Result<T> = std::result::Result<T, SigVerifyServiceError>;
pub struct SigVerifyStage {
thread_hdls: Vec<JoinHandle<()>>,
thread_hdl: JoinHandle<()>,
}
pub trait SigVerifier {
@ -137,44 +133,66 @@ impl SigVerifyStage {
verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T,
) -> Self {
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier);
Self { thread_hdls }
let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier);
Self { thread_hdl }
}
pub fn discard_excess_packets(batches: &mut Vec<Packets>, max_packets: usize) {
let mut received_ips = HashMap::new();
for (batch_index, batch) in batches.iter().enumerate() {
for (packet_index, packets) in batch.packets.iter().enumerate() {
let e = received_ips
.entry(packets.meta.addr().ip())
.or_insert_with(Vec::new);
e.push((batch_index, packet_index));
}
}
let mut batch_len = 0;
while batch_len < max_packets {
for (_ip, indexes) in received_ips.iter_mut() {
if !indexes.is_empty() {
indexes.remove(0);
batch_len += 1;
if batch_len >= MAX_SIGVERIFY_BATCH {
break;
}
}
}
}
for (_addr, indexes) in received_ips {
for (batch_index, packet_index) in indexes {
batches[batch_index].packets[packet_index].meta.discard = true;
}
}
}
fn verifier<T: SigVerifier>(
recvr: &Arc<Mutex<PacketReceiver>>,
recvr: &PacketReceiver,
sendr: &CrossbeamSender<Vec<Packets>>,
id: usize,
verifier: &T,
stats: &mut SigVerifierStats,
) -> Result<()> {
let (batch, num_packets, recv_duration) = streamer::recv_batch(
&recvr.lock().expect("'recvr' lock in fn verifier"),
if perf_libs::api().is_some() {
RECV_BATCH_MAX_GPU
} else {
RECV_BATCH_MAX_CPU
},
)?;
let (mut batches, num_packets, recv_duration) = streamer::recv_batch(recvr)?;
let batch_len = batch.len();
let batches_len = batches.len();
debug!(
"@{:?} verifier: verifying: {} id: {}",
"@{:?} verifier: verifying: {}",
timing::timestamp(),
num_packets,
id
);
if num_packets > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
}
let mut verify_batch_time = Measure::start("sigverify_batch_time");
sendr.send(verifier.verify_batch(batch))?;
sendr.send(verifier.verify_batch(batches))?;
verify_batch_time.stop();
debug!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
"@{:?} verifier: done. batches: {} total verify time: {:?} verified: {} v/s {}",
timing::timestamp(),
batch_len,
batches_len,
verify_batch_time.as_ms(),
id,
num_packets,
(num_packets as f32 / verify_batch_time.as_s())
);
@ -187,33 +205,28 @@ impl SigVerifyStage {
.verify_batches_pp_us_hist
.increment(verify_batch_time.as_us() / (num_packets as u64))
.unwrap();
stats.batches_hist.increment(batch_len as u64).unwrap();
stats.batches_hist.increment(batches_len as u64).unwrap();
stats.packets_hist.increment(num_packets as u64).unwrap();
stats.total_batches += batch_len;
stats.total_batches += batches_len;
stats.total_packets += num_packets;
Ok(())
}
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Arc<Mutex<PacketReceiver>>,
packet_receiver: PacketReceiver,
verified_sender: CrossbeamSender<Vec<Packets>>,
id: usize,
verifier: &T,
) -> JoinHandle<()> {
let verifier = verifier.clone();
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
Builder::new()
.name(format!("solana-verifier-{}", id))
.name("solana-verifier".to_string())
.spawn(move || loop {
if let Err(e) = Self::verifier(
&packet_receiver,
&verified_sender,
id,
&verifier,
&mut stats,
) {
if let Err(e) =
Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats)
{
match e {
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Disconnected,
@ -240,19 +253,43 @@ impl SigVerifyStage {
packet_receiver: PacketReceiver,
verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T,
) -> Vec<JoinHandle<()>> {
let receiver = Arc::new(Mutex::new(packet_receiver));
(0..4)
.map(|id| {
Self::verifier_service(receiver.clone(), verified_sender.clone(), id, &verifier)
})
.collect()
) -> JoinHandle<()> {
Self::verifier_service(packet_receiver, verified_sender, &verifier)
}
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_perf::packet::Packet;
fn count_non_discard(packets: &[Packets]) -> usize {
packets
.iter()
.map(|pp| {
pp.packets
.iter()
.map(|p| if p.meta.discard { 0 } else { 1 })
.sum::<usize>()
})
.sum::<usize>()
}
#[test]
fn test_packet_discard() {
solana_logger::setup();
let mut p = Packets::default();
p.packets.resize(10, Packet::default());
p.packets[3].meta.addr = [1u16; 8];
let mut packets = vec![p];
let max = 3;
SigVerifyStage::discard_excess_packets(&mut packets, max);
assert_eq!(count_non_discard(&packets), max);
assert!(!packets[0].packets[0].meta.discard);
assert!(!packets[0].packets[3].meta.discard);
}
}

View File

@ -126,10 +126,7 @@ fn recv_send(
Ok(())
}
pub fn recv_batch(
recvr: &PacketReceiver,
max_batch: usize,
) -> Result<(Vec<Packets>, usize, Duration)> {
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, Duration)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
@ -140,9 +137,6 @@ pub fn recv_batch(
trace!("got more msgs");
len += more.packets.len();
batch.push(more);
if len > max_batch {
break;
}
}
let recv_duration = recv_start.elapsed();
trace!("batch len {}", batch.len());