Confirm validator ports are reachable by the entrypoint at startup (#5795)

This commit is contained in:
Michael Vines 2019-09-04 23:10:35 -07:00 committed by GitHub
parent bd74e63702
commit c4a5442146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 255 additions and 40 deletions

4
Cargo.lock generated
View File

@ -3621,13 +3621,17 @@ name = "solana-netutil"
version = "0.19.0-pre0"
dependencies = [
"bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)",
"socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-logger 0.19.0-pre0",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]

View File

@ -10,13 +10,17 @@ edition = "2018"
[dependencies]
bincode = "1.1.4"
bytes = "0.4"
clap = "2.33.0"
log = "0.4.8"
nix = "0.15.0"
rand = "0.6.1"
serde = "1.0.99"
serde_derive = "1.0.99"
socket2 = "0.3.11"
solana-logger = { path = "../logger", version = "0.19.0-pre0" }
tokio = "0.1"
tokio-codec = "0.1"
[lib]
name = "solana_netutil"

View File

@ -1,12 +1,35 @@
use bytes::Bytes;
use log::*;
use serde_derive::{Deserialize, Serialize};
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use tokio;
use tokio::net::TcpListener;
use tokio::prelude::{Future, Stream};
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio_codec::{BytesCodec, Decoder};
pub type IpEchoServer = Runtime;
#[derive(Serialize, Deserialize, Default)]
pub(crate) struct IpEchoServerMessage {
tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
}
impl IpEchoServerMessage {
pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self {
let mut msg = Self::default();
assert!(tcp_ports.len() <= msg.tcp_ports.len());
assert!(udp_ports.len() <= msg.udp_ports.len());
msg.tcp_ports[..tcp_ports.len()].copy_from_slice(tcp_ports);
msg.udp_ports[..udp_ports.len()].copy_from_slice(udp_ports);
msg
}
}
/// Starts a simple TCP server on the given port that echos the IP address of any peer that
/// connects. Used by |get_public_ip_addr|
pub fn ip_echo_server(port: u16) -> IpEchoServer {
@ -19,23 +42,96 @@ pub fn ip_echo_server(port: u16) -> IpEchoServer {
.incoming()
.map_err(|err| warn!("accept failed: {:?}", err))
.for_each(move |socket| {
let ip = socket
.peer_addr()
.and_then(|peer_addr| {
bincode::serialize(&peer_addr.ip()).map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to serialize: {:?}", err),
)
let ip = socket.peer_addr().expect("Expect peer_addr()").ip();
info!("connection from {:?}", ip);
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader
.and_then(move |bytes| {
bincode::deserialize::<IpEchoServerMessage>(&bytes).or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Failed to deserialize IpEchoServerMessage: {:?}", err),
))
})
})
.unwrap_or_else(|_| vec![]);
.and_then(move |msg| {
// Fire a datagram at each non-zero UDP port
if !msg.udp_ports.is_empty() {
match std::net::UdpSocket::bind("0.0.0.0:0") {
Ok(udp_socket) => {
for udp_port in &msg.udp_ports {
if *udp_port != 0 {
match udp_socket
.send_to(&[0], SocketAddr::from((ip, *udp_port)))
{
Ok(_) => debug!("Successful send_to udp/{}", udp_port),
Err(err) => {
info!("Failed to send_to udp/{}: {}", udp_port, err)
}
}
}
}
}
Err(err) => {
warn!("Failed to bind local udp socket: {}", err);
}
}
}
let write_future = tokio::io::write_all(socket, ip)
.map_err(|err| warn!("write error: {:?}", err))
.map(|_| ());
// Try to connect to each non-zero TCP port
let tcp_futures: Vec<_> = msg
.tcp_ports
.iter()
.filter_map(|tcp_port| {
let tcp_port = *tcp_port;
if tcp_port == 0 {
None
} else {
Some(
tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port))
.and_then(move |tcp_stream| {
debug!("Connection established to tcp/{}", tcp_port);
let _ = tcp_stream.shutdown(std::net::Shutdown::Both);
Ok(())
})
.timeout(Duration::from_secs(5))
.or_else(move |err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Connection timeout to {}: {:?}",
tcp_port, err
),
))
}),
)
}
})
.collect();
future::join_all(tcp_futures)
})
.and_then(move |_| {
let ip = bincode::serialize(&ip).unwrap_or_else(|err| {
warn!("Failed to serialize: {:?}", err);
vec![]
});
Ok(Bytes::from(ip))
});
tokio::spawn(write_future)
let connection = writer
.send_all(processor)
.timeout(Duration::from_secs(5))
.then(|result| {
if let Err(err) = result {
info!("Session failed: {:?}", err);
}
Ok(())
});
tokio::spawn(connection)
});
let mut rt = Runtime::new().expect("Failed to create Runtime");

View File

@ -2,13 +2,14 @@
use log::*;
use rand::{thread_rng, Rng};
use socket2::{Domain, SockAddr, Socket, Type};
use std::io;
use std::io::Read;
use std::io::{self, Read, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::sync::mpsc::channel;
use std::time::Duration;
mod ip_echo_server;
pub use ip_echo_server::*;
use ip_echo_server::IpEchoServerMessage;
pub use ip_echo_server::{ip_echo_server, IpEchoServer};
/// A data type representing a public Udp socket
pub struct UdpSocketPair {
@ -19,14 +20,18 @@ pub struct UdpSocketPair {
pub type PortRange = (u16, u16);
/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
fn ip_echo_server_request(
ip_echo_server_addr: &SocketAddr,
msg: IpEchoServerMessage,
) -> Result<IpAddr, String> {
let mut data = Vec::new();
let timeout = Duration::new(5, 0);
TcpStream::connect_timeout(ip_echo_server_addr, timeout)
.and_then(|mut stream| {
let msg = bincode::serialize(&msg).expect("serialize IpEchoServerMessage");
stream.write_all(&msg)?;
stream.shutdown(std::net::Shutdown::Write)?;
stream
.set_read_timeout(Some(Duration::new(10, 0)))
.expect("set_read_timeout");
@ -43,6 +48,98 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, St
.map_err(|err| err.to_string())
}
/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())
}
// Aborts the process if any of the provided TCP/UDP ports are not reachable by the machine at
// `ip_echo_server_addr`
pub fn verify_reachable_ports(
ip_echo_server_addr: &SocketAddr,
tcp_ports: &[u16],
udp_sockets: &[&UdpSocket],
) {
let tcp: Vec<(_, _)> = tcp_ports
.iter()
.map(|port| {
(
port,
TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else(|err| {
error!("Unable to bind to tcp/{}: {}", port, err);
std::process::exit(1);
}),
)
})
.collect();
let udp: Vec<(_, _)> = udp_sockets
.iter()
.map(|udp_socket| {
(
udp_socket.local_addr().unwrap().port(),
udp_socket.try_clone().expect("Unable to clone udp socket"),
)
})
.collect();
let udp_ports: Vec<_> = udp.iter().map(|x| x.0).collect();
info!(
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
tcp_ports, udp_ports, ip_echo_server_addr
);
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &udp_ports),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
// Wait for a connection to open on each TCP port
for (port, tcp_listener) in tcp {
let (sender, receiver) = channel();
let port = *port;
std::thread::spawn(move || {
debug!("Waiting for incoming connection on tcp/{}", port);
let _ = tcp_listener.incoming().next().expect("tcp incoming failed");
sender.send(()).expect("send failure");
});
receiver
.recv_timeout(Duration::from_secs(5))
.unwrap_or_else(|err| {
error!(
"Received no response at tcp/{}, check your port configuration: {}",
port, err
);
std::process::exit(1);
});
info!("tdp/{} is reachable", port);
}
// Wait for a datagram to arrive at each UDP port
for (port, udp_socket) in udp {
let (sender, receiver) = channel();
std::thread::spawn(move || {
let mut buf = [0; 1];
debug!("Waiting for incoming datagram on udp/{}", port);
let _ = udp_socket.recv(&mut buf).expect("udp recv failure");
sender.send(()).expect("send failure");
});
receiver
.recv_timeout(Duration::from_secs(5))
.unwrap_or_else(|err| {
error!(
"Received no response at udp/{}, check your port configuration: {}",
port, err
);
std::process::exit(1);
});
info!("udp/{} is reachable", port);
}
}
pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {

View File

@ -12,7 +12,6 @@ use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
use solana_core::service::Service;
use solana_core::socketaddr;
use solana_core::validator::{Validator, ValidatorConfig};
use solana_netutil::parse_port_range;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil};
use solana_sdk::timing::Slot;
@ -25,7 +24,7 @@ use std::sync::Arc;
use std::time::Instant;
fn port_range_validator(port_range: String) -> Result<(), String> {
if parse_port_range(&port_range).is_some() {
if solana_netutil::parse_port_range(&port_range).is_some() {
Ok(())
} else {
Err("Invalid port range".to_string())
@ -155,7 +154,6 @@ fn download_tar_bz2(
fn initialize_ledger_path(
entrypoint: &ContactInfo,
gossip_addr: &SocketAddr,
ledger_path: &Path,
no_snapshot_fetch: bool,
) -> Result<Hash, String> {
@ -165,7 +163,7 @@ fn initialize_ledger_path(
Some(60),
None,
Some(entrypoint.gossip.ip()),
Some(&gossip_addr),
None,
)
.map_err(|err| err.to_string())?;
@ -443,8 +441,9 @@ fn main() {
solana_netutil::parse_host_port(address).expect("failed to parse drone address")
});
let dynamic_port_range = parse_port_range(matches.value_of("dynamic_port_range").unwrap())
.expect("invalid dynamic_port_range");
let dynamic_port_range =
solana_netutil::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
.expect("invalid dynamic_port_range");
let mut gossip_addr = solana_netutil::parse_port_or_addr(
matches.value_of("gossip_port"),
@ -528,10 +527,36 @@ fn main() {
);
solana_metrics::set_host_id(keypair.pubkey().to_string());
if let Some(ref entrypoint_addr) = cluster_entrypoint {
let mut tcp_ports = vec![gossip_addr.port()];
let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);
if let Some(port) = matches.value_of("rpc_port") {
let port_number = port.to_string().parse().expect("integer");
if port_number == 0 {
eprintln!("Invalid RPC port requested: {:?}", port);
exit(1);
}
node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number);
node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1);
tcp_ports.extend_from_slice(&[port_number, port_number + 1]);
};
if let Some(ref cluster_entrypoint) = cluster_entrypoint {
let udp_sockets = [
&node.sockets.gossip,
&node.sockets.broadcast,
&node.sockets.repair,
&node.sockets.retransmit,
];
solana_netutil::verify_reachable_ports(
&cluster_entrypoint.gossip,
&tcp_ports,
&udp_sockets,
);
let expected_genesis_blockhash = initialize_ledger_path(
entrypoint_addr,
&gossip_addr,
cluster_entrypoint,
&ledger_path,
matches.is_present("no_snapshot_fetch"),
)
@ -551,17 +576,6 @@ fn main() {
}
}
let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);
if let Some(port) = matches.value_of("rpc_port") {
let port_number = port.to_string().parse().expect("integer");
if port_number == 0 {
eprintln!("Invalid RPC port requested: {:?}", port);
exit(1);
}
node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number);
node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1);
};
let validator = Validator::new(
node,
&Arc::new(keypair),