diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 2839989c39..85eb48ca6a 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -373,7 +373,9 @@ fn main() { packet_batch_index, timestamp(), ); - verified_sender.send(vec![packet_batch.clone()]).unwrap(); + verified_sender + .send((vec![packet_batch.clone()], None)) + .unwrap(); } for tx in &packets_for_this_iteration.transactions { diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index d92ef025d9..ea0fd0a17e 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -256,7 +256,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { for xv in v { sent += xv.len(); } - verified_sender.send(v.to_vec()).unwrap(); + verified_sender.send((v.to_vec(), None)).unwrap(); } check_txs(&signal_receiver2, txes / CHUNKS); diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index cdbc480552..c029ba02cf 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -51,7 +51,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { info!("total packets: {}", total); bencher.iter(move || { - SigVerifyStage::discard_excess_packets(&mut batches, 10_000); + SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ()); let mut num_packets = 0; for batch in batches.iter_mut() { for p in batch.iter_mut() { @@ -98,7 +98,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { } } bencher.iter(move || { - SigVerifyStage::discard_excess_packets(&mut batches, 10_000); + SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ()); let mut num_packets = 0; for batch in batches.iter_mut() { for packet in batch.iter_mut() { @@ -142,8 +142,8 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = unbounded(); - let verifier = TransactionSigVerifier::default(); - let stage = SigVerifyStage::new(packet_r, verified_s, verifier, "bench"); + let verifier = TransactionSigVerifier::new(verified_s); + let stage = SigVerifyStage::new(packet_r, verifier, "bench"); let use_same_tx = true; bencher.iter(move || { @@ -165,7 +165,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { let mut received = 0; trace!("sent: {}", sent_len); loop { - if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { + if let Ok((mut verifieds, _)) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { received += v.len(); batches.push(v); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f7dda3ae1a..49510019ef 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,9 +8,12 @@ use { LeaderExecuteAndCommitTimings, RecordTransactionsTimings, }, qos_service::QosService, + sigverify::TransactionTracerPacketStats, unprocessed_packet_batches::{self, *}, }, - crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, + crossbeam_channel::{ + Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, + }, histogram::Histogram, itertools::Itertools, min_max_heap::MinMaxHeap, @@ -87,6 +90,9 @@ const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); +pub type BankingPacketBatch = (Vec, Option); +pub type BankingPacketSender = CrossbeamSender; +pub type BankingPacketReceiver = CrossbeamReceiver; pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model @@ -381,9 +387,9 @@ impl BankingStage { pub fn new( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: BankingPacketReceiver, + tpu_verified_vote_receiver: BankingPacketReceiver, + verified_vote_receiver: BankingPacketReceiver, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, @@ -405,9 +411,9 @@ impl BankingStage { pub fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: BankingPacketReceiver, + tpu_verified_vote_receiver: BankingPacketReceiver, + verified_vote_receiver: BankingPacketReceiver, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -989,7 +995,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_loop( - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &BankingPacketReceiver, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, @@ -1984,14 +1990,15 @@ impl BankingStage { } fn receive_until( - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &BankingPacketReceiver, recv_timeout: Duration, packet_count_upperbound: usize, ) -> Result, RecvTimeoutError> { let start = Instant::now(); - let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?; + let (mut packet_batches, _tracer_packet_stats_option) = + verified_receiver.recv_timeout(recv_timeout)?; let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum(); - while let Ok(packet_batch) = verified_receiver.try_recv() { + while let Ok((packet_batch, _tracer_packet_stats_option)) = verified_receiver.try_recv() { trace!("got more packet batches in banking stage"); let (packets_received, packet_count_overflowed) = num_packets_received .overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum()); @@ -2013,7 +2020,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] /// Receive incoming packets, push into unprocessed buffer with packet indexes fn receive_and_buffer_packets( - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &BankingPacketReceiver, recv_start: &mut Instant, recv_timeout: Duration, id: u32, @@ -2414,7 +2421,7 @@ mod tests { .collect(); let packet_batches = convert_from_old_verified(packet_batches); verified_sender // no_ver, anf, tx - .send(packet_batches) + .send((packet_batches, None)) .unwrap(); drop(verified_sender); @@ -2486,7 +2493,7 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); + verified_sender.send((packet_batches, None)).unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX let tx = @@ -2497,7 +2504,7 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); + verified_sender.send((packet_batches, None)).unwrap(); let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d91e54a007..9fe7c11086 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,5 +1,6 @@ use { crate::{ + banking_stage::BankingPacketSender, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, @@ -18,7 +19,7 @@ use { solana_ledger::blockstore::Blockstore, solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{self, PacketBatch}, + solana_perf::packet, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -190,7 +191,7 @@ impl ClusterInfoVoteListener { pub fn new( exit: Arc, cluster_info: Arc, - verified_packets_sender: Sender>, + verified_packets_sender: BankingPacketSender, poh_recorder: Arc>, vote_tracker: Arc, bank_forks: Arc>, @@ -333,7 +334,7 @@ impl ClusterInfoVoteListener { exit: Arc, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, poh_recorder: Arc>, - verified_packets_sender: &Sender>, + verified_packets_sender: &BankingPacketSender, ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); let mut time_since_lock = Instant::now(); @@ -382,7 +383,7 @@ impl ClusterInfoVoteListener { fn check_for_leader_bank_and_send_votes( bank_vote_sender_state_option: &mut Option, current_working_bank: Arc, - verified_packets_sender: &Sender>, + verified_packets_sender: &BankingPacketSender, verified_vote_packets: &VerifiedVotePackets, ) -> Result<()> { // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` @@ -423,7 +424,7 @@ impl ClusterInfoVoteListener { for single_validator_votes in gossip_votes_iterator { bank_send_votes_stats.num_votes_sent += single_validator_votes.len(); bank_send_votes_stats.num_batches_sent += 1; - verified_packets_sender.send(single_validator_votes)?; + verified_packets_sender.send((single_validator_votes, None))?; } filter_gossip_votes_timing.stop(); bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us(); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index a00f06059c..ddf2eab61a 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -8,30 +8,45 @@ pub use solana_perf::sigverify::{ count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, }; use { - crate::sigverify_stage::SigVerifier, + crate::{ + banking_stage::BankingPacketBatch, + sigverify_stage::{SigVerifier, SigVerifyServiceError}, + }, + crossbeam_channel::Sender, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, + solana_sdk::packet::Packet, }; +#[derive(Debug, Default, Clone)] +pub struct TransactionTracerPacketStats { + pub total_removed_before_sigverify_stage: usize, + pub total_tracer_packets_received_in_sigverify_stage: usize, + pub total_tracer_packets_deduped: usize, + pub total_excess_tracer_packets: usize, + pub total_tracker_packets_passed_sigverify: usize, +} + #[derive(Clone)] pub struct TransactionSigVerifier { + packet_sender: Sender<::SendType>, + tracer_packet_stats: TransactionTracerPacketStats, recycler: Recycler, recycler_out: Recycler>, reject_non_vote: bool, } impl TransactionSigVerifier { - pub fn new_reject_non_vote() -> Self { - TransactionSigVerifier { - reject_non_vote: true, - ..TransactionSigVerifier::default() - } + pub fn new_reject_non_vote(packet_sender: Sender<::SendType>) -> Self { + let mut new_self = Self::new(packet_sender); + new_self.reject_non_vote = true; + new_self } -} -impl Default for TransactionSigVerifier { - fn default() -> Self { + pub fn new(packet_sender: Sender<::SendType>) -> Self { init(); Self { + packet_sender, + tracer_packet_stats: TransactionTracerPacketStats::default(), recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), reject_non_vote: false, @@ -40,6 +55,58 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { + type SendType = BankingPacketBatch; + + #[inline(always)] + fn process_received_packet( + &mut self, + packet: &mut Packet, + removed_before_sigverify_stage: bool, + is_dup: bool, + ) { + if packet.meta.is_tracer_packet() { + if removed_before_sigverify_stage { + self.tracer_packet_stats + .total_removed_before_sigverify_stage += 1; + } else { + self.tracer_packet_stats + .total_tracer_packets_received_in_sigverify_stage += 1; + if is_dup { + self.tracer_packet_stats.total_tracer_packets_deduped += 1; + } + } + } + } + + #[inline(always)] + fn process_excess_packet(&mut self, packet: &Packet) { + if packet.meta.is_tracer_packet() { + self.tracer_packet_stats.total_excess_tracer_packets += 1; + } + } + + #[inline(always)] + fn process_passed_sigverify_packet(&mut self, packet: &Packet) { + if packet.meta.is_tracer_packet() { + self.tracer_packet_stats + .total_tracker_packets_passed_sigverify += 1; + } + } + + fn send_packets( + &mut self, + packet_batches: Vec, + ) -> Result<(), SigVerifyServiceError> { + let mut tracer_packet_stats_to_send = TransactionTracerPacketStats::default(); + std::mem::swap( + &mut tracer_packet_stats_to_send, + &mut self.tracer_packet_stats, + ); + self.packet_sender + .send((packet_batches, Some(tracer_packet_stats_to_send)))?; + Ok(()) + } + fn verify_batches( &self, mut batches: Vec, diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 97dba300a0..5c6f503cd7 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,6 +1,11 @@ #![allow(clippy::implicit_hasher)] + use { - crate::{sigverify, sigverify_stage::SigVerifier}, + crate::{ + sigverify, + sigverify_stage::{SigVerifier, SigVerifyServiceError}, + }, + crossbeam_channel::Sender, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::Shred, sigverify_shreds::verify_shreds_gpu, @@ -18,18 +23,21 @@ pub struct ShredSigVerifier { bank_forks: Arc>, leader_schedule_cache: Arc, recycler_cache: RecyclerCache, + packet_sender: Sender>, } impl ShredSigVerifier { pub fn new( bank_forks: Arc>, leader_schedule_cache: Arc, + packet_sender: Sender>, ) -> Self { sigverify::init(); Self { bank_forks, leader_schedule_cache, recycler_cache: RecyclerCache::warmed(), + packet_sender, } } fn read_slots(batches: &[PacketBatch]) -> HashSet { @@ -41,6 +49,16 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { + type SendType = Vec; + + fn send_packets( + &mut self, + packet_batches: Vec, + ) -> Result<(), SigVerifyServiceError> { + self.packet_sender.send(packet_batches)?; + Ok(()) + } + fn verify_batches( &self, mut batches: Vec, @@ -69,6 +87,7 @@ impl SigVerifier for ShredSigVerifier { pub mod tests { use { super::*, + crossbeam_channel::unbounded, solana_ledger::{ genesis_utils::create_genesis_config_with_leader, shred::{Shred, ShredFlags}, @@ -131,7 +150,8 @@ pub mod tests { ); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bf = Arc::new(RwLock::new(BankForks::new(bank))); - let verifier = ShredSigVerifier::new(bf, cache); + let (sender, receiver) = unbounded(); + let mut verifier = ShredSigVerifier::new(bf, cache, sender); let batch_size = 2; let mut batch = PacketBatch::with_capacity(batch_size); @@ -171,5 +191,16 @@ pub mod tests { let rv = verifier.verify_batches(batches, num_packets); assert!(!rv[0][0].meta.discard()); assert!(rv[0][1].meta.discard()); + + verifier.send_packets(rv.clone()).unwrap(); + let received_packets = receiver.recv().unwrap(); + assert_eq!(received_packets.len(), rv.len()); + for (received_packet_batch, original_packet_batch) in received_packets.iter().zip(rv.iter()) + { + assert_eq!( + received_packet_batch.iter().collect::>(), + original_packet_batch.iter().collect::>() + ); + } } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index c1b1a8b276..8577660d98 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -8,11 +8,11 @@ use { crate::{find_packet_sender_stake_stage, sigverify}, core::time::Duration, - crossbeam_channel::{RecvTimeoutError, SendError, Sender}, + crossbeam_channel::{RecvTimeoutError, SendError}, itertools::Itertools, solana_measure::measure::Measure, solana_perf::{ - packet::PacketBatch, + packet::{Packet, PacketBatch}, sigverify::{count_valid_packets, shrink_batches, Deduper}, }, solana_sdk::timing, @@ -33,22 +33,33 @@ const MAX_DEDUP_BATCH: usize = 165_000; const MAX_SIGVERIFY_BATCH: usize = 2_000; #[derive(Error, Debug)] -pub enum SigVerifyServiceError { +pub enum SigVerifyServiceError { #[error("send packets batch error")] - Send(#[from] SendError>), + Send(#[from] SendError), #[error("streamer error")] Streamer(#[from] StreamerError), } -type Result = std::result::Result; +type Result = std::result::Result>; pub struct SigVerifyStage { thread_hdl: JoinHandle<()>, } pub trait SigVerifier { + type SendType: std::fmt::Debug; fn verify_batches(&self, batches: Vec, valid_packets: usize) -> Vec; + fn process_received_packet( + &mut self, + _packet: &mut Packet, + _removed_before_sigverify_stage: bool, + _is_dup: bool, + ) { + } + fn process_excess_packet(&mut self, _packet: &Packet) {} + fn process_passed_sigverify_packet(&mut self, _packet: &Packet) {} + fn send_packets(&mut self, packet_batches: Vec) -> Result<(), Self::SendType>; } #[derive(Default, Clone)] @@ -199,6 +210,7 @@ impl SigVerifierStats { } impl SigVerifier for DisabledSigVerifier { + type SendType = (); fn verify_batches( &self, mut batches: Vec, @@ -207,21 +219,28 @@ impl SigVerifier for DisabledSigVerifier { sigverify::ed25519_verify_disabled(&mut batches); batches } + + fn send_packets(&mut self, _packet_batches: Vec) -> Result<(), Self::SendType> { + Ok(()) + } } impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, - verified_sender: Sender>, verifier: T, name: &'static str, ) -> Self { - let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier, name); + let thread_hdl = Self::verifier_services(packet_receiver, verifier, name); Self { thread_hdl } } - pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) { + pub fn discard_excess_packets( + batches: &mut [PacketBatch], + mut max_packets: usize, + mut process_excess_packet: impl FnMut(&Packet), + ) { // Group packets by their incoming IP address. let mut addrs = batches .iter_mut() @@ -242,6 +261,7 @@ impl SigVerifyStage { } // Discard excess packets from each address. for packet in addrs.into_values().flatten() { + process_excess_packet(packet); packet.meta.set_discard(true); } } @@ -249,10 +269,9 @@ impl SigVerifyStage { fn verifier( deduper: &Deduper, recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, - sendr: &Sender>, - verifier: &T, + verifier: &mut T, stats: &mut SigVerifierStats, - ) -> Result<()> { + ) -> Result<(), T::SendType> { let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?; let batches_len = batches.len(); @@ -272,14 +291,29 @@ impl SigVerifyStage { discard_random_time.stop(); let mut dedup_time = Measure::start("sigverify_dedup_time"); - let discard_or_dedup_fail = deduper.dedup_packets_and_count_discards(&mut batches) as usize; + let discard_or_dedup_fail = deduper.dedup_packets_and_count_discards( + &mut batches, + #[inline(always)] + |received_packet, removed_before_sigverify_stage, is_dup| { + verifier.process_received_packet( + received_packet, + removed_before_sigverify_stage, + is_dup, + ); + }, + ) as usize; dedup_time.stop(); let num_unique = non_discarded_packets.saturating_sub(discard_or_dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); let mut num_valid_packets = num_unique; if num_unique > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); + Self::discard_excess_packets( + &mut batches, + MAX_SIGVERIFY_BATCH, + #[inline(always)] + |excess_packet| verifier.process_excess_packet(excess_packet), + ); num_valid_packets = MAX_SIGVERIFY_BATCH; } let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); @@ -290,7 +324,11 @@ impl SigVerifyStage { verify_time.stop(); let mut shrink_time = Measure::start("sigverify_shrink_time"); - let num_valid_packets = count_valid_packets(&batches); + let num_valid_packets = count_valid_packets( + &batches, + #[inline(always)] + |valid_packet| verifier.process_passed_sigverify_packet(valid_packet), + ); let start_len = batches.len(); const MAX_EMPTY_BATCH_RATIO: usize = 4; if non_discarded_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) { @@ -300,7 +338,7 @@ impl SigVerifyStage { let total_shrinks = start_len.saturating_sub(batches.len()); shrink_time.stop(); - sendr.send(batches)?; + verifier.send_packets(batches)?; debug!( "@{:?} verifier: done. batches: {} total verify time: {:?} verified: {} v/s {}", @@ -347,11 +385,9 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, - verified_sender: Sender>, - verifier: &T, + mut verifier: T, name: &'static str, ) -> JoinHandle<()> { - let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2); @@ -362,13 +398,9 @@ impl SigVerifyStage { let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE); loop { deduper.reset(); - if let Err(e) = Self::verifier( - &deduper, - &packet_receiver, - &verified_sender, - &verifier, - &mut stats, - ) { + if let Err(e) = + Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats) + { match e { SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( RecvTimeoutError::Disconnected, @@ -394,11 +426,10 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, - verified_sender: Sender>, verifier: T, name: &'static str, ) -> JoinHandle<()> { - Self::verifier_service(packet_receiver, verified_sender, &verifier, name) + Self::verifier_service(packet_receiver, verifier, name) } pub fn join(self) -> thread::Result<()> { @@ -416,6 +447,7 @@ mod tests { packet::{to_packet_batches, Packet}, test_tx::test_tx, }, + solana_sdk::packet::PacketFlags, }; fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { @@ -435,27 +467,48 @@ mod tests { solana_logger::setup(); let batch_size = 10; let mut batch = PacketBatch::with_capacity(batch_size); - batch.resize(batch_size, Packet::default()); + let mut tracer_packet = Packet::default(); + tracer_packet.meta.flags |= PacketFlags::TRACER_PACKET; + batch.resize(batch_size, tracer_packet); batch[3].meta.addr = std::net::IpAddr::from([1u16; 8]); batch[3].meta.set_discard(true); + let num_discarded_before_filter = 1; batch[4].meta.addr = std::net::IpAddr::from([2u16; 8]); + let total_num_packets = batch.len(); let mut batches = vec![batch]; let max = 3; - SigVerifyStage::discard_excess_packets(&mut batches, max); - assert_eq!(count_non_discard(&batches), max); + let mut total_tracer_packets_discarded = 0; + SigVerifyStage::discard_excess_packets(&mut batches, max, |packet| { + if packet.meta.is_tracer_packet() { + total_tracer_packets_discarded += 1; + } + }); + let total_non_discard = count_non_discard(&batches); + let total_discarded = total_num_packets - total_non_discard; + // Every packet except the packets already marked `discard` before the call + // to `discard_excess_packets()` should count towards the + // `total_tracer_packets_discarded` + assert_eq!( + total_tracer_packets_discarded, + total_discarded - num_discarded_before_filter + ); + assert_eq!(total_non_discard, max); assert!(!batches[0][0].meta.discard()); assert!(batches[0][3].meta.discard()); assert!(!batches[0][4].meta.discard()); } - fn gen_batches(use_same_tx: bool) -> Vec { - let len = 4096; - let chunk_size = 1024; + + fn gen_batches( + use_same_tx: bool, + packets_per_batch: usize, + total_packets: usize, + ) -> Vec { if use_same_tx { let tx = test_tx(); - to_packet_batches(&vec![tx; len], chunk_size) + to_packet_batches(&vec![tx; total_packets], packets_per_batch) } else { - let txs: Vec<_> = (0..len).map(|_| test_tx()).collect(); - to_packet_batches(&txs, chunk_size) + let txs: Vec<_> = (0..total_packets).map(|_| test_tx()).collect(); + to_packet_batches(&txs, packets_per_batch) } } @@ -465,12 +518,17 @@ mod tests { trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = unbounded(); - let verifier = TransactionSigVerifier::default(); - let stage = SigVerifyStage::new(packet_r, verified_s, verifier, "test"); + let verifier = TransactionSigVerifier::new(verified_s); + let stage = SigVerifyStage::new(packet_r, verifier, "test"); let use_same_tx = true; let now = Instant::now(); - let mut batches = gen_batches(use_same_tx); + let packets_per_batch = 128; + let total_packets = 1920; + // This is important so that we don't discard any packets and fail asserts below about + // `total_excess_tracer_packets` + assert!(total_packets < MAX_SIGVERIFY_BATCH); + let mut batches = gen_batches(use_same_tx, packets_per_batch, total_packets); trace!( "starting... generation took: {} ms batches: {}", duration_as_ms(&now.elapsed()), @@ -479,25 +537,72 @@ mod tests { let mut sent_len = 0; for _ in 0..batches.len() { - if let Some(batch) = batches.pop() { + if let Some(mut batch) = batches.pop() { sent_len += batch.len(); + batch + .iter_mut() + .for_each(|packet| packet.meta.flags |= PacketFlags::TRACER_PACKET); + assert_eq!(batch.len(), packets_per_batch); packet_s.send(vec![batch]).unwrap(); } } let mut received = 0; + let mut total_tracer_packets_received_in_sigverify_stage = 0; trace!("sent: {}", sent_len); loop { - if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { + if let Ok((mut verifieds, tracer_packet_stats_option)) = verified_r.recv() { + let tracer_packet_stats = tracer_packet_stats_option.unwrap(); + total_tracer_packets_received_in_sigverify_stage += + tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage; + assert_eq!( + tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage + % packets_per_batch, + 0, + ); + + if use_same_tx { + // Every transaction other than the very first one in the very first batch + // should be deduped. + + // Also have to account for the fact that deduper could be cleared periodically, + // in which case the first transaction in the next batch won't be deduped + assert!( + (tracer_packet_stats.total_tracer_packets_deduped + == tracer_packet_stats + .total_tracer_packets_received_in_sigverify_stage + - 1) + || (tracer_packet_stats.total_tracer_packets_deduped + == tracer_packet_stats + .total_tracer_packets_received_in_sigverify_stage) + ); + assert!( + (tracer_packet_stats.total_tracker_packets_passed_sigverify == 1) + || (tracer_packet_stats.total_tracker_packets_passed_sigverify == 0) + ); + } else { + assert_eq!(tracer_packet_stats.total_tracer_packets_deduped, 0); + assert!( + (tracer_packet_stats.total_tracker_packets_passed_sigverify + == tracer_packet_stats + .total_tracer_packets_received_in_sigverify_stage) + ); + } + assert_eq!(tracer_packet_stats.total_excess_tracer_packets, 0); while let Some(v) = verifieds.pop() { received += v.len(); batches.push(v); } - if use_same_tx || received >= sent_len { - break; - } + } + + if total_tracer_packets_received_in_sigverify_stage >= sent_len { + break; } } trace!("received: {}", received); + assert_eq!( + total_tracer_packets_received_in_sigverify_stage, + total_packets + ); drop(packet_s); stage.join().unwrap(); } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index ce0940b013..f2d559edb3 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -159,22 +159,17 @@ impl Tpu { .unwrap(); let sigverify_stage = { - let verifier = TransactionSigVerifier::default(); - SigVerifyStage::new( - find_packet_sender_stake_receiver, - verified_sender, - verifier, - "tpu-verifier", - ) + let verifier = TransactionSigVerifier::new(verified_sender); + SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier") }; let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); let vote_sigverify_stage = { - let verifier = TransactionSigVerifier::new_reject_non_vote(); + let verifier = + TransactionSigVerifier::new_reject_non_vote(verified_tpu_vote_packets_sender); SigVerifyStage::new( vote_find_packet_sender_stake_receiver, - verified_tpu_vote_packets_sender, verifier, "tpu-vote-verifier", ) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4e3bad1d3b..838934a149 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -161,8 +161,11 @@ impl Tvu { let (verified_sender, verified_receiver) = unbounded(); let sigverify_stage = SigVerifyStage::new( fetch_receiver, - verified_sender, - ShredSigVerifier::new(bank_forks.clone(), leader_schedule_cache.clone()), + ShredSigVerifier::new( + bank_forks.clone(), + leader_schedule_cache.clone(), + verified_sender, + ), "shred-verifier", ); diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index a6911b8f58..c4f3169979 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -26,7 +26,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) // verify packets let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000)); bencher.iter(|| { - let _ans = deduper.dedup_packets_and_count_discards(&mut batches); + let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()); deduper.reset(); batches .iter_mut() diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs index 878b84e181..5de33533ec 100644 --- a/perf/benches/shrink.rs +++ b/perf/benches/shrink.rs @@ -79,6 +79,6 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) { }); bencher.iter(|| { - let _ = sigverify::count_valid_packets(&batches); + let _ = sigverify::count_valid_packets(&batches, |_| ()); }); } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 5c3971e9c9..40546801c0 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -19,6 +19,7 @@ use { hash::Hash, message::{MESSAGE_HEADER_LENGTH, MESSAGE_VERSION_PREFIX}, pubkey::Pubkey, + saturating_add_assign, short_vec::decode_shortu16_len, signature::Signature, }, @@ -152,10 +153,10 @@ fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { } // Check for tracer pubkey - if !packet.meta.is_tracer_tx() + if !packet.meta.is_tracer_packet() && &packet.data[pubkey_start..pubkey_end] == TRACER_KEY.as_ref() { - packet.meta.flags |= PacketFlags::TRACER_TX; + packet.meta.flags |= PacketFlags::TRACER_PACKET; } pubkey_start = pubkey_end; @@ -167,10 +168,24 @@ pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { batches.iter().map(|batch| batch.len()).sum() } -pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { +pub fn count_valid_packets( + batches: &[PacketBatch], + mut process_valid_packet: impl FnMut(&Packet), +) -> usize { batches .iter() - .map(|batch| batch.iter().filter(|p| !p.meta.discard()).count()) + .map(|batch| { + batch + .iter() + .filter(|p| { + let should_keep = !p.meta.discard(); + if should_keep { + process_valid_packet(p); + } + should_keep + }) + .count() + }) .sum() } @@ -495,11 +510,23 @@ impl Deduper { 0 } - pub fn dedup_packets_and_count_discards(&self, batches: &mut [PacketBatch]) -> u64 { - batches - .iter_mut() - .flat_map(|batch| batch.iter_mut().map(|p| self.dedup_packet(p))) - .sum() + pub fn dedup_packets_and_count_discards( + &self, + batches: &mut [PacketBatch], + mut process_received_packet: impl FnMut(&mut Packet, bool, bool), + ) -> u64 { + let mut num_removed: u64 = 0; + batches.iter_mut().for_each(|batch| { + batch.iter_mut().for_each(|p| { + let removed_before_sigverify = p.meta.discard(); + let is_duplicate = self.dedup_packet(p); + if is_duplicate == 1 { + saturating_add_assign!(num_removed, 1); + } + process_received_packet(p, removed_before_sigverify, is_duplicate == 1); + }) + }); + num_removed } } @@ -1401,7 +1428,14 @@ mod tests { to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); let filter = Deduper::new(1_000_000, Duration::from_millis(0)); - let discard = filter.dedup_packets_and_count_discards(&mut batches) as usize; + let mut num_deduped = 0; + let discard = filter.dedup_packets_and_count_discards( + &mut batches, + |_deduped_packet, _removed_before_sigverify_stage, _is_dup| { + num_deduped += 1; + }, + ) as usize; + assert_eq!(num_deduped, discard + 1); assert_eq!(packet_count, discard + 1); } @@ -1409,8 +1443,7 @@ mod tests { fn test_dedup_diff() { let mut filter = Deduper::new(1_000_000, Duration::from_millis(0)); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - - let discard = filter.dedup_packets_and_count_discards(&mut batches) as usize; + let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through assert_eq!(discard, 0); filter.reset(); @@ -1428,7 +1461,7 @@ mod tests { for i in 0..1000 { let mut batches = to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); - discard += filter.dedup_packets_and_count_discards(&mut batches) as usize; + discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; debug!("{} {}", i, discard); if filter.saturated.load(Ordering::Relaxed) { break; @@ -1444,7 +1477,7 @@ mod tests { for i in 0..10 { let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - discard += filter.dedup_packets_and_count_discards(&mut batches) as usize; + discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; debug!("false positive rate: {}/{}", discard, i * 1024); } //allow for 1 false positive even if extremely unlikely @@ -1473,7 +1506,7 @@ mod tests { }); start.sort_by_key(|p| p.data); - let packet_count = count_valid_packets(&batches); + let packet_count = count_valid_packets(&batches, |_| ()); let res = shrink_batches(&mut batches); batches.truncate(res); @@ -1485,7 +1518,7 @@ mod tests { .for_each(|p| end.push(p.clone())) }); end.sort_by_key(|p| p.data); - let packet_count2 = count_valid_packets(&batches); + let packet_count2 = count_valid_packets(&batches, |_| ()); assert_eq!(packet_count, packet_count2); assert_eq!(start, end); } @@ -1642,13 +1675,13 @@ mod tests { PACKETS_PER_BATCH, ); assert_eq!(batches.len(), BATCH_COUNT); - assert_eq!(count_valid_packets(&batches), PACKET_COUNT); + assert_eq!(count_valid_packets(&batches, |_| ()), PACKET_COUNT); batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut() .enumerate() .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) }); - assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets); debug!("show valid packets for case {}", i); batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut().enumerate().for_each(|(j, p)| { @@ -1662,7 +1695,7 @@ mod tests { debug!("shrunk batch test {} count: {}", i, shrunken_batch_count); assert_eq!(shrunken_batch_count, *expect_batch_count); batches.truncate(shrunken_batch_count); - assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets); } } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 5866f192dd..516d91e104 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -21,7 +21,7 @@ bitflags! { const FORWARDED = 0b0000_0010; const REPAIR = 0b0000_0100; const SIMPLE_VOTE_TX = 0b0000_1000; - const TRACER_TX = 0b0001_0000; + const TRACER_PACKET = 0b0001_0000; } } @@ -148,8 +148,8 @@ impl Meta { } #[inline] - pub fn is_tracer_tx(&self) -> bool { - self.flags.contains(PacketFlags::TRACER_TX) + pub fn is_tracer_packet(&self) -> bool { + self.flags.contains(PacketFlags::TRACER_PACKET) } }