diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3c70dc984a..d66fd5eab1 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,7 +8,8 @@ use { LeaderExecuteAndCommitTimings, RecordTransactionsTimings, }, qos_service::QosService, - sigverify::TransactionTracerPacketStats, + sigverify::SigverifyTracerPacketStats, + tracer_packet_stats::TracerPacketStats, unprocessed_packet_batches::{self, *}, }, crossbeam_channel::{ @@ -93,7 +94,7 @@ const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); -pub type BankingPacketBatch = (Vec, Option); +pub type BankingPacketBatch = (Vec, Option); pub type BankingPacketSender = CrossbeamSender; pub type BankingPacketReceiver = CrossbeamReceiver; @@ -398,6 +399,12 @@ pub enum ForwardOption { ForwardTransaction, } +struct FilterForwardingResults<'a> { + forwardable_packets: Vec<&'a Packet>, + total_tracer_packets_in_buffer: usize, + total_forwardable_tracer_packets: usize, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -496,16 +503,33 @@ impl BankingStage { fn filter_valid_packets_for_forwarding<'a>( deserialized_packets: impl Iterator, - ) -> Vec<&'a Packet> { - deserialized_packets - .filter_map(|deserialized_packet| { - if !deserialized_packet.forwarded { - Some(deserialized_packet.immutable_section().original_packet()) - } else { - None - } - }) - .collect() + ) -> FilterForwardingResults<'a> { + let mut total_forwardable_tracer_packets = 0; + let mut total_tracer_packets_in_buffer = 0; + FilterForwardingResults { + forwardable_packets: deserialized_packets + .filter_map(|deserialized_packet| { + let is_tracer_packet = deserialized_packet + .immutable_section() + .original_packet() + .meta + .is_tracer_packet(); + if is_tracer_packet { + total_tracer_packets_in_buffer += 1; + } + if !deserialized_packet.forwarded { + if is_tracer_packet { + total_forwardable_tracer_packets += 1; + } + Some(deserialized_packet.immutable_section().original_packet()) + } else { + None + } + }) + .collect(), + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + } } /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns @@ -515,11 +539,12 @@ impl BankingStage { forward_option: &ForwardOption, cluster_info: &ClusterInfo, poh_recorder: &Arc>, - packets: Vec<&Packet>, + filter_forwarding_results: &FilterForwardingResults, data_budget: &DataBudget, banking_stage_stats: &BankingStageStats, + tracer_packet_stats: &mut TracerPacketStats, ) -> (std::result::Result<(), TransportError>, usize) { - let addr = match forward_option { + let leader_and_addr = match forward_option { ForwardOption::NotForward => return (Ok(()), 0), ForwardOption::ForwardTransaction => { next_leader_tpu_forwards(cluster_info, poh_recorder) @@ -527,11 +552,22 @@ impl BankingStage { ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), }; - let addr = match addr { - Some(addr) => addr, + let (leader_pubkey, addr) = match leader_and_addr { + Some(leader_and_addr) => leader_and_addr, None => return (Ok(()), 0), }; + let FilterForwardingResults { + forwardable_packets, + total_forwardable_tracer_packets, + .. + } = filter_forwarding_results; + + tracer_packet_stats.increment_total_forwardable_tracer_packets( + *total_forwardable_tracer_packets, + leader_pubkey, + ); + const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -543,7 +579,7 @@ impl BankingStage { ) }); - let packet_vec: Vec<_> = packets + let packet_vec: Vec<_> = forwardable_packets .iter() .filter_map(|p| { if !p.meta.forwarded() && data_budget.take(p.meta.size) { @@ -879,6 +915,7 @@ impl BankingStage { qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, connection_cache: &ConnectionCache, + tracer_packet_stats: &mut TracerPacketStats, ) { let (decision, make_decision_time) = measure!( { @@ -948,6 +985,7 @@ impl BankingStage { slot_metrics_tracker, banking_stage_stats, connection_cache, + tracer_packet_stats, ), "forward", ); @@ -965,6 +1003,7 @@ impl BankingStage { slot_metrics_tracker, banking_stage_stats, connection_cache, + tracer_packet_stats, ), "forward_and_hold", ); @@ -974,6 +1013,7 @@ impl BankingStage { } } + #[allow(clippy::too_many_arguments)] fn handle_forwarding( forward_option: &ForwardOption, cluster_info: &ClusterInfo, @@ -984,6 +1024,7 @@ impl BankingStage { slot_metrics_tracker: &mut LeaderSlotMetricsTracker, banking_stage_stats: &BankingStageStats, connection_cache: &ConnectionCache, + tracer_packet_stats: &mut TracerPacketStats, ) { if let ForwardOption::NotForward = forward_option { if !hold { @@ -992,17 +1033,19 @@ impl BankingStage { return; } - let forwardable_packets = + let filter_forwarding_result = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); - let forwardable_packets_len = forwardable_packets.len(); + + let forwardable_packets_len = filter_forwarding_result.forwardable_packets.len(); let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets( connection_cache, forward_option, cluster_info, poh_recorder, - forwardable_packets, + &filter_forwarding_result, data_budget, banking_stage_stats, + tracer_packet_stats, ); let failed_forwarded_packets_count = forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); @@ -1026,6 +1069,9 @@ impl BankingStage { } else { slot_metrics_tracker .increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64); + tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( + filter_forwarding_result.total_tracer_packets_in_buffer, + ); buffered_packet_batches.clear(); } } @@ -1048,6 +1094,7 @@ impl BankingStage { let recorder = poh_recorder.lock().unwrap().recorder(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); + let mut tracer_packet_stats = TracerPacketStats::new(id); let qos_service = QosService::new(cost_model, id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); @@ -1071,6 +1118,7 @@ impl BankingStage { &qos_service, &mut slot_metrics_tracker, &connection_cache, + &mut tracer_packet_stats, ), "process_buffered_packets", ); @@ -1078,6 +1126,8 @@ impl BankingStage { .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); } + tracer_packet_stats.report(1000); + if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, slot_metrics_checker_check_slot_boundary_time) = measure!( { @@ -1115,6 +1165,7 @@ impl BankingStage { id, &mut buffered_packet_batches, &mut banking_stage_stats, + &mut tracer_packet_stats, &mut slot_metrics_tracker, ), "receive_and_buffer_packets", @@ -1999,10 +2050,21 @@ impl BankingStage { verified_receiver: &BankingPacketReceiver, recv_timeout: Duration, packet_count_upperbound: usize, - ) -> Result, RecvTimeoutError> { + ) -> Result<(Vec, Option), RecvTimeoutError> { let start = Instant::now(); - let (mut packet_batches, _tracer_packet_stats_option) = + let mut aggregated_tracer_packet_stats_option: Option = None; + let (mut packet_batches, new_tracer_packet_stats_option) = verified_receiver.recv_timeout(recv_timeout)?; + + if let Some(new_tracer_packet_stats) = &new_tracer_packet_stats_option { + if let Some(aggregated_tracer_packet_stats) = &mut aggregated_tracer_packet_stats_option + { + aggregated_tracer_packet_stats.aggregate(new_tracer_packet_stats); + } else { + aggregated_tracer_packet_stats_option = new_tracer_packet_stats_option; + } + } + let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum(); while let Ok((packet_batch, _tracer_packet_stats_option)) = verified_receiver.try_recv() { trace!("got more packet batches in banking stage"); @@ -2020,7 +2082,7 @@ impl BankingStage { } num_packets_received = packets_received; } - Ok(packet_batches) + Ok((packet_batches, aggregated_tracer_packet_stats_option)) } #[allow(clippy::too_many_arguments)] @@ -2032,17 +2094,21 @@ impl BankingStage { id: u32, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &mut BankingStageStats, + tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); - let packet_batches = Self::receive_until( + let (packet_batches, new_sigverify_tracer_packet_stats_option) = Self::receive_until( verified_receiver, recv_timeout, buffered_packet_batches.capacity() - buffered_packet_batches.len(), )?; - recv_time.stop(); - let packet_batches_len = packet_batches.len(); + if let Some(new_sigverify_tracer_packet_stats) = &new_sigverify_tracer_packet_stats_option { + tracer_packet_stats + .aggregate_sigverify_tracer_packet_stats(new_sigverify_tracer_packet_stats); + } + let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", @@ -2051,7 +2117,6 @@ impl BankingStage { packet_count, id, ); - let mut proc_start = Measure::start("receive_and_buffer_packets_transactions_process"); let packet_batch_iter = packet_batches.into_iter(); let mut dropped_packets_count = 0; @@ -2072,21 +2137,14 @@ impl BankingStage { &mut newly_buffered_packets_count, banking_stage_stats, slot_metrics_tracker, + tracer_packet_stats, ) } - proc_start.stop(); + recv_time.stop(); - debug!( - "@{:?} done processing transaction batches: {} time: {:?}ms total count: {} id: {}", - timestamp(), - packet_batches_len, - proc_start.as_ms(), - packet_count, - id, - ); banking_stage_stats .receive_and_buffer_packets_elapsed - .fetch_add(proc_start.as_us(), Ordering::Relaxed); + .fetch_add(recv_time.as_us(), Ordering::Relaxed); banking_stage_stats .receive_and_buffer_packets_count .fetch_add(packet_count, Ordering::Relaxed); @@ -2114,6 +2172,7 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + tracer_packet_stats: &mut TracerPacketStats, ) { if !packet_indexes.is_empty() { let _ = banking_stage_stats @@ -2124,14 +2183,18 @@ impl BankingStage { slot_metrics_tracker .increment_newly_buffered_packets_count(packet_indexes.len() as u64); - let number_of_dropped_packets = unprocessed_packet_batches.insert_batch( - unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes), - ); + let (number_of_dropped_packets, number_of_dropped_tracer_packets) = + unprocessed_packet_batches.insert_batch( + unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes), + ); saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets); slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( number_of_dropped_packets as u64, ); + + tracer_packet_stats + .increment_total_exceeded_banking_stage_buffer(number_of_dropped_tracer_packets); } } @@ -2146,21 +2209,21 @@ impl BankingStage { pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, poh_recorder: &Mutex, -) -> Option { +) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) } fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, poh_recorder: &Mutex, -) -> Option { +) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) } pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, poh_recorder: &Mutex, -) -> Option { +) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) } @@ -2168,7 +2231,7 @@ fn next_leader_x( cluster_info: &ClusterInfo, poh_recorder: &Mutex, port_selector: F, -) -> Option +) -> Option<(Pubkey, std::net::SocketAddr)> where F: FnOnce(&ContactInfo) -> SocketAddr, { @@ -2177,7 +2240,9 @@ where .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); if let Some(leader_pubkey) = leader_pubkey { - cluster_info.lookup_contact_info(&leader_pubkey, port_selector) + cluster_info + .lookup_contact_info(&leader_pubkey, port_selector) + .map(|addr| (leader_pubkey, addr)) } else { None } @@ -2188,7 +2253,6 @@ mod tests { use { super::*, crossbeam_channel::{unbounded, Receiver}, - itertools::Itertools, solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta}, solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, @@ -3254,17 +3318,27 @@ mod tests { let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); let mut p = Packet::from_data(None, &transaction).unwrap(); p.meta.port = packets_id; + p.meta.set_tracer(true); DeserializedPacket::new(p).unwrap() }) .collect_vec(); - let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); - assert_eq!(result.len(), 256); + let FilterForwardingResults { + forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + } = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); + assert_eq!(forwardable_packets.len(), 256); + assert_eq!(total_tracer_packets_in_buffer, 256); + assert_eq!(total_forwardable_tracer_packets, 256); // packets in a batch are forwarded in arbitrary order; verify the ports match after // sorting let expected_ports: Vec<_> = (0..256).collect(); - let mut forwarded_ports: Vec<_> = result.into_iter().map(|p| p.meta.port).collect(); + let mut forwarded_ports: Vec<_> = forwardable_packets + .into_iter() + .map(|p| p.meta.port) + .collect(); forwarded_ports.sort_unstable(); assert_eq!(expected_ports, forwarded_ports); @@ -3272,8 +3346,20 @@ mod tests { for packet in &mut packets[0..num_already_forwarded] { packet.forwarded = true; } - let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); - assert_eq!(result.len(), packets.len() - num_already_forwarded); + let FilterForwardingResults { + forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + } = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); + assert_eq!( + forwardable_packets.len(), + packets.len() - num_already_forwarded + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_forwarded + ); } #[test] @@ -4093,6 +4179,7 @@ mod tests { &mut LeaderSlotMetricsTracker::new(0), &stats, &connection_cache, + &mut TracerPacketStats::new(0), ); recv_socket @@ -4204,6 +4291,7 @@ mod tests { &mut LeaderSlotMetricsTracker::new(0), &stats, &connection_cache, + &mut TracerPacketStats::new(0), ); recv_socket diff --git a/core/src/lib.rs b/core/src/lib.rs index 2af5f786ed..fb2122b8b5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -65,6 +65,7 @@ pub mod system_monitor_service; mod tower1_7_14; pub mod tower_storage; pub mod tpu; +pub mod tracer_packet_stats; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 4ce7ff89ab..d5ce63ad57 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -14,11 +14,11 @@ use { }, crossbeam_channel::Sender, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, - solana_sdk::packet::Packet, + solana_sdk::{packet::Packet, saturating_add_assign}, }; -#[derive(Debug, Default, Clone)] -pub struct TransactionTracerPacketStats { +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct SigverifyTracerPacketStats { pub total_removed_before_sigverify_stage: usize, pub total_tracer_packets_received_in_sigverify_stage: usize, pub total_tracer_packets_deduped: usize, @@ -26,10 +26,39 @@ pub struct TransactionTracerPacketStats { pub total_tracker_packets_passed_sigverify: usize, } +impl SigverifyTracerPacketStats { + pub fn is_default(&self) -> bool { + *self == SigverifyTracerPacketStats::default() + } + + pub fn aggregate(&mut self, other: &SigverifyTracerPacketStats) { + saturating_add_assign!( + self.total_removed_before_sigverify_stage, + other.total_removed_before_sigverify_stage + ); + saturating_add_assign!( + self.total_tracer_packets_received_in_sigverify_stage, + other.total_tracer_packets_received_in_sigverify_stage + ); + saturating_add_assign!( + self.total_tracer_packets_deduped, + other.total_tracer_packets_deduped + ); + saturating_add_assign!( + self.total_excess_tracer_packets, + other.total_excess_tracer_packets + ); + saturating_add_assign!( + self.total_tracker_packets_passed_sigverify, + other.total_tracker_packets_passed_sigverify + ); + } +} + #[derive(Clone)] pub struct TransactionSigVerifier { packet_sender: Sender<::SendType>, - tracer_packet_stats: TransactionTracerPacketStats, + tracer_packet_stats: SigverifyTracerPacketStats, recycler: Recycler, recycler_out: Recycler>, reject_non_vote: bool, @@ -46,7 +75,7 @@ impl TransactionSigVerifier { init(); Self { packet_sender, - tracer_packet_stats: TransactionTracerPacketStats::default(), + tracer_packet_stats: SigverifyTracerPacketStats::default(), recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), reject_non_vote: false, @@ -98,7 +127,7 @@ impl SigVerifier for TransactionSigVerifier { &mut self, packet_batches: Vec, ) -> Result<(), SigVerifyServiceError> { - let mut tracer_packet_stats_to_send = TransactionTracerPacketStats::default(); + let mut tracer_packet_stats_to_send = SigverifyTracerPacketStats::default(); std::mem::swap( &mut tracer_packet_stats_to_send, &mut self.tracer_packet_stats, diff --git a/core/src/tracer_packet_stats.rs b/core/src/tracer_packet_stats.rs new file mode 100644 index 0000000000..f384634f77 --- /dev/null +++ b/core/src/tracer_packet_stats.rs @@ -0,0 +1,207 @@ +use { + crate::sigverify::SigverifyTracerPacketStats, + solana_sdk::{pubkey::Pubkey, saturating_add_assign, timing::timestamp}, + std::collections::HashSet, +}; + +#[derive(Debug, Default)] +pub struct BankingStageTracerPacketStats { + total_exceeded_banking_stage_buffer: usize, + // This is the total number of tracer packets removed from the buffer + // after a leader's set of slots. Of these, only a subset that were in + // the buffer were actually forwardable (didn't arrive on forward port and haven't been + // forwarded before) + total_cleared_from_buffer_after_forward: usize, + total_forwardable_tracer_packets: usize, + forward_target_leaders: HashSet, +} + +#[derive(Debug, Default)] +pub struct ModifiableTracerPacketStats { + sigverify_tracer_packet_stats: SigverifyTracerPacketStats, + banking_stage_tracer_packet_stats: BankingStageTracerPacketStats, +} + +#[derive(Debug, Default)] +pub struct TracerPacketStats { + id: u32, + last_report: u64, + modifiable_tracer_packet_stats: Option, +} + +impl TracerPacketStats { + pub fn new(id: u32) -> Self { + Self { + id, + ..Self::default() + } + } + + pub fn get_mutable_stats(&mut self) -> &mut ModifiableTracerPacketStats { + if self.modifiable_tracer_packet_stats.is_none() { + self.modifiable_tracer_packet_stats = Some(ModifiableTracerPacketStats::default()); + } + self.modifiable_tracer_packet_stats.as_mut().unwrap() + } + + pub fn aggregate_sigverify_tracer_packet_stats( + &mut self, + new_sigverify_stats: &SigverifyTracerPacketStats, + ) { + if !new_sigverify_stats.is_default() { + let stats = self.get_mutable_stats(); + stats + .sigverify_tracer_packet_stats + .aggregate(new_sigverify_stats); + } + } + + pub fn increment_total_exceeded_banking_stage_buffer( + &mut self, + total_exceeded_banking_stage_buffer: usize, + ) { + if total_exceeded_banking_stage_buffer != 0 { + let stats = self.get_mutable_stats(); + saturating_add_assign!( + stats + .banking_stage_tracer_packet_stats + .total_exceeded_banking_stage_buffer, + total_exceeded_banking_stage_buffer + ); + } + } + + pub fn increment_total_cleared_from_buffer_after_forward( + &mut self, + total_cleared_from_buffer_after_forward: usize, + ) { + if total_cleared_from_buffer_after_forward != 0 { + let stats = self.get_mutable_stats(); + saturating_add_assign!( + stats + .banking_stage_tracer_packet_stats + .total_cleared_from_buffer_after_forward, + total_cleared_from_buffer_after_forward + ); + } + } + + pub fn increment_total_forwardable_tracer_packets( + &mut self, + total_forwardable_tracer_packets: usize, + forward_target_leader: Pubkey, + ) { + if total_forwardable_tracer_packets != 0 { + let stats = self.get_mutable_stats(); + stats + .banking_stage_tracer_packet_stats + .forward_target_leaders + .insert(forward_target_leader); + saturating_add_assign!( + stats + .banking_stage_tracer_packet_stats + .total_forwardable_tracer_packets, + total_forwardable_tracer_packets + ); + } + } + + pub fn report(&mut self, report_interval_ms: u64) { + let now = timestamp(); + const LEADER_REPORT_LIMIT: usize = 4; + if now.saturating_sub(self.last_report) > report_interval_ms { + // We don't want to report unless we actually saw/forwarded a tracer packet + // to prevent noisy metrics + if let Some(modifiable_tracer_packet_stats) = self.modifiable_tracer_packet_stats.take() + { + datapoint_info!( + "tracer-packet-stats", + ("id", self.id, i64), + ( + "total_removed_before_sigverify", + modifiable_tracer_packet_stats + .sigverify_tracer_packet_stats + .total_removed_before_sigverify_stage as i64, + i64 + ), + ( + "total_tracer_packets_received_in_sigverify", + modifiable_tracer_packet_stats + .sigverify_tracer_packet_stats + .total_tracer_packets_received_in_sigverify_stage + as i64, + i64 + ), + ( + "total_tracer_packets_deduped_in_sigverify", + modifiable_tracer_packet_stats + .sigverify_tracer_packet_stats + .total_tracer_packets_deduped as i64, + i64 + ), + ( + "total_excess_tracer_packets_discarded_in_sigverify", + modifiable_tracer_packet_stats + .sigverify_tracer_packet_stats + .total_excess_tracer_packets as i64, + i64 + ), + ( + "total_tracker_packets_passed_sigverify", + modifiable_tracer_packet_stats + .sigverify_tracer_packet_stats + .total_tracker_packets_passed_sigverify as i64, + i64 + ), + ( + "total_exceeded_banking_stage_buffer", + modifiable_tracer_packet_stats + .banking_stage_tracer_packet_stats + .total_exceeded_banking_stage_buffer as i64, + i64 + ), + ( + "total_cleared_from_buffer_after_forward", + modifiable_tracer_packet_stats + .banking_stage_tracer_packet_stats + .total_cleared_from_buffer_after_forward as i64, + i64 + ), + ( + "total_forwardable_tracer_packets", + modifiable_tracer_packet_stats + .banking_stage_tracer_packet_stats + .total_forwardable_tracer_packets as i64, + i64 + ), + ( + "exceeded_expected_forward_leader_count", + modifiable_tracer_packet_stats + .banking_stage_tracer_packet_stats + .forward_target_leaders + .len() + > LEADER_REPORT_LIMIT, + bool + ), + ( + "forward_target_leaders", + itertools::Itertools::intersperse( + modifiable_tracer_packet_stats + .banking_stage_tracer_packet_stats + .forward_target_leaders + .iter() + .take(LEADER_REPORT_LIMIT) + .map(|leader_pubkey| leader_pubkey.to_string()), + ", ".to_string() + ) + .collect::(), + String + ) + ); + + *self = Self::default(); + self.last_report = timestamp(); + } + } + } +} diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index bfc8852111..ce268bead5 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -210,14 +210,23 @@ impl UnprocessedPacketBatches { pub fn insert_batch( &mut self, deserialized_packets: impl Iterator, - ) -> usize { + ) -> (usize, usize) { let mut num_dropped_packets = 0; + let mut num_dropped_tracer_packets = 0; for deserialized_packet in deserialized_packets { - if self.push(deserialized_packet).is_some() { + if let Some(dropped_packet) = self.push(deserialized_packet) { num_dropped_packets += 1; + if dropped_packet + .immutable_section() + .original_packet() + .meta + .is_tracer_packet() + { + num_dropped_tracer_packets += 1; + } } } - num_dropped_packets + (num_dropped_packets, num_dropped_tracer_packets) } pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option { diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 7ab6d9e9e3..4f7e585d61 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -81,12 +81,15 @@ impl VotingService { inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); } - let target_address = if send_to_tpu_vote_port { + let pubkey_and_target_address = if send_to_tpu_vote_port { crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder) } else { crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder) }; - let _ = cluster_info.send_transaction(vote_op.tx(), target_address); + let _ = cluster_info.send_transaction( + vote_op.tx(), + pubkey_and_target_address.map(|(_pubkey, target_addr)| target_addr), + ); match vote_op { VoteOp::PushVote { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e03348248f..e859afba23 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -331,7 +331,7 @@ pub fn check_for_tracer_packet(packet: &mut Packet) -> bool { // Check for tracer pubkey match packet.data(first_pubkey_start..first_pubkey_end) { Some(pubkey) if pubkey == TRACER_KEY.as_ref() => { - packet.meta.flags |= PacketFlags::TRACER_PACKET; + packet.meta.set_tracer(true); true } _ => false, diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 2cca65d6a5..36e924996a 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -151,6 +151,11 @@ impl Meta { self.flags.set(PacketFlags::DISCARD, discard); } + #[inline] + pub fn set_tracer(&mut self, is_tracer: bool) { + self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); + } + #[inline] pub fn forwarded(&self) -> bool { self.flags.contains(PacketFlags::FORWARDED)