Add logging of the number of chunks handled by the quic server (#30954)

This commit is contained in:
ryleung-solana 2023-03-31 01:51:32 +08:00 committed by GitHub
parent eb47e44c6b
commit 721d8cb0ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 7 deletions

View File

@ -573,9 +573,13 @@ async fn packet_batch_sender(
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0; let mut total_bytes: usize = 0;
stats
.total_packet_batches_allocated
.fetch_add(1, Ordering::Relaxed);
stats stats
.total_packets_allocated .total_packets_allocated
.fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed); .fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed);
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
@ -622,12 +626,17 @@ async fn packet_batch_sender(
let i = packet_batch.len() - 1; let i = packet_batch.len() - 1;
*packet_batch[i].meta_mut() = packet_accumulator.meta; *packet_batch[i].meta_mut() = packet_accumulator.meta;
let num_chunks = packet_accumulator.chunks.len();
for chunk in packet_accumulator.chunks { for chunk in packet_accumulator.chunks {
packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk] packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk]
.copy_from_slice(&chunk.bytes); .copy_from_slice(&chunk.bytes);
} }
total_bytes += packet_batch[i].meta().size; total_bytes += packet_batch[i].meta().size;
stats
.total_chunks_processed_by_batcher
.fetch_add(num_chunks, Ordering::Relaxed);
} }
} }
} }
@ -815,11 +824,9 @@ async fn handle_chunk(
// done receiving chunks // done receiving chunks
trace!("chunk is none"); trace!("chunk is none");
if let Some(accum) = packet_accum.take() { if let Some(accum) = packet_accum.take() {
let len = accum let bytes_sent = accum.meta.size;
.chunks let chunks_sent = accum.chunks.len();
.iter()
.map(|packet_bytes| packet_bytes.bytes.len())
.sum::<usize>();
if let Err(err) = packet_sender.send(accum).await { if let Err(err) = packet_sender.send(accum).await {
stats stats
.total_handle_chunk_to_packet_batcher_send_err .total_handle_chunk_to_packet_batcher_send_err
@ -831,8 +838,12 @@ async fn handle_chunk(
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
stats stats
.total_bytes_sent_for_batching .total_bytes_sent_for_batching
.fetch_add(len, Ordering::Relaxed); .fetch_add(bytes_sent, Ordering::Relaxed);
trace!("sent {} byte packet for batching", len); stats
.total_chunks_sent_for_batching
.fetch_add(chunks_sent, Ordering::Relaxed);
trace!("sent {} byte packet for batching", bytes_sent);
} }
} else { } else {
stats stats

View File

@ -122,6 +122,7 @@ pub struct StreamStats {
pub(crate) total_invalid_chunks: AtomicUsize, pub(crate) total_invalid_chunks: AtomicUsize,
pub(crate) total_invalid_chunk_size: AtomicUsize, pub(crate) total_invalid_chunk_size: AtomicUsize,
pub(crate) total_packets_allocated: AtomicUsize, pub(crate) total_packets_allocated: AtomicUsize,
pub(crate) total_packet_batches_allocated: AtomicUsize,
pub(crate) total_chunks_received: AtomicUsize, pub(crate) total_chunks_received: AtomicUsize,
pub(crate) total_staked_chunks_received: AtomicUsize, pub(crate) total_staked_chunks_received: AtomicUsize,
pub(crate) total_unstaked_chunks_received: AtomicUsize, pub(crate) total_unstaked_chunks_received: AtomicUsize,
@ -131,8 +132,10 @@ pub struct StreamStats {
pub(crate) total_packet_batches_none: AtomicUsize, pub(crate) total_packet_batches_none: AtomicUsize,
pub(crate) total_packets_sent_for_batching: AtomicUsize, pub(crate) total_packets_sent_for_batching: AtomicUsize,
pub(crate) total_bytes_sent_for_batching: AtomicUsize, pub(crate) total_bytes_sent_for_batching: AtomicUsize,
pub(crate) total_chunks_sent_for_batching: AtomicUsize,
pub(crate) total_packets_sent_to_consumer: AtomicUsize, pub(crate) total_packets_sent_to_consumer: AtomicUsize,
pub(crate) total_bytes_sent_to_consumer: AtomicUsize, pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
pub(crate) total_stream_read_errors: AtomicUsize, pub(crate) total_stream_read_errors: AtomicUsize,
pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) total_stream_read_timeouts: AtomicUsize,
pub(crate) num_evictions: AtomicUsize, pub(crate) num_evictions: AtomicUsize,
@ -254,6 +257,12 @@ impl StreamStats {
self.total_packets_allocated.swap(0, Ordering::Relaxed), self.total_packets_allocated.swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"packet_batches_allocated",
self.total_packet_batches_allocated
.swap(0, Ordering::Relaxed),
i64
),
( (
"packets_sent_for_batching", "packets_sent_for_batching",
self.total_packets_sent_for_batching self.total_packets_sent_for_batching
@ -266,6 +275,12 @@ impl StreamStats {
.swap(0, Ordering::Relaxed), .swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"chunks_sent_for_batching",
self.total_chunks_sent_for_batching
.swap(0, Ordering::Relaxed),
i64
),
( (
"packets_sent_to_consumer", "packets_sent_to_consumer",
self.total_packets_sent_to_consumer self.total_packets_sent_to_consumer
@ -277,6 +292,12 @@ impl StreamStats {
self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed), self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"chunks_processed_by_batcher",
self.total_chunks_processed_by_batcher
.swap(0, Ordering::Relaxed),
i64
),
( (
"chunks_received", "chunks_received",
self.total_chunks_received.swap(0, Ordering::Relaxed), self.total_chunks_received.swap(0, Ordering::Relaxed),