recvmmsg IPv6 awareness (#18957)
This commit is contained in:
parent
d8984cb0f4
commit
0b7ed18cfa
|
@ -35,21 +35,25 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz
|
|||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
#[allow(clippy::uninit_assumed_init)]
|
||||
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
||||
use libc::{
|
||||
c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, timespec, MSG_WAITFORONE,
|
||||
c_void, iovec, mmsghdr, recvmmsg, sa_family_t, sockaddr_in, sockaddr_in6, sockaddr_storage,
|
||||
socklen_t, timespec, AF_INET, AF_INET6, MSG_WAITFORONE,
|
||||
};
|
||||
use nix::sys::socket::InetAddr;
|
||||
use std::mem;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
const SOCKADDR_STORAGE_SIZE: socklen_t = mem::size_of::<sockaddr_storage>() as socklen_t;
|
||||
const SOCKADDR_IN_SIZE: socklen_t = mem::size_of::<sockaddr_in>() as socklen_t;
|
||||
const SOCKADDR_IN6_SIZE: socklen_t = mem::size_of::<sockaddr_in6>() as socklen_t;
|
||||
|
||||
let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||
let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||
let mut addr: [sockaddr_in; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||
let addrlen = mem::size_of_val(&addr) as socklen_t;
|
||||
let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::MaybeUninit::uninit().assume_init() };
|
||||
let mut addr: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||
|
||||
let sock_fd = sock.as_raw_fd();
|
||||
|
||||
let count = cmp::min(iovs.len(), packets.len());
|
||||
|
||||
for i in 0..count {
|
||||
|
@ -57,7 +61,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize,
|
|||
iovs[i].iov_len = packets[i].data.len();
|
||||
|
||||
hdrs[i].msg_hdr.msg_name = &mut addr[i] as *mut _ as *mut _;
|
||||
hdrs[i].msg_hdr.msg_namelen = addrlen;
|
||||
hdrs[i].msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE;
|
||||
hdrs[i].msg_hdr.msg_iov = &mut iovs[i];
|
||||
hdrs[i].msg_hdr.msg_iovlen = 1;
|
||||
}
|
||||
|
@ -71,14 +75,32 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize,
|
|||
match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } {
|
||||
-1 => return Err(io::Error::last_os_error()),
|
||||
n => {
|
||||
let mut pkt_idx: usize = 0;
|
||||
for i in 0..n as usize {
|
||||
let mut p = &mut packets[i];
|
||||
let inet_addr = if addr[i].ss_family == AF_INET as sa_family_t
|
||||
&& hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN_SIZE
|
||||
{
|
||||
let a: *const sockaddr_in = &addr[i] as *const _ as *const _;
|
||||
unsafe { InetAddr::V4(*a) }
|
||||
} else if addr[i].ss_family == AF_INET6 as sa_family_t
|
||||
&& hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE
|
||||
{
|
||||
let a: *const sockaddr_in6 = &addr[i] as *const _ as *const _;
|
||||
unsafe { InetAddr::V6(*a) }
|
||||
} else {
|
||||
error!(
|
||||
"recvmmsg unexpected ss_family:{} msg_namelen:{}",
|
||||
addr[i].ss_family, hdrs[i].msg_hdr.msg_namelen
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let mut p = &mut packets[pkt_idx];
|
||||
p.meta.size = hdrs[i].msg_len as usize;
|
||||
total_size += p.meta.size;
|
||||
let inet_addr = InetAddr::V4(addr[i]);
|
||||
p.meta.set_addr(&inet_addr.to_std());
|
||||
pkt_idx += 1;
|
||||
}
|
||||
n as usize
|
||||
pkt_idx
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -89,55 +111,76 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize,
|
|||
mod tests {
|
||||
use crate::packet::PACKET_DATA_SIZE;
|
||||
use crate::recvmmsg::*;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
|
||||
|
||||
fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
|
||||
let reader = UdpSocket::bind(ip_str)?;
|
||||
let addr = reader.local_addr()?;
|
||||
let sender = UdpSocket::bind(ip_str)?;
|
||||
let saddr = sender.local_addr()?;
|
||||
Ok((reader, addr, sender, saddr))
|
||||
}
|
||||
|
||||
const TEST_NUM_MSGS: usize = 32;
|
||||
#[test]
|
||||
pub fn test_recv_mmsg_one_iter() {
|
||||
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let addr = reader.local_addr().unwrap();
|
||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let saddr = sender.local_addr().unwrap();
|
||||
let sent = TEST_NUM_MSGS - 1;
|
||||
for _ in 0..sent {
|
||||
let data = [0; PACKET_DATA_SIZE];
|
||||
sender.send_to(&data[..], &addr).unwrap();
|
||||
}
|
||||
let test_one_iter = |(reader, addr, sender, saddr): TestConfig| {
|
||||
let sent = TEST_NUM_MSGS - 1;
|
||||
for _ in 0..sent {
|
||||
let data = [0; PACKET_DATA_SIZE];
|
||||
sender.send_to(&data[..], &addr).unwrap();
|
||||
}
|
||||
|
||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(sent, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(sent, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
}
|
||||
};
|
||||
|
||||
test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
|
||||
|
||||
match test_setup_reader_sender("::1:0") {
|
||||
Ok(config) => test_one_iter(config),
|
||||
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_recv_mmsg_multi_iter() {
|
||||
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let addr = reader.local_addr().unwrap();
|
||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let saddr = sender.local_addr().unwrap();
|
||||
let sent = TEST_NUM_MSGS + 10;
|
||||
for _ in 0..sent {
|
||||
let data = [0; PACKET_DATA_SIZE];
|
||||
sender.send_to(&data[..], &addr).unwrap();
|
||||
}
|
||||
let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| {
|
||||
let sent = TEST_NUM_MSGS + 10;
|
||||
for _ in 0..sent {
|
||||
let data = [0; PACKET_DATA_SIZE];
|
||||
sender.send_to(&data[..], &addr).unwrap();
|
||||
}
|
||||
|
||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(TEST_NUM_MSGS, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
}
|
||||
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(TEST_NUM_MSGS, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
}
|
||||
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(sent - TEST_NUM_MSGS, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||
assert_eq!(sent - TEST_NUM_MSGS, recv);
|
||||
for packet in packets.iter().take(recv) {
|
||||
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
|
||||
assert_eq!(packet.meta.addr(), saddr);
|
||||
}
|
||||
};
|
||||
|
||||
test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
|
||||
|
||||
match test_setup_reader_sender("::1:0") {
|
||||
Ok(config) => test_multi_iter(config),
|
||||
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue