Retry a couple times before declaring a UDP port unreachable (#10181)
This commit is contained in:
parent
e2b5cd6d47
commit
269db1710e
|
@ -90,25 +90,18 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, St
|
||||||
ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())
|
ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aborts the process if any of the provided TCP/UDP ports are not reachable by the machine at
|
// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
|
||||||
// `ip_echo_server_addr`
|
// `ip_echo_server_addr`
|
||||||
pub fn verify_reachable_ports(
|
pub fn verify_reachable_ports(
|
||||||
ip_echo_server_addr: &SocketAddr,
|
ip_echo_server_addr: &SocketAddr,
|
||||||
tcp_listeners: Vec<(u16, TcpListener)>,
|
tcp_listeners: Vec<(u16, TcpListener)>,
|
||||||
udp_sockets: &[&UdpSocket],
|
udp_sockets: &[&UdpSocket],
|
||||||
) {
|
) -> bool {
|
||||||
let udp: Vec<(_, _)> = udp_sockets
|
let udp_ports: Vec<_> = udp_sockets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|udp_socket| {
|
.map(|udp_socket| udp_socket.local_addr().unwrap().port())
|
||||||
(
|
|
||||||
udp_socket.local_addr().unwrap().port(),
|
|
||||||
udp_socket.try_clone().expect("Unable to clone udp socket"),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let udp_ports: Vec<_> = udp.iter().map(|x| x.0).collect();
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
|
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
|
||||||
tcp_listeners, udp_ports, ip_echo_server_addr
|
tcp_listeners, udp_ports, ip_echo_server_addr
|
||||||
|
@ -121,6 +114,8 @@ pub fn verify_reachable_ports(
|
||||||
)
|
)
|
||||||
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
||||||
|
|
||||||
|
let mut ok = true;
|
||||||
|
|
||||||
// Wait for a connection to open on each TCP port
|
// Wait for a connection to open on each TCP port
|
||||||
for (port, tcp_listener) in tcp_listeners {
|
for (port, tcp_listener) in tcp_listeners {
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
|
@ -129,38 +124,64 @@ pub fn verify_reachable_ports(
|
||||||
let _ = tcp_listener.incoming().next().expect("tcp incoming failed");
|
let _ = tcp_listener.incoming().next().expect("tcp incoming failed");
|
||||||
sender.send(()).expect("send failure");
|
sender.send(()).expect("send failure");
|
||||||
});
|
});
|
||||||
receiver
|
match receiver.recv_timeout(Duration::from_secs(5)) {
|
||||||
.recv_timeout(Duration::from_secs(5))
|
Ok(_) => {
|
||||||
.unwrap_or_else(|err| {
|
info!("tcp/{} is reachable", port);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Received no response at tcp/{}, check your port configuration: {}",
|
"Received no response at tcp/{}, check your port configuration: {}",
|
||||||
port, err
|
port, err
|
||||||
);
|
);
|
||||||
std::process::exit(1);
|
ok = false;
|
||||||
});
|
}
|
||||||
info!("tcp/{} is reachable", port);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for a datagram to arrive at each UDP port
|
if !ok {
|
||||||
for (port, udp_socket) in udp {
|
// No retries for TCP, abort on the first failure
|
||||||
let (sender, receiver) = channel();
|
return ok;
|
||||||
std::thread::spawn(move || {
|
|
||||||
let mut buf = [0; 1];
|
|
||||||
debug!("Waiting for incoming datagram on udp/{}", port);
|
|
||||||
let _ = udp_socket.recv(&mut buf).expect("udp recv failure");
|
|
||||||
sender.send(()).expect("send failure");
|
|
||||||
});
|
|
||||||
receiver
|
|
||||||
.recv_timeout(Duration::from_secs(5))
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
error!(
|
|
||||||
"Received no response at udp/{}, check your port configuration: {}",
|
|
||||||
port, err
|
|
||||||
);
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
info!("udp/{} is reachable", port);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _udp_retries in 0..5 {
|
||||||
|
// Wait for a datagram to arrive at each UDP port
|
||||||
|
for udp_socket in udp_sockets {
|
||||||
|
let port = udp_socket.local_addr().unwrap().port();
|
||||||
|
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
|
||||||
|
let (sender, receiver) = channel();
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let mut buf = [0; 1];
|
||||||
|
debug!("Waiting for incoming datagram on udp/{}", port);
|
||||||
|
let _ = udp_socket.recv(&mut buf).expect("udp recv failure");
|
||||||
|
sender.send(()).expect("send failure");
|
||||||
|
});
|
||||||
|
match receiver.recv_timeout(Duration::from_secs(5)) {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("udp/{} is reachable", port);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!(
|
||||||
|
"Received no response at udp/{}, check your port configuration: {}",
|
||||||
|
port, err
|
||||||
|
);
|
||||||
|
ok = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ok = true;
|
||||||
|
|
||||||
|
// Might have lost a UDP packet, retry a couple times
|
||||||
|
let _ = ip_echo_server_request(
|
||||||
|
ip_echo_server_addr,
|
||||||
|
IpEchoServerMessage::new(&[], &udp_ports),
|
||||||
|
)
|
||||||
|
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
|
||||||
|
}
|
||||||
|
|
||||||
|
ok
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
|
pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
|
||||||
|
@ -499,10 +520,10 @@ mod tests {
|
||||||
parse_host("127.0.0.1"),
|
parse_host("127.0.0.1"),
|
||||||
);
|
);
|
||||||
|
|
||||||
verify_reachable_ports(
|
assert!(verify_reachable_ports(
|
||||||
&ip_echo_server_addr,
|
&ip_echo_server_addr,
|
||||||
vec![(client_port, client_tcp_listener)],
|
vec![(client_port, client_tcp_listener)],
|
||||||
&[&client_udp_socket],
|
&[&client_udp_socket],
|
||||||
);
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1100,7 +1100,7 @@ pub fn main() {
|
||||||
TcpListener::bind(&SocketAddr::from((rpc_bind_address, *port)))
|
TcpListener::bind(&SocketAddr::from((rpc_bind_address, *port)))
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
error!("Unable to bind to tcp/{} for {}: {}", port, purpose, err);
|
error!("Unable to bind to tcp/{} for {}: {}", port, purpose, err);
|
||||||
std::process::exit(1);
|
exit(1);
|
||||||
}),
|
}),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -1112,11 +1112,13 @@ pub fn main() {
|
||||||
tcp_listeners.push((node.info.gossip.port(), ip_echo));
|
tcp_listeners.push((node.info.gossip.port(), ip_echo));
|
||||||
}
|
}
|
||||||
|
|
||||||
solana_net_utils::verify_reachable_ports(
|
if !solana_net_utils::verify_reachable_ports(
|
||||||
&cluster_entrypoint.gossip,
|
&cluster_entrypoint.gossip,
|
||||||
tcp_listeners,
|
tcp_listeners,
|
||||||
&udp_sockets,
|
&udp_sockets,
|
||||||
);
|
) {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if !no_genesis_fetch {
|
if !no_genesis_fetch {
|
||||||
let (cluster_info, gossip_exit_flag, gossip_service) = start_gossip_node(
|
let (cluster_info, gossip_exit_flag, gossip_service) = start_gossip_node(
|
||||||
&identity_keypair,
|
&identity_keypair,
|
||||||
|
|
Loading…
Reference in New Issue