Forwarder: forward_packets w/o metrics (#30925)

This commit is contained in:
Andrew Fitzgerald 2023-04-12 14:09:24 -07:00 committed by GitHub
parent 8556a40ee0
commit 01659edd16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 72 additions and 43 deletions

View File

@ -9,7 +9,7 @@ use {
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
@ -135,6 +135,44 @@ impl Forwarder {
}
}
/// Forwards all valid, unprocessed packets in the iterator, up to a rate limit.
/// Returns whether forwarding succeeded, the number of attempted forwarded packets
/// if any, the time spent forwarding in us, and the leader pubkey if any.
fn forward_packets<'a>(
&self,
forward_option: &ForwardOption,
forwardable_packets: impl Iterator<Item = &'a Packet>,
) -> (
std::result::Result<(), TransportError>,
usize,
u64,
Option<Pubkey>,
) {
let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else {
return (Ok(()), 0, 0, None);
};
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
let packet_vec_len = packet_vec.len();
// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
let (res, forward_us) = if !packet_vec.is_empty() {
measure_us!(self.forward(forward_option, packet_vec, &addr))
} else {
(Ok(()), 0)
};
(res, packet_vec_len, forward_us, Some(leader_pubkey))
}
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets<'a>(
@ -147,60 +185,29 @@ impl Forwarder {
usize,
Option<Pubkey>,
) {
let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else {
return (Ok(()), 0, None);
};
let (res, num_packets, forward_us, leader_pubkey) =
self.forward_packets(forward_option, forwardable_packets);
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
let packet_vec_len = packet_vec.len();
// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len);
let mut measure = Measure::start("banking_stage-forward-us");
let res = if let ForwardOption::ForwardTpuVote = forward_option {
// The vote must be forwarded using only UDP.
if num_packets > 0 {
inc_new_counter_info!("banking_stage-forwarded_packets", num_packets);
if let ForwardOption::ForwardTpuVote = forward_option {
banking_stage_stats
.forwarded_vote_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
.fetch_add(num_packets, Ordering::Relaxed);
} else {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
banking_stage_stats
.forwarded_transaction_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let conn = self.connection_cache.get_connection(&addr);
conn.send_data_batch_async(packet_vec)
};
.fetch_add(num_packets, Ordering::Relaxed);
}
measure.stop();
inc_new_counter_info!(
"banking_stage-forward-us",
measure.as_us() as usize,
1000,
1000
);
inc_new_counter_info!("banking_stage-forward-us", forward_us as usize, 1000, 1000);
if let Err(err) = res {
if res.is_err() {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0, Some(leader_pubkey));
}
}
(Ok(()), packet_vec_len, Some(leader_pubkey))
(res, num_packets, leader_pubkey)
}
/// Get the pubkey and socket address for the leader to forward to
@ -231,6 +238,28 @@ impl Forwarder {
)
});
}
fn forward(
&self,
forward_option: &ForwardOption,
packet_vec: Vec<Vec<u8>>,
addr: &SocketAddr,
) -> Result<(), TransportError> {
match forward_option {
ForwardOption::ForwardTpuVote => {
// The vote must be forwarded using only UDP.
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
}
ForwardOption::ForwardTransaction => {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
let conn = self.connection_cache.get_connection(addr);
conn.send_data_batch_async(packet_vec)
}
ForwardOption::NotForward => panic!("should not forward"),
}
}
}
#[cfg(test)]