From 92189d82b4a9854acbc4774b8dc7ba172a1fc771 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Tue, 28 Mar 2023 15:33:40 +0800 Subject: [PATCH] Quic server log data rate (#30892) * Add more statistics to better track incoming data rate to the Quic server --- streamer/src/nonblocking/quic.rs | 22 ++++++++++++++++------ streamer/src/quic.rs | 13 +++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index bbb055274d..2ab5b9daa9 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -269,9 +269,8 @@ enum ConnectionHandlerError { } struct NewConnectionHandlerParams { - // In principle, the code should work as-is if we replaced this with a crossbeam channel - // as the server code never does a blocking send (as the channel's unbounded) - // or a blocking recv (as we always use try_recv) + // In principle, the code can be made to work with a crossbeam channel + // as long as we're careful never to use a blocking recv or send call // but I've found that it's simply too easy to accidentally block // in async code when using the crossbeam channel, so for the sake of maintainability, // we're sticking with an async channel @@ -577,6 +576,7 @@ async fn packet_batch_sender( let mut batch_start_time = Instant::now(); loop { let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); + let mut total_bytes: usize = 0; stats .total_packets_allocated @@ -604,6 +604,10 @@ async fn packet_batch_sender( .total_packets_sent_to_consumer .fetch_add(len, Ordering::Relaxed); + stats + .total_bytes_sent_to_consumer + .fetch_add(total_bytes, Ordering::Relaxed); + trace!("Sent {} packet batch", len); } break; @@ -612,6 +616,11 @@ async fn packet_batch_sender( let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await; if let Ok(Ok(packet_accumulator)) = timeout_res { + // Start the timeout from when the packet batch first becomes non-empty + if packet_batch.is_empty() { + batch_start_time = Instant::now(); + } + unsafe { packet_batch.set_len(packet_batch.len() + 1); } @@ -623,9 +632,7 @@ async fn packet_batch_sender( .copy_from_slice(&chunk.bytes); } - if packet_batch.len() == 1 { - batch_start_time = Instant::now(); - } + total_bytes += packet_batch[i].meta().size; } } } @@ -827,6 +834,9 @@ async fn handle_chunk( stats .total_packets_sent_for_batching .fetch_add(1, Ordering::Relaxed); + stats + .total_bytes_sent_for_batching + .fetch_add(len, Ordering::Relaxed); trace!("sent {} byte packet for batching", len); } } else { diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 771cc4a1ac..cbb3838c27 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -130,7 +130,9 @@ pub struct StreamStats { pub(crate) total_packet_batches_sent: AtomicUsize, pub(crate) total_packet_batches_none: AtomicUsize, pub(crate) total_packets_sent_for_batching: AtomicUsize, + pub(crate) total_bytes_sent_for_batching: AtomicUsize, pub(crate) total_packets_sent_to_consumer: AtomicUsize, + pub(crate) total_bytes_sent_to_consumer: AtomicUsize, pub(crate) total_stream_read_errors: AtomicUsize, pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) num_evictions: AtomicUsize, @@ -258,12 +260,23 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "bytes_sent_for_batching", + self.total_bytes_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), ( "packets_sent_to_consumer", self.total_packets_sent_to_consumer .swap(0, Ordering::Relaxed), i64 ), + ( + "bytes_sent_to_consumer", + self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed), + i64 + ), ( "chunks_received", self.total_chunks_received.swap(0, Ordering::Relaxed),