From 3d6ab965871215829a5f9682e00f0b37a631452a Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Thu, 16 Dec 2021 11:44:10 -0600 Subject: [PATCH] push live packets straight to buffer, leader only process packets from buffer --- core/src/banking_stage.rs | 148 +++++++------------------------------- 1 file changed, 25 insertions(+), 123 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4834dd8035..b9731fd06d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -85,8 +85,7 @@ const MIN_THREADS_BANKING: u32 = 1; pub struct BankingStageStats { last_report: AtomicInterval, id: u32, - process_packets_count: AtomicUsize, - new_tx_count: AtomicUsize, + receive_and_buffer_packets_count: AtomicUsize, dropped_packet_batches_count: AtomicUsize, dropped_packets_count: AtomicUsize, pub(crate) dropped_duplicated_packets_count: AtomicUsize, @@ -98,7 +97,7 @@ pub struct BankingStageStats { // Timing consume_buffered_packets_elapsed: AtomicU64, - process_packets_elapsed: AtomicU64, + receive_and_buffer_packets_elapsed: AtomicU64, handle_retryable_packets_elapsed: AtomicU64, filter_pending_packets_elapsed: AtomicU64, pub(crate) packet_duplicate_check_elapsed: AtomicU64, @@ -116,8 +115,9 @@ impl BankingStageStats { } fn is_empty(&self) -> bool { - 0 == self.process_packets_count.load(Ordering::Relaxed) as u64 - + self.new_tx_count.load(Ordering::Relaxed) as u64 + 0 == self + .receive_and_buffer_packets_count + .load(Ordering::Relaxed) as u64 + self.dropped_packet_batches_count.load(Ordering::Relaxed) as u64 + self.dropped_packets_count.load(Ordering::Relaxed) as u64 + self @@ -133,7 +133,9 @@ impl BankingStageStats { + self .consume_buffered_packets_elapsed .load(Ordering::Relaxed) - + self.process_packets_elapsed.load(Ordering::Relaxed) + + self + .receive_and_buffer_packets_elapsed + .load(Ordering::Relaxed) + self .handle_retryable_packets_elapsed .load(Ordering::Relaxed) @@ -156,13 +158,9 @@ impl BankingStageStats { "banking_stage-loop-stats", ("id", self.id as i64, i64), ( - "process_packets_count", - self.process_packets_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "new_tx_count", - self.new_tx_count.swap(0, Ordering::Relaxed) as i64, + "receive_and_buffer_packets_count", + self.receive_and_buffer_packets_count + .swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -216,8 +214,9 @@ impl BankingStageStats { i64 ), ( - "process_packets_elapsed", - self.process_packets_elapsed.swap(0, Ordering::Relaxed) as i64, + "receive_and_buffer_packets_elapsed", + self.receive_and_buffer_packets_elapsed + .swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -753,21 +752,15 @@ impl BankingStage { Duration::from_millis(100) }; - match Self::process_packets( - &my_pubkey, + match Self::receive_and_buffer_packets( verified_receiver, - poh_recorder, recv_start, recv_timeout, id, batch_limit, - transaction_status_sender.clone(), - &gossip_vote_sender, &mut buffered_packet_batches, &banking_stage_stats, packet_deduper, - &recorder, - &qos_service, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1315,24 +1308,18 @@ impl BankingStage { } #[allow(clippy::too_many_arguments)] - /// Process the incoming packets - fn process_packets( - my_pubkey: &Pubkey, + /// Receive incoming packets, push into unprocessed buffer with packet indexes + fn receive_and_buffer_packets( verified_receiver: &CrossbeamReceiver>, - poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, id: u32, batch_limit: usize, - transaction_status_sender: Option, - gossip_vote_sender: &ReplayVoteSender, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &BankingStageStats, packet_deduper: &PacketDeduper, - recorder: &TransactionRecorder, - qos_service: &QosService, ) -> Result<(), RecvTimeoutError> { - let mut recv_time = Measure::start("process_packets_recv"); + let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; recv_time.stop(); @@ -1345,64 +1332,18 @@ impl BankingStage { packet_count, id, ); - let mut proc_start = Measure::start("process_packets_transactions_process"); - let mut new_tx_count = 0; + let mut proc_start = Measure::start("receive_and_buffer_packets_transactions_process"); - let mut packet_batch_iter = packet_batches.into_iter(); + let packet_batch_iter = packet_batches.into_iter(); let mut dropped_packets_count = 0; let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; - while let Some(packet_batch) = packet_batch_iter.next() { + for packet_batch in packet_batch_iter { let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); - let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank(); - let working_bank_start = poh_recorder_bank.working_bank_start(); - if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() { - qos_service - .accumulate_tpu_buffered_packets_count(packet_batch.packets.len() as u64); - Self::push_unprocessed( - buffered_packet_batches, - packet_batch, - packet_indexes, - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - packet_deduper, - banking_stage_stats, - ); - continue; - } - - // Destructure the `BankStart` behind an Arc - let BankStart { - working_bank, - bank_creation_time, - } = &*working_bank_start.unwrap(); - - qos_service.accumulate_tpu_ingested_packets_count(packet_batch.packets.len() as u64); - let (processed, verified_txs_len, unprocessed_indexes) = - Self::process_packets_transactions( - working_bank, - bank_creation_time, - recorder, - &packet_batch, - packet_indexes, - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - ); - - new_tx_count += processed; - qos_service.accumulated_verified_txs_count(verified_txs_len as u64); - qos_service.accumulated_processed_txs_count(processed as u64); - qos_service.accumulated_retryable_txs_count(unprocessed_indexes.len() as u64); - - // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( buffered_packet_batches, packet_batch, - unprocessed_indexes, + packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -1410,62 +1351,23 @@ impl BankingStage { packet_deduper, banking_stage_stats, ); - - // If there were retryable transactions, add the unexpired ones to the buffered queue - if processed < verified_txs_len { - let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets"); - let next_leader = poh.lock().unwrap().next_slot_leader(); - // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones - #[allow(clippy::while_let_on_iterator)] - while let Some(packet_batch) = packet_batch_iter.next() { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); - let unprocessed_indexes = Self::filter_unprocessed_packets( - working_bank, - &packet_batch, - &packet_indexes, - my_pubkey, - next_leader, - banking_stage_stats, - ); - Self::push_unprocessed( - buffered_packet_batches, - packet_batch, - unprocessed_indexes, - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - packet_deduper, - banking_stage_stats, - ); - } - handle_retryable_packets_time.stop(); - banking_stage_stats - .handle_retryable_packets_elapsed - .fetch_add(handle_retryable_packets_time.as_us(), Ordering::Relaxed); - } } proc_start.stop(); debug!( - "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", + "@{:?} done processing transaction batches: {} time: {:?}ms total count: {} id: {}", timestamp(), packet_batches_len, proc_start.as_ms(), - new_tx_count, - (new_tx_count as f32) / (proc_start.as_s()), packet_count, id, ); banking_stage_stats - .process_packets_elapsed + .receive_and_buffer_packets_elapsed .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats - .process_packets_count + .receive_and_buffer_packets_count .fetch_add(packet_count, Ordering::Relaxed); - banking_stage_stats - .new_tx_count - .fetch_add(new_tx_count, Ordering::Relaxed); banking_stage_stats .dropped_packet_batches_count .fetch_add(dropped_packet_batches_count, Ordering::Relaxed);