diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4a29abd790..ecd85c0470 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -35,7 +35,7 @@ use { solana_ledger::{ blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, }, - solana_measure::{measure, measure::Measure}, + solana_measure::{measure, measure::Measure, measure_us}, solana_metrics::inc_new_counter_info, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, @@ -59,7 +59,6 @@ use { cmp, collections::HashMap, env, - net::UdpSocket, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, @@ -454,14 +453,17 @@ impl BankingStage { let mut packet_deserializer = PacketDeserializer::new(packet_receiver); let poh_recorder = poh_recorder.clone(); - let cluster_info = cluster_info.clone(); let transaction_status_sender = transaction_status_sender.clone(); let replay_vote_sender = replay_vote_sender.clone(); - let data_budget = data_budget.clone(); - let connection_cache = connection_cache.clone(); - let bank_forks = bank_forks.clone(); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let forwarder = Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + connection_cache.clone(), + data_budget.clone(), + ); Builder::new() .name(format!("solBanknStgTx{i:02}")) @@ -469,15 +471,12 @@ impl BankingStage { Self::process_loop( &mut packet_deserializer, &decision_maker, + &forwarder, &poh_recorder, - &cluster_info, i, transaction_status_sender, replay_vote_sender, - &data_budget, log_messages_bytes_limit, - connection_cache, - &bank_forks, unprocessed_transaction_storage, ); }) @@ -634,21 +633,16 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_buffered_packets( decision_maker: &DecisionMaker, - socket: &UdpSocket, - poh_recorder: &Arc>, - cluster_info: &ClusterInfo, + forwarder: &Forwarder, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, transaction_status_sender: &Option, replay_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - data_budget: &DataBudget, qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, log_messages_bytes_limit: Option, - connection_cache: &ConnectionCache, tracer_packet_stats: &mut TracerPacketStats, - bank_forks: &Arc>, ) { if unprocessed_transaction_storage.should_not_process() { return; @@ -683,45 +677,27 @@ impl BankingStage { .increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us()); } BufferedPacketsDecision::Forward => { - let (_, forward_time) = measure!( - Forwarder::handle_forwarding( - cluster_info, - unprocessed_transaction_storage, - poh_recorder, - socket, - false, - data_budget, - slot_metrics_tracker, - banking_stage_stats, - connection_cache, - tracer_packet_stats, - bank_forks, - ), - "forward", - ); - slot_metrics_tracker.increment_forward_us(forward_time.as_us()); + let ((), forward_us) = measure_us!(forwarder.handle_forwarding( + unprocessed_transaction_storage, + false, + slot_metrics_tracker, + banking_stage_stats, + tracer_packet_stats, + )); + slot_metrics_tracker.increment_forward_us(forward_us); // Take metrics action after forwarding packets to include forwarded // metrics into current slot slot_metrics_tracker.apply_action(metrics_action); } BufferedPacketsDecision::ForwardAndHold => { - let (_, forward_and_hold_time) = measure!( - Forwarder::handle_forwarding( - cluster_info, - unprocessed_transaction_storage, - poh_recorder, - socket, - true, - data_budget, - slot_metrics_tracker, - banking_stage_stats, - connection_cache, - tracer_packet_stats, - bank_forks, - ), - "forward_and_hold", - ); - slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us()); + let ((), forward_and_hold_us) = measure_us!(forwarder.handle_forwarding( + unprocessed_transaction_storage, + true, + slot_metrics_tracker, + banking_stage_stats, + tracer_packet_stats, + )); + slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us); // Take metrics action after forwarding packets slot_metrics_tracker.apply_action(metrics_action); } @@ -733,19 +709,15 @@ impl BankingStage { fn process_loop( packet_deserializer: &mut PacketDeserializer, decision_maker: &DecisionMaker, + forwarder: &Forwarder, poh_recorder: &Arc>, - cluster_info: &ClusterInfo, id: u32, transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, - data_budget: &DataBudget, log_messages_bytes_limit: Option, - connection_cache: Arc, - bank_forks: &Arc>, mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { let recorder = poh_recorder.read().unwrap().recorder(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); let qos_service = QosService::new(id); @@ -760,21 +732,16 @@ impl BankingStage { let (_, process_buffered_packets_time) = measure!( Self::process_buffered_packets( decision_maker, - &socket, - poh_recorder, - cluster_info, + forwarder, &mut unprocessed_transaction_storage, &transaction_status_sender, &replay_vote_sender, &banking_stage_stats, &recorder, - data_budget, &qos_service, &mut slot_metrics_tracker, log_messages_bytes_limit, - &connection_cache, &mut tracer_packet_stats, - bank_forks, ), "process_buffered_packets", ); diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 74343ef68e..38165d35fa 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -22,28 +22,46 @@ use { }, }; -pub struct Forwarder; +pub(crate) struct Forwarder { + poh_recorder: Arc>, + bank_forks: Arc>, + socket: UdpSocket, + cluster_info: Arc, + connection_cache: Arc, + data_budget: Arc, +} impl Forwarder { - #[allow(clippy::too_many_arguments)] - pub fn handle_forwarding( - cluster_info: &ClusterInfo, + pub(crate) fn new( + poh_recorder: Arc>, + bank_forks: Arc>, + cluster_info: Arc, + connection_cache: Arc, + data_budget: Arc, + ) -> Self { + Self { + poh_recorder, + bank_forks, + socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + cluster_info, + connection_cache, + data_budget, + } + } + + pub(crate) fn handle_forwarding( + &self, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, - poh_recorder: &Arc>, - socket: &UdpSocket, hold: bool, - data_budget: &DataBudget, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, banking_stage_stats: &BankingStageStats, - connection_cache: &ConnectionCache, tracer_packet_stats: &mut TracerPacketStats, - bank_forks: &Arc>, ) { let forward_option = unprocessed_transaction_storage.forward_option(); // get current root bank from bank_forks, use it to sanitize transaction and // load all accounts from address loader; - let current_bank = bank_forks.read().unwrap().root_bank(); + let current_bank = self.bank_forks.read().unwrap().root_bank(); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -76,15 +94,10 @@ impl Forwarder { slot_metrics_tracker.increment_forwardable_batches_count(1); let batched_forwardable_packets_count = forward_batch.len(); - let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = - Self::forward_buffered_packets( - connection_cache, + let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = self + .forward_buffered_packets( &forward_option, - cluster_info, - poh_recorder, - socket, forward_batch.get_forwardable_packets(), - data_budget, banking_stage_stats, ); @@ -125,13 +138,9 @@ impl Forwarder { /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets<'a>( - connection_cache: &ConnectionCache, + &self, forward_option: &ForwardOption, - cluster_info: &ClusterInfo, - poh_recorder: &Arc>, - socket: &UdpSocket, forwardable_packets: impl Iterator, - data_budget: &DataBudget, banking_stage_stats: &BankingStageStats, ) -> ( std::result::Result<(), TransportError>, @@ -141,10 +150,12 @@ impl Forwarder { let leader_and_addr = match forward_option { ForwardOption::NotForward => return (Ok(()), 0, None), ForwardOption::ForwardTransaction => { - next_leader_tpu_forwards(cluster_info, poh_recorder) + next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder) } - ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + ForwardOption::ForwardTpuVote => { + next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) + } }; let (leader_pubkey, addr) = match leader_and_addr { Some(leader_and_addr) => leader_and_addr, @@ -156,7 +167,7 @@ impl Forwarder { const MAX_BYTES_PER_SECOND: usize = 12_000_000; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; - data_budget.update(INTERVAL_MS, |bytes| { + self.data_budget.update(INTERVAL_MS, |bytes| { std::cmp::min( bytes.saturating_add(MAX_BYTES_PER_INTERVAL), MAX_BYTES_BUDGET, @@ -165,7 +176,7 @@ impl Forwarder { let packet_vec: Vec<_> = forwardable_packets .filter_map(|p| { - if !p.meta().forwarded() && data_budget.take(p.meta().size) { + if !p.meta().forwarded() && self.data_budget.take(p.meta().size) { Some(p.data(..)?.to_vec()) } else { None @@ -189,14 +200,14 @@ impl Forwarder { .forwarded_vote_count .fetch_add(packet_vec_len, Ordering::Relaxed); let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); - batch_send(socket, &pkts).map_err(|err| err.into()) + batch_send(&self.socket, &pkts).map_err(|err| err.into()) } else { // All other transactions can be forwarded using QUIC, get_connection() will use // system wide setting to pick the correct connection object. banking_stage_stats .forwarded_transaction_count .fetch_add(packet_vec_len, Ordering::Relaxed); - let conn = connection_cache.get_connection(&addr); + let conn = self.connection_cache.get_connection(&addr); conn.send_data_batch_async(packet_vec) }; @@ -280,37 +291,36 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config), None); let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); + let cluster_info = Arc::new(cluster_info); let recv_socket = &local_node.sockets.tpu_forwards[0]; let test_cases = vec![ ("budget-restricted", DataBudget::restricted(), 0), ("budget-available", DataBudget::default(), 1), ]; - - let connection_cache = ConnectionCache::default(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); for (name, data_budget, expected_num_forwarded) in test_cases { + let forwarder = Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + Arc::new(ConnectionCache::default()), + Arc::new(data_budget), + ); let unprocessed_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter( vec![deserialized_packet.clone()].into_iter(), 1, ); let stats = BankingStageStats::default(); - Forwarder::handle_forwarding( - &cluster_info, + forwarder.handle_forwarding( &mut UnprocessedTransactionStorage::new_transaction_storage( unprocessed_packet_batches, ThreadType::Transactions, ), - &poh_recorder, - &socket, true, - &data_budget, &mut LeaderSlotMetricsTracker::new(0), &stats, - &connection_cache, &mut TracerPacketStats::new(0), - &bank_forks, ); recv_socket @@ -393,21 +403,21 @@ mod tests { ("fwd-no-hold", false, vec![], 0), ]; - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let forwarder = Forwarder::new( + poh_recorder, + bank_forks, + Arc::new(cluster_info), + Arc::new(connection_cache), + Arc::new(DataBudget::default()), + ); for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { let stats = BankingStageStats::default(); - Forwarder::handle_forwarding( - &cluster_info, + forwarder.handle_forwarding( &mut unprocessed_packet_batches, - &poh_recorder, - &socket, hold, - &DataBudget::default(), &mut LeaderSlotMetricsTracker::new(0), &stats, - &connection_cache, &mut TracerPacketStats::new(0), - &bank_forks, ); recv_socket