From 94668c95c21f7f85e4e6a13ad5c955d195f2341c Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 29 Sep 2021 20:41:05 -0700 Subject: [PATCH] Prune sigverify queue (#20331) --- core/benches/sigverify_stage.rs | 38 +++++++++ core/src/sigverify_stage.rs | 137 ++++++++++++++++++++------------ streamer/src/streamer.rs | 8 +- 3 files changed, 126 insertions(+), 57 deletions(-) diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 435a420179..1a14c80835 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -18,6 +18,44 @@ use std::sync::mpsc::channel; use std::time::{Duration, Instant}; use test::Bencher; +#[bench] +fn bench_packet_discard(bencher: &mut Bencher) { + solana_logger::setup(); + let len = 30 * 1000; + let chunk_size = 1024; + let tx = test_tx(); + let mut batches = to_packets_chunked(&vec![tx; len], chunk_size); + + let mut total = 0; + + let ips: Vec<_> = (0..10_000) + .into_iter() + .map(|_| { + let mut addr = [0u16; 8]; + thread_rng().fill(&mut addr); + addr + }) + .collect(); + + for batch in batches.iter_mut() { + total += batch.packets.len(); + for p in batch.packets.iter_mut() { + let ip_index = thread_rng().gen_range(0, ips.len()); + p.meta.addr = ips[ip_index]; + } + } + info!("total packets: {}", total); + + bencher.iter(move || { + SigVerifyStage::discard_excess_packets(&mut batches, 10_000); + for batch in batches.iter_mut() { + for p in batch.packets.iter_mut() { + p.meta.discard = false; + } + } + }); +} + #[bench] fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 8dec1d367b..96fa36f119 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -9,21 +9,17 @@ use crate::sigverify; use crossbeam_channel::{SendError, Sender as CrossbeamSender}; use solana_measure::measure::Measure; use solana_perf::packet::Packets; -use solana_perf::perf_libs; use solana_sdk::timing; use solana_streamer::streamer::{self, PacketReceiver, StreamerError}; use std::{ - sync::{ - mpsc::{Receiver, RecvTimeoutError}, - Arc, Mutex, - }, + collections::HashMap, + sync::mpsc::{Receiver, RecvTimeoutError}, thread::{self, Builder, JoinHandle}, time::Instant, }; use thiserror::Error; -const RECV_BATCH_MAX_CPU: usize = 1_000; -const RECV_BATCH_MAX_GPU: usize = 5_000; +const MAX_SIGVERIFY_BATCH: usize = 10_000; #[derive(Error, Debug)] pub enum SigVerifyServiceError { @@ -37,7 +33,7 @@ pub enum SigVerifyServiceError { type Result = std::result::Result; pub struct SigVerifyStage { - thread_hdls: Vec>, + thread_hdl: JoinHandle<()>, } pub trait SigVerifier { @@ -137,44 +133,66 @@ impl SigVerifyStage { verified_sender: CrossbeamSender>, verifier: T, ) -> Self { - let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier); - Self { thread_hdls } + let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier); + Self { thread_hdl } + } + + pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { + let mut received_ips = HashMap::new(); + for (batch_index, batch) in batches.iter().enumerate() { + for (packet_index, packets) in batch.packets.iter().enumerate() { + let e = received_ips + .entry(packets.meta.addr().ip()) + .or_insert_with(Vec::new); + e.push((batch_index, packet_index)); + } + } + let mut batch_len = 0; + while batch_len < max_packets { + for (_ip, indexes) in received_ips.iter_mut() { + if !indexes.is_empty() { + indexes.remove(0); + batch_len += 1; + if batch_len >= MAX_SIGVERIFY_BATCH { + break; + } + } + } + } + for (_addr, indexes) in received_ips { + for (batch_index, packet_index) in indexes { + batches[batch_index].packets[packet_index].meta.discard = true; + } + } } fn verifier( - recvr: &Arc>, + recvr: &PacketReceiver, sendr: &CrossbeamSender>, - id: usize, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { - let (batch, num_packets, recv_duration) = streamer::recv_batch( - &recvr.lock().expect("'recvr' lock in fn verifier"), - if perf_libs::api().is_some() { - RECV_BATCH_MAX_GPU - } else { - RECV_BATCH_MAX_CPU - }, - )?; + let (mut batches, num_packets, recv_duration) = streamer::recv_batch(recvr)?; - let batch_len = batch.len(); + let batches_len = batches.len(); debug!( - "@{:?} verifier: verifying: {} id: {}", + "@{:?} verifier: verifying: {}", timing::timestamp(), num_packets, - id ); + if num_packets > MAX_SIGVERIFY_BATCH { + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); + } let mut verify_batch_time = Measure::start("sigverify_batch_time"); - sendr.send(verifier.verify_batch(batch))?; + sendr.send(verifier.verify_batch(batches))?; verify_batch_time.stop(); debug!( - "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", + "@{:?} verifier: done. batches: {} total verify time: {:?} verified: {} v/s {}", timing::timestamp(), - batch_len, + batches_len, verify_batch_time.as_ms(), - id, num_packets, (num_packets as f32 / verify_batch_time.as_s()) ); @@ -187,33 +205,28 @@ impl SigVerifyStage { .verify_batches_pp_us_hist .increment(verify_batch_time.as_us() / (num_packets as u64)) .unwrap(); - stats.batches_hist.increment(batch_len as u64).unwrap(); + stats.batches_hist.increment(batches_len as u64).unwrap(); stats.packets_hist.increment(num_packets as u64).unwrap(); - stats.total_batches += batch_len; + stats.total_batches += batches_len; stats.total_packets += num_packets; Ok(()) } fn verifier_service( - packet_receiver: Arc>, + packet_receiver: PacketReceiver, verified_sender: CrossbeamSender>, - id: usize, verifier: &T, ) -> JoinHandle<()> { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); Builder::new() - .name(format!("solana-verifier-{}", id)) + .name("solana-verifier".to_string()) .spawn(move || loop { - if let Err(e) = Self::verifier( - &packet_receiver, - &verified_sender, - id, - &verifier, - &mut stats, - ) { + if let Err(e) = + Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats) + { match e { SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( RecvTimeoutError::Disconnected, @@ -240,19 +253,43 @@ impl SigVerifyStage { packet_receiver: PacketReceiver, verified_sender: CrossbeamSender>, verifier: T, - ) -> Vec> { - let receiver = Arc::new(Mutex::new(packet_receiver)); - (0..4) - .map(|id| { - Self::verifier_service(receiver.clone(), verified_sender.clone(), id, &verifier) - }) - .collect() + ) -> JoinHandle<()> { + Self::verifier_service(packet_receiver, verified_sender, &verifier) } pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_perf::packet::Packet; + + fn count_non_discard(packets: &[Packets]) -> usize { + packets + .iter() + .map(|pp| { + pp.packets + .iter() + .map(|p| if p.meta.discard { 0 } else { 1 }) + .sum::() + }) + .sum::() + } + + #[test] + fn test_packet_discard() { + solana_logger::setup(); + let mut p = Packets::default(); + p.packets.resize(10, Packet::default()); + p.packets[3].meta.addr = [1u16; 8]; + let mut packets = vec![p]; + let max = 3; + SigVerifyStage::discard_excess_packets(&mut packets, max); + assert_eq!(count_non_discard(&packets), max); + assert!(!packets[0].packets[0].meta.discard); + assert!(!packets[0].packets[3].meta.discard); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 698827542f..d345cf1051 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -126,10 +126,7 @@ fn recv_send( Ok(()) } -pub fn recv_batch( - recvr: &PacketReceiver, - max_batch: usize, -) -> Result<(Vec, usize, Duration)> { +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, Duration)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); @@ -140,9 +137,6 @@ pub fn recv_batch( trace!("got more msgs"); len += more.packets.len(); batch.push(more); - if len > max_batch { - break; - } } let recv_duration = recv_start.elapsed(); trace!("batch len {}", batch.len());