diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 5b35db05f9..cb1b678ac7 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -152,6 +152,7 @@ fn handle_chunk( maybe_batch: &mut Option, remote_addr: &SocketAddr, packet_sender: &Sender, + stats: Arc, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -161,9 +162,13 @@ fn handle_chunk( // shouldn't happen, but sanity check the size and offsets if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 { + stats.total_invalid_chunks.fetch_add(1, Ordering::Relaxed); return true; } if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 { + stats + .total_invalid_chunk_size + .fetch_add(1, Ordering::Relaxed); return true; } @@ -174,12 +179,16 @@ fn handle_chunk( packet.meta.set_addr(remote_addr); batch.packets.push(packet); *maybe_batch = Some(batch); + stats + .total_packets_allocated + .fetch_add(1, Ordering::Relaxed); } if let Some(batch) = maybe_batch.as_mut() { let end = chunk.offset as usize + chunk.bytes.len(); batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end); + stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); } } else { trace!("chunk is none"); @@ -187,16 +196,29 @@ fn handle_chunk( if let Some(batch) = maybe_batch.take() { let len = batch.packets[0].meta.size; if let Err(e) = packet_sender.send(batch) { + stats + .total_packet_batch_send_err + .fetch_add(1, Ordering::Relaxed); info!("send error: {}", e); } else { + stats + .total_packet_batches_sent + .fetch_add(1, Ordering::Relaxed); trace!("sent {} byte packet", len); } + } else { + stats + .total_packet_batches_none + .fetch_add(1, Ordering::Relaxed); } return true; } } Err(e) => { debug!("Received stream error: {:?}", e); + stats + .total_stream_read_errors + .fetch_add(1, Ordering::Relaxed); return true; } } @@ -306,6 +328,14 @@ struct StreamStats { total_new_connections: AtomicUsize, total_streams: AtomicUsize, total_new_streams: AtomicUsize, + total_invalid_chunks: AtomicUsize, + total_invalid_chunk_size: AtomicUsize, + total_packets_allocated: AtomicUsize, + total_chunks_received: AtomicUsize, + total_packet_batch_send_err: AtomicUsize, + total_packet_batches_sent: AtomicUsize, + total_packet_batches_none: AtomicUsize, + total_stream_read_errors: AtomicUsize, num_evictions: AtomicUsize, connection_add_failed: AtomicUsize, connection_setup_timeout: AtomicUsize, @@ -350,6 +380,46 @@ impl StreamStats { self.connection_setup_timeout.load(Ordering::Relaxed), i64 ), + ( + "invalid_chunk", + self.total_invalid_chunks.load(Ordering::Relaxed), + i64 + ), + ( + "invalid_chunk_size", + self.total_invalid_chunk_size.load(Ordering::Relaxed), + i64 + ), + ( + "packets_allocated", + self.total_packets_allocated.load(Ordering::Relaxed), + i64 + ), + ( + "chunks_received", + self.total_chunks_received.load(Ordering::Relaxed), + i64 + ), + ( + "packet_batch_send_error", + self.total_packet_batch_send_err.load(Ordering::Relaxed), + i64 + ), + ( + "packet_batches_sent", + self.total_packet_batches_sent.load(Ordering::Relaxed), + i64 + ), + ( + "packet_batch_empty", + self.total_packet_batches_none.load(Ordering::Relaxed), + i64 + ), + ( + "stream_read_errors", + self.total_stream_read_errors.load(Ordering::Relaxed), + i64 + ), ); } } @@ -383,6 +453,7 @@ fn handle_connection( &mut maybe_batch, &remote_addr, &packet_sender, + stats.clone(), ) { last_update.store(timing::timestamp(), Ordering::Relaxed); break;