From 63c00e7f5e20f8393044c864362f0f88105adb67 Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Wed, 5 Oct 2022 22:20:26 +0200 Subject: [PATCH] move readable inside the loop of packets (#27916) * Add recv_mmsg_exact function * update tests * address PR comments --- streamer/src/nonblocking/recvmmsg.rs | 30 ++++++++++++++++++++++------ streamer/src/nonblocking/sendmmsg.rs | 12 +++++------ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index df2b08ff2c..794b9d72e0 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -9,6 +9,8 @@ use { tokio::net::UdpSocket, }; +/// Pulls some packets from the socket into the specified container +/// returning how many packets were read pub async fn recv_mmsg( socket: &UdpSocket, packets: &mut [Packet], @@ -36,6 +38,21 @@ pub async fn recv_mmsg( Ok(i) } +/// Reads the exact number of packets required to fill `packets` +pub async fn recv_mmsg_exact( + socket: &UdpSocket, + packets: &mut [Packet], +) -> io::Result { + let total = packets.len(); + let mut remaining = total; + while remaining != 0 { + let first = total - remaining; + let res = recv_mmsg(socket, &mut packets[first..]).await?; + remaining -= res; + } + Ok(packets.len()) +} + #[cfg(test)] mod tests { use { @@ -63,8 +80,8 @@ mod tests { sender.send_to(&data[..], &addr).await.unwrap(); } - let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let mut packets = vec![Packet::default(); sent]; + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(sent, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -90,17 +107,18 @@ mod tests { } let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); assert_eq!(packet.meta.socket_addr(), saddr); } + let mut packets = vec![Packet::default(); sent - TEST_NUM_MSGS]; packets .iter_mut() .for_each(|pkt| pkt.meta = Meta::default()); - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(sent - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -119,7 +137,7 @@ mod tests { } #[tokio::test] - async fn test_recv_mmsg_multi_iter_timeout() { + async fn test_recv_mmsg_exact_multi_iter_timeout() { let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); @@ -132,7 +150,7 @@ mod tests { let start = Instant::now(); let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); diff --git a/streamer/src/nonblocking/sendmmsg.rs b/streamer/src/nonblocking/sendmmsg.rs index 8721937e25..3c8d608300 100644 --- a/streamer/src/nonblocking/sendmmsg.rs +++ b/streamer/src/nonblocking/sendmmsg.rs @@ -54,7 +54,7 @@ mod tests { use { crate::{ nonblocking::{ - recvmmsg::recv_mmsg, + recvmmsg::{recv_mmsg, recv_mmsg_exact}, sendmmsg::{batch_send, multi_target_send}, }, packet::Packet, @@ -81,7 +81,7 @@ mod tests { assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(32, recv); } @@ -111,12 +111,12 @@ mod tests { let sent = batch_send(&sender, &packet_refs[..]).await.ok(); assert_eq!(sent, Some(())); - let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); + let mut packets = vec![Packet::default(); 16]; + let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(16, recv); - let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap(); + let mut packets = vec![Packet::default(); 16]; + let recv = recv_mmsg_exact(&reader2, &mut packets[..]).await.unwrap(); assert_eq!(16, recv); }