diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b1bacfd98..cf378cbce 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2818,8 +2818,7 @@ mod tests { .unwrap(); let mut packets = vec![Packet::default(); 2]; - let (_, num_received) = - recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); assert_eq!(num_received, expected_num_forwarded, "{}", name); } @@ -2918,8 +2917,7 @@ mod tests { .unwrap(); let mut packets = vec![Packet::default(); 2]; - let (_, num_received) = - recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); assert_eq!(num_received, expected_ids.len(), "{}", name); for (i, expected_id) in expected_ids.iter().enumerate() { assert_eq!(packets[i].meta.size, 1); diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index b0abe551a..018fae453 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -41,7 +41,7 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) trace!("recv_from err {:?}", e); return Err(e); } - Ok((_, npkts)) => { + Ok(npkts) => { if i == 0 { socket.set_nonblocking(true)?; } diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 4437e43d6..ec734f1d0 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -14,10 +14,9 @@ use { }; #[cfg(not(target_os = "linux"))] -pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { +pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { let mut i = 0; let count = cmp::min(NUM_RCVMMSGS, packets.len()); - let mut total_size = 0; for p in packets.iter_mut().take(count) { p.meta.size = 0; match socket.recv_from(&mut p.data) { @@ -28,7 +27,6 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz return Err(e); } Ok((nrecv, from)) => { - total_size += nrecv; p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { @@ -38,7 +36,7 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz } i += 1; } - Ok((total_size, i)) + Ok(i) } #[cfg(target_os = "linux")] @@ -67,7 +65,7 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option #[cfg(target_os = "linux")] #[allow(clippy::uninit_assumed_init)] -pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { +pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; @@ -99,7 +97,6 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, return Err(io::Error::last_os_error()); } let mut npkts = 0; - let mut total_size = 0; izip!(addrs, hdrs, packets.iter_mut()) .take(nrecv as usize) @@ -111,9 +108,8 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, pkt.meta.size = hdr.msg_len as usize; pkt.meta.set_addr(&addr); npkts += 1; - total_size += pkt.meta.size; }); - Ok((total_size, npkts)) + Ok(npkts) } #[cfg(test)] @@ -147,7 +143,7 @@ mod tests { } let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(sent, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -173,14 +169,14 @@ mod tests { } let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); assert_eq!(packet.meta.addr(), saddr); } - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(sent - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -212,7 +208,7 @@ mod tests { let start = Instant::now(); let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -249,7 +245,7 @@ mod tests { let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(sent1) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); @@ -260,7 +256,7 @@ mod tests { assert_eq!(packet.meta.addr(), saddr2); } - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 6e434f5ed..47abcc0af 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -175,7 +175,7 @@ mod tests { assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(32, recv); } @@ -206,11 +206,11 @@ mod tests { assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(16, recv); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap(); assert_eq!(16, recv); } @@ -241,19 +241,19 @@ mod tests { assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); assert_eq!(1, recv); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap(); assert_eq!(1, recv); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader3, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader3, &mut packets[..]).unwrap(); assert_eq!(1, recv); let mut packets = vec![Packet::default(); 32]; - let recv = recv_mmsg(&reader4, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader4, &mut packets[..]).unwrap(); assert_eq!(1, recv); } diff --git a/streamer/tests/recvmmsg.rs b/streamer/tests/recvmmsg.rs index 614dc16ee..515f97593 100644 --- a/streamer/tests/recvmmsg.rs +++ b/streamer/tests/recvmmsg.rs @@ -25,7 +25,7 @@ pub fn test_recv_mmsg_batch_size() { } let mut packets = vec![Packet::default(); TEST_BATCH_SIZE]; let now = Instant::now(); - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); elapsed_in_max_batch += now.elapsed().as_nanos(); assert_eq!(TEST_BATCH_SIZE, recv); }); @@ -40,7 +40,7 @@ pub fn test_recv_mmsg_batch_size() { let mut recv = 0; let now = Instant::now(); while let Ok(num) = recv_mmsg(&reader, &mut packets[..]) { - recv += num.1; + recv += num; if recv >= TEST_BATCH_SIZE { break; }