From 01659edd164605fb08cde3d368718f49456b8f70 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 12 Apr 2023 14:09:24 -0700 Subject: [PATCH] Forwarder: forward_packets w/o metrics (#30925) --- core/src/banking_stage/forwarder.rs | 115 +++++++++++++++++----------- 1 file changed, 72 insertions(+), 43 deletions(-) diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index ab78b4006..39ff47212 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -9,7 +9,7 @@ use { }, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, - solana_measure::measure::Measure, + solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, @@ -135,6 +135,44 @@ impl Forwarder { } } + /// Forwards all valid, unprocessed packets in the iterator, up to a rate limit. + /// Returns whether forwarding succeeded, the number of attempted forwarded packets + /// if any, the time spent forwarding in us, and the leader pubkey if any. + fn forward_packets<'a>( + &self, + forward_option: &ForwardOption, + forwardable_packets: impl Iterator, + ) -> ( + std::result::Result<(), TransportError>, + usize, + u64, + Option, + ) { + let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else { + return (Ok(()), 0, 0, None); + }; + + self.update_data_budget(); + let packet_vec: Vec<_> = forwardable_packets + .filter(|p| !p.meta().forwarded()) + .filter(|p| self.data_budget.take(p.meta().size)) + .filter_map(|p| p.data(..).map(|data| data.to_vec())) + .collect(); + + let packet_vec_len = packet_vec.len(); + // TODO: see https://github.com/solana-labs/solana/issues/23819 + // fix this so returns the correct number of succeeded packets + // when there's an error sending the batch. This was left as-is for now + // in favor of shipping Quic support, which was considered higher-priority + let (res, forward_us) = if !packet_vec.is_empty() { + measure_us!(self.forward(forward_option, packet_vec, &addr)) + } else { + (Ok(()), 0) + }; + + (res, packet_vec_len, forward_us, Some(leader_pubkey)) + } + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets<'a>( @@ -147,60 +185,29 @@ impl Forwarder { usize, Option, ) { - let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else { - return (Ok(()), 0, None); - }; + let (res, num_packets, forward_us, leader_pubkey) = + self.forward_packets(forward_option, forwardable_packets); - self.update_data_budget(); - - let packet_vec: Vec<_> = forwardable_packets - .filter(|p| !p.meta().forwarded()) - .filter(|p| self.data_budget.take(p.meta().size)) - .filter_map(|p| p.data(..).map(|data| data.to_vec())) - .collect(); - - let packet_vec_len = packet_vec.len(); - // TODO: see https://github.com/solana-labs/solana/issues/23819 - // fix this so returns the correct number of succeeded packets - // when there's an error sending the batch. This was left as-is for now - // in favor of shipping Quic support, which was considered higher-priority - if !packet_vec.is_empty() { - inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len); - - let mut measure = Measure::start("banking_stage-forward-us"); - - let res = if let ForwardOption::ForwardTpuVote = forward_option { - // The vote must be forwarded using only UDP. + if num_packets > 0 { + inc_new_counter_info!("banking_stage-forwarded_packets", num_packets); + if let ForwardOption::ForwardTpuVote = forward_option { banking_stage_stats .forwarded_vote_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); - batch_send(&self.socket, &pkts).map_err(|err| err.into()) + .fetch_add(num_packets, Ordering::Relaxed); } else { - // All other transactions can be forwarded using QUIC, get_connection() will use - // system wide setting to pick the correct connection object. banking_stage_stats .forwarded_transaction_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let conn = self.connection_cache.get_connection(&addr); - conn.send_data_batch_async(packet_vec) - }; + .fetch_add(num_packets, Ordering::Relaxed); + } - measure.stop(); - inc_new_counter_info!( - "banking_stage-forward-us", - measure.as_us() as usize, - 1000, - 1000 - ); + inc_new_counter_info!("banking_stage-forward-us", forward_us as usize, 1000, 1000); - if let Err(err) = res { + if res.is_err() { inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); - return (Err(err), 0, Some(leader_pubkey)); } } - (Ok(()), packet_vec_len, Some(leader_pubkey)) + (res, num_packets, leader_pubkey) } /// Get the pubkey and socket address for the leader to forward to @@ -231,6 +238,28 @@ impl Forwarder { ) }); } + + fn forward( + &self, + forward_option: &ForwardOption, + packet_vec: Vec>, + addr: &SocketAddr, + ) -> Result<(), TransportError> { + match forward_option { + ForwardOption::ForwardTpuVote => { + // The vote must be forwarded using only UDP. + let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect(); + batch_send(&self.socket, &pkts).map_err(|err| err.into()) + } + ForwardOption::ForwardTransaction => { + // All other transactions can be forwarded using QUIC, get_connection() will use + // system wide setting to pick the correct connection object. + let conn = self.connection_cache.get_connection(addr); + conn.send_data_batch_async(packet_vec) + } + ForwardOption::NotForward => panic!("should not forward"), + } + } } #[cfg(test)]