Add more datapoints in QUIC streamer (#24611)
This commit is contained in:
parent
46da7714bb
commit
564bd4b34b
|
@ -152,6 +152,7 @@ fn handle_chunk(
|
||||||
maybe_batch: &mut Option<PacketBatch>,
|
maybe_batch: &mut Option<PacketBatch>,
|
||||||
remote_addr: &SocketAddr,
|
remote_addr: &SocketAddr,
|
||||||
packet_sender: &Sender<PacketBatch>,
|
packet_sender: &Sender<PacketBatch>,
|
||||||
|
stats: Arc<StreamStats>,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
match chunk {
|
match chunk {
|
||||||
Ok(maybe_chunk) => {
|
Ok(maybe_chunk) => {
|
||||||
|
@ -161,9 +162,13 @@ fn handle_chunk(
|
||||||
|
|
||||||
// shouldn't happen, but sanity check the size and offsets
|
// shouldn't happen, but sanity check the size and offsets
|
||||||
if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 {
|
if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 {
|
||||||
|
stats.total_invalid_chunks.fetch_add(1, Ordering::Relaxed);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 {
|
if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 {
|
||||||
|
stats
|
||||||
|
.total_invalid_chunk_size
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,12 +179,16 @@ fn handle_chunk(
|
||||||
packet.meta.set_addr(remote_addr);
|
packet.meta.set_addr(remote_addr);
|
||||||
batch.packets.push(packet);
|
batch.packets.push(packet);
|
||||||
*maybe_batch = Some(batch);
|
*maybe_batch = Some(batch);
|
||||||
|
stats
|
||||||
|
.total_packets_allocated
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(batch) = maybe_batch.as_mut() {
|
if let Some(batch) = maybe_batch.as_mut() {
|
||||||
let end = chunk.offset as usize + chunk.bytes.len();
|
let end = chunk.offset as usize + chunk.bytes.len();
|
||||||
batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes);
|
batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes);
|
||||||
batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end);
|
batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end);
|
||||||
|
stats.total_chunks_received.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
trace!("chunk is none");
|
trace!("chunk is none");
|
||||||
|
@ -187,16 +196,29 @@ fn handle_chunk(
|
||||||
if let Some(batch) = maybe_batch.take() {
|
if let Some(batch) = maybe_batch.take() {
|
||||||
let len = batch.packets[0].meta.size;
|
let len = batch.packets[0].meta.size;
|
||||||
if let Err(e) = packet_sender.send(batch) {
|
if let Err(e) = packet_sender.send(batch) {
|
||||||
|
stats
|
||||||
|
.total_packet_batch_send_err
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
info!("send error: {}", e);
|
info!("send error: {}", e);
|
||||||
} else {
|
} else {
|
||||||
|
stats
|
||||||
|
.total_packet_batches_sent
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
trace!("sent {} byte packet", len);
|
trace!("sent {} byte packet", len);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
stats
|
||||||
|
.total_packet_batches_none
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Received stream error: {:?}", e);
|
debug!("Received stream error: {:?}", e);
|
||||||
|
stats
|
||||||
|
.total_stream_read_errors
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -306,6 +328,14 @@ struct StreamStats {
|
||||||
total_new_connections: AtomicUsize,
|
total_new_connections: AtomicUsize,
|
||||||
total_streams: AtomicUsize,
|
total_streams: AtomicUsize,
|
||||||
total_new_streams: AtomicUsize,
|
total_new_streams: AtomicUsize,
|
||||||
|
total_invalid_chunks: AtomicUsize,
|
||||||
|
total_invalid_chunk_size: AtomicUsize,
|
||||||
|
total_packets_allocated: AtomicUsize,
|
||||||
|
total_chunks_received: AtomicUsize,
|
||||||
|
total_packet_batch_send_err: AtomicUsize,
|
||||||
|
total_packet_batches_sent: AtomicUsize,
|
||||||
|
total_packet_batches_none: AtomicUsize,
|
||||||
|
total_stream_read_errors: AtomicUsize,
|
||||||
num_evictions: AtomicUsize,
|
num_evictions: AtomicUsize,
|
||||||
connection_add_failed: AtomicUsize,
|
connection_add_failed: AtomicUsize,
|
||||||
connection_setup_timeout: AtomicUsize,
|
connection_setup_timeout: AtomicUsize,
|
||||||
|
@ -350,6 +380,46 @@ impl StreamStats {
|
||||||
self.connection_setup_timeout.load(Ordering::Relaxed),
|
self.connection_setup_timeout.load(Ordering::Relaxed),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"invalid_chunk",
|
||||||
|
self.total_invalid_chunks.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"invalid_chunk_size",
|
||||||
|
self.total_invalid_chunk_size.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"packets_allocated",
|
||||||
|
self.total_packets_allocated.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"chunks_received",
|
||||||
|
self.total_chunks_received.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"packet_batch_send_error",
|
||||||
|
self.total_packet_batch_send_err.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"packet_batches_sent",
|
||||||
|
self.total_packet_batches_sent.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"packet_batch_empty",
|
||||||
|
self.total_packet_batches_none.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"stream_read_errors",
|
||||||
|
self.total_stream_read_errors.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -383,6 +453,7 @@ fn handle_connection(
|
||||||
&mut maybe_batch,
|
&mut maybe_batch,
|
||||||
&remote_addr,
|
&remote_addr,
|
||||||
&packet_sender,
|
&packet_sender,
|
||||||
|
stats.clone(),
|
||||||
) {
|
) {
|
||||||
last_update.store(timing::timestamp(), Ordering::Relaxed);
|
last_update.store(timing::timestamp(), Ordering::Relaxed);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue