Don't send packets when msgs len is 0 (#4030)

And don't send metrics every iteration.
This commit is contained in:
sakridge 2019-04-26 17:27:31 -07:00 committed by GitHub
parent 8b34fd2c75
commit a056c1f18f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 14 deletions

View File

@ -147,16 +147,18 @@ impl BankingStage {
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![]; let mut unprocessed_packets = vec![];
let mut bank_shutdown = false; let mut bank_shutdown = false;
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
for (msgs, offset, vers) in buffered_packets { for (msgs, offset, vers) in buffered_packets {
if bank_shutdown { if bank_shutdown {
inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset); rebuffered_packets += vers.len() - *offset;
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
continue; continue;
} }
let bank = poh_recorder.lock().unwrap().bank(); let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() { if bank.is_none() {
inc_new_counter_info!("banking_stage-rebuffered_packets", vers.len() - *offset); rebuffered_packets += vers.len() - *offset;
unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned()));
continue; continue;
} }
@ -164,13 +166,11 @@ impl BankingStage {
let (processed, verified_txs, verified_indexes) = let (processed, verified_txs, verified_indexes) =
Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?;
inc_new_counter_info!("banking_stage-consumed_buffered_packets", processed);
inc_new_counter_info!("banking_stage-process_transactions", processed); new_tx_count += processed;
inc_new_counter_info!(
"banking_stage-rebuffered_packets",
verified_txs.len() - processed
);
if processed < verified_txs.len() { if processed < verified_txs.len() {
rebuffered_packets += verified_txs.len() - processed;
bank_shutdown = true; bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding // Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push(( unprocessed_packets.push((
@ -180,6 +180,11 @@ impl BankingStage {
)); ));
} }
} }
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
inc_new_counter_info!("banking_stage-process_transactions", new_tx_count);
Ok(unprocessed_packets) Ok(unprocessed_packets)
} }
@ -314,6 +319,9 @@ impl BankingStage {
{ {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => { Ok(unprocessed_packets) => {
if unprocessed_packets.is_empty() {
continue;
}
if Self::should_buffer_packets( if Self::should_buffer_packets(
poh_recorder, poh_recorder,
cluster_info, cluster_info,

View File

@ -58,6 +58,7 @@ impl ClusterInfoVoteListener {
last_ts = new_ts; last_ts = new_ts;
inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len()); inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
let msgs = packet::to_packets(&votes); let msgs = packet::to_packets(&votes);
if !msgs.is_empty() {
let r = if sigverify_disabled { let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&msgs) sigverify::ed25519_verify_disabled(&msgs)
} else { } else {
@ -65,6 +66,7 @@ impl ClusterInfoVoteListener {
}; };
sender.send(msgs.into_iter().zip(r).collect())?; sender.send(msgs.into_iter().zip(r).collect())?;
} }
}
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
} }
} }