move readable inside the loop of packets (#27916)
* Add recv_mmsg_exact function * update tests * address PR comments
This commit is contained in:
parent
3f5eec32cd
commit
63c00e7f5e
|
@ -9,6 +9,8 @@ use {
|
||||||
tokio::net::UdpSocket,
|
tokio::net::UdpSocket,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Pulls some packets from the socket into the specified container
|
||||||
|
/// returning how many packets were read
|
||||||
pub async fn recv_mmsg(
|
pub async fn recv_mmsg(
|
||||||
socket: &UdpSocket,
|
socket: &UdpSocket,
|
||||||
packets: &mut [Packet],
|
packets: &mut [Packet],
|
||||||
|
@ -36,6 +38,21 @@ pub async fn recv_mmsg(
|
||||||
Ok(i)
|
Ok(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reads the exact number of packets required to fill `packets`
|
||||||
|
pub async fn recv_mmsg_exact(
|
||||||
|
socket: &UdpSocket,
|
||||||
|
packets: &mut [Packet],
|
||||||
|
) -> io::Result</*num packets:*/ usize> {
|
||||||
|
let total = packets.len();
|
||||||
|
let mut remaining = total;
|
||||||
|
while remaining != 0 {
|
||||||
|
let first = total - remaining;
|
||||||
|
let res = recv_mmsg(socket, &mut packets[first..]).await?;
|
||||||
|
remaining -= res;
|
||||||
|
}
|
||||||
|
Ok(packets.len())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use {
|
use {
|
||||||
|
@ -63,8 +80,8 @@ mod tests {
|
||||||
sender.send_to(&data[..], &addr).await.unwrap();
|
sender.send_to(&data[..], &addr).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
let mut packets = vec![Packet::default(); sent];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(sent, recv);
|
assert_eq!(sent, recv);
|
||||||
for packet in packets.iter().take(recv) {
|
for packet in packets.iter().take(recv) {
|
||||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||||
|
@ -90,17 +107,18 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(TEST_NUM_MSGS, recv);
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
for packet in packets.iter().take(recv) {
|
for packet in packets.iter().take(recv) {
|
||||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packet.meta.socket_addr(), saddr);
|
assert_eq!(packet.meta.socket_addr(), saddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut packets = vec![Packet::default(); sent - TEST_NUM_MSGS];
|
||||||
packets
|
packets
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.for_each(|pkt| pkt.meta = Meta::default());
|
.for_each(|pkt| pkt.meta = Meta::default());
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(sent - TEST_NUM_MSGS, recv);
|
assert_eq!(sent - TEST_NUM_MSGS, recv);
|
||||||
for packet in packets.iter().take(recv) {
|
for packet in packets.iter().take(recv) {
|
||||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||||
|
@ -119,7 +137,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_recv_mmsg_multi_iter_timeout() {
|
async fn test_recv_mmsg_exact_multi_iter_timeout() {
|
||||||
let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
|
let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
|
||||||
let addr = reader.local_addr().unwrap();
|
let addr = reader.local_addr().unwrap();
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
|
||||||
|
@ -132,7 +150,7 @@ mod tests {
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(TEST_NUM_MSGS, recv);
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
for packet in packets.iter().take(recv) {
|
for packet in packets.iter().take(recv) {
|
||||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||||
|
|
|
@ -54,7 +54,7 @@ mod tests {
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
nonblocking::{
|
nonblocking::{
|
||||||
recvmmsg::recv_mmsg,
|
recvmmsg::{recv_mmsg, recv_mmsg_exact},
|
||||||
sendmmsg::{batch_send, multi_target_send},
|
sendmmsg::{batch_send, multi_target_send},
|
||||||
},
|
},
|
||||||
packet::Packet,
|
packet::Packet,
|
||||||
|
@ -81,7 +81,7 @@ mod tests {
|
||||||
assert_eq!(sent, Some(()));
|
assert_eq!(sent, Some(()));
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); 32];
|
let mut packets = vec![Packet::default(); 32];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(32, recv);
|
assert_eq!(32, recv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,12 +111,12 @@ mod tests {
|
||||||
let sent = batch_send(&sender, &packet_refs[..]).await.ok();
|
let sent = batch_send(&sender, &packet_refs[..]).await.ok();
|
||||||
assert_eq!(sent, Some(()));
|
assert_eq!(sent, Some(()));
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); 32];
|
let mut packets = vec![Packet::default(); 16];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(16, recv);
|
assert_eq!(16, recv);
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); 32];
|
let mut packets = vec![Packet::default(); 16];
|
||||||
let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
|
let recv = recv_mmsg_exact(&reader2, &mut packets[..]).await.unwrap();
|
||||||
assert_eq!(16, recv);
|
assert_eq!(16, recv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue