Fix udp port check retry and check all udp ports (#10385)
* Don't start if udp port is really closed * Fully check all udp ports * Remove test code....... * Add tests and adjust impl a bit * Add comment * Move comment a bit * Move a bit * clean ups
This commit is contained in:
parent
0d38257dbb
commit
a39df7ee5d
|
@ -7,10 +7,12 @@ use tokio_codec::{BytesCodec, Decoder};
|
|||
|
||||
pub type IpEchoServer = Runtime;
|
||||
|
||||
pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;
|
||||
|
||||
#[derive(Serialize, Deserialize, Default)]
|
||||
pub(crate) struct IpEchoServerMessage {
|
||||
tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
|
||||
udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
|
||||
tcp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
|
||||
udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
|
||||
}
|
||||
|
||||
impl IpEchoServerMessage {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use socket2::{Domain, SockAddr, Socket, Type};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
|
||||
use std::sync::mpsc::channel;
|
||||
|
@ -9,7 +10,7 @@ use std::time::Duration;
|
|||
|
||||
mod ip_echo_server;
|
||||
use ip_echo_server::IpEchoServerMessage;
|
||||
pub use ip_echo_server::{ip_echo_server, IpEchoServer};
|
||||
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
|
||||
|
||||
/// A data type representing a public Udp socket
|
||||
pub struct UdpSocketPair {
|
||||
|
@ -92,34 +93,36 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, St
|
|||
|
||||
// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
|
||||
// `ip_echo_server_addr`
|
||||
pub fn verify_reachable_ports(
|
||||
const DEFAULT_TIMEOUT_SECS: u64 = 5;
|
||||
const DEFAULT_RETRY_COUNT: usize = 5;
|
||||
|
||||
fn do_verify_reachable_ports(
|
||||
ip_echo_server_addr: &SocketAddr,
|
||||
tcp_listeners: Vec<(u16, TcpListener)>,
|
||||
udp_sockets: &[&UdpSocket],
|
||||
timeout: u64,
|
||||
udp_retry_count: usize,
|
||||
) -> bool {
|
||||
let udp_ports: Vec<_> = udp_sockets
|
||||
.iter()
|
||||
.map(|udp_socket| udp_socket.local_addr().unwrap().port())
|
||||
.collect();
|
||||
|
||||
info!(
|
||||
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
|
||||
tcp_listeners, udp_ports, ip_echo_server_addr
|
||||
"Checking that tcp ports {:?} from {:?}",
|
||||
tcp_listeners, ip_echo_server_addr
|
||||
);
|
||||
|
||||
let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
|
||||
let _ = ip_echo_server_request(
|
||||
ip_echo_server_addr,
|
||||
IpEchoServerMessage::new(&tcp_ports, &udp_ports),
|
||||
IpEchoServerMessage::new(&tcp_ports, &[]),
|
||||
)
|
||||
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
||||
|
||||
let mut ok = true;
|
||||
let timeout = Duration::from_secs(timeout);
|
||||
|
||||
// Wait for a connection to open on each TCP port
|
||||
for (port, tcp_listener) in tcp_listeners {
|
||||
let (sender, receiver) = channel();
|
||||
std::thread::spawn(move || {
|
||||
let listening_addr = tcp_listener.local_addr().unwrap();
|
||||
let thread_handle = std::thread::spawn(move || {
|
||||
debug!("Waiting for incoming connection on tcp/{}", port);
|
||||
match tcp_listener.incoming().next() {
|
||||
Some(_) => sender
|
||||
|
@ -128,7 +131,7 @@ pub fn verify_reachable_ports(
|
|||
None => warn!("tcp incoming failed"),
|
||||
}
|
||||
});
|
||||
match receiver.recv_timeout(Duration::from_secs(5)) {
|
||||
match receiver.recv_timeout(timeout) {
|
||||
Ok(_) => {
|
||||
info!("tcp/{} is reachable", port);
|
||||
}
|
||||
|
@ -137,9 +140,16 @@ pub fn verify_reachable_ports(
|
|||
"Received no response at tcp/{}, check your port configuration: {}",
|
||||
port, err
|
||||
);
|
||||
// Ugh, std rustc doesn't provide acceptng with timeout or restoring original
|
||||
// nonblocking-status of sockets because of lack of getter, only the setter...
|
||||
// So, to close the thread cleanly, just connect from here.
|
||||
// ref: https://github.com/rust-lang/rust/issues/31615
|
||||
TcpStream::connect_timeout(&listening_addr, timeout).unwrap();
|
||||
ok = false;
|
||||
}
|
||||
}
|
||||
// ensure to reap the thread
|
||||
thread_handle.join().unwrap();
|
||||
}
|
||||
|
||||
if !ok {
|
||||
|
@ -147,51 +157,110 @@ pub fn verify_reachable_ports(
|
|||
return ok;
|
||||
}
|
||||
|
||||
for _udp_retries in 0..5 {
|
||||
// Wait for a datagram to arrive at each UDP port
|
||||
for udp_socket in udp_sockets {
|
||||
let port = udp_socket.local_addr().unwrap().port();
|
||||
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
|
||||
let (sender, receiver) = channel();
|
||||
std::thread::spawn(move || {
|
||||
let mut buf = [0; 1];
|
||||
debug!("Waiting for incoming datagram on udp/{}", port);
|
||||
match udp_socket.recv(&mut buf) {
|
||||
Ok(_) => sender
|
||||
.send(())
|
||||
.unwrap_or_else(|err| warn!("send failure: {}", err)),
|
||||
Err(err) => warn!("udp recv failure: {}", err),
|
||||
}
|
||||
});
|
||||
match receiver.recv_timeout(Duration::from_secs(5)) {
|
||||
Ok(_) => {
|
||||
info!("udp/{} is reachable", port);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Received no response at udp/{}, check your port configuration: {}",
|
||||
port, err
|
||||
);
|
||||
ok = false;
|
||||
}
|
||||
let mut udp_ports: BTreeMap<_, _> = BTreeMap::new();
|
||||
udp_sockets.iter().for_each(|udp_socket| {
|
||||
let port = udp_socket.local_addr().unwrap().port();
|
||||
udp_ports
|
||||
.entry(port)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(udp_socket);
|
||||
});
|
||||
let udp_ports: Vec<_> = udp_ports.into_iter().collect();
|
||||
|
||||
info!(
|
||||
"Checking that udp ports {:?} are reachable from {:?}",
|
||||
udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
|
||||
ip_echo_server_addr
|
||||
);
|
||||
|
||||
'outer: for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) {
|
||||
ok = false;
|
||||
|
||||
for udp_remaining_retry in (0_usize..udp_retry_count).rev() {
|
||||
let (checked_ports, checked_socket_iter) = (
|
||||
checked_ports_and_sockets
|
||||
.iter()
|
||||
.map(|(port, _)| *port)
|
||||
.collect::<Vec<_>>(),
|
||||
checked_ports_and_sockets
|
||||
.iter()
|
||||
.map(|(_, sockets)| sockets)
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let _ = ip_echo_server_request(
|
||||
ip_echo_server_addr,
|
||||
IpEchoServerMessage::new(&[], &checked_ports),
|
||||
)
|
||||
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
||||
|
||||
// Spawn threads at once!
|
||||
let thread_handles: Vec<_> = checked_socket_iter
|
||||
.map(|udp_socket| {
|
||||
let port = udp_socket.local_addr().unwrap().port();
|
||||
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
|
||||
std::thread::spawn(move || {
|
||||
let mut buf = [0; 1];
|
||||
let original_read_timeout = udp_socket.read_timeout().unwrap();
|
||||
udp_socket.set_read_timeout(Some(timeout)).unwrap();
|
||||
let recv_result = udp_socket.recv(&mut buf);
|
||||
debug!(
|
||||
"Waited for incoming datagram on udp/{}: {:?}",
|
||||
port, recv_result
|
||||
);
|
||||
udp_socket.set_read_timeout(original_read_timeout).unwrap();
|
||||
recv_result.map(|_| port).ok()
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Now join threads!
|
||||
// Separate from the above by collect()-ing as an intermediately step to make the iterator
|
||||
// eager not lazy so that joining happens here at once after creating bunch of threads
|
||||
// at once.
|
||||
let reachable_ports: BTreeSet<_> = thread_handles
|
||||
.into_iter()
|
||||
.filter_map(|t| t.join().unwrap())
|
||||
.collect();
|
||||
|
||||
if reachable_ports.len() == checked_ports.len() {
|
||||
info!(
|
||||
"checked udp ports: {:?}, reachable udp ports: {:?}",
|
||||
checked_ports, reachable_ports
|
||||
);
|
||||
ok = true;
|
||||
break;
|
||||
} else if udp_remaining_retry > 0 {
|
||||
// Might have lost a UDP packet, retry a couple times
|
||||
error!(
|
||||
"checked udp ports: {:?}, reachable udp ports: {:?}",
|
||||
checked_ports, reachable_ports
|
||||
);
|
||||
error!("There are some udp ports with no response!! Retrying...");
|
||||
} else {
|
||||
error!("Maximum retry count is reached....");
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
break;
|
||||
}
|
||||
ok = true;
|
||||
|
||||
// Might have lost a UDP packet, retry a couple times
|
||||
let _ = ip_echo_server_request(
|
||||
ip_echo_server_addr,
|
||||
IpEchoServerMessage::new(&[], &udp_ports),
|
||||
)
|
||||
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
||||
}
|
||||
|
||||
ok
|
||||
}
|
||||
|
||||
pub fn verify_reachable_ports(
|
||||
ip_echo_server_addr: &SocketAddr,
|
||||
tcp_listeners: Vec<(u16, TcpListener)>,
|
||||
udp_sockets: &[&UdpSocket],
|
||||
) -> bool {
|
||||
do_verify_reachable_ports(
|
||||
ip_echo_server_addr,
|
||||
tcp_listeners,
|
||||
udp_sockets,
|
||||
DEFAULT_TIMEOUT_SECS,
|
||||
DEFAULT_RETRY_COUNT,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
|
||||
if let Some(addrstr) = optstr {
|
||||
if let Ok(port) = addrstr.parse() {
|
||||
|
@ -511,7 +580,25 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_public_ip_addr() {
|
||||
fn test_get_public_ip_addr_none() {
|
||||
solana_logger::setup();
|
||||
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
|
||||
let (_server_port, (server_udp_socket, server_tcp_listener)) =
|
||||
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
|
||||
|
||||
let _runtime = ip_echo_server(server_tcp_listener);
|
||||
|
||||
let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
|
||||
assert_eq!(
|
||||
get_public_ip_addr(&server_ip_echo_addr),
|
||||
parse_host("127.0.0.1"),
|
||||
);
|
||||
|
||||
assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_public_ip_addr_reachable() {
|
||||
solana_logger::setup();
|
||||
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
|
||||
let (_server_port, (server_udp_socket, server_tcp_listener)) =
|
||||
|
@ -533,4 +620,50 @@ mod tests {
|
|||
&[&client_udp_socket],
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_public_ip_addr_tcp_unreachable() {
|
||||
solana_logger::setup();
|
||||
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
|
||||
let (_server_port, (server_udp_socket, _server_tcp_listener)) =
|
||||
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
|
||||
|
||||
// make the socket unreachable by not running the ip echo server!
|
||||
|
||||
let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
|
||||
|
||||
let (correct_client_port, (_client_udp_socket, client_tcp_listener)) =
|
||||
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
|
||||
|
||||
assert!(!do_verify_reachable_ports(
|
||||
&server_ip_echo_addr,
|
||||
vec![(correct_client_port, client_tcp_listener)],
|
||||
&[],
|
||||
2,
|
||||
3,
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_public_ip_addr_udp_unreachable() {
|
||||
solana_logger::setup();
|
||||
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
|
||||
let (_server_port, (server_udp_socket, _server_tcp_listener)) =
|
||||
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
|
||||
|
||||
// make the socket unreachable by not running the ip echo server!
|
||||
|
||||
let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
|
||||
|
||||
let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
|
||||
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
|
||||
|
||||
assert!(!do_verify_reachable_ports(
|
||||
&server_ip_echo_addr,
|
||||
vec![],
|
||||
&[&client_udp_socket],
|
||||
2,
|
||||
3,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1110,23 +1110,17 @@ pub fn main() {
|
|||
}
|
||||
|
||||
if let Some(ref cluster_entrypoint) = cluster_entrypoint {
|
||||
let udp_sockets = [
|
||||
node.sockets.tpu.first(),
|
||||
/*
|
||||
Enable these ports when `IpEchoServerMessage` supports more than 4 UDP ports:
|
||||
node.sockets.tpu_forwards.first(),
|
||||
node.sockets.tvu.first(),
|
||||
node.sockets.tvu_forwards.first(),
|
||||
node.sockets.broadcast.first(),
|
||||
node.sockets.retransmit_sockets.first(),
|
||||
*/
|
||||
Some(&node.sockets.gossip),
|
||||
Some(&node.sockets.repair),
|
||||
Some(&node.sockets.serve_repair),
|
||||
]
|
||||
.iter()
|
||||
.filter_map(|x| *x)
|
||||
.collect::<Vec<_>>();
|
||||
let mut udp_sockets = vec![
|
||||
&node.sockets.gossip,
|
||||
&node.sockets.repair,
|
||||
&node.sockets.serve_repair,
|
||||
];
|
||||
udp_sockets.extend(node.sockets.tpu.iter());
|
||||
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
||||
udp_sockets.extend(node.sockets.tvu.iter());
|
||||
udp_sockets.extend(node.sockets.tvu_forwards.iter());
|
||||
udp_sockets.extend(node.sockets.broadcast.iter());
|
||||
udp_sockets.extend(node.sockets.retransmit_sockets.iter());
|
||||
|
||||
let mut tcp_listeners = vec![];
|
||||
if !private_rpc {
|
||||
|
|
Loading…
Reference in New Issue