diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 24ba54667..3d2f57f4b 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -13,7 +13,7 @@ use { get_tmp_ledger_path, }, solana_measure::measure::Measure, - solana_perf::packet::to_packets_chunked, + solana_perf::packet::to_packet_batches, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, @@ -212,7 +212,7 @@ fn main() { bank.clear_signatures(); } - let mut verified: Vec<_> = to_packets_chunked(&transactions, packets_per_chunk); + let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_chunk); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -364,7 +364,7 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packets_chunked(&transactions.clone(), packets_per_chunk); + verified = to_packet_batches(&transactions.clone(), packets_per_chunk); } start += chunk_len; diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index bade7a943..46eeeb761 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -2,8 +2,8 @@ use { clap::{crate_description, crate_name, App, Arg}, solana_streamer::{ - packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}, - streamer::{receiver, PacketReceiver}, + packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, + streamer::{receiver, PacketBatchReceiver}, }, std::{ cmp::max, @@ -20,19 +20,19 @@ use { fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut msgs = Packets::default(); - msgs.packets.resize(10, Packet::default()); - for w in msgs.packets.iter_mut() { + let mut packet_batch = PacketBatch::default(); + packet_batch.packets.resize(10, Packet::default()); + for w in packet_batch.packets.iter_mut() { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(addr); } - let msgs = Arc::new(msgs); + let packet_batch = Arc::new(packet_batch); spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - for p in &msgs.packets { + for p in &packet_batch.packets { let a = p.meta.addr(); assert!(p.meta.size <= PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -42,14 +42,14 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { }) } -fn sink(exit: Arc, rvs: Arc, r: PacketReceiver) -> JoinHandle<()> { +fn sink(exit: Arc, rvs: Arc, r: PacketBatchReceiver) -> JoinHandle<()> { spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let timer = Duration::new(1, 0); - if let Ok(msgs) = r.recv_timeout(timer) { - rvs.fetch_add(msgs.packets.len(), Ordering::Relaxed); + if let Ok(packet_batch) = r.recv_timeout(timer) { + rvs.fetch_add(packet_batch.packets.len(), Ordering::Relaxed); } }) } @@ -81,7 +81,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); for _ in 0..num_sockets { let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1b9e79a85..dbb0961af 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -20,7 +20,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }, - solana_perf::{packet::to_packets_chunked, test_tx::test_tx}, + solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{bank::Bank, cost_model::CostModel}, solana_sdk::{ @@ -77,11 +77,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let tx = test_tx(); let len = 4096; let chunk_size = 1024; - let batches = to_packets_chunked(&vec![tx; len], chunk_size); - let mut packets = VecDeque::new(); + let batches = to_packet_batches(&vec![tx; len], chunk_size); + let mut packet_batches = VecDeque::new(); for batch in batches { let batch_len = batch.packets.len(); - packets.push_back((batch, vec![0usize; batch_len], false)); + packet_batches.push_back((batch, vec![0usize; batch_len], false)); } let (s, _r) = unbounded(); // This tests the performance of buffering packets. @@ -91,7 +91,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &my_pubkey, std::u128::MAX, &poh_recorder, - &mut packets, + &mut packet_batches, None, &s, None::>, @@ -206,7 +206,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { assert!(r.is_ok(), "sanity parallel execution"); } bank.clear_signatures(); - let verified: Vec<_> = to_packets_chunked(&transactions, PACKETS_PER_BATCH); + let verified: Vec<_> = to_packet_batches(&transactions, PACKETS_PER_BATCH); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index e48ab9301..894c474ce 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -8,7 +8,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage}, - solana_perf::{packet::to_packets_chunked, test_tx::test_tx}, + solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -28,7 +28,7 @@ fn bench_packet_discard(bencher: &mut Bencher) { let len = 30 * 1000; let chunk_size = 1024; let tx = test_tx(); - let mut batches = to_packets_chunked(&vec![tx; len], chunk_size); + let mut batches = to_packet_batches(&vec![tx; len], chunk_size); let mut total = 0; @@ -74,7 +74,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { let chunk_size = 1024; let mut batches = if use_same_tx { let tx = test_tx(); - to_packets_chunked(&vec![tx; len], chunk_size) + to_packet_batches(&vec![tx; len], chunk_size) } else { let from_keypair = Keypair::new(); let to_keypair = Keypair::new(); @@ -89,7 +89,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { ) }) .collect(); - to_packets_chunked(&txs, chunk_size) + to_packet_batches(&txs, chunk_size) }; trace!( diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index c824036cd..4ccdb3333 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -14,7 +14,7 @@ use { solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}, solana_measure::measure::Measure, solana_perf::{ - packet::{limited_deserialize, Packet, Packets}, + packet::{limited_deserialize, Packet, PacketBatch}, recycler::Recycler, }, solana_runtime::bank::Bank, @@ -23,7 +23,7 @@ use { pubkey::Pubkey, timing::timestamp, }, - solana_streamer::streamer::{self, PacketReceiver}, + solana_streamer::streamer::{self, PacketBatchReceiver}, std::{ collections::HashSet, net::UdpSocket, @@ -197,7 +197,7 @@ impl AncestorHashesService { /// Listen for responses to our ancestors hashes repair requests fn run_responses_listener( ancestor_hashes_request_statuses: Arc>, - response_receiver: PacketReceiver, + response_receiver: PacketBatchReceiver, blockstore: Arc, outstanding_requests: Arc>, exit: Arc, @@ -240,7 +240,7 @@ impl AncestorHashesService { /// Process messages from the network fn process_new_packets_from_channel( ancestor_hashes_request_statuses: &DashMap, - response_receiver: &PacketReceiver, + response_receiver: &PacketBatchReceiver, blockstore: &Blockstore, outstanding_requests: &RwLock, stats: &mut AncestorHashesResponsesStats, @@ -249,17 +249,17 @@ impl AncestorHashesService { retryable_slots_sender: &RetryableSlotsSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let mut responses = vec![response_receiver.recv_timeout(timeout)?]; - let mut total_packets = responses[0].packets.len(); + let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?]; + let mut total_packets = packet_batches[0].packets.len(); let mut dropped_packets = 0; - while let Ok(more) = response_receiver.try_recv() { - total_packets += more.packets.len(); + while let Ok(batch) = response_receiver.try_recv() { + total_packets += batch.packets.len(); if total_packets < *max_packets { // Drop the rest in the channel in case of DOS - responses.push(more); + packet_batches.push(batch); } else { - dropped_packets += more.packets.len(); + dropped_packets += batch.packets.len(); } } @@ -267,10 +267,10 @@ impl AncestorHashesService { stats.total_packets += total_packets; let mut time = Measure::start("ancestor_hashes::handle_packets"); - for response in responses { - Self::process_single_packets( + for packet_batch in packet_batches { + Self::process_packet_batch( ancestor_hashes_request_statuses, - response, + packet_batch, stats, outstanding_requests, blockstore, @@ -289,16 +289,16 @@ impl AncestorHashesService { Ok(()) } - fn process_single_packets( + fn process_packet_batch( ancestor_hashes_request_statuses: &DashMap, - packets: Packets, + packet_batch: PacketBatch, stats: &mut AncestorHashesResponsesStats, outstanding_requests: &RwLock, blockstore: &Blockstore, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, retryable_slots_sender: &RetryableSlotsSender, ) { - packets.packets.iter().for_each(|packet| { + packet_batch.packets.iter().for_each(|packet| { let decision = Self::verify_and_process_ancestor_response( packet, ancestor_hashes_request_statuses, @@ -871,7 +871,7 @@ mod test { t_listen: JoinHandle<()>, exit: Arc, responder_info: ContactInfo, - response_receiver: PacketReceiver, + response_receiver: PacketBatchReceiver, correct_bank_hashes: HashMap, } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d1c7e5eb0..3328c5c14 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -15,7 +15,7 @@ use { solana_perf::{ cuda_runtime::PinnedVec, data_budget::DataBudget, - packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, + packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH}, perf_libs, }, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, @@ -64,10 +64,10 @@ use { }; /// (packets, valid_indexes, forwarded) -/// Set of packets with a list of which are valid and if this batch has been forwarded. -type PacketsAndOffsets = (Packets, Vec, bool); +/// Batch of packets with a list of which are valid and if this batch has been forwarded. +type PacketBatchAndOffsets = (PacketBatch, Vec, bool); -pub type UnprocessedPackets = VecDeque; +pub type UnprocessedPacketBatches = VecDeque; /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; @@ -255,9 +255,9 @@ impl BankingStage { pub fn new( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, @@ -278,9 +278,9 @@ impl BankingStage { fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - tpu_verified_vote_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, + tpu_verified_vote_receiver: CrossbeamReceiver>, + verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, @@ -346,12 +346,12 @@ impl BankingStage { } fn filter_valid_packets_for_forwarding<'a>( - all_packets: impl Iterator, + packet_batches: impl Iterator, ) -> Vec<&'a Packet> { - all_packets - .filter(|(_p, _indexes, forwarded)| !forwarded) - .flat_map(|(p, valid_indexes, _forwarded)| { - valid_indexes.iter().map(move |x| &p.packets[*x]) + packet_batches + .filter(|(_batch, _indexes, forwarded)| !forwarded) + .flat_map(|(batch, valid_indexes, _forwarded)| { + valid_indexes.iter().map(move |x| &batch.packets[*x]) }) .collect() } @@ -359,10 +359,10 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - unprocessed_packets: &UnprocessedPackets, + buffered_packet_batches: &UnprocessedPacketBatches, data_budget: &DataBudget, ) -> std::io::Result<()> { - let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); + let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; @@ -385,7 +385,7 @@ impl BankingStage { Ok(()) } - // Returns whether the given `Packets` has any more remaining unprocessed + // Returns whether the given `PacketBatch` has any more remaining unprocessed // transactions fn update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes: &mut Vec, @@ -404,7 +404,7 @@ impl BankingStage { my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, poh_recorder: &Arc>, - buffered_packets: &mut UnprocessedPackets, + buffered_packet_batches: &mut UnprocessedPacketBatches, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, test_fn: Option, @@ -412,19 +412,21 @@ impl BankingStage { recorder: &TransactionRecorder, qos_service: &Arc, ) { - let mut rebuffered_packets_len = 0; + let mut rebuffered_packet_count = 0; let mut new_tx_count = 0; - let buffered_len = buffered_packets.len(); + let buffered_packet_batches_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; - buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| { + buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| { + let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = + buffered_packet_batch_and_offsets; if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones let new_unprocessed_indexes = Self::filter_unprocessed_packets( bank, - msgs, + packet_batch, original_unprocessed_indexes, my_pubkey, *next_leader, @@ -446,7 +448,7 @@ impl BankingStage { &working_bank, &bank_creation_time, recorder, - msgs, + packet_batch, original_unprocessed_indexes.to_owned(), transaction_status_sender.clone(), gossip_vote_sender, @@ -467,7 +469,7 @@ impl BankingStage { new_tx_count += processed; // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding - rebuffered_packets_len += new_unprocessed_indexes.len(); + rebuffered_packet_count += new_unprocessed_indexes.len(); let has_more_unprocessed_transactions = Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, @@ -478,7 +480,7 @@ impl BankingStage { } has_more_unprocessed_transactions } else { - rebuffered_packets_len += original_unprocessed_indexes.len(); + rebuffered_packet_count += original_unprocessed_indexes.len(); // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions( @@ -494,7 +496,7 @@ impl BankingStage { debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timestamp(), - buffered_len, + buffered_packet_batches_len, proc_start.as_ms(), new_tx_count, (new_tx_count as f32) / (proc_start.as_s()) @@ -505,7 +507,7 @@ impl BankingStage { .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats .rebuffered_packets_count - .fetch_add(rebuffered_packets_len, Ordering::Relaxed); + .fetch_add(rebuffered_packet_count, Ordering::Relaxed); banking_stage_stats .consumed_buffered_packets_count .fetch_add(new_tx_count, Ordering::Relaxed); @@ -550,7 +552,7 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, - buffered_packets: &mut UnprocessedPackets, + buffered_packet_batches: &mut UnprocessedPacketBatches, forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -592,7 +594,7 @@ impl BankingStage { my_pubkey, max_tx_ingestion_ns, poh_recorder, - buffered_packets, + buffered_packet_batches, transaction_status_sender, gossip_vote_sender, None::>, @@ -605,7 +607,7 @@ impl BankingStage { Self::handle_forwarding( forward_option, cluster_info, - buffered_packets, + buffered_packet_batches, poh_recorder, socket, false, @@ -616,7 +618,7 @@ impl BankingStage { Self::handle_forwarding( forward_option, cluster_info, - buffered_packets, + buffered_packet_batches, poh_recorder, socket, true, @@ -631,7 +633,7 @@ impl BankingStage { fn handle_forwarding( forward_option: &ForwardOption, cluster_info: &ClusterInfo, - buffered_packets: &mut UnprocessedPackets, + buffered_packet_batches: &mut UnprocessedPacketBatches, poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, @@ -640,7 +642,7 @@ impl BankingStage { let addr = match forward_option { ForwardOption::NotForward => { if !hold { - buffered_packets.clear(); + buffered_packet_batches.clear(); } return; } @@ -653,20 +655,20 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget); + let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget); if hold { - buffered_packets.retain(|(_, index, _)| !index.is_empty()); - for (_, _, forwarded) in buffered_packets.iter_mut() { + buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); + for (_, _, forwarded) in buffered_packet_batches.iter_mut() { *forwarded = true; } } else { - buffered_packets.clear(); + buffered_packet_batches.clear(); } } #[allow(clippy::too_many_arguments)] fn process_loop( - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, @@ -681,17 +683,17 @@ impl BankingStage { ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packets = VecDeque::with_capacity(batch_limit); + let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); loop { let my_pubkey = cluster_info.id(); - while !buffered_packets.is_empty() { + while !buffered_packet_batches.is_empty() { let decision = Self::process_buffered_packets( &my_pubkey, &socket, poh_recorder, cluster_info, - &mut buffered_packets, + &mut buffered_packet_batches, &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, @@ -709,7 +711,7 @@ impl BankingStage { } } - let recv_timeout = if !buffered_packets.is_empty() { + let recv_timeout = if !buffered_packet_batches.is_empty() { // If packets are buffered, let's wait for less time on recv from the channel. // This helps detect the next leader faster, and processing the buffered // packets quickly @@ -729,7 +731,7 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), &gossip_vote_sender, - &mut buffered_packets, + &mut buffered_packet_batches, &banking_stage_stats, duplicates, &recorder, @@ -1076,7 +1078,7 @@ impl BankingStage { // with their packet indexes. #[allow(clippy::needless_collect)] fn transactions_from_packets( - msgs: &Packets, + packet_batch: &PacketBatch, transaction_indexes: &[usize], feature_set: &Arc, votes_only: bool, @@ -1084,7 +1086,7 @@ impl BankingStage { transaction_indexes .iter() .filter_map(|tx_index| { - let p = &msgs.packets[*tx_index]; + let p = &packet_batch.packets[*tx_index]; if votes_only && !p.meta.is_simple_vote_tx { return None; } @@ -1149,7 +1151,7 @@ impl BankingStage { bank: &Arc, bank_creation_time: &Instant, poh: &TransactionRecorder, - msgs: &Packets, + packet_batch: &PacketBatch, packet_indexes: Vec, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -1158,7 +1160,7 @@ impl BankingStage { ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( - msgs, + packet_batch, &packet_indexes, &bank.feature_set, bank.vote_only_bank(), @@ -1214,7 +1216,7 @@ impl BankingStage { fn filter_unprocessed_packets( bank: &Arc, - msgs: &Packets, + packet_batch: &PacketBatch, transaction_indexes: &[usize], my_pubkey: &Pubkey, next_leader: Option, @@ -1232,7 +1234,7 @@ impl BankingStage { let mut unprocessed_packet_conversion_time = Measure::start("unprocessed_packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( - msgs, + packet_batch, transaction_indexes, &bank.feature_set, bank.vote_only_bank(), @@ -1282,7 +1284,7 @@ impl BankingStage { /// Process the incoming packets fn process_packets( my_pubkey: &Pubkey, - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, @@ -1290,41 +1292,41 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - buffered_packets: &mut UnprocessedPackets, + buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, qos_service: &Arc, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); - let mms = verified_receiver.recv_timeout(recv_timeout)?; + let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; recv_time.stop(); - let mms_len = mms.len(); - let count: usize = mms.iter().map(|x| x.packets.len()).sum(); + let packet_batches_len = packet_batches.len(); + let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timestamp(), duration_as_ms(&recv_start.elapsed()), - count, + packet_count, id, ); - inc_new_counter_debug!("banking_stage-transactions_received", count); + inc_new_counter_debug!("banking_stage-transactions_received", packet_count); let mut proc_start = Measure::start("process_packets_transactions_process"); let mut new_tx_count = 0; - let mut mms_iter = mms.into_iter(); + let mut packet_batch_iter = packet_batches.into_iter(); let mut dropped_packets_count = 0; let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; - while let Some(msgs) = mms_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&msgs.packets); + while let Some(packet_batch) = packet_batch_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank(); let working_bank_start = poh_recorder_bank.working_bank_start(); if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() { Self::push_unprocessed( - buffered_packets, - msgs, + buffered_packet_batches, + packet_batch, packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1347,7 +1349,7 @@ impl BankingStage { working_bank, bank_creation_time, recorder, - &msgs, + &packet_batch, packet_indexes, transaction_status_sender.clone(), gossip_vote_sender, @@ -1359,8 +1361,8 @@ impl BankingStage { // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( - buffered_packets, - msgs, + buffered_packet_batches, + packet_batch, unprocessed_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1376,19 +1378,19 @@ impl BankingStage { let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones #[allow(clippy::while_let_on_iterator)] - while let Some(msgs) = mms_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&msgs.packets); + while let Some(packet_batch) = packet_batch_iter.next() { + let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); let unprocessed_indexes = Self::filter_unprocessed_packets( working_bank, - &msgs, + &packet_batch, &packet_indexes, my_pubkey, next_leader, banking_stage_stats, ); Self::push_unprocessed( - buffered_packets, - msgs, + buffered_packet_batches, + packet_batch, unprocessed_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -1409,11 +1411,11 @@ impl BankingStage { debug!( "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", timestamp(), - mms_len, + packet_batches_len, proc_start.as_ms(), new_tx_count, (new_tx_count as f32) / (proc_start.as_s()), - count, + packet_count, id, ); banking_stage_stats @@ -1421,7 +1423,7 @@ impl BankingStage { .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats .process_packets_count - .fetch_add(count, Ordering::Relaxed); + .fetch_add(packet_count, Ordering::Relaxed); banking_stage_stats .new_tx_count .fetch_add(new_tx_count, Ordering::Relaxed); @@ -1436,9 +1438,12 @@ impl BankingStage { .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats .current_buffered_packet_batches_count - .swap(buffered_packets.len(), Ordering::Relaxed); + .swap(buffered_packet_batches.len(), Ordering::Relaxed); banking_stage_stats.current_buffered_packets_count.swap( - buffered_packets.iter().map(|packets| packets.1.len()).sum(), + buffered_packet_batches + .iter() + .map(|packets| packets.1.len()) + .sum(), Ordering::Relaxed, ); *recv_start = Instant::now(); @@ -1446,8 +1451,8 @@ impl BankingStage { } fn push_unprocessed( - unprocessed_packets: &mut UnprocessedPackets, - packets: Packets, + unprocessed_packet_batches: &mut UnprocessedPacketBatches, + packet_batch: PacketBatch, mut packet_indexes: Vec, dropped_packet_batches_count: &mut usize, dropped_packets_count: &mut usize, @@ -1462,7 +1467,7 @@ impl BankingStage { let mut duplicates = duplicates.lock().unwrap(); let (cache, hasher) = duplicates.deref_mut(); packet_indexes.retain(|i| { - let packet_hash = hasher.hash_packet(&packets.packets[*i]); + let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); match cache.get_mut(&packet_hash) { Some(_hash) => false, None => { @@ -1483,14 +1488,14 @@ impl BankingStage { ); } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { - if unprocessed_packets.len() >= batch_limit { + if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; - if let Some(dropped_batch) = unprocessed_packets.pop_front() { + if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { *dropped_packets_count += dropped_batch.1.len(); } } *newly_buffered_packets_count += packet_indexes.len(); - unprocessed_packets.push_back((packets, packet_indexes, false)); + unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); } } @@ -1560,7 +1565,7 @@ mod tests { get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::to_packets_chunked, + solana_perf::packet::to_packet_batches, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -1697,7 +1702,9 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } - pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec)>) -> Vec { + pub fn convert_from_old_verified( + mut with_vers: Vec<(PacketBatch, Vec)>, + ) -> Vec { with_vers.iter_mut().for_each(|(b, v)| { b.packets .iter_mut() @@ -1769,18 +1776,18 @@ mod tests { let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash); // send 'em over - let packets = to_packets_chunked(&[tx_no_ver, tx_anf, tx], 3); + let packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3); // glad they all fit - assert_eq!(packets.len(), 1); + assert_eq!(packet_batches.len(), 1); - let packets = packets + let packet_batches = packet_batches .into_iter() - .map(|packets| (packets, vec![0u8, 1u8, 1u8])) + .map(|batch| (batch, vec![0u8, 1u8, 1u8])) .collect(); - let packets = convert_from_old_verified(packets); + let packet_batches = convert_from_old_verified(packet_batches); verified_sender // no_ver, anf, tx - .send(packets) + .send(packet_batches) .unwrap(); drop(verified_sender); @@ -1846,24 +1853,24 @@ mod tests { let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash()); - let packets = to_packets_chunked(&[tx], 1); - let packets = packets + let packet_batches = to_packet_batches(&[tx], 1); + let packet_batches = packet_batches .into_iter() - .map(|packets| (packets, vec![1u8])) + .map(|batch| (batch, vec![1u8])) .collect(); - let packets = convert_from_old_verified(packets); - verified_sender.send(packets).unwrap(); + let packet_batches = convert_from_old_verified(packet_batches); + verified_sender.send(packet_batches).unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash()); - let packets = to_packets_chunked(&[tx], 1); - let packets = packets + let packet_batches = to_packet_batches(&[tx], 1); + let packet_batches = packet_batches .into_iter() - .map(|packets| (packets, vec![1u8])) + .map(|batch| (batch, vec![1u8])) .collect(); - let packets = convert_from_old_verified(packets); - verified_sender.send(packets).unwrap(); + let packet_batches = convert_from_old_verified(packet_batches); + verified_sender.send(packet_batches).unwrap(); let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2381,9 +2388,9 @@ mod tests { fn test_filter_valid_packets() { solana_logger::setup(); - let mut all_packets = (0..16) + let mut packet_batches = (0..16) .map(|packets_id| { - let packets = Packets::new( + let packet_batch = PacketBatch::new( (0..32) .map(|packet_id| { let mut p = Packet::default(); @@ -2395,11 +2402,11 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (packets, valid_indexes, false) + (packet_batch, valid_indexes, false) }) .collect_vec(); - let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); + let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); assert_eq!(result.len(), 256); @@ -2413,8 +2420,8 @@ mod tests { }) .collect_vec(); - all_packets[0].2 = true; - let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); + packet_batches[0].2 = true; + let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); assert_eq!(result.len(), 240); } @@ -2666,12 +2673,15 @@ mod tests { setup_conflicting_transactions(&ledger_path); let recorder = poh_recorder.lock().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); - let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions); - assert_eq!(packets_vec.len(), 1); - assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions); - let all_packets = packets_vec.pop().unwrap(); - let mut buffered_packets: UnprocessedPackets = vec![( - all_packets, + let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions); + assert_eq!(packet_batches.len(), 1); + assert_eq!( + packet_batches[0].packets.len(), + num_conflicting_transactions + ); + let packet_batch = packet_batches.pop().unwrap(); + let mut buffered_packet_batches: UnprocessedPacketBatches = vec![( + packet_batch, (0..num_conflicting_transactions).into_iter().collect(), false, )] @@ -2687,7 +2697,7 @@ mod tests { &Pubkey::default(), max_tx_processing_ns, &poh_recorder, - &mut buffered_packets, + &mut buffered_packet_batches, None, &gossip_vote_sender, None::>, @@ -2695,7 +2705,10 @@ mod tests { &recorder, &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); - assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); + assert_eq!( + buffered_packet_batches[0].1.len(), + num_conflicting_transactions + ); // When the poh recorder has a bank, should process all non conflicting buffered packets. // Processes one packet per iteration of the loop for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { @@ -2704,7 +2717,7 @@ mod tests { &Pubkey::default(), max_tx_processing_ns, &poh_recorder, - &mut buffered_packets, + &mut buffered_packet_batches, None, &gossip_vote_sender, None::>, @@ -2713,9 +2726,9 @@ mod tests { &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); if num_expected_unprocessed == 0 { - assert!(buffered_packets.is_empty()) + assert!(buffered_packet_batches.is_empty()) } else { - assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed); + assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed); } } poh_recorder @@ -2735,12 +2748,12 @@ mod tests { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = setup_conflicting_transactions(&ledger_path); let num_conflicting_transactions = transactions.len(); - let packets_vec = to_packets_chunked(&transactions, 1); - assert_eq!(packets_vec.len(), num_conflicting_transactions); - for single_packets in &packets_vec { - assert_eq!(single_packets.packets.len(), 1); + let packet_batches = to_packet_batches(&transactions, 1); + assert_eq!(packet_batches.len(), num_conflicting_transactions); + for single_packet_batch in &packet_batches { + assert_eq!(single_packet_batch.packets.len(), 1); } - let mut buffered_packets: UnprocessedPackets = packets_vec + let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches .clone() .into_iter() .map(|single_packets| (single_packets, vec![0], false)) @@ -2754,8 +2767,8 @@ mod tests { continue_receiver.recv().unwrap(); }); // When the poh recorder has a bank, it should process all non conflicting buffered packets. - // Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then - // each iteration of this loop will process one element of `packets_vec`per iteration of the + // Because each conflicting transaction is in it's own `Packet` within a `PacketBatch`, then + // each iteration of this loop will process one element of the batch per iteration of the // loop. let interrupted_iteration = 1; poh_recorder.lock().unwrap().set_bank(&bank); @@ -2770,7 +2783,7 @@ mod tests { &Pubkey::default(), std::u128::MAX, &poh_recorder_, - &mut buffered_packets, + &mut buffered_packet_batches, None, &gossip_vote_sender, test_fn, @@ -2782,13 +2795,13 @@ mod tests { // Check everything is correct. All indexes after `interrupted_iteration` // should still be unprocessed assert_eq!( - buffered_packets.len(), - packets_vec[interrupted_iteration + 1..].len() + buffered_packet_batches.len(), + packet_batches[interrupted_iteration + 1..].len() ); for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in - buffered_packets + buffered_packet_batches .iter() - .zip(&packets_vec[interrupted_iteration + 1..]) + .zip(&packet_batches[interrupted_iteration + 1..]) { assert_eq!( remaining_unprocessed_packet.packets[0], @@ -2823,10 +2836,10 @@ mod tests { #[test] fn test_forwarder_budget() { solana_logger::setup(); - // Create `Packets` with 1 unprocessed element - let single_element_packets = Packets::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPackets = - vec![(single_element_packets, vec![0], false)] + // Create `PacketBatch` with 1 unprocessed packet + let single_packet_batch = PacketBatch::new(vec![Packet::default()]); + let mut unprocessed_packets: UnprocessedPacketBatches = + vec![(single_packet_batch, vec![0], false)] .into_iter() .collect(); @@ -2872,14 +2885,16 @@ mod tests { #[test] fn test_push_unprocessed_batch_limit() { solana_logger::setup(); - // Create `Packets` with 2 unprocessed elements - let new_packets = Packets::new(vec![Packet::default(); 2]); - let mut unprocessed_packets: UnprocessedPackets = - vec![(new_packets, vec![0, 1], false)].into_iter().collect(); + // Create `PacketBatch` with 2 unprocessed packets + let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]); + let mut unprocessed_packets: UnprocessedPacketBatches = + vec![(new_packet_batch, vec![0, 1], false)] + .into_iter() + .collect(); // Set the limit to 2 let batch_limit = 2; - // Create some new unprocessed packets - let new_packets = Packets::new(vec![Packet::default()]); + // Create new unprocessed packets and add to a batch + let new_packet_batch = PacketBatch::new(vec![Packet::default()]); let packet_indexes = vec![]; let duplicates = Arc::new(Mutex::new(( @@ -2894,7 +2909,7 @@ mod tests { // packets are not added to the unprocessed queue BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packets.clone(), + new_packet_batch.clone(), packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -2913,7 +2928,7 @@ mod tests { let packet_indexes = vec![0]; BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packets, + new_packet_batch, packet_indexes.clone(), &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -2929,7 +2944,7 @@ mod tests { // Because we've reached the batch limit, old unprocessed packets are // dropped and the new one is appended to the end - let new_packets = Packets::new(vec![Packet::from_data( + let new_packet_batch = PacketBatch::new(vec![Packet::from_data( Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), 42, ) @@ -2937,7 +2952,7 @@ mod tests { assert_eq!(unprocessed_packets.len(), batch_limit); BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packets.clone(), + new_packet_batch.clone(), packet_indexes.clone(), &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -2947,7 +2962,10 @@ mod tests { &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); + assert_eq!( + unprocessed_packets[1].0.packets[0], + new_packet_batch.packets[0] + ); assert_eq!(dropped_packet_batches_count, 1); assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); @@ -2955,7 +2973,7 @@ mod tests { // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, - new_packets.clone(), + new_packet_batch.clone(), packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, @@ -2965,7 +2983,10 @@ mod tests { &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); + assert_eq!( + unprocessed_packets[1].0.packets[0], + new_packet_batch.packets[0] + ); assert_eq!(dropped_packet_batches_count, 1); assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); @@ -2988,19 +3009,19 @@ mod tests { fn make_test_packets( transactions: Vec, vote_indexes: Vec, - ) -> (Packets, Vec) { + ) -> (PacketBatch, Vec) { let capacity = transactions.len(); - let mut packets = Packets::with_capacity(capacity); + let mut packet_batch = PacketBatch::with_capacity(capacity); let mut packet_indexes = Vec::with_capacity(capacity); - packets.packets.resize(capacity, Packet::default()); + packet_batch.packets.resize(capacity, Packet::default()); for (index, tx) in transactions.iter().enumerate() { - Packet::populate_packet(&mut packets.packets[index], None, tx).ok(); + Packet::populate_packet(&mut packet_batch.packets[index], None, tx).ok(); packet_indexes.push(index); } for index in vote_indexes.iter() { - packets.packets[*index].meta.is_simple_vote_tx = true; + packet_batch.packets[*index].meta.is_simple_vote_tx = true; } - (packets, packet_indexes) + (packet_batch, packet_indexes) } #[test] @@ -3022,12 +3043,12 @@ mod tests { // packets with no votes { let vote_indexes = vec![]; - let (packets, packet_indexes) = + let (packet_batch, packet_indexes) = make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, @@ -3037,7 +3058,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, @@ -3049,14 +3070,14 @@ mod tests { // packets with some votes { let vote_indexes = vec![0, 2]; - let (packets, packet_indexes) = make_test_packets( + let (packet_batch, packet_indexes) = make_test_packets( vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], vote_indexes, ); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, @@ -3066,7 +3087,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, @@ -3078,14 +3099,14 @@ mod tests { // packets with all votes { let vote_indexes = vec![0, 1, 2]; - let (packets, packet_indexes) = make_test_packets( + let (packet_batch, packet_indexes) = make_test_packets( vec![vote_tx.clone(), vote_tx.clone(), vote_tx], vote_indexes, ); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, @@ -3095,7 +3116,7 @@ mod tests { votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packets, + &packet_batch, &packet_indexes, &Arc::new(FeatureSet::default()), votes_only, diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index bae24e80d..259448808 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -22,7 +22,7 @@ use { solana_ledger::blockstore::Blockstore, solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{self, Packets}, + solana_perf::packet::{self, PacketBatch}, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -299,7 +299,7 @@ impl ClusterInfoVoteListener { pub fn new( exit: &Arc, cluster_info: Arc, - verified_packets_sender: CrossbeamSender>, + verified_packets_sender: CrossbeamSender>, poh_recorder: &Arc>, vote_tracker: Arc, bank_forks: Arc>, @@ -396,14 +396,14 @@ impl ClusterInfoVoteListener { #[allow(clippy::type_complexity)] fn verify_votes(votes: Vec) -> (Vec, Vec) { - let mut msgs = packet::to_packets_chunked(&votes, 1); + let mut packet_batches = packet::to_packet_batches(&votes, 1); // Votes should already be filtered by this point. let reject_non_vote = false; - sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); + sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote); - let (vote_txs, vote_metadata) = izip!(votes.into_iter(), msgs,) - .filter_map(|(vote_tx, packet)| { + let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches) + .filter_map(|(vote_tx, packet_batch)| { let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx) .and_then(|(vote_account_key, vote, _)| { if vote.slots().is_empty() { @@ -413,16 +413,16 @@ impl ClusterInfoVoteListener { } })?; - // to_packets_chunked() above split into 1 packet long chunks - assert_eq!(packet.packets.len(), 1); - if !packet.packets[0].meta.discard { + // to_packet_batches() above splits into 1 packet long batches + assert_eq!(packet_batch.packets.len(), 1); + if !packet_batch.packets[0].meta.discard { if let Some(signature) = vote_tx.signatures.first().cloned() { return Some(( vote_tx, VerifiedVoteMetadata { vote_account_key, vote, - packet, + packet_batch, signature, }, )); @@ -438,7 +438,7 @@ impl ClusterInfoVoteListener { exit: Arc, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, poh_recorder: Arc>, - verified_packets_sender: &CrossbeamSender>, + verified_packets_sender: &CrossbeamSender>, ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); let mut time_since_lock = Instant::now(); @@ -486,7 +486,7 @@ impl ClusterInfoVoteListener { fn check_for_leader_bank_and_send_votes( bank_vote_sender_state_option: &mut Option, current_working_bank: Arc, - verified_packets_sender: &CrossbeamSender>, + verified_packets_sender: &CrossbeamSender>, verified_vote_packets: &VerifiedVotePackets, ) -> Result<()> { // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` @@ -986,9 +986,9 @@ mod tests { use bincode::serialized_size; info!("max vote size {}", serialized_size(&vote_tx).unwrap()); - let msgs = packet::to_packets_chunked(&[vote_tx], 1); // panics if won't fit + let packet_batches = packet::to_packet_batches(&[vote_tx], 1); // panics if won't fit - assert_eq!(msgs.len(), 1); + assert_eq!(packet_batches.len(), 1); } fn run_vote_contains_authorized_voter(hash: Option) { @@ -1819,7 +1819,7 @@ mod tests { fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) { let num_packets: usize = packets .iter() - .map(|vote_metadata| vote_metadata.packet.packets.len()) + .map(|vote_metadata| vote_metadata.packet_batch.packets.len()) .sum(); assert_eq!(num_packets, ref_value); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 248d3bf65..9a52e2f6b 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -6,10 +6,10 @@ use { result::{Error, Result}, }, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, - solana_perf::{packet::PacketsRecycler, recycler::Recycler}, + solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, solana_sdk::clock::DEFAULT_TICKS_PER_SLOT, - solana_streamer::streamer::{self, PacketReceiver, PacketSender}, + solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, sync::{ @@ -34,7 +34,7 @@ impl FetchStage { exit: &Arc, poh_recorder: &Arc>, coalesce_ms: u64, - ) -> (Self, PacketReceiver, PacketReceiver) { + ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { let (sender, receiver) = channel(); let (vote_sender, vote_receiver) = channel(); ( @@ -58,8 +58,8 @@ impl FetchStage { tpu_forwards_sockets: Vec, tpu_vote_sockets: Vec, exit: &Arc, - sender: &PacketSender, - vote_sender: &PacketSender, + sender: &PacketBatchSender, + vote_sender: &PacketBatchSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { @@ -79,18 +79,18 @@ impl FetchStage { } fn handle_forwarded_packets( - recvr: &PacketReceiver, - sendr: &PacketSender, + recvr: &PacketBatchReceiver, + sendr: &PacketBatchSender, poh_recorder: &Arc>, ) -> Result<()> { - let msgs = recvr.recv()?; - let mut len = msgs.packets.len(); - let mut batch = vec![msgs]; - while let Ok(more) = recvr.try_recv() { - len += more.packets.len(); - batch.push(more); + let packet_batch = recvr.recv()?; + let mut num_packets = packet_batch.packets.len(); + let mut packet_batches = vec![packet_batch]; + while let Ok(packet_batch) = recvr.try_recv() { + num_packets += packet_batch.packets.len(); + packet_batches.push(packet_batch); // Read at most 1K transactions in a loop - if len > 1024 { + if num_packets > 1024 { break; } } @@ -100,15 +100,15 @@ impl FetchStage { .unwrap() .would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT)) { - inc_new_counter_debug!("fetch_stage-honor_forwards", len); - for packets in batch { + inc_new_counter_debug!("fetch_stage-honor_forwards", num_packets); + for packet_batch in packet_batches { #[allow(clippy::question_mark)] - if sendr.send(packets).is_err() { + if sendr.send(packet_batch).is_err() { return Err(Error::Send); } } } else { - inc_new_counter_info!("fetch_stage-discard_forwards", len); + inc_new_counter_info!("fetch_stage-discard_forwards", num_packets); } Ok(()) @@ -119,12 +119,12 @@ impl FetchStage { tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, - sender: &PacketSender, - vote_sender: &PacketSender, + sender: &PacketBatchSender, + vote_sender: &PacketBatchSender, poh_recorder: &Arc>, coalesce_ms: u64, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); + let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index fecd40b32..ee91be1cd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -27,7 +27,7 @@ use { shred::{Shred, ShredType}, }, solana_measure::measure::Measure, - solana_perf::packet::Packets, + solana_perf::packet::PacketBatch, solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -433,7 +433,7 @@ impl RetransmitStage { cluster_info: Arc, retransmit_sockets: Arc>, repair_socket: Arc, - verified_receiver: Receiver>, + verified_receiver: Receiver>, exit: Arc, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, @@ -610,10 +610,10 @@ mod tests { let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); // it should send this over the sockets. retransmit_sender.send(vec![shred]).unwrap(); - let mut packets = Packets::new(vec![]); - solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert!(!packets.packets[0].meta.repair); + let mut packet_batch = PacketBatch::new(vec![]); + solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap(); + assert_eq!(packet_batch.packets.len(), 1); + assert!(!packet_batch.packets[0].meta.repair); } #[test] diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 2086661e7..d668dda68 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -25,11 +25,11 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}, + solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler}, solana_sdk::{ clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, }, - solana_streamer::streamer::{PacketReceiver, PacketSender}, + solana_streamer::streamer::{PacketBatchReceiver, PacketBatchSender}, std::{ collections::HashSet, net::SocketAddr, @@ -229,12 +229,12 @@ impl ServeRepair { fn handle_repair( me: &Arc>, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, request: RepairProtocol, stats: &mut ServeRepairStats, - ) -> Option { + ) -> Option { let now = Instant::now(); let my_id = me.read().unwrap().my_id(); @@ -317,10 +317,10 @@ impl ServeRepair { /// Process messages from the network fn run_listen( obj: &Arc>, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, - requests_receiver: &PacketReceiver, - response_sender: &PacketSender, + requests_receiver: &PacketBatchReceiver, + response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, max_packets: &mut usize, ) -> Result<()> { @@ -392,12 +392,12 @@ impl ServeRepair { pub fn listen( me: Arc>, blockstore: Option>, - requests_receiver: PacketReceiver, - response_sender: PacketSender, + requests_receiver: PacketBatchReceiver, + response_sender: PacketBatchSender, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) .spawn(move || { @@ -432,14 +432,14 @@ impl ServeRepair { fn handle_packets( me: &Arc>, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, - packets: Packets, - response_sender: &PacketSender, + packet_batch: PacketBatch, + response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, ) { // iter over the packets - packets.packets.iter().for_each(|packet| { + packet_batch.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() @@ -609,7 +609,7 @@ impl ServeRepair { } fn run_window_request( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, from: &ContactInfo, from_addr: &SocketAddr, blockstore: Option<&Arc>, @@ -617,7 +617,7 @@ impl ServeRepair { slot: Slot, shred_index: u64, nonce: Nonce, - ) -> Option { + ) -> Option { if let Some(blockstore) = blockstore { // Try to find the requested index in one of the slots let packet = repair_response::repair_response_packet( @@ -630,7 +630,7 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(Packets::new_unpinned_with_recycler_data( + return Some(PacketBatch::new_unpinned_with_recycler_data( recycler, "run_window_request", vec![packet], @@ -651,13 +651,13 @@ impl ServeRepair { } fn run_highest_window_request( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, slot: Slot, highest_index: u64, nonce: Nonce, - ) -> Option { + ) -> Option { let blockstore = blockstore?; // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; @@ -670,7 +670,7 @@ impl ServeRepair { from_addr, nonce, )?; - return Some(Packets::new_unpinned_with_recycler_data( + return Some(PacketBatch::new_unpinned_with_recycler_data( recycler, "run_highest_window_request", vec![packet], @@ -680,14 +680,14 @@ impl ServeRepair { } fn run_orphan( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, mut slot: Slot, max_responses: usize, nonce: Nonce, - ) -> Option { - let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan"); + ) -> Option { + let mut res = PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { @@ -720,12 +720,12 @@ impl ServeRepair { } fn run_ancestor_hashes( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, slot: Slot, nonce: Nonce, - ) -> Option { + ) -> Option { let blockstore = blockstore?; let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) { let ancestor_iterator = @@ -746,7 +746,7 @@ impl ServeRepair { from_addr, nonce, )?; - Some(Packets::new_unpinned_with_recycler_data( + Some(PacketBatch::new_unpinned_with_recycler_data( recycler, "run_ancestor_hashes", vec![packet], @@ -778,7 +778,7 @@ mod tests { /// test run_window_request responds with the right shred, and do not overrun fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -848,7 +848,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun fn run_window_request(slot: Slot, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -1017,7 +1017,7 @@ mod tests { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -1091,7 +1091,7 @@ mod tests { #[test] fn run_orphan_corrupted_shred_size() { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -1152,7 +1152,7 @@ mod tests { #[test] fn test_run_ancestor_hashes() { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let slot = 0; diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index a0d02ba4a..a58056187 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -6,12 +6,12 @@ use { solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_perf::{ cuda_runtime::PinnedVec, - packet::{Packet, PacketsRecycler}, + packet::{Packet, PacketBatchRecycler}, recycler::Recycler, }, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, - solana_streamer::streamer::{self, PacketReceiver, PacketSender}, + solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock}, @@ -63,8 +63,8 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( - recvr: PacketReceiver, - sendr: PacketSender, + recvr: PacketBatchReceiver, + sendr: PacketBatchSender, bank_forks: Option>>, name: &'static str, modify: F, @@ -83,7 +83,7 @@ impl ShredFetchStage { let mut stats = ShredFetchStats::default(); let mut packet_hasher = PacketHasher::default(); - while let Some(mut p) = recvr.iter().next() { + while let Some(mut packet_batch) = recvr.iter().next() { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); packet_hasher.reset(); @@ -97,8 +97,8 @@ impl ShredFetchStage { slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } } - stats.shred_count += p.packets.len(); - p.packets.iter_mut().for_each(|packet| { + stats.shred_count += packet_batch.packets.len(); + packet_batch.packets.iter_mut().for_each(|packet| { Self::process_packet( packet, &mut shreds_received, @@ -124,7 +124,7 @@ impl ShredFetchStage { stats = ShredFetchStats::default(); last_stats = Instant::now(); } - if sendr.send(p).is_err() { + if sendr.send(packet_batch).is_err() { break; } } @@ -133,7 +133,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: PacketSender, + sender: PacketBatchSender, recycler: Recycler>, bank_forks: Option>>, name: &'static str, @@ -169,11 +169,11 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: &PacketSender, + sender: &PacketBatchSender, bank_forks: Option>>, exit: &Arc, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(100, 1024); + let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 8ffa30bb8..74dbf5bdf 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -5,11 +5,11 @@ //! pub use solana_perf::sigverify::{ - batch_size, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, + count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, }; use { crate::sigverify_stage::SigVerifier, - solana_perf::{cuda_runtime::PinnedVec, packet::Packets, recycler::Recycler, sigverify}, + solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, }; #[derive(Clone)] @@ -40,13 +40,13 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { - fn verify_batch(&self, mut batch: Vec) -> Vec { + fn verify_batches(&self, mut batches: Vec) -> Vec { sigverify::ed25519_verify( - &mut batch, + &mut batches, &self.recycler, &self.recycler_out, self.reject_non_vote, ); - batch + batches } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 08ebae0bb..85078f510 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -5,7 +5,7 @@ use { leader_schedule_cache::LeaderScheduleCache, shred::Shred, sigverify_shreds::verify_shreds_gpu, }, - solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache}, + solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache}, solana_runtime::bank_forks::BankForks, std::{ collections::{HashMap, HashSet}, @@ -32,7 +32,7 @@ impl ShredSigVerifier { recycler_cache: RecyclerCache::warmed(), } } - fn read_slots(batches: &[Packets]) -> HashSet { + fn read_slots(batches: &[PacketBatch]) -> HashSet { batches .iter() .flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet)) @@ -41,7 +41,7 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { - fn verify_batch(&self, mut batches: Vec) -> Vec { + fn verify_batches(&self, mut batches: Vec) -> Vec { let r_bank = self.bank_forks.read().unwrap().working_bank(); let slots: HashSet = Self::read_slots(&batches); let mut leader_slots: HashMap = slots @@ -88,13 +88,13 @@ pub mod tests { 0, 0xc0de, ); - let mut batch = [Packets::default(), Packets::default()]; + let mut batches = [PacketBatch::default(), PacketBatch::default()]; let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets.resize(1, Packet::default()); + batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[0].meta.size = shred.payload.len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -108,16 +108,16 @@ pub mod tests { 0xc0de, ); Shredder::sign_shred(&keypair, &mut shred); - batch[1].packets.resize(1, Packet::default()); - batch[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[1].packets[0].meta.size = shred.payload.len(); + batches[1].packets.resize(1, Packet::default()); + batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[1].packets[0].meta.size = shred.payload.len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); - assert_eq!(ShredSigVerifier::read_slots(&batch), expected); + assert_eq!(ShredSigVerifier::read_slots(&batches), expected); } #[test] - fn test_sigverify_shreds_verify_batch() { + fn test_sigverify_shreds_verify_batches() { let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let bank = Bank::new_for_tests( @@ -127,8 +127,8 @@ pub mod tests { let bf = Arc::new(RwLock::new(BankForks::new(bank))); let verifier = ShredSigVerifier::new(bf, cache); - let mut batch = vec![Packets::default()]; - batch[0].packets.resize(2, Packet::default()); + let mut batches = vec![PacketBatch::default()]; + batches[0].packets.resize(2, Packet::default()); let mut shred = Shred::new_from_data( 0, @@ -142,8 +142,8 @@ pub mod tests { 0xc0de, ); Shredder::sign_shred(&leader_keypair, &mut shred); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[0].meta.size = shred.payload.len(); let mut shred = Shred::new_from_data( 0, @@ -158,10 +158,10 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); Shredder::sign_shred(&wrong_keypair, &mut shred); - batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[1].meta.size = shred.payload.len(); + batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[1].meta.size = shred.payload.len(); - let rv = verifier.verify_batch(batch); + let rv = verifier.verify_batches(batches); assert!(!rv[0].packets[0].meta.discard); assert!(rv[0].packets[1].meta.discard); } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 9810c5dc2..9b63ba2b8 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -9,9 +9,9 @@ use { crate::sigverify, crossbeam_channel::{SendError, Sender as CrossbeamSender}, solana_measure::measure::Measure, - solana_perf::packet::Packets, + solana_perf::packet::PacketBatch, solana_sdk::timing, - solana_streamer::streamer::{self, PacketReceiver, StreamerError}, + solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ collections::HashMap, sync::mpsc::{Receiver, RecvTimeoutError}, @@ -26,7 +26,7 @@ const MAX_SIGVERIFY_BATCH: usize = 10_000; #[derive(Error, Debug)] pub enum SigVerifyServiceError { #[error("send packets batch error")] - Send(#[from] SendError>), + Send(#[from] SendError>), #[error("streamer error")] Streamer(#[from] StreamerError), @@ -39,7 +39,7 @@ pub struct SigVerifyStage { } pub trait SigVerifier { - fn verify_batch(&self, batch: Vec) -> Vec; + fn verify_batches(&self, batches: Vec) -> Vec; } #[derive(Default, Clone)] @@ -49,7 +49,7 @@ pub struct DisabledSigVerifier {} struct SigVerifierStats { recv_batches_us_hist: histogram::Histogram, // time to call recv_batch verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - batches_hist: histogram::Histogram, // number of Packets structures per verify call + batches_hist: histogram::Histogram, // number of packet batches per verify call packets_hist: histogram::Histogram, // number of packets per verify call total_batches: usize, total_packets: usize, @@ -122,24 +122,24 @@ impl SigVerifierStats { } impl SigVerifier for DisabledSigVerifier { - fn verify_batch(&self, mut batch: Vec) -> Vec { - sigverify::ed25519_verify_disabled(&mut batch); - batch + fn verify_batches(&self, mut batches: Vec) -> Vec { + sigverify::ed25519_verify_disabled(&mut batches); + batches } } impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver, - verified_sender: CrossbeamSender>, + packet_receiver: Receiver, + verified_sender: CrossbeamSender>, verifier: T, ) -> Self { let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier); Self { thread_hdl } } - pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { + pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { let mut received_ips = HashMap::new(); for (batch_index, batch) in batches.iter().enumerate() { for (packet_index, packets) in batch.packets.iter().enumerate() { @@ -169,12 +169,12 @@ impl SigVerifyStage { } fn verifier( - recvr: &PacketReceiver, - sendr: &CrossbeamSender>, + recvr: &PacketBatchReceiver, + sendr: &CrossbeamSender>, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { - let (mut batches, num_packets, recv_duration) = streamer::recv_batch(recvr)?; + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; let batches_len = batches.len(); debug!( @@ -187,7 +187,7 @@ impl SigVerifyStage { } let mut verify_batch_time = Measure::start("sigverify_batch_time"); - sendr.send(verifier.verify_batch(batches))?; + sendr.send(verifier.verify_batches(batches))?; verify_batch_time.stop(); debug!( @@ -216,8 +216,8 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: PacketReceiver, - verified_sender: CrossbeamSender>, + packet_receiver: PacketBatchReceiver, + verified_sender: CrossbeamSender>, verifier: &T, ) -> JoinHandle<()> { let verifier = verifier.clone(); @@ -252,8 +252,8 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: PacketReceiver, - verified_sender: CrossbeamSender>, + packet_receiver: PacketBatchReceiver, + verified_sender: CrossbeamSender>, verifier: T, ) -> JoinHandle<()> { Self::verifier_service(packet_receiver, verified_sender, &verifier) @@ -268,11 +268,12 @@ impl SigVerifyStage { mod tests { use {super::*, solana_perf::packet::Packet}; - fn count_non_discard(packets: &[Packets]) -> usize { - packets + fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { + packet_batches .iter() - .map(|pp| { - pp.packets + .map(|batch| { + batch + .packets .iter() .map(|p| if p.meta.discard { 0 } else { 1 }) .sum::() @@ -283,14 +284,14 @@ mod tests { #[test] fn test_packet_discard() { solana_logger::setup(); - let mut p = Packets::default(); - p.packets.resize(10, Packet::default()); - p.packets[3].meta.addr = [1u16; 8]; - let mut packets = vec![p]; + let mut batch = PacketBatch::default(); + batch.packets.resize(10, Packet::default()); + batch.packets[3].meta.addr = [1u16; 8]; + let mut batches = vec![batch]; let max = 3; - SigVerifyStage::discard_excess_packets(&mut packets, max); - assert_eq!(count_non_discard(&packets), max); - assert!(!packets[0].packets[0].meta.discard); - assert!(!packets[0].packets[3].meta.discard); + SigVerifyStage::discard_excess_packets(&mut batches, max); + assert_eq!(count_non_discard(&batches), max); + assert!(!batches[0].packets[0].meta.discard); + assert!(!batches[0].packets[3].meta.discard); } } diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index a50cf9033..b11b1c0e7 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,7 +1,7 @@ use { crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}, crossbeam_channel::Select, - solana_perf::packet::Packets, + solana_perf::packet::PacketBatch, solana_runtime::bank::Bank, solana_sdk::{ account::from_account, clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature, @@ -20,7 +20,7 @@ const MAX_VOTES_PER_VALIDATOR: usize = 1000; pub struct VerifiedVoteMetadata { pub vote_account_key: Pubkey, pub vote: Box, - pub packet: Packets, + pub packet_batch: PacketBatch, pub signature: Signature, } @@ -70,7 +70,7 @@ impl<'a> ValidatorGossipVotesIterator<'a> { /// /// Iterator is done after iterating through all vote accounts impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { - type Item = Vec; + type Item = Vec; fn next(&mut self) -> Option { // TODO: Maybe prioritize by stake weight @@ -116,7 +116,7 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { None } }) - .collect::>() + .collect::>() }) }) }); @@ -130,7 +130,7 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { } } -pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (Packets, Signature)>; +pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (PacketBatch, Signature)>; #[derive(Default)] pub struct VerifiedVotePackets(HashMap); @@ -150,7 +150,7 @@ impl VerifiedVotePackets { let VerifiedVoteMetadata { vote_account_key, vote, - packet, + packet_batch, signature, } = verfied_vote_metadata; if vote.is_empty() { @@ -161,7 +161,7 @@ impl VerifiedVotePackets { let hash = vote.hash(); let validator_votes = self.0.entry(vote_account_key).or_default(); - validator_votes.insert((slot, hash), (packet, signature)); + validator_votes.insert((slot, hash), (packet_batch, signature)); if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { let smallest_key = validator_votes.keys().next().cloned().unwrap(); @@ -200,7 +200,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote.clone()), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new(&[1u8; 64]), }]) .unwrap(); @@ -220,7 +220,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new(&[1u8; 64]), }]) .unwrap(); @@ -242,7 +242,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new(&[1u8; 64]), }]) .unwrap(); @@ -265,7 +265,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new(&[2u8; 64]), }]) .unwrap(); @@ -304,7 +304,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new(&[1u8; 64]), }]) .unwrap(); @@ -341,7 +341,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new_unique(), }]) .unwrap(); @@ -395,7 +395,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote: Box::new(vote), - packet: Packets::new(vec![Packet::default(); num_packets]), + packet_batch: PacketBatch::new(vec![Packet::default(); num_packets]), signature: Signature::new_unique(), }]) .unwrap(); @@ -428,12 +428,12 @@ mod tests { // Get and verify batches let num_expected_batches = 2; for _ in 0..num_expected_batches { - let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); + let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len()); let expected_len = validator_batch[0].packets.len(); assert!(validator_batch .iter() - .all(|p| p.packets.len() == expected_len)); + .all(|batch| batch.packets.len() == expected_len)); } // Should be empty now @@ -462,7 +462,7 @@ mod tests { s.send(vec![VerifiedVoteMetadata { vote_account_key, vote, - packet: Packets::default(), + packet_batch: PacketBatch::default(), signature: Signature::new_unique(), }]) .unwrap(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 822dc4047..978d39c3e 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -22,7 +22,7 @@ use { }, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, - solana_perf::packet::{Packet, Packets}, + solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, @@ -353,7 +353,7 @@ fn recv_window( blockstore: &Blockstore, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - verified_receiver: &CrossbeamReceiver>, + verified_receiver: &CrossbeamReceiver>, retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, @@ -458,7 +458,7 @@ impl WindowService { #[allow(clippy::too_many_arguments)] pub(crate) fn new( blockstore: Arc, - verified_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, retransmit_sender: Sender>, repair_socket: Arc, exit: Arc, @@ -629,7 +629,7 @@ impl WindowService { exit: Arc, blockstore: Arc, insert_sender: CrossbeamSender<(Vec, Vec>)>, - verified_receiver: CrossbeamReceiver>, + verified_receiver: CrossbeamReceiver>, shred_filter: F, bank_forks: Arc>, retransmit_sender: Sender>, diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 59a607838..a0d5337b7 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -15,7 +15,7 @@ use { solana_metrics::*, solana_perf::{ cuda_runtime::PinnedVec, - packet::{Packet, Packets, PacketsRecycler, PACKETS_PER_BATCH}, + packet::{Packet, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, perf_libs, recycler::Recycler, sigverify, @@ -308,7 +308,7 @@ impl<'a> EntrySigVerificationState { pub struct VerifyRecyclers { hash_recycler: Recycler>, tick_count_recycler: Recycler>, - packet_recycler: PacketsRecycler, + packet_recycler: PacketBatchRecycler, out_recycler: Recycler>, tx_offset_recycler: Recycler, } @@ -499,12 +499,12 @@ pub fn start_verify_transactions( }) .flatten() .collect::>(); - let mut packets_vec = entry_txs + let mut packet_batches = entry_txs .par_iter() .chunks(PACKETS_PER_BATCH) .map(|slice| { let vec_size = slice.len(); - let mut packets = Packets::new_with_recycler( + let mut packet_batch = PacketBatch::new_with_recycler( verify_recyclers.packet_recycler.clone(), vec_size, "entry-sig-verify", @@ -515,13 +515,13 @@ pub fn start_verify_transactions( // uninitialized anyway, so the initilization would simply write junk into // the vector anyway. unsafe { - packets.packets.set_len(vec_size); + packet_batch.packets.set_len(vec_size); } let entry_tx_iter = slice .into_par_iter() .map(|tx| tx.to_versioned_transaction()); - let res = packets + let res = packet_batch .packets .par_iter_mut() .zip(entry_tx_iter) @@ -530,7 +530,7 @@ pub fn start_verify_transactions( Packet::populate_packet(pair.0, None, &pair.1).is_ok() }); if res { - Ok(packets) + Ok(packet_batch) } else { Err(TransactionError::SanitizeFailure) } @@ -542,14 +542,14 @@ pub fn start_verify_transactions( let gpu_verify_thread = thread::spawn(move || { let mut verify_time = Measure::start("sigverify"); sigverify::ed25519_verify( - &mut packets_vec, + &mut packet_batches, &tx_offset_recycler, &out_recycler, false, ); - let verified = packets_vec + let verified = packet_batches .iter() - .all(|packets| packets.packets.iter().all(|p| !p.meta.discard)); + .all(|batch| batch.packets.iter().all(|p| !p.meta.discard)); verify_time.stop(); (verified, verify_time.as_us()) }); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 071d31779..9c6fc9dd7 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -47,8 +47,8 @@ use { solana_perf::{ data_budget::DataBudget, packet::{ - limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, - PACKET_DATA_SIZE, + limited_deserialize, to_packet_batch_with_destination, Packet, PacketBatch, + PacketBatchRecycler, PACKET_DATA_SIZE, }, }, solana_rayon_threadlimit::get_thread_count, @@ -67,7 +67,7 @@ use { packet, sendmmsg::{multi_target_send, SendPktsError}, socket::SocketAddrSpace, - streamer::{PacketReceiver, PacketSender}, + streamer::{PacketBatchReceiver, PacketBatchSender}, }, solana_vote_program::{ vote_state::MAX_LOCKOUT_HISTORY, vote_transaction::parse_vote_transaction, @@ -1588,9 +1588,9 @@ impl ClusterInfo { &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, stakes: &HashMap, - sender: &PacketSender, + sender: &PacketBatchSender, generate_pull_requests: bool, ) -> Result<(), GossipError> { let reqs = self.generate_new_gossip_requests( @@ -1600,11 +1600,11 @@ impl ClusterInfo { generate_pull_requests, ); if !reqs.is_empty() { - let packets = to_packets_with_destination(recycler.clone(), &reqs); + let packet_batch = to_packet_batch_with_destination(recycler.clone(), &reqs); self.stats .packets_sent_gossip_requests_count - .add_relaxed(packets.packets.len() as u64); - sender.send(packets)?; + .add_relaxed(packet_batch.packets.len() as u64); + sender.send(packet_batch)?; } Ok(()) } @@ -1699,7 +1699,7 @@ impl ClusterInfo { pub fn gossip( self: Arc, bank_forks: Option>>, - sender: PacketSender, + sender: PacketBatchSender, gossip_validators: Option>, exit: Arc, ) -> JoinHandle<()> { @@ -1715,7 +1715,7 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), CrdsData::NodeInstance( @@ -1840,9 +1840,9 @@ impl ClusterInfo { // from address, crds filter, caller contact info requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>, thread_pool: &ThreadPool, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, stakes: &HashMap, - response_sender: &PacketSender, + response_sender: &PacketBatchSender, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time); if requests.is_empty() { @@ -1904,7 +1904,7 @@ impl ClusterInfo { &'a self, now: Instant, mut rng: &'a mut R, - packets: &'a mut Packets, + packet_batch: &'a mut PacketBatch, ) -> impl FnMut(&PullData) -> bool + 'a where R: Rng + CryptoRng, @@ -1917,7 +1917,7 @@ impl ClusterInfo { if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); match Packet::from_data(Some(&node.1), ping) { - Ok(packet) => packets.packets.push(packet), + Ok(packet) => packet_batch.packets.push(packet), Err(err) => error!("failed to write ping packet: {:?}", err), }; } @@ -1944,10 +1944,10 @@ impl ClusterInfo { fn handle_pull_requests( &self, thread_pool: &ThreadPool, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, requests: Vec, stakes: &HashMap, - ) -> Packets { + ) -> PacketBatch { const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; let mut time = Measure::start("handle_pull_requests"); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); @@ -1958,12 +1958,12 @@ impl ClusterInfo { } let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packets = - Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let mut packet_batch = + PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = - self.check_pull_request(Instant::now(), &mut rng, &mut packets); + self.check_pull_request(Instant::now(), &mut rng, &mut packet_batch); requests .into_iter() .filter(check_pull_request) @@ -2009,7 +2009,7 @@ impl ClusterInfo { }) .unzip(); if responses.is_empty() { - return packets; + return packet_batch; } let mut rng = rand::thread_rng(); let shuffle = WeightedShuffle::new(&mut rng, &scores).unwrap(); @@ -2023,7 +2023,7 @@ impl ClusterInfo { Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { total_bytes += packet.meta.size; - packets.packets.push(packet); + packet_batch.packets.push(packet); sent += 1; } else { inc_new_counter_info!("gossip_pull_request-no_budget", 1); @@ -2043,7 +2043,7 @@ impl ClusterInfo { responses.len(), total_bytes ); - packets + packet_batch } fn handle_batch_pull_responses( @@ -2164,8 +2164,8 @@ impl ClusterInfo { fn handle_batch_ping_messages( &self, pings: I, - recycler: &PacketsRecycler, - response_sender: &PacketSender, + recycler: &PacketBatchRecycler, + response_sender: &PacketBatchSender, ) where I: IntoIterator, { @@ -2175,7 +2175,11 @@ impl ClusterInfo { } } - fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option + fn handle_ping_messages( + &self, + pings: I, + recycler: &PacketBatchRecycler, + ) -> Option where I: IntoIterator, { @@ -2197,9 +2201,12 @@ impl ClusterInfo { if packets.is_empty() { None } else { - let packets = - Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets); - Some(packets) + let packet_batch = PacketBatch::new_unpinned_with_recycler_data( + recycler, + "handle_ping_messages", + packets, + ); + Some(packet_batch) } } @@ -2222,9 +2229,9 @@ impl ClusterInfo { &self, messages: Vec<(Pubkey, Vec)>, thread_pool: &ThreadPool, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, stakes: &HashMap, - response_sender: &PacketSender, + response_sender: &PacketBatchSender, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_push_messages_time); if messages.is_empty() { @@ -2301,17 +2308,17 @@ impl ClusterInfo { if prune_messages.is_empty() { return; } - let mut packets = to_packets_with_destination(recycler.clone(), &prune_messages); - let num_prune_packets = packets.packets.len(); + let mut packet_batch = to_packet_batch_with_destination(recycler.clone(), &prune_messages); + let num_prune_packets = packet_batch.packets.len(); self.stats .push_response_count - .add_relaxed(packets.packets.len() as u64); + .add_relaxed(packet_batch.packets.len() as u64); let new_push_requests = self.new_push_requests(stakes); inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { match Packet::from_data(Some(&address), &request) { - Ok(packet) => packets.packets.push(packet), + Ok(packet) => packet_batch.packets.push(packet), Err(err) => error!("failed to write push-request packet: {:?}", err), } } else { @@ -2323,8 +2330,8 @@ impl ClusterInfo { .add_relaxed(num_prune_packets as u64); self.stats .packets_sent_push_messages_count - .add_relaxed((packets.packets.len() - num_prune_packets) as u64); - let _ = response_sender.send(packets); + .add_relaxed((packet_batch.packets.len() - num_prune_packets) as u64); + let _ = response_sender.send(packet_batch); } fn require_stake_for_gossip(&self, stakes: &HashMap) -> bool { @@ -2342,8 +2349,8 @@ impl ClusterInfo { &self, packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>, thread_pool: &ThreadPool, - recycler: &PacketsRecycler, - response_sender: &PacketSender, + recycler: &PacketBatchRecycler, + response_sender: &PacketBatchSender, stakes: &HashMap, _feature_set: Option<&FeatureSet>, epoch_duration: Duration, @@ -2460,15 +2467,15 @@ impl ClusterInfo { // handling of requests/messages. fn run_socket_consume( &self, - receiver: &PacketReceiver, + receiver: &PacketBatchReceiver, sender: &Sender>, thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); let mut packets = VecDeque::from(packets); - for payload in receiver.try_iter() { - packets.extend(payload.packets.iter().cloned()); + for packet_batch in receiver.try_iter() { + packets.extend(packet_batch.packets.iter().cloned()); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -2500,10 +2507,10 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( &self, - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, bank_forks: Option<&RwLock>, receiver: &Receiver>, - response_sender: &PacketSender, + response_sender: &PacketBatchSender, thread_pool: &ThreadPool, last_print: &mut Instant, should_check_duplicate_instance: bool, @@ -2551,7 +2558,7 @@ impl ClusterInfo { pub(crate) fn start_socket_consume_thread( self: Arc, - receiver: PacketReceiver, + receiver: PacketBatchReceiver, sender: Sender>, exit: Arc, ) -> JoinHandle<()> { @@ -2581,12 +2588,12 @@ impl ClusterInfo { self: Arc, bank_forks: Option>>, requests_receiver: Receiver>, - response_sender: PacketSender, + response_sender: PacketBatchSender, should_check_duplicate_instance: bool, exit: Arc, ) -> JoinHandle<()> { let mut last_print = Instant::now(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let thread_pool = ThreadPoolBuilder::new() .num_threads(get_thread_count().min(8)) .thread_name(|i| format!("sol-gossip-work-{}", i)) @@ -2955,9 +2962,9 @@ pub fn push_messages_to_peer( let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages) .map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload))) .collect(); - let packets = to_packets_with_destination(PacketsRecycler::default(), &reqs); + let packet_batch = to_packet_batch_with_destination(PacketBatchRecycler::default(), &reqs); let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); - packet::send_to(&packets, &sock, socket_addr_space)?; + packet::send_to(&packet_batch, &sock, socket_addr_space)?; Ok(()) } @@ -3206,7 +3213,7 @@ mod tests { .iter() .map(|ping| Pong::new(ping, &this_node).unwrap()) .collect(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); let packets = cluster_info .handle_ping_messages( remote_nodes diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 4a3de44ff..16bfd7200 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -7,7 +7,7 @@ use { sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair}, }, solana_perf::{ - packet::{Packet, Packets}, + packet::{Packet, PacketBatch}, recycler_cache::RecyclerCache, }, solana_sdk::signature::Keypair, @@ -21,13 +21,13 @@ const NUM_BATCHES: usize = 1; fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { let recycler_cache = RecyclerCache::default(); - let mut packets = Packets::default(); - packets.packets.set_pinnable(); + let mut packet_batch = PacketBatch::default(); + packet_batch.packets.set_pinnable(); let slot = 0xdead_c0de; // need to pin explicitly since the resize will not cause re-allocation - packets.packets.reserve_and_pin(NUM_PACKETS); - packets.packets.resize(NUM_PACKETS, Packet::default()); - for p in packets.packets.iter_mut() { + packet_batch.packets.reserve_and_pin(NUM_PACKETS); + packet_batch.packets.resize(NUM_PACKETS, Packet::default()); + for p in packet_batch.packets.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -41,25 +41,25 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { ); shred.copy_to_packet(p); } - let mut batch = vec![packets; NUM_BATCHES]; + let mut batches = vec![packet_batch; NUM_BATCHES]; let keypair = Keypair::new(); let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); let pinned_keypair = Some(Arc::new(pinned_keypair)); //warmup for _ in 0..100 { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); } bencher.iter(|| { - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); }) } #[bench] fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { - let mut packets = Packets::default(); + let mut packet_batch = PacketBatch::default(); let slot = 0xdead_c0de; - packets.packets.resize(NUM_PACKETS, Packet::default()); - for p in packets.packets.iter_mut() { + packet_batch.packets.resize(NUM_PACKETS, Packet::default()); + for p in packet_batch.packets.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -73,9 +73,9 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { ); shred.copy_to_packet(p); } - let mut batch = vec![packets; NUM_BATCHES]; + let mut batches = vec![packet_batch; NUM_BATCHES]; let keypair = Keypair::new(); bencher.iter(|| { - sign_shreds_cpu(&keypair, &mut batch); + sign_shreds_cpu(&keypair, &mut batches); }) } diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 0df3d7b6b..a8e8868cf 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -12,10 +12,10 @@ use { solana_metrics::inc_new_counter_debug, solana_perf::{ cuda_runtime::PinnedVec, - packet::{limited_deserialize, Packet, Packets}, + packet::{limited_deserialize, Packet, PacketBatch}, perf_libs, recycler_cache::RecyclerCache, - sigverify::{self, batch_size, TxOffset}, + sigverify::{self, count_packets_in_batches, TxOffset}, }, solana_rayon_threadlimit::get_thread_count, solana_sdk::{ @@ -76,22 +76,26 @@ pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) Some(1) } -fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { +fn verify_shreds_cpu( + batches: &[PacketBatch], + slot_leaders: &HashMap, +) -> Vec> { use rayon::prelude::*; - let count = batch_size(batches); - debug!("CPU SHRED ECDSA for {}", count); + let packet_count = count_packets_in_batches(batches); + debug!("CPU SHRED ECDSA for {}", packet_count); let rv = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|p| { - p.packets + .map(|batch| { + batch + .packets .par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect() }) .collect() }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", count); + inc_new_counter_debug!("ed25519_shred_verify_cpu", packet_count); rv } @@ -99,7 +103,7 @@ fn slot_key_data_for_gpu< T: Sync + Sized + Default + std::fmt::Debug + Eq + std::hash::Hash + Clone + Copy + AsRef<[u8]>, >( offset_start: usize, - batches: &[Packets], + batches: &[PacketBatch], slot_keys: &HashMap, recycler_cache: &RecyclerCache, ) -> (PinnedVec, TxOffset, usize) { @@ -108,8 +112,9 @@ fn slot_key_data_for_gpu< let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|p| { - p.packets + .map(|batch| { + batch + .packets .iter() .map(|packet| { let slot_start = size_of::() + size_of::(); @@ -173,7 +178,7 @@ fn vec_size_in_packets(keyvec: &PinnedVec) -> usize { } fn resize_vec(keyvec: &mut PinnedVec) -> usize { - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems let num_in_packets = (keyvec.len() + (size_of::() - 1)) / size_of::(); @@ -183,7 +188,7 @@ fn resize_vec(keyvec: &mut PinnedVec) -> usize { fn shred_gpu_offsets( mut pubkeys_end: usize, - batches: &[Packets], + batches: &[PacketBatch], recycler_cache: &RecyclerCache, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); @@ -221,7 +226,7 @@ fn shred_gpu_offsets( } pub fn verify_shreds_gpu( - batches: &[Packets], + batches: &[PacketBatch], slot_leaders: &HashMap, recycler_cache: &RecyclerCache, ) -> Vec> { @@ -233,10 +238,10 @@ pub fn verify_shreds_gpu( let mut elems = Vec::new(); let mut rvs = Vec::new(); - let count = batch_size(batches); + let packet_count = count_packets_in_batches(batches); let (pubkeys, pubkey_offsets, mut num_packets) = slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache); - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //HACK: Pubkeys vector is passed along as a `PacketBatch` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data let pubkeys_len = num_packets * size_of::(); trace!("num_packets: {}", num_packets); @@ -251,15 +256,15 @@ pub fn verify_shreds_gpu( num: num_packets as u32, }); - for p in batches { + for batch in batches { elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, + elems: batch.packets.as_ptr(), + num: batch.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(p.packets.len(), 0); + v.resize(batch.packets.len(), 0); rvs.push(v); - num_packets += p.packets.len(); + num_packets += batch.packets.len(); } out.resize(signature_offsets.len(), 0); @@ -290,7 +295,7 @@ pub fn verify_shreds_gpu( sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); - inc_new_counter_debug!("ed25519_shred_verify_gpu", count); + inc_new_counter_debug!("ed25519_shred_verify_gpu", packet_count); rvs } @@ -316,18 +321,18 @@ fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { packet.data[0..sig_end].copy_from_slice(signature.as_ref()); } -pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { +pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) { use rayon::prelude::*; - let count = batch_size(batches); - debug!("CPU SHRED ECDSA for {}", count); + let packet_count = count_packets_in_batches(batches); + debug!("CPU SHRED ECDSA for {}", packet_count); SIGVERIFY_THREAD_POOL.install(|| { - batches.par_iter_mut().for_each(|p| { - p.packets[..] + batches.par_iter_mut().for_each(|batch| { + batch.packets[..] .par_iter_mut() .for_each(|p| sign_shred_cpu(keypair, p)); }); }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", count); + inc_new_counter_debug!("ed25519_shred_verify_cpu", packet_count); } pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { @@ -350,14 +355,14 @@ pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) pub fn sign_shreds_gpu( keypair: &Keypair, pinned_keypair: &Option>>, - batches: &mut [Packets], + batches: &mut [PacketBatch], recycler_cache: &RecyclerCache, ) { let sig_size = size_of::(); let pubkey_size = size_of::(); let api = perf_libs::api(); - let count = batch_size(batches); - if api.is_none() || count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { + let packet_count = count_packets_in_batches(batches); + if api.is_none() || packet_count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { return sign_shreds_cpu(keypair, batches); } let api = api.unwrap(); @@ -370,10 +375,10 @@ pub fn sign_shreds_gpu( //should be zero let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); - pubkey_offsets.resize(count, 0); + pubkey_offsets.resize(packet_count, 0); let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); - secret_offsets.resize(count, pubkey_size as u32); + secret_offsets.resize(packet_count, pubkey_size as u32); trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = @@ -388,14 +393,14 @@ pub fn sign_shreds_gpu( num: num_keypair_packets as u32, }); - for p in batches.iter() { + for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, + elems: batch.packets.as_ptr(), + num: batch.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - num_packets += p.packets.len(); + v.resize(batch.packets.len(), 0); + num_packets += batch.packets.len(); } trace!("Starting verify num packets: {}", num_packets); @@ -447,7 +452,7 @@ pub fn sign_shreds_gpu( }); }); }); - inc_new_counter_debug!("ed25519_shred_sign_gpu", count); + inc_new_counter_debug!("ed25519_shred_sign_gpu", packet_count); } #[cfg(test)] @@ -506,7 +511,7 @@ pub mod tests { fn run_test_sigverify_shreds_cpu(slot: Slot) { solana_logger::setup(); - let mut batch = [Packets::default()]; + let mut batches = [PacketBatch::default()]; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -520,15 +525,15 @@ pub mod tests { ); let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets.resize(1, Packet::default()); + batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[0].meta.size = shred.payload.len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batch, &leader_slots); + let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -536,19 +541,19 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_cpu(&batch, &leader_slots); + let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = HashMap::new(); - let rv = verify_shreds_cpu(&batch, &leader_slots); + let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() .cloned() .collect(); - batch[0].packets[0].meta.size = 0; - let rv = verify_shreds_cpu(&batch, &leader_slots); + batches[0].packets[0].meta.size = 0; + let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); } @@ -561,7 +566,7 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut batch = [Packets::default()]; + let mut batches = [PacketBatch::default()]; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -575,9 +580,9 @@ pub mod tests { ); let keypair = Keypair::new(); Shredder::sign_shred(&keypair, &mut shred); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets.resize(1, Packet::default()); + batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[0].meta.size = shred.payload.len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -586,7 +591,7 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![1]]); let wrong_keypair = Keypair::new(); @@ -597,14 +602,14 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect(); - let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); - batch[0].packets[0].meta.size = 0; + batches[0].packets[0].meta.size = 0; let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), @@ -612,7 +617,7 @@ pub mod tests { .iter() .cloned() .collect(); - let rv = verify_shreds_gpu(&batch, &leader_slots, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); } @@ -625,11 +630,11 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut packets = Packets::default(); + let mut packet_batch = PacketBatch::default(); let num_packets = 32; let num_batches = 100; - packets.packets.resize(num_packets, Packet::default()); - for (i, p) in packets.packets.iter_mut().enumerate() { + packet_batch.packets.resize(num_packets, Packet::default()); + for (i, p) in packet_batch.packets.iter_mut().enumerate() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -643,7 +648,7 @@ pub mod tests { ); shred.copy_to_packet(p); } - let mut batch = vec![packets; num_batches]; + let mut batches = vec![packet_batch; num_batches]; let keypair = Keypair::new(); let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); let pinned_keypair = Some(Arc::new(pinned_keypair)); @@ -655,14 +660,14 @@ pub mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![0; num_packets]; num_batches]); //signed - sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); - let rv = verify_shreds_cpu(&batch, &pubkeys); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache); + let rv = verify_shreds_cpu(&batches, &pubkeys); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); - let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); + let rv = verify_shreds_gpu(&batches, &pubkeys, &recycler_cache); assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); } @@ -674,7 +679,7 @@ pub mod tests { fn run_test_sigverify_shreds_sign_cpu(slot: Slot) { solana_logger::setup(); - let mut batch = [Packets::default()]; + let mut batches = [PacketBatch::default()]; let keypair = Keypair::new(); let shred = Shred::new_from_data( slot, @@ -687,9 +692,9 @@ pub mod tests { 0, 0xc0de, ); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets.resize(1, Packet::default()); + batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batches[0].packets[0].meta.size = shred.payload.len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), @@ -698,11 +703,11 @@ pub mod tests { .cloned() .collect(); //unsigned - let rv = verify_shreds_cpu(&batch, &pubkeys); + let rv = verify_shreds_cpu(&batches, &pubkeys); assert_eq!(rv, vec![vec![0]]); //signed - sign_shreds_cpu(&keypair, &mut batch); - let rv = verify_shreds_cpu(&batch, &pubkeys); + sign_shreds_cpu(&keypair, &mut batches); + let rv = verify_shreds_cpu(&batches, &pubkeys); assert_eq!(rv, vec![vec![1]]); } diff --git a/perf/benches/recycler.rs b/perf/benches/recycler.rs index 63410ffc8..0533e4a11 100644 --- a/perf/benches/recycler.rs +++ b/perf/benches/recycler.rs @@ -3,7 +3,7 @@ extern crate test; use { - solana_perf::{packet::PacketsRecycler, recycler::Recycler}, + solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, test::Bencher, }; @@ -11,7 +11,7 @@ use { fn bench_recycler(bencher: &mut Bencher) { solana_logger::setup(); - let recycler: PacketsRecycler = Recycler::default(); + let recycler: PacketBatchRecycler = Recycler::default(); for _ in 0..1000 { let _packet = recycler.allocate(""); diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index a3211cade..7c60f362b 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -3,7 +3,7 @@ extern crate test; use { - solana_perf::{packet::to_packets_chunked, recycler::Recycler, sigverify, test_tx::test_tx}, + solana_perf::{packet::to_packet_batches, recycler::Recycler, sigverify, test_tx::test_tx}, test::Bencher, }; @@ -12,7 +12,7 @@ fn bench_sigverify(bencher: &mut Bencher) { let tx = test_tx(); // generate packet vector - let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); + let mut batches = to_packet_batches(&std::iter::repeat(tx).take(128).collect::>(), 128); let recycler = Recycler::default(); let recycler_out = Recycler::default(); @@ -28,7 +28,7 @@ fn bench_get_offsets(bencher: &mut Bencher) { // generate packet vector let mut batches = - to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::>(), 1024); + to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 1024); let recycler = Recycler::default(); // verify packets diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 59f9d8f7d..d8c163a7a 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -13,13 +13,13 @@ pub const PACKETS_PER_BATCH: usize = 128; pub const NUM_RCVMMSGS: usize = 128; #[derive(Debug, Default, Clone)] -pub struct Packets { +pub struct PacketBatch { pub packets: PinnedVec, } -pub type PacketsRecycler = Recycler>; +pub type PacketBatchRecycler = Recycler>; -impl Packets { +impl PacketBatch { pub fn new(packets: Vec) -> Self { let packets = PinnedVec::from_vec(packets); Self { packets } @@ -27,48 +27,52 @@ impl Packets { pub fn with_capacity(capacity: usize) -> Self { let packets = PinnedVec::with_capacity(capacity); - Packets { packets } + PacketBatch { packets } } pub fn new_unpinned_with_recycler( - recycler: PacketsRecycler, + recycler: PacketBatchRecycler, size: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); packets.reserve(size); - Packets { packets } + PacketBatch { packets } } - pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { + pub fn new_with_recycler( + recycler: PacketBatchRecycler, + size: usize, + name: &'static str, + ) -> Self { let mut packets = recycler.allocate(name); packets.reserve_and_pin(size); - Packets { packets } + PacketBatch { packets } } pub fn new_with_recycler_data( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, name: &'static str, mut packets: Vec, ) -> Self { - let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); - vec.packets.append(&mut packets); - vec + let mut batch = Self::new_with_recycler(recycler.clone(), packets.len(), name); + batch.packets.append(&mut packets); + batch } pub fn new_unpinned_with_recycler_data( - recycler: &PacketsRecycler, + recycler: &PacketBatchRecycler, name: &'static str, mut packets: Vec, ) -> Self { - let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name); - vec.packets.append(&mut packets); - vec + let mut batch = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name); + batch.packets.append(&mut packets); + batch } pub fn set_addr(&mut self, addr: &SocketAddr) { - for m in self.packets.iter_mut() { - m.meta.set_addr(addr); + for p in self.packets.iter_mut() { + p.meta.set_addr(addr); } } @@ -77,32 +81,32 @@ impl Packets { } } -pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { +pub fn to_packet_batches(xs: &[T], chunks: usize) -> Vec { let mut out = vec![]; for x in xs.chunks(chunks) { - let mut p = Packets::with_capacity(x.len()); - p.packets.resize(x.len(), Packet::default()); - for (i, o) in x.iter().zip(p.packets.iter_mut()) { - Packet::populate_packet(o, None, i).expect("serialize request"); + let mut batch = PacketBatch::with_capacity(x.len()); + batch.packets.resize(x.len(), Packet::default()); + for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { + Packet::populate_packet(packet, None, i).expect("serialize request"); } - out.push(p); + out.push(batch); } out } #[cfg(test)] -pub fn to_packets(xs: &[T]) -> Vec { - to_packets_chunked(xs, NUM_PACKETS) +pub fn to_packet_batches_for_tests(xs: &[T]) -> Vec { + to_packet_batches(xs, NUM_PACKETS) } -pub fn to_packets_with_destination( - recycler: PacketsRecycler, +pub fn to_packet_batch_with_destination( + recycler: PacketBatchRecycler, dests_and_data: &[(SocketAddr, T)], -) -> Packets { - let mut out = Packets::new_unpinned_with_recycler( +) -> PacketBatch { + let mut out = PacketBatch::new_unpinned_with_recycler( recycler, dests_and_data.len(), - "to_packets_with_destination", + "to_packet_batch_with_destination", ); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { @@ -143,21 +147,21 @@ mod tests { }; #[test] - fn test_to_packets() { + fn test_to_packet_batches() { let keypair = Keypair::new(); let hash = Hash::new(&[1; 32]); let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash); - let rv = to_packets(&[tx.clone(); 1]); + let rv = to_packet_batches_for_tests(&[tx.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].packets.len(), 1); #[allow(clippy::useless_vec)] - let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); + let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].packets.len(), NUM_PACKETS); #[allow(clippy::useless_vec)] - let rv = to_packets(&vec![tx; NUM_PACKETS + 1]); + let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); assert_eq!(rv[0].packets.len(), NUM_PACKETS); assert_eq!(rv[1].packets.len(), 1); @@ -165,9 +169,10 @@ mod tests { #[test] fn test_to_packets_pinning() { - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); for i in 0..2 { - let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); + let _first_packets = + PacketBatch::new_with_recycler(recycler.clone(), i + 1, "first one"); } } } diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 154d88328..a8169ab55 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -182,7 +182,7 @@ impl RecyclerX { #[cfg(test)] mod tests { - use {super::*, crate::packet::PacketsRecycler, std::iter::repeat_with}; + use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with}; impl Reset for u64 { fn reset(&mut self) { @@ -209,7 +209,7 @@ mod tests { #[test] fn test_recycler_shrink() { let mut rng = rand::thread_rng(); - let recycler = PacketsRecycler::default(); + let recycler = PacketBatchRecycler::default(); // Allocate a burst of packets. const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2; { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 6102c69af..e7e47c59b 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -9,7 +9,7 @@ use solana_sdk::transaction::Transaction; use { crate::{ cuda_runtime::PinnedVec, - packet::{Packet, Packets}, + packet::{Packet, PacketBatch}, perf_libs, recycler::Recycler, }, @@ -158,8 +158,8 @@ fn verify_packet(packet: &mut Packet, reject_non_vote: bool) { } } -pub fn batch_size(batches: &[Packets]) -> usize { - batches.iter().map(|p| p.packets.len()).sum() +pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { + batches.iter().map(|batch| batch.packets.len()).sum() } // internal function to be unit-tested; should be used only by get_packet_offsets @@ -366,7 +366,7 @@ fn check_for_simple_vote_transaction( } pub fn generate_offsets( - batches: &mut [Packets], + batches: &mut [PacketBatch], recycler: &Recycler, reject_non_vote: bool, ) -> TxOffsets { @@ -381,9 +381,9 @@ pub fn generate_offsets( msg_sizes.set_pinnable(); let mut current_offset: usize = 0; let mut v_sig_lens = Vec::new(); - batches.iter_mut().for_each(|p| { + batches.iter_mut().for_each(|batch| { let mut sig_lens = Vec::new(); - p.packets.iter_mut().for_each(|packet| { + batch.packets.iter_mut().for_each(|packet| { let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); sig_lens.push(packet_offsets.sig_len); @@ -418,30 +418,32 @@ pub fn generate_offsets( ) } -pub fn ed25519_verify_cpu(batches: &mut [Packets], reject_non_vote: bool) { +pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { use rayon::prelude::*; - let count = batch_size(batches); - debug!("CPU ECDSA for {}", batch_size(batches)); + let packet_count = count_packets_in_batches(batches); + debug!("CPU ECDSA for {}", packet_count); PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|p| { - p.packets + batches.into_par_iter().for_each(|batch| { + batch + .packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) }) }); - inc_new_counter_debug!("ed25519_verify_cpu", count); + inc_new_counter_debug!("ed25519_verify_cpu", packet_count); } -pub fn ed25519_verify_disabled(batches: &mut [Packets]) { +pub fn ed25519_verify_disabled(batches: &mut [PacketBatch]) { use rayon::prelude::*; - let count = batch_size(batches); - debug!("disabled ECDSA for {}", batch_size(batches)); - batches.into_par_iter().for_each(|p| { - p.packets + let packet_count = count_packets_in_batches(batches); + debug!("disabled ECDSA for {}", packet_count); + batches.into_par_iter().for_each(|batch| { + batch + .packets .par_iter_mut() .for_each(|p| p.meta.discard = false) }); - inc_new_counter_debug!("ed25519_verify_disabled", count); + inc_new_counter_debug!("ed25519_verify_disabled", packet_count); } pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { @@ -495,7 +497,7 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { Ok(out) } -pub fn mark_disabled(batches: &mut [Packets], r: &[Vec]) { +pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { batches.iter_mut().zip(r).for_each(|(b, v)| { b.packets.iter_mut().zip(v).for_each(|(p, f)| { p.meta.discard = *f == 0; @@ -504,7 +506,7 @@ pub fn mark_disabled(batches: &mut [Packets], r: &[Vec]) { } pub fn ed25519_verify( - batches: &mut [Packets], + batches: &mut [PacketBatch], recycler: &Recycler, recycler_out: &Recycler>, reject_non_vote: bool, @@ -516,21 +518,21 @@ pub fn ed25519_verify( let api = api.unwrap(); use crate::packet::PACKET_DATA_SIZE; - let count = batch_size(batches); + let packet_count = count_packets_in_batches(batches); // micro-benchmarks show GPU time for smallest batch around 15-20ms // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice // power-of-two number around that accounting for the fact that the CPU // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover - if count < 64 { + if packet_count < 64 { return ed25519_verify_cpu(batches, reject_non_vote); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = generate_offsets(batches, recycler, reject_non_vote); - debug!("CUDA ECDSA for {}", batch_size(batches)); + debug!("CUDA ECDSA for {}", packet_count); debug!("allocating out.."); let mut out = recycler_out.allocate("out_buffer"); out.set_pinnable(); @@ -538,15 +540,15 @@ pub fn ed25519_verify( let mut rvs = Vec::new(); let mut num_packets: usize = 0; - for p in batches.iter() { + for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, + elems: batch.packets.as_ptr(), + num: batch.packets.len() as u32, }); let mut v = Vec::new(); - v.resize(p.packets.len(), 0); + v.resize(batch.packets.len(), 0); rvs.push(v); - num_packets = num_packets.saturating_add(p.packets.len()); + num_packets = num_packets.saturating_add(batch.packets.len()); } out.resize(signature_offsets.len(), 0); trace!("Starting verify num packets: {}", num_packets); @@ -575,7 +577,7 @@ pub fn ed25519_verify( trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); mark_disabled(batches, &rvs); - inc_new_counter_debug!("ed25519_verify_gpu", count); + inc_new_counter_debug!("ed25519_verify_gpu", packet_count); } #[cfg(test)] @@ -595,7 +597,7 @@ mod tests { use { super::*, crate::{ - packet::{Packet, Packets}, + packet::{Packet, PacketBatch}, sigverify::{self, PacketOffsets}, test_tx::{test_multisig_tx, test_tx, vote_tx}, }, @@ -623,9 +625,9 @@ mod tests { #[test] fn test_mark_disabled() { - let mut batch = Packets::default(); + let mut batch = PacketBatch::default(); batch.packets.push(Packet::default()); - let mut batches: Vec = vec![batch]; + let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); assert!(batches[0].packets[0].meta.discard); mark_disabled(&mut batches, &[vec![1]]); @@ -731,7 +733,7 @@ mod tests { assert!(packet.meta.discard); packet.meta.discard = false; - let mut batches = generate_packet_vec(&packet, 1, 1); + let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); assert!(batches[0].packets[0].meta.discard); } @@ -767,7 +769,7 @@ mod tests { assert!(packet.meta.discard); packet.meta.discard = false; - let mut batches = generate_packet_vec(&packet, 1, 1); + let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); assert!(batches[0].packets[0].meta.discard); } @@ -929,21 +931,21 @@ mod tests { ); } - fn generate_packet_vec( + fn generate_packet_batches( packet: &Packet, num_packets_per_batch: usize, num_batches: usize, - ) -> Vec { + ) -> Vec { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packets = Packets::default(); - packets.packets.resize(0, Packet::default()); + let mut packet_batch = PacketBatch::default(); + packet_batch.packets.resize(0, Packet::default()); for _ in 0..num_packets_per_batch { - packets.packets.push(packet.clone()); + packet_batch.packets.push(packet.clone()); } - assert_eq!(packets.packets.len(), num_packets_per_batch); - packets + assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + packet_batch }) .collect(); assert_eq!(batches.len(), num_batches); @@ -960,7 +962,7 @@ mod tests { packet.data[20] = packet.data[20].wrapping_add(10); } - let mut batches = generate_packet_vec(&packet, n, 2); + let mut batches = generate_packet_batches(&packet, n, 2); // verify packets ed25519_verify(&mut batches); @@ -969,11 +971,11 @@ mod tests { let should_discard = modify_data; assert!(batches .iter() - .flat_map(|p| &p.packets) + .flat_map(|batch| &batch.packets) .all(|p| p.meta.discard == should_discard)); } - fn ed25519_verify(batches: &mut [Packets]) { + fn ed25519_verify(batches: &mut [PacketBatch]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); @@ -986,13 +988,13 @@ mod tests { tx.signatures.pop(); let packet = sigverify::make_packet_from_transaction(tx); - let mut batches = generate_packet_vec(&packet, 1, 1); + let mut batches = generate_packet_batches(&packet, 1, 1); // verify packets ed25519_verify(&mut batches); assert!(batches .iter() - .flat_map(|p| &p.packets) + .flat_map(|batch| &batch.packets) .all(|p| p.meta.discard)); } @@ -1020,7 +1022,7 @@ mod tests { let n = 4; let num_batches = 3; - let mut batches = generate_packet_vec(&packet, n, num_batches); + let mut batches = generate_packet_batches(&packet, n, num_batches); packet.data[40] = packet.data[40].wrapping_add(8); @@ -1035,7 +1037,7 @@ mod tests { ref_vec[0].push(0u8); assert!(batches .iter() - .flat_map(|p| &p.packets) + .flat_map(|batch| &batch.packets) .zip(ref_vec.into_iter().flatten()) .all(|(p, discard)| { if discard == 0 { @@ -1059,7 +1061,7 @@ mod tests { for _ in 0..50 { let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); - let mut batches = generate_packet_vec(&packet, n, num_batches); + let mut batches = generate_packet_batches(&packet, n, num_batches); let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { @@ -1080,8 +1082,8 @@ mod tests { // check result batches .iter() - .flat_map(|p| &p.packets) - .zip(batches_cpu.iter().flat_map(|p| &p.packets)) + .flat_map(|batch| &batch.packets) + .zip(batches_cpu.iter().flat_map(|batch| &batch.packets)) .for_each(|(p1, p2)| assert_eq!(p1, p2)); } } @@ -1233,7 +1235,7 @@ mod tests { solana_logger::setup(); let mut current_offset = 0usize; - let mut batch = Packets::default(); + let mut batch = PacketBatch::default(); batch .packets .push(sigverify::make_packet_from_transaction(test_tx())); diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 58688ef80..b0abe551a 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -9,13 +9,13 @@ use { }; pub use { solana_perf::packet::{ - limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, + limited_deserialize, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH, }, solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}, }; -pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Result { +pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result { let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -27,11 +27,11 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res trace!("receiving on {}", socket.local_addr().unwrap()); let start = Instant::now(); loop { - obj.packets.resize( + batch.packets.resize( std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH), Packet::default(), ); - match recv_mmsg(socket, &mut obj.packets[i..]) { + match recv_mmsg(socket, &mut batch.packets[i..]) { Err(_) if i > 0 => { if start.elapsed().as_millis() as u64 > max_wait_ms { break; @@ -55,17 +55,17 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res } } } - obj.packets.truncate(i); + batch.packets.truncate(i); inc_new_counter_debug!("packets-recv_count", i); Ok(i) } pub fn send_to( - obj: &Packets, + batch: &PacketBatch, socket: &UdpSocket, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { - for p in &obj.packets { + for p in &batch.packets { let addr = p.meta.addr(); if socket_addr_space.check(&addr) { socket.send_to(&p.data[..p.meta.size], &addr)?; @@ -90,9 +90,9 @@ mod tests { // test that the address is actually being updated let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap(); let packets = vec![Packet::default()]; - let mut msgs = Packets::new(packets); - msgs.set_addr(&send_addr); - assert_eq!(msgs.packets[0].meta.addr(), send_addr); + let mut packet_batch = PacketBatch::new(packets); + packet_batch.set_addr(&send_addr); + assert_eq!(packet_batch.packets[0].meta.addr(), send_addr); } #[test] @@ -102,21 +102,21 @@ mod tests { let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = send_socket.local_addr().unwrap(); - let mut p = Packets::default(); + let mut batch = PacketBatch::default(); - p.packets.resize(10, Packet::default()); + batch.packets.resize(10, Packet::default()); - for m in p.packets.iter_mut() { + for m in batch.packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); - let recvd = recv_from(&mut p, &recv_socket, 1).unwrap(); + let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); - assert_eq!(recvd, p.packets.len()); + assert_eq!(recvd, batch.packets.len()); - for m in &p.packets { + for m in &batch.packets { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } @@ -125,7 +125,7 @@ mod tests { #[test] pub fn debug_trait() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); - write!(io::sink(), "{:?}", Packets::default()).unwrap(); + write!(io::sink(), "{:?}", PacketBatch::default()).unwrap(); } #[test] @@ -151,25 +151,25 @@ mod tests { let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut p = Packets::default(); - p.packets.resize(PACKETS_PER_BATCH, Packet::default()); + let mut batch = PacketBatch::default(); + batch.packets.resize(PACKETS_PER_BATCH, Packet::default()); // Should only get PACKETS_PER_BATCH packets per iteration even // if a lot more were sent, and regardless of packet size for _ in 0..2 * PACKETS_PER_BATCH { - let mut p = Packets::default(); - p.packets.resize(1, Packet::default()); - for m in p.packets.iter_mut() { + let mut batch = PacketBatch::default(); + batch.packets.resize(1, Packet::default()); + for m in batch.packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = 1; } - send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } - let recvd = recv_from(&mut p, &recv_socket, 100).unwrap(); + let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap(); // Check we only got PACKETS_PER_BATCH packets assert_eq!(recvd, PACKETS_PER_BATCH); - assert_eq!(p.packets.capacity(), PACKETS_PER_BATCH); + assert_eq!(batch.packets.capacity(), PACKETS_PER_BATCH); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index d71a458c1..9f7db9c54 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -3,7 +3,7 @@ use { crate::{ - packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}, + packet::{self, send_to, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, recvmmsg::NUM_RCVMMSGS, socket::SocketAddrSpace, }, @@ -21,8 +21,8 @@ use { thiserror::Error, }; -pub type PacketReceiver = Receiver; -pub type PacketSender = Sender; +pub type PacketBatchReceiver = Receiver; +pub type PacketBatchSender = Sender; #[derive(Error, Debug)] pub enum StreamerError { @@ -33,7 +33,7 @@ pub enum StreamerError { RecvTimeout(#[from] RecvTimeoutError), #[error("send packets error")] - Send(#[from] SendError), + Send(#[from] SendError), } pub type Result = std::result::Result; @@ -41,8 +41,8 @@ pub type Result = std::result::Result; fn recv_loop( sock: &UdpSocket, exit: Arc, - channel: &PacketSender, - recycler: &PacketsRecycler, + channel: &PacketBatchSender, + recycler: &PacketBatchRecycler, name: &'static str, coalesce_ms: u64, use_pinned_memory: bool, @@ -52,10 +52,10 @@ fn recv_loop( let mut now = Instant::now(); let mut num_max_received = 0; // Number of times maximum packets were received loop { - let mut msgs = if use_pinned_memory { - Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) + let mut packet_batch = if use_pinned_memory { + PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) } else { - Packets::with_capacity(PACKETS_PER_BATCH) + PacketBatch::with_capacity(PACKETS_PER_BATCH) }; loop { // Check for exit signal, even if socket is busy @@ -63,14 +63,14 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) { + if let Ok(len) = packet::recv_from(&mut packet_batch, sock, coalesce_ms) { if len == NUM_RCVMMSGS { num_max_received += 1; } recv_count += len; call_count += 1; if len > 0 { - channel.send(msgs)?; + channel.send(packet_batch)?; } break; } @@ -94,8 +94,8 @@ fn recv_loop( pub fn receiver( sock: Arc, exit: &Arc, - packet_sender: PacketSender, - recycler: PacketsRecycler, + packet_sender: PacketBatchSender, + recycler: PacketBatchRecycler, name: &'static str, coalesce_ms: u64, use_pinned_memory: bool, @@ -121,36 +121,42 @@ pub fn receiver( fn recv_send( sock: &UdpSocket, - r: &PacketReceiver, + r: &PacketBatchReceiver, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { let timer = Duration::new(1, 0); - let msgs = r.recv_timeout(timer)?; - send_to(&msgs, sock, socket_addr_space)?; + let packet_batch = r.recv_timeout(timer)?; + send_to(&packet_batch, sock, socket_addr_space)?; Ok(()) } -pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, Duration)> { +pub fn recv_packet_batches( + recvr: &PacketBatchReceiver, +) -> Result<(Vec, usize, Duration)> { let timer = Duration::new(1, 0); - let msgs = recvr.recv_timeout(timer)?; + let packet_batch = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); - trace!("got msgs"); - let mut len = msgs.packets.len(); - let mut batch = vec![msgs]; - while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); - len += more.packets.len(); - batch.push(more); + trace!("got packets"); + let mut num_packets = packet_batch.packets.len(); + let mut packet_batches = vec![packet_batch]; + while let Ok(packet_batch) = recvr.try_recv() { + trace!("got more packets"); + num_packets += packet_batch.packets.len(); + packet_batches.push(packet_batch); } let recv_duration = recv_start.elapsed(); - trace!("batch len {}", batch.len()); - Ok((batch, len, recv_duration)) + trace!( + "packet batches len: {}, num packets: {}", + packet_batches.len(), + num_packets + ); + Ok((packet_batches, num_packets, recv_duration)) } pub fn responder( name: &'static str, sock: Arc, - r: PacketReceiver, + r: PacketBatchReceiver, socket_addr_space: SocketAddrSpace, ) -> JoinHandle<()> { Builder::new() @@ -187,7 +193,7 @@ mod test { use { super::*, crate::{ - packet::{Packet, Packets, PACKET_DATA_SIZE}, + packet::{Packet, PacketBatch, PACKET_DATA_SIZE}, streamer::{receiver, responder}, }, solana_perf::recycler::Recycler, @@ -204,16 +210,16 @@ mod test { }, }; - fn get_msgs(r: PacketReceiver, num: &mut usize) { + fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) { for _ in 0..10 { - let m = r.recv_timeout(Duration::new(1, 0)); - if m.is_err() { + let packet_batch_res = r.recv_timeout(Duration::new(1, 0)); + if packet_batch_res.is_err() { continue; } - *num -= m.unwrap().packets.len(); + *num_packets -= packet_batch_res.unwrap().packets.len(); - if *num == 0 { + if *num_packets == 0 { break; } } @@ -222,7 +228,7 @@ mod test { #[test] fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); - write!(io::sink(), "{:?}", Packets::default()).unwrap(); + write!(io::sink(), "{:?}", PacketBatch::default()).unwrap(); } #[test] fn streamer_send_test() { @@ -250,23 +256,23 @@ mod test { r_responder, SocketAddrSpace::Unspecified, ); - let mut msgs = Packets::default(); + let mut packet_batch = PacketBatch::default(); for i in 0..5 { - let mut b = Packet::default(); + let mut p = Packet::default(); { - b.data[0] = i as u8; - b.meta.size = PACKET_DATA_SIZE; - b.meta.set_addr(&addr); + p.data[0] = i as u8; + p.meta.size = PACKET_DATA_SIZE; + p.meta.set_addr(&addr); } - msgs.packets.push(b); + packet_batch.packets.push(p); } - s_responder.send(msgs).expect("send"); + s_responder.send(packet_batch).expect("send"); t_responder }; - let mut num = 5; - get_msgs(r_reader, &mut num); - assert_eq!(num, 0); + let mut packets_remaining = 5; + get_packet_batches(r_reader, &mut packets_remaining); + assert_eq!(packets_remaining, 0); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join");