diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 24f6ba7a7e..41d3e331a5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -12,16 +12,14 @@ use { tracer_packet_stats::TracerPacketStats, unprocessed_packet_batches::{self, *}, }, + core::iter::repeat, crossbeam_channel::{ Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }, histogram::Histogram, itertools::Itertools, min_max_heap::MinMaxHeap, - solana_client::{ - connection_cache::ConnectionCache, tpu_connection::TpuConnection, - udp_client::UdpTpuConnection, - }, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, @@ -59,6 +57,7 @@ use { }, transport::TransportError, }, + solana_streamer::sendmmsg::batch_send, solana_transaction_status::token_balances::{ collect_token_balances, TransactionTokenBalancesSet, }, @@ -66,7 +65,7 @@ use { cmp, collections::HashMap, env, - net::SocketAddr, + net::{SocketAddr, UdpSocket}, rc::Rc, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -539,6 +538,7 @@ impl BankingStage { forward_option: &ForwardOption, cluster_info: &ClusterInfo, poh_recorder: &Arc>, + socket: &UdpSocket, filter_forwarding_results: &FilterForwardingResults, data_budget: &DataBudget, banking_stage_stats: &BankingStageStats, @@ -600,21 +600,22 @@ impl BankingStage { let mut measure = Measure::start("banking_stage-forward-us"); - let conn = if let ForwardOption::ForwardTpuVote = forward_option { - // The vote must be forwarded using only UDP. Let's get the UDP connection. + let res = if let ForwardOption::ForwardTpuVote = forward_option { + // The vote must be forwarded using only UDP. banking_stage_stats .forwarded_vote_count .fetch_add(packet_vec_len, Ordering::Relaxed); - Arc::new(UdpTpuConnection::new_from_addr(addr).into()) + let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); + batch_send(socket, &pkts).map_err(|err| err.into()) } else { // All other transactions can be forwarded using QUIC, get_connection() will use // system wide setting to pick the correct connection object. banking_stage_stats .forwarded_transaction_count .fetch_add(packet_vec_len, Ordering::Relaxed); - connection_cache.get_connection(&addr) + let conn = connection_cache.get_connection(&addr); + conn.send_wire_transaction_batch_async(packet_vec) }; - let res = conn.send_wire_transaction_batch_async(packet_vec); measure.stop(); inc_new_counter_info!( @@ -903,6 +904,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_buffered_packets( my_pubkey: &Pubkey, + socket: &UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, @@ -988,6 +990,7 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, + socket, false, data_budget, slot_metrics_tracker, @@ -1009,6 +1012,7 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, + socket, true, data_budget, slot_metrics_tracker, @@ -1032,6 +1036,7 @@ impl BankingStage { cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, poh_recorder: &Arc>, + socket: &UdpSocket, hold: bool, data_budget: &DataBudget, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -1055,6 +1060,7 @@ impl BankingStage { forward_option, cluster_info, poh_recorder, + socket, &filter_forwarding_result, data_budget, banking_stage_stats, @@ -1105,6 +1111,7 @@ impl BankingStage { connection_cache: Arc, ) { let recorder = poh_recorder.lock().unwrap().recorder(); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); @@ -1121,6 +1128,7 @@ impl BankingStage { let (_, process_buffered_packets_time) = measure!( Self::process_buffered_packets( &my_pubkey, + &socket, poh_recorder, cluster_info, &mut buffered_packet_batches, @@ -4160,6 +4168,7 @@ mod tests { ]; let connection_cache = ConnectionCache::default(); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); for (name, data_budget, expected_num_forwarded) in test_cases { let mut unprocessed_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter( @@ -4172,6 +4181,7 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, + &socket, true, &data_budget, &mut LeaderSlotMetricsTracker::new(0), @@ -4277,6 +4287,7 @@ mod tests { ), ]; + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { let stats = BankingStageStats::default(); BankingStage::handle_forwarding( @@ -4284,6 +4295,7 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, + &socket, hold, &DataBudget::default(), &mut LeaderSlotMetricsTracker::new(0),