From 0b7ed18cfa4512f89ebef6289243db2af5e8663f Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Tue, 3 Aug 2021 20:35:50 -0700 Subject: [PATCH] recvmmsg IPv6 awareness (#18957) --- streamer/src/recvmmsg.rs | 133 ++++++++++++++++++++++++++------------- 1 file changed, 88 insertions(+), 45 deletions(-) diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index fb6da391e..32b2f4c4e 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -35,21 +35,25 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz } #[cfg(target_os = "linux")] +#[allow(clippy::uninit_assumed_init)] pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { use libc::{ - c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, timespec, MSG_WAITFORONE, + c_void, iovec, mmsghdr, recvmmsg, sa_family_t, sockaddr_in, sockaddr_in6, sockaddr_storage, + socklen_t, timespec, AF_INET, AF_INET6, MSG_WAITFORONE, }; use nix::sys::socket::InetAddr; use std::mem; use std::os::unix::io::AsRawFd; + const SOCKADDR_STORAGE_SIZE: socklen_t = mem::size_of::() as socklen_t; + const SOCKADDR_IN_SIZE: socklen_t = mem::size_of::() as socklen_t; + const SOCKADDR_IN6_SIZE: socklen_t = mem::size_of::() as socklen_t; + let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; - let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; - let mut addr: [sockaddr_in; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; - let addrlen = mem::size_of_val(&addr) as socklen_t; + let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::MaybeUninit::uninit().assume_init() }; + let mut addr: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; let sock_fd = sock.as_raw_fd(); - let count = cmp::min(iovs.len(), packets.len()); for i in 0..count { @@ -57,7 +61,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, iovs[i].iov_len = packets[i].data.len(); hdrs[i].msg_hdr.msg_name = &mut addr[i] as *mut _ as *mut _; - hdrs[i].msg_hdr.msg_namelen = addrlen; + hdrs[i].msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE; hdrs[i].msg_hdr.msg_iov = &mut iovs[i]; hdrs[i].msg_hdr.msg_iovlen = 1; } @@ -71,14 +75,32 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } { -1 => return Err(io::Error::last_os_error()), n => { + let mut pkt_idx: usize = 0; for i in 0..n as usize { - let mut p = &mut packets[i]; + let inet_addr = if addr[i].ss_family == AF_INET as sa_family_t + && hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN_SIZE + { + let a: *const sockaddr_in = &addr[i] as *const _ as *const _; + unsafe { InetAddr::V4(*a) } + } else if addr[i].ss_family == AF_INET6 as sa_family_t + && hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE + { + let a: *const sockaddr_in6 = &addr[i] as *const _ as *const _; + unsafe { InetAddr::V6(*a) } + } else { + error!( + "recvmmsg unexpected ss_family:{} msg_namelen:{}", + addr[i].ss_family, hdrs[i].msg_hdr.msg_namelen + ); + continue; + }; + let mut p = &mut packets[pkt_idx]; p.meta.size = hdrs[i].msg_len as usize; total_size += p.meta.size; - let inet_addr = InetAddr::V4(addr[i]); p.meta.set_addr(&inet_addr.to_std()); + pkt_idx += 1; } - n as usize + pkt_idx } }; @@ -89,55 +111,76 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, mod tests { use crate::packet::PACKET_DATA_SIZE; use crate::recvmmsg::*; + use std::net::{SocketAddr, UdpSocket}; use std::time::{Duration, Instant}; + type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr); + + fn test_setup_reader_sender(ip_str: &str) -> io::Result { + let reader = UdpSocket::bind(ip_str)?; + let addr = reader.local_addr()?; + let sender = UdpSocket::bind(ip_str)?; + let saddr = sender.local_addr()?; + Ok((reader, addr, sender, saddr)) + } + const TEST_NUM_MSGS: usize = 32; #[test] pub fn test_recv_mmsg_one_iter() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let saddr = sender.local_addr().unwrap(); - let sent = TEST_NUM_MSGS - 1; - for _ in 0..sent { - let data = [0; PACKET_DATA_SIZE]; - sender.send_to(&data[..], &addr).unwrap(); - } + let test_one_iter = |(reader, addr, sender, saddr): TestConfig| { + let sent = TEST_NUM_MSGS - 1; + for _ in 0..sent { + let data = [0; PACKET_DATA_SIZE]; + sender.send_to(&data[..], &addr).unwrap(); + } - let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(sent, recv); - for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + assert_eq!(sent, recv); + for packet in packets.iter().take(recv) { + assert_eq!(packet.meta.size, PACKET_DATA_SIZE); + assert_eq!(packet.meta.addr(), saddr); + } + }; + + test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap()); + + match test_setup_reader_sender("::1:0") { + Ok(config) => test_one_iter(config), + Err(e) => warn!("Failed to configure IPv6: {:?}", e), } } #[test] pub fn test_recv_mmsg_multi_iter() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let saddr = sender.local_addr().unwrap(); - let sent = TEST_NUM_MSGS + 10; - for _ in 0..sent { - let data = [0; PACKET_DATA_SIZE]; - sender.send_to(&data[..], &addr).unwrap(); - } + let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| { + let sent = TEST_NUM_MSGS + 10; + for _ in 0..sent { + let data = [0; PACKET_DATA_SIZE]; + sender.send_to(&data[..], &addr).unwrap(); + } - let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(TEST_NUM_MSGS, recv); - for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); - } + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + assert_eq!(TEST_NUM_MSGS, recv); + for packet in packets.iter().take(recv) { + assert_eq!(packet.meta.size, PACKET_DATA_SIZE); + assert_eq!(packet.meta.addr(), saddr); + } - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(sent - TEST_NUM_MSGS, recv); - for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + assert_eq!(sent - TEST_NUM_MSGS, recv); + for packet in packets.iter().take(recv) { + assert_eq!(packet.meta.size, PACKET_DATA_SIZE); + assert_eq!(packet.meta.addr(), saddr); + } + }; + + test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap()); + + match test_setup_reader_sender("::1:0") { + Ok(config) => test_multi_iter(config), + Err(e) => warn!("Failed to configure IPv6: {:?}", e), } }