diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 680a493a2d..8528f90fd9 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -147,16 +147,18 @@ impl BankingStage { ) -> Result { let mut unprocessed_packets = vec![]; let mut bank_shutdown = false; + let mut rebuffered_packets = 0; + let mut new_tx_count = 0; for (msgs, offset, vers) in buffered_packets { if bank_shutdown { - inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset); + rebuffered_packets += vers.len() - *offset; unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); continue; } let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { - inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset); + rebuffered_packets += vers.len() - *offset; unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); continue; } @@ -164,13 +166,11 @@ impl BankingStage { let (processed, verified_txs, verified_indexes) = Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; - inc_new_counter_info!("banking_stage-consumed_buffered_packets", processed); - inc_new_counter_info!("banking_stage-process_transactions", processed); - inc_new_counter_info!( - "banking_stage-rebuffered_packets", - verified_txs.len() - processed - ); + + new_tx_count += processed; + if processed < verified_txs.len() { + rebuffered_packets += verified_txs.len() - processed; bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding unprocessed_packets.push(( @@ -180,6 +180,11 @@ impl BankingStage { )); } } + + inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); + inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); + inc_new_counter_info!("banking_stage-process_transactions", new_tx_count); + Ok(unprocessed_packets) } @@ -314,6 +319,9 @@ impl BankingStage { { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(unprocessed_packets) => { + if unprocessed_packets.is_empty() { + continue; + } if Self::should_buffer_packets( poh_recorder, cluster_info, diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 0f71af0d9a..40527414df 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -58,12 +58,14 @@ impl ClusterInfoVoteListener { last_ts = new_ts; inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len()); let msgs = packet::to_packets(&votes); - let r = if sigverify_disabled { - sigverify::ed25519_verify_disabled(&msgs) - } else { - sigverify::ed25519_verify_cpu(&msgs) - }; - sender.send(msgs.into_iter().zip(r).collect())?; + if !msgs.is_empty() { + let r = if sigverify_disabled { + sigverify::ed25519_verify_disabled(&msgs) + } else { + sigverify::ed25519_verify_cpu(&msgs) + }; + sender.send(msgs.into_iter().zip(r).collect())?; + } } sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); }