Report banking stage tracer metrics (#25620)

This commit is contained in:
carllin 2022-06-09 00:25:37 -05:00 committed by GitHub
parent 0bbfcc3ba0
commit bf8faa8a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 405 additions and 63 deletions

View File

@ -8,7 +8,8 @@ use {
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
qos_service::QosService,
sigverify::TransactionTracerPacketStats,
sigverify::SigverifyTracerPacketStats,
tracer_packet_stats::TracerPacketStats,
unprocessed_packet_batches::{self, *},
},
crossbeam_channel::{
@ -93,7 +94,7 @@ const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<TransactionTracerPacketStats>);
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<SigverifyTracerPacketStats>);
pub type BankingPacketSender = CrossbeamSender<BankingPacketBatch>;
pub type BankingPacketReceiver = CrossbeamReceiver<BankingPacketBatch>;
@ -398,6 +399,12 @@ pub enum ForwardOption {
ForwardTransaction,
}
struct FilterForwardingResults<'a> {
forwardable_packets: Vec<&'a Packet>,
total_tracer_packets_in_buffer: usize,
total_forwardable_tracer_packets: usize,
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
@ -496,16 +503,33 @@ impl BankingStage {
fn filter_valid_packets_for_forwarding<'a>(
deserialized_packets: impl Iterator<Item = &'a DeserializedPacket>,
) -> Vec<&'a Packet> {
deserialized_packets
.filter_map(|deserialized_packet| {
if !deserialized_packet.forwarded {
Some(deserialized_packet.immutable_section().original_packet())
} else {
None
}
})
.collect()
) -> FilterForwardingResults<'a> {
let mut total_forwardable_tracer_packets = 0;
let mut total_tracer_packets_in_buffer = 0;
FilterForwardingResults {
forwardable_packets: deserialized_packets
.filter_map(|deserialized_packet| {
let is_tracer_packet = deserialized_packet
.immutable_section()
.original_packet()
.meta
.is_tracer_packet();
if is_tracer_packet {
total_tracer_packets_in_buffer += 1;
}
if !deserialized_packet.forwarded {
if is_tracer_packet {
total_forwardable_tracer_packets += 1;
}
Some(deserialized_packet.immutable_section().original_packet())
} else {
None
}
})
.collect(),
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
}
}
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
@ -515,11 +539,12 @@ impl BankingStage {
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
poh_recorder: &Arc<Mutex<PohRecorder>>,
packets: Vec<&Packet>,
filter_forwarding_results: &FilterForwardingResults,
data_budget: &DataBudget,
banking_stage_stats: &BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
) -> (std::result::Result<(), TransportError>, usize) {
let addr = match forward_option {
let leader_and_addr = match forward_option {
ForwardOption::NotForward => return (Ok(()), 0),
ForwardOption::ForwardTransaction => {
next_leader_tpu_forwards(cluster_info, poh_recorder)
@ -527,11 +552,22 @@ impl BankingStage {
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
};
let addr = match addr {
Some(addr) => addr,
let (leader_pubkey, addr) = match leader_and_addr {
Some(leader_and_addr) => leader_and_addr,
None => return (Ok(()), 0),
};
let FilterForwardingResults {
forwardable_packets,
total_forwardable_tracer_packets,
..
} = filter_forwarding_results;
tracer_packet_stats.increment_total_forwardable_tracer_packets(
*total_forwardable_tracer_packets,
leader_pubkey,
);
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
@ -543,7 +579,7 @@ impl BankingStage {
)
});
let packet_vec: Vec<_> = packets
let packet_vec: Vec<_> = forwardable_packets
.iter()
.filter_map(|p| {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
@ -879,6 +915,7 @@ impl BankingStage {
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
connection_cache: &ConnectionCache,
tracer_packet_stats: &mut TracerPacketStats,
) {
let (decision, make_decision_time) = measure!(
{
@ -948,6 +985,7 @@ impl BankingStage {
slot_metrics_tracker,
banking_stage_stats,
connection_cache,
tracer_packet_stats,
),
"forward",
);
@ -965,6 +1003,7 @@ impl BankingStage {
slot_metrics_tracker,
banking_stage_stats,
connection_cache,
tracer_packet_stats,
),
"forward_and_hold",
);
@ -974,6 +1013,7 @@ impl BankingStage {
}
}
#[allow(clippy::too_many_arguments)]
fn handle_forwarding(
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
@ -984,6 +1024,7 @@ impl BankingStage {
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
banking_stage_stats: &BankingStageStats,
connection_cache: &ConnectionCache,
tracer_packet_stats: &mut TracerPacketStats,
) {
if let ForwardOption::NotForward = forward_option {
if !hold {
@ -992,17 +1033,19 @@ impl BankingStage {
return;
}
let forwardable_packets =
let filter_forwarding_result =
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let forwardable_packets_len = filter_forwarding_result.forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets(
connection_cache,
forward_option,
cluster_info,
poh_recorder,
forwardable_packets,
&filter_forwarding_result,
data_budget,
banking_stage_stats,
tracer_packet_stats,
);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
@ -1026,6 +1069,9 @@ impl BankingStage {
} else {
slot_metrics_tracker
.increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64);
tracer_packet_stats.increment_total_cleared_from_buffer_after_forward(
filter_forwarding_result.total_tracer_packets_in_buffer,
);
buffered_packet_batches.clear();
}
}
@ -1048,6 +1094,7 @@ impl BankingStage {
let recorder = poh_recorder.lock().unwrap().recorder();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);
let qos_service = QosService::new(cost_model, id);
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
@ -1071,6 +1118,7 @@ impl BankingStage {
&qos_service,
&mut slot_metrics_tracker,
&connection_cache,
&mut tracer_packet_stats,
),
"process_buffered_packets",
);
@ -1078,6 +1126,8 @@ impl BankingStage {
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());
}
tracer_packet_stats.report(1000);
if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD {
let (_, slot_metrics_checker_check_slot_boundary_time) = measure!(
{
@ -1115,6 +1165,7 @@ impl BankingStage {
id,
&mut buffered_packet_batches,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
),
"receive_and_buffer_packets",
@ -1999,10 +2050,21 @@ impl BankingStage {
verified_receiver: &BankingPacketReceiver,
recv_timeout: Duration,
packet_count_upperbound: usize,
) -> Result<Vec<PacketBatch>, RecvTimeoutError> {
) -> Result<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>), RecvTimeoutError> {
let start = Instant::now();
let (mut packet_batches, _tracer_packet_stats_option) =
let mut aggregated_tracer_packet_stats_option: Option<SigverifyTracerPacketStats> = None;
let (mut packet_batches, new_tracer_packet_stats_option) =
verified_receiver.recv_timeout(recv_timeout)?;
if let Some(new_tracer_packet_stats) = &new_tracer_packet_stats_option {
if let Some(aggregated_tracer_packet_stats) = &mut aggregated_tracer_packet_stats_option
{
aggregated_tracer_packet_stats.aggregate(new_tracer_packet_stats);
} else {
aggregated_tracer_packet_stats_option = new_tracer_packet_stats_option;
}
}
let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum();
while let Ok((packet_batch, _tracer_packet_stats_option)) = verified_receiver.try_recv() {
trace!("got more packet batches in banking stage");
@ -2020,7 +2082,7 @@ impl BankingStage {
}
num_packets_received = packets_received;
}
Ok(packet_batches)
Ok((packet_batches, aggregated_tracer_packet_stats_option))
}
#[allow(clippy::too_many_arguments)]
@ -2032,17 +2094,21 @@ impl BankingStage {
id: u32,
buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let packet_batches = Self::receive_until(
let (packet_batches, new_sigverify_tracer_packet_stats_option) = Self::receive_until(
verified_receiver,
recv_timeout,
buffered_packet_batches.capacity() - buffered_packet_batches.len(),
)?;
recv_time.stop();
let packet_batches_len = packet_batches.len();
if let Some(new_sigverify_tracer_packet_stats) = &new_sigverify_tracer_packet_stats_option {
tracer_packet_stats
.aggregate_sigverify_tracer_packet_stats(new_sigverify_tracer_packet_stats);
}
let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
debug!(
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
@ -2051,7 +2117,6 @@ impl BankingStage {
packet_count,
id,
);
let mut proc_start = Measure::start("receive_and_buffer_packets_transactions_process");
let packet_batch_iter = packet_batches.into_iter();
let mut dropped_packets_count = 0;
@ -2072,21 +2137,14 @@ impl BankingStage {
&mut newly_buffered_packets_count,
banking_stage_stats,
slot_metrics_tracker,
tracer_packet_stats,
)
}
proc_start.stop();
recv_time.stop();
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms total count: {} id: {}",
timestamp(),
packet_batches_len,
proc_start.as_ms(),
packet_count,
id,
);
banking_stage_stats
.receive_and_buffer_packets_elapsed
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
.fetch_add(recv_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.receive_and_buffer_packets_count
.fetch_add(packet_count, Ordering::Relaxed);
@ -2114,6 +2172,7 @@ impl BankingStage {
newly_buffered_packets_count: &mut usize,
banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if !packet_indexes.is_empty() {
let _ = banking_stage_stats
@ -2124,14 +2183,18 @@ impl BankingStage {
slot_metrics_tracker
.increment_newly_buffered_packets_count(packet_indexes.len() as u64);
let number_of_dropped_packets = unprocessed_packet_batches.insert_batch(
unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes),
);
let (number_of_dropped_packets, number_of_dropped_tracer_packets) =
unprocessed_packet_batches.insert_batch(
unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes),
);
saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets);
slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count(
number_of_dropped_packets as u64,
);
tracer_packet_stats
.increment_total_exceeded_banking_stage_buffer(number_of_dropped_tracer_packets);
}
}
@ -2146,21 +2209,21 @@ impl BankingStage {
pub(crate) fn next_leader_tpu(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
}
fn next_leader_tpu_forwards(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
}
pub(crate) fn next_leader_tpu_vote(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
}
@ -2168,7 +2231,7 @@ fn next_leader_x<F>(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
port_selector: F,
) -> Option<std::net::SocketAddr>
) -> Option<(Pubkey, std::net::SocketAddr)>
where
F: FnOnce(&ContactInfo) -> SocketAddr,
{
@ -2177,7 +2240,9 @@ where
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
if let Some(leader_pubkey) = leader_pubkey {
cluster_info.lookup_contact_info(&leader_pubkey, port_selector)
cluster_info
.lookup_contact_info(&leader_pubkey, port_selector)
.map(|addr| (leader_pubkey, addr))
} else {
None
}
@ -2188,7 +2253,6 @@ mod tests {
use {
super::*,
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta},
solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
@ -3254,17 +3318,27 @@ mod tests {
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let mut p = Packet::from_data(None, &transaction).unwrap();
p.meta.port = packets_id;
p.meta.set_tracer(true);
DeserializedPacket::new(p).unwrap()
})
.collect_vec();
let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(result.len(), 256);
let FilterForwardingResults {
forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(forwardable_packets.len(), 256);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_forwardable_tracer_packets, 256);
// packets in a batch are forwarded in arbitrary order; verify the ports match after
// sorting
let expected_ports: Vec<_> = (0..256).collect();
let mut forwarded_ports: Vec<_> = result.into_iter().map(|p| p.meta.port).collect();
let mut forwarded_ports: Vec<_> = forwardable_packets
.into_iter()
.map(|p| p.meta.port)
.collect();
forwarded_ports.sort_unstable();
assert_eq!(expected_ports, forwarded_ports);
@ -3272,8 +3346,20 @@ mod tests {
for packet in &mut packets[0..num_already_forwarded] {
packet.forwarded = true;
}
let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(result.len(), packets.len() - num_already_forwarded);
let FilterForwardingResults {
forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(
forwardable_packets.len(),
packets.len() - num_already_forwarded
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_forwarded
);
}
#[test]
@ -4093,6 +4179,7 @@ mod tests {
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&connection_cache,
&mut TracerPacketStats::new(0),
);
recv_socket
@ -4204,6 +4291,7 @@ mod tests {
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&connection_cache,
&mut TracerPacketStats::new(0),
);
recv_socket

View File

@ -65,6 +65,7 @@ pub mod system_monitor_service;
mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
pub mod tracer_packet_stats;
pub mod tree_diff;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;

View File

@ -14,11 +14,11 @@ use {
},
crossbeam_channel::Sender,
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
solana_sdk::packet::Packet,
solana_sdk::{packet::Packet, saturating_add_assign},
};
#[derive(Debug, Default, Clone)]
pub struct TransactionTracerPacketStats {
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SigverifyTracerPacketStats {
pub total_removed_before_sigverify_stage: usize,
pub total_tracer_packets_received_in_sigverify_stage: usize,
pub total_tracer_packets_deduped: usize,
@ -26,10 +26,39 @@ pub struct TransactionTracerPacketStats {
pub total_tracker_packets_passed_sigverify: usize,
}
impl SigverifyTracerPacketStats {
pub fn is_default(&self) -> bool {
*self == SigverifyTracerPacketStats::default()
}
pub fn aggregate(&mut self, other: &SigverifyTracerPacketStats) {
saturating_add_assign!(
self.total_removed_before_sigverify_stage,
other.total_removed_before_sigverify_stage
);
saturating_add_assign!(
self.total_tracer_packets_received_in_sigverify_stage,
other.total_tracer_packets_received_in_sigverify_stage
);
saturating_add_assign!(
self.total_tracer_packets_deduped,
other.total_tracer_packets_deduped
);
saturating_add_assign!(
self.total_excess_tracer_packets,
other.total_excess_tracer_packets
);
saturating_add_assign!(
self.total_tracker_packets_passed_sigverify,
other.total_tracker_packets_passed_sigverify
);
}
}
#[derive(Clone)]
pub struct TransactionSigVerifier {
packet_sender: Sender<<Self as SigVerifier>::SendType>,
tracer_packet_stats: TransactionTracerPacketStats,
tracer_packet_stats: SigverifyTracerPacketStats,
recycler: Recycler<TxOffset>,
recycler_out: Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
@ -46,7 +75,7 @@ impl TransactionSigVerifier {
init();
Self {
packet_sender,
tracer_packet_stats: TransactionTracerPacketStats::default(),
tracer_packet_stats: SigverifyTracerPacketStats::default(),
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
reject_non_vote: false,
@ -98,7 +127,7 @@ impl SigVerifier for TransactionSigVerifier {
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
let mut tracer_packet_stats_to_send = TransactionTracerPacketStats::default();
let mut tracer_packet_stats_to_send = SigverifyTracerPacketStats::default();
std::mem::swap(
&mut tracer_packet_stats_to_send,
&mut self.tracer_packet_stats,

View File

@ -0,0 +1,207 @@
use {
crate::sigverify::SigverifyTracerPacketStats,
solana_sdk::{pubkey::Pubkey, saturating_add_assign, timing::timestamp},
std::collections::HashSet,
};
#[derive(Debug, Default)]
pub struct BankingStageTracerPacketStats {
total_exceeded_banking_stage_buffer: usize,
// This is the total number of tracer packets removed from the buffer
// after a leader's set of slots. Of these, only a subset that were in
// the buffer were actually forwardable (didn't arrive on forward port and haven't been
// forwarded before)
total_cleared_from_buffer_after_forward: usize,
total_forwardable_tracer_packets: usize,
forward_target_leaders: HashSet<Pubkey>,
}
#[derive(Debug, Default)]
pub struct ModifiableTracerPacketStats {
sigverify_tracer_packet_stats: SigverifyTracerPacketStats,
banking_stage_tracer_packet_stats: BankingStageTracerPacketStats,
}
#[derive(Debug, Default)]
pub struct TracerPacketStats {
id: u32,
last_report: u64,
modifiable_tracer_packet_stats: Option<ModifiableTracerPacketStats>,
}
impl TracerPacketStats {
pub fn new(id: u32) -> Self {
Self {
id,
..Self::default()
}
}
pub fn get_mutable_stats(&mut self) -> &mut ModifiableTracerPacketStats {
if self.modifiable_tracer_packet_stats.is_none() {
self.modifiable_tracer_packet_stats = Some(ModifiableTracerPacketStats::default());
}
self.modifiable_tracer_packet_stats.as_mut().unwrap()
}
pub fn aggregate_sigverify_tracer_packet_stats(
&mut self,
new_sigverify_stats: &SigverifyTracerPacketStats,
) {
if !new_sigverify_stats.is_default() {
let stats = self.get_mutable_stats();
stats
.sigverify_tracer_packet_stats
.aggregate(new_sigverify_stats);
}
}
pub fn increment_total_exceeded_banking_stage_buffer(
&mut self,
total_exceeded_banking_stage_buffer: usize,
) {
if total_exceeded_banking_stage_buffer != 0 {
let stats = self.get_mutable_stats();
saturating_add_assign!(
stats
.banking_stage_tracer_packet_stats
.total_exceeded_banking_stage_buffer,
total_exceeded_banking_stage_buffer
);
}
}
pub fn increment_total_cleared_from_buffer_after_forward(
&mut self,
total_cleared_from_buffer_after_forward: usize,
) {
if total_cleared_from_buffer_after_forward != 0 {
let stats = self.get_mutable_stats();
saturating_add_assign!(
stats
.banking_stage_tracer_packet_stats
.total_cleared_from_buffer_after_forward,
total_cleared_from_buffer_after_forward
);
}
}
pub fn increment_total_forwardable_tracer_packets(
&mut self,
total_forwardable_tracer_packets: usize,
forward_target_leader: Pubkey,
) {
if total_forwardable_tracer_packets != 0 {
let stats = self.get_mutable_stats();
stats
.banking_stage_tracer_packet_stats
.forward_target_leaders
.insert(forward_target_leader);
saturating_add_assign!(
stats
.banking_stage_tracer_packet_stats
.total_forwardable_tracer_packets,
total_forwardable_tracer_packets
);
}
}
pub fn report(&mut self, report_interval_ms: u64) {
let now = timestamp();
const LEADER_REPORT_LIMIT: usize = 4;
if now.saturating_sub(self.last_report) > report_interval_ms {
// We don't want to report unless we actually saw/forwarded a tracer packet
// to prevent noisy metrics
if let Some(modifiable_tracer_packet_stats) = self.modifiable_tracer_packet_stats.take()
{
datapoint_info!(
"tracer-packet-stats",
("id", self.id, i64),
(
"total_removed_before_sigverify",
modifiable_tracer_packet_stats
.sigverify_tracer_packet_stats
.total_removed_before_sigverify_stage as i64,
i64
),
(
"total_tracer_packets_received_in_sigverify",
modifiable_tracer_packet_stats
.sigverify_tracer_packet_stats
.total_tracer_packets_received_in_sigverify_stage
as i64,
i64
),
(
"total_tracer_packets_deduped_in_sigverify",
modifiable_tracer_packet_stats
.sigverify_tracer_packet_stats
.total_tracer_packets_deduped as i64,
i64
),
(
"total_excess_tracer_packets_discarded_in_sigverify",
modifiable_tracer_packet_stats
.sigverify_tracer_packet_stats
.total_excess_tracer_packets as i64,
i64
),
(
"total_tracker_packets_passed_sigverify",
modifiable_tracer_packet_stats
.sigverify_tracer_packet_stats
.total_tracker_packets_passed_sigverify as i64,
i64
),
(
"total_exceeded_banking_stage_buffer",
modifiable_tracer_packet_stats
.banking_stage_tracer_packet_stats
.total_exceeded_banking_stage_buffer as i64,
i64
),
(
"total_cleared_from_buffer_after_forward",
modifiable_tracer_packet_stats
.banking_stage_tracer_packet_stats
.total_cleared_from_buffer_after_forward as i64,
i64
),
(
"total_forwardable_tracer_packets",
modifiable_tracer_packet_stats
.banking_stage_tracer_packet_stats
.total_forwardable_tracer_packets as i64,
i64
),
(
"exceeded_expected_forward_leader_count",
modifiable_tracer_packet_stats
.banking_stage_tracer_packet_stats
.forward_target_leaders
.len()
> LEADER_REPORT_LIMIT,
bool
),
(
"forward_target_leaders",
itertools::Itertools::intersperse(
modifiable_tracer_packet_stats
.banking_stage_tracer_packet_stats
.forward_target_leaders
.iter()
.take(LEADER_REPORT_LIMIT)
.map(|leader_pubkey| leader_pubkey.to_string()),
", ".to_string()
)
.collect::<String>(),
String
)
);
*self = Self::default();
self.last_report = timestamp();
}
}
}
}

View File

@ -210,14 +210,23 @@ impl UnprocessedPacketBatches {
pub fn insert_batch(
&mut self,
deserialized_packets: impl Iterator<Item = DeserializedPacket>,
) -> usize {
) -> (usize, usize) {
let mut num_dropped_packets = 0;
let mut num_dropped_tracer_packets = 0;
for deserialized_packet in deserialized_packets {
if self.push(deserialized_packet).is_some() {
if let Some(dropped_packet) = self.push(deserialized_packet) {
num_dropped_packets += 1;
if dropped_packet
.immutable_section()
.original_packet()
.meta
.is_tracer_packet()
{
num_dropped_tracer_packets += 1;
}
}
}
num_dropped_packets
(num_dropped_packets, num_dropped_tracer_packets)
}
pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option<DeserializedPacket> {

View File

@ -81,12 +81,15 @@ impl VotingService {
inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize);
}
let target_address = if send_to_tpu_vote_port {
let pubkey_and_target_address = if send_to_tpu_vote_port {
crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder)
} else {
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder)
};
let _ = cluster_info.send_transaction(vote_op.tx(), target_address);
let _ = cluster_info.send_transaction(
vote_op.tx(),
pubkey_and_target_address.map(|(_pubkey, target_addr)| target_addr),
);
match vote_op {
VoteOp::PushVote {

View File

@ -331,7 +331,7 @@ pub fn check_for_tracer_packet(packet: &mut Packet) -> bool {
// Check for tracer pubkey
match packet.data(first_pubkey_start..first_pubkey_end) {
Some(pubkey) if pubkey == TRACER_KEY.as_ref() => {
packet.meta.flags |= PacketFlags::TRACER_PACKET;
packet.meta.set_tracer(true);
true
}
_ => false,

View File

@ -151,6 +151,11 @@ impl Meta {
self.flags.set(PacketFlags::DISCARD, discard);
}
#[inline]
pub fn set_tracer(&mut self, is_tracer: bool) {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}
#[inline]
pub fn forwarded(&self) -> bool {
self.flags.contains(PacketFlags::FORWARDED)