Speed up UDP reachable port checks

This commit is contained in:
Michael Vines 2020-12-29 21:19:27 -08:00
parent fb6c660cfd
commit 71b88da48e
1 changed files with 42 additions and 21 deletions

View File

@ -1,13 +1,17 @@
//! The `net_utils` module assists with networking //! The `net_utils` module assists with networking
use log::*; use {
use rand::{thread_rng, Rng}; log::*,
use socket2::{Domain, SockAddr, Socket, Type}; rand::{thread_rng, Rng},
use std::collections::{BTreeMap, BTreeSet}; socket2::{Domain, SockAddr, Socket, Type},
use std::io::{self, Read, Write}; std::{
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; collections::{BTreeMap, HashSet},
use std::sync::mpsc::channel; io::{self, Read, Write},
use std::time::Duration; net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
use url::Url; sync::{mpsc::channel, Arc, RwLock},
time::{Duration, Instant},
},
url::Url,
};
mod ip_echo_server; mod ip_echo_server;
use ip_echo_server::IpEchoServerMessage; use ip_echo_server::IpEchoServerMessage;
@ -204,21 +208,38 @@ fn do_verify_reachable_ports(
.map_err(|err| warn!("ip_echo_server request failed: {}", err)); .map_err(|err| warn!("ip_echo_server request failed: {}", err));
// Spawn threads at once! // Spawn threads at once!
let reachable_ports = Arc::new(RwLock::new(HashSet::new()));
let thread_handles: Vec<_> = checked_socket_iter let thread_handles: Vec<_> = checked_socket_iter
.map(|udp_socket| { .map(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port(); let port = udp_socket.local_addr().unwrap().port();
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket"); let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
let reachable_ports = reachable_ports.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
let mut buf = [0; 1]; let start = Instant::now();
let original_read_timeout = udp_socket.read_timeout().unwrap(); let original_read_timeout = udp_socket.read_timeout().unwrap();
udp_socket.set_read_timeout(Some(timeout)).unwrap(); udp_socket
let recv_result = udp_socket.recv(&mut buf); .set_read_timeout(Some(Duration::from_millis(250)))
debug!( .unwrap();
"Waited for incoming datagram on udp/{}: {:?}", loop {
port, recv_result if reachable_ports.read().unwrap().contains(&port)
); || Instant::now().duration_since(start) >= timeout
{
break;
}
let recv_result = udp_socket.recv(&mut [0; 1]);
debug!(
"Waited for incoming datagram on udp/{}: {:?}",
port, recv_result
);
if recv_result.is_ok() {
reachable_ports.write().unwrap().insert(port);
break;
}
}
udp_socket.set_read_timeout(original_read_timeout).unwrap(); udp_socket.set_read_timeout(original_read_timeout).unwrap();
recv_result.map(|_| port).ok()
}) })
}) })
.collect(); .collect();
@ -227,11 +248,11 @@ fn do_verify_reachable_ports(
// Separate from the above by collect()-ing as an intermediately step to make the iterator // Separate from the above by collect()-ing as an intermediately step to make the iterator
// eager not lazy so that joining happens here at once after creating bunch of threads // eager not lazy so that joining happens here at once after creating bunch of threads
// at once. // at once.
let reachable_ports: BTreeSet<_> = thread_handles for thread in thread_handles {
.into_iter() thread.join().unwrap();
.filter_map(|t| t.join().unwrap()) }
.collect();
let reachable_ports = reachable_ports.read().unwrap().clone();
if reachable_ports.len() == checked_ports.len() { if reachable_ports.len() == checked_ports.len() {
info!( info!(
"checked udp ports: {:?}, reachable udp ports: {:?}", "checked udp ports: {:?}, reachable udp ports: {:?}",