Quic server log data rate (#30892)

* Add more statistics to better track incoming data rate to the Quic server
This commit is contained in:
ryleung-solana 2023-03-28 15:33:40 +08:00 committed by GitHub
parent 7fbd9e526a
commit 92189d82b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 6 deletions

View File

@ -269,9 +269,8 @@ enum ConnectionHandlerError {
}
struct NewConnectionHandlerParams {
// In principle, the code should work as-is if we replaced this with a crossbeam channel
// as the server code never does a blocking send (as the channel's unbounded)
// or a blocking recv (as we always use try_recv)
// In principle, the code can be made to work with a crossbeam channel
// as long as we're careful never to use a blocking recv or send call
// but I've found that it's simply too easy to accidentally block
// in async code when using the crossbeam channel, so for the sake of maintainability,
// we're sticking with an async channel
@ -577,6 +576,7 @@ async fn packet_batch_sender(
let mut batch_start_time = Instant::now();
loop {
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;
stats
.total_packets_allocated
@ -604,6 +604,10 @@ async fn packet_batch_sender(
.total_packets_sent_to_consumer
.fetch_add(len, Ordering::Relaxed);
stats
.total_bytes_sent_to_consumer
.fetch_add(total_bytes, Ordering::Relaxed);
trace!("Sent {} packet batch", len);
}
break;
@ -612,6 +616,11 @@ async fn packet_batch_sender(
let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await;
if let Ok(Ok(packet_accumulator)) = timeout_res {
// Start the timeout from when the packet batch first becomes non-empty
if packet_batch.is_empty() {
batch_start_time = Instant::now();
}
unsafe {
packet_batch.set_len(packet_batch.len() + 1);
}
@ -623,9 +632,7 @@ async fn packet_batch_sender(
.copy_from_slice(&chunk.bytes);
}
if packet_batch.len() == 1 {
batch_start_time = Instant::now();
}
total_bytes += packet_batch[i].meta().size;
}
}
}
@ -827,6 +834,9 @@ async fn handle_chunk(
stats
.total_packets_sent_for_batching
.fetch_add(1, Ordering::Relaxed);
stats
.total_bytes_sent_for_batching
.fetch_add(len, Ordering::Relaxed);
trace!("sent {} byte packet for batching", len);
}
} else {

View File

@ -130,7 +130,9 @@ pub struct StreamStats {
pub(crate) total_packet_batches_sent: AtomicUsize,
pub(crate) total_packet_batches_none: AtomicUsize,
pub(crate) total_packets_sent_for_batching: AtomicUsize,
pub(crate) total_bytes_sent_for_batching: AtomicUsize,
pub(crate) total_packets_sent_to_consumer: AtomicUsize,
pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
pub(crate) total_stream_read_errors: AtomicUsize,
pub(crate) total_stream_read_timeouts: AtomicUsize,
pub(crate) num_evictions: AtomicUsize,
@ -258,12 +260,23 @@ impl StreamStats {
.swap(0, Ordering::Relaxed),
i64
),
(
"bytes_sent_for_batching",
self.total_bytes_sent_for_batching
.swap(0, Ordering::Relaxed),
i64
),
(
"packets_sent_to_consumer",
self.total_packets_sent_to_consumer
.swap(0, Ordering::Relaxed),
i64
),
(
"bytes_sent_to_consumer",
self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
i64
),
(
"chunks_received",
self.total_chunks_received.swap(0, Ordering::Relaxed),