From 721d8cb0ac2de30f3d9544c43baf023762aef067 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Fri, 31 Mar 2023 01:51:32 +0800 Subject: [PATCH] Add logging of the number of chunks handled by the quic server (#30954) --- streamer/src/nonblocking/quic.rs | 25 ++++++++++++++++++------- streamer/src/quic.rs | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f2fbc2ab0a..6308108c31 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -573,9 +573,13 @@ async fn packet_batch_sender( let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; + stats + .total_packet_batches_allocated + .fetch_add(1, Ordering::Relaxed); stats .total_packets_allocated .fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed); + loop { if exit.load(Ordering::Relaxed) { return; @@ -622,12 +626,17 @@ async fn packet_batch_sender( let i = packet_batch.len() - 1; *packet_batch[i].meta_mut() = packet_accumulator.meta; + let num_chunks = packet_accumulator.chunks.len(); for chunk in packet_accumulator.chunks { packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk] .copy_from_slice(&chunk.bytes); } total_bytes += packet_batch[i].meta().size; + + stats + .total_chunks_processed_by_batcher + .fetch_add(num_chunks, Ordering::Relaxed); } } } @@ -815,11 +824,9 @@ async fn handle_chunk( // done receiving chunks trace!("chunk is none"); if let Some(accum) = packet_accum.take() { - let len = accum - .chunks - .iter() - .map(|packet_bytes| packet_bytes.bytes.len()) - .sum::(); + let bytes_sent = accum.meta.size; + let chunks_sent = accum.chunks.len(); + if let Err(err) = packet_sender.send(accum).await { stats .total_handle_chunk_to_packet_batcher_send_err @@ -831,8 +838,12 @@ async fn handle_chunk( .fetch_add(1, Ordering::Relaxed); stats .total_bytes_sent_for_batching - .fetch_add(len, Ordering::Relaxed); - trace!("sent {} byte packet for batching", len); + .fetch_add(bytes_sent, Ordering::Relaxed); + stats + .total_chunks_sent_for_batching + .fetch_add(chunks_sent, Ordering::Relaxed); + + trace!("sent {} byte packet for batching", bytes_sent); } } else { stats diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index cbb3838c27..72f85a4e8f 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -122,6 +122,7 @@ pub struct StreamStats { pub(crate) total_invalid_chunks: AtomicUsize, pub(crate) total_invalid_chunk_size: AtomicUsize, pub(crate) total_packets_allocated: AtomicUsize, + pub(crate) total_packet_batches_allocated: AtomicUsize, pub(crate) total_chunks_received: AtomicUsize, pub(crate) total_staked_chunks_received: AtomicUsize, pub(crate) total_unstaked_chunks_received: AtomicUsize, @@ -131,8 +132,10 @@ pub struct StreamStats { pub(crate) total_packet_batches_none: AtomicUsize, pub(crate) total_packets_sent_for_batching: AtomicUsize, pub(crate) total_bytes_sent_for_batching: AtomicUsize, + pub(crate) total_chunks_sent_for_batching: AtomicUsize, pub(crate) total_packets_sent_to_consumer: AtomicUsize, pub(crate) total_bytes_sent_to_consumer: AtomicUsize, + pub(crate) total_chunks_processed_by_batcher: AtomicUsize, pub(crate) total_stream_read_errors: AtomicUsize, pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) num_evictions: AtomicUsize, @@ -254,6 +257,12 @@ impl StreamStats { self.total_packets_allocated.swap(0, Ordering::Relaxed), i64 ), + ( + "packet_batches_allocated", + self.total_packet_batches_allocated + .swap(0, Ordering::Relaxed), + i64 + ), ( "packets_sent_for_batching", self.total_packets_sent_for_batching @@ -266,6 +275,12 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "chunks_sent_for_batching", + self.total_chunks_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), ( "packets_sent_to_consumer", self.total_packets_sent_to_consumer @@ -277,6 +292,12 @@ impl StreamStats { self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed), i64 ), + ( + "chunks_processed_by_batcher", + self.total_chunks_processed_by_batcher + .swap(0, Ordering::Relaxed), + i64 + ), ( "chunks_received", self.total_chunks_received.swap(0, Ordering::Relaxed),