diff --git a/ci/localnet-sanity.sh b/ci/localnet-sanity.sh index 99fdc3c662..c8fcfc0ca3 100755 --- a/ci/localnet-sanity.sh +++ b/ci/localnet-sanity.sh @@ -75,18 +75,18 @@ nodes=( "multinode-demo/drone.sh" "multinode-demo/bootstrap-leader.sh \ --init-complete-file init-complete-node1.log \ - --dynamic-port-range 8000-8019" + --dynamic-port-range 8000-8050" "multinode-demo/validator.sh \ --enable-rpc-exit \ --no-restart \ - --dynamic-port-range 8020-8039 + --dynamic-port-range 8050-8100 --init-complete-file init-complete-node2.log \ --rpc-port 18899" ) for i in $(seq 1 $extraNodes); do - portStart=$((8040 + i * 20)) - portEnd=$((portStart + 19)) + portStart=$((8100 + i * 50)) + portEnd=$((portStart + 49)) nodes+=( "multinode-demo/validator.sh \ --no-restart \ diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 97957c6cd8..d2ea60e656 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -34,7 +34,8 @@ use rand_chacha::ChaChaRng; use rayon::prelude::*; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_netutil::{ - bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange, + bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, + multi_bind_in_range, PortRange, }; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; @@ -46,7 +47,7 @@ use std::borrow::Cow; use std::cmp::min; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; @@ -1485,7 +1486,7 @@ impl ClusterInfo { /// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip. pub fn gossip_node(id: &Pubkey, gossip_addr: &SocketAddr) -> (ContactInfo, UdpSocket) { - let (port, gossip_socket) = Node::get_gossip_port(gossip_addr, FULLNODE_PORT_RANGE); + let (port, (gossip_socket, _)) = Node::get_gossip_port(gossip_addr, FULLNODE_PORT_RANGE); let daddr = socketaddr_any!(); let node = ContactInfo::new( @@ -1563,6 +1564,7 @@ pub fn compute_retransmit_peers( #[derive(Debug)] pub struct Sockets { pub gossip: UdpSocket, + pub ip_echo: Option, pub tvu: Vec, pub tvu_forwards: Vec, pub tpu: Vec, @@ -1619,12 +1621,14 @@ impl Node { repair, retransmit, storage: Some(storage), + ip_echo: None, }, } } pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); - let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); + let (gossip_port, (gossip, ip_echo)) = bind_common_in_range((1024, 65535)).unwrap(); + let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port); let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -1640,7 +1644,7 @@ impl Node { let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo::new( pubkey, - gossip.local_addr().unwrap(), + gossip_addr, tvu.local_addr().unwrap(), tvu_forwards.local_addr().unwrap(), tpu.local_addr().unwrap(), @@ -1654,6 +1658,7 @@ impl Node { info, sockets: Sockets { gossip, + ip_echo: Some(ip_echo), tvu: vec![tvu], tvu_forwards: vec![tvu_forwards], tpu: vec![tpu], @@ -1665,16 +1670,19 @@ impl Node { }, } } - fn get_gossip_port(gossip_addr: &SocketAddr, port_range: PortRange) -> (u16, UdpSocket) { + fn get_gossip_port( + gossip_addr: &SocketAddr, + port_range: PortRange, + ) -> (u16, (UdpSocket, TcpListener)) { if gossip_addr.port() != 0 { ( gossip_addr.port(), - bind_to(gossip_addr.port(), false).unwrap_or_else(|e| { + bind_common(gossip_addr.port(), false).unwrap_or_else(|e| { panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e) }), ) } else { - Self::bind(port_range) + bind_common_in_range(port_range).expect("Failed to bind") } } fn bind(port_range: PortRange) -> (u16, UdpSocket) { @@ -1685,7 +1693,7 @@ impl Node { gossip_addr: &SocketAddr, port_range: PortRange, ) -> Node { - let (gossip_port, gossip) = Self::get_gossip_port(gossip_addr, port_range); + let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range); let (tvu_port, tvu_sockets) = multi_bind_in_range(port_range, 8).expect("tvu multi_bind"); @@ -1727,6 +1735,7 @@ impl Node { repair, retransmit, storage: None, + ip_echo: Some(ip_echo), }, } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 96ee6d20cc..6843da2b60 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -240,8 +240,7 @@ impl Validator { "New blob signal for the TVU should be the same as the clear bank signal." ); - let ip_echo_server = - solana_netutil::ip_echo_server(node.sockets.gossip.local_addr().unwrap().port()); + let ip_echo_server = solana_netutil::ip_echo_server(node.sockets.ip_echo.unwrap()); let gossip_service = GossipService::new( &cluster_info, diff --git a/local_cluster/src/tests/replicator.rs b/local_cluster/src/tests/replicator.rs index 3e1c836237..d1a6a90d6b 100644 --- a/local_cluster/src/tests/replicator.rs +++ b/local_cluster/src/tests/replicator.rs @@ -74,7 +74,7 @@ fn test_replicator_startup_1_node() { } #[test] -#[ignore] +#[serial] fn test_replicator_startup_2_nodes() { run_replicator_startup_basic(2, 1); } @@ -119,7 +119,7 @@ fn test_replicator_startup_leader_hang() { } #[test] -#[ignore] +#[serial] fn test_replicator_startup_ledger_hang() { solana_logger::setup(); info!("starting replicator test"); diff --git a/netutil/src/bin/ip_address_server.rs b/netutil/src/bin/ip_address_server.rs index 105d0733d2..91df1f5da5 100644 --- a/netutil/src/bin/ip_address_server.rs +++ b/netutil/src/bin/ip_address_server.rs @@ -1,4 +1,5 @@ use clap::{crate_version, App, Arg}; +use std::net::{SocketAddr, TcpListener}; fn main() { solana_logger::setup(); @@ -16,7 +17,9 @@ fn main() { let port = port .parse() .unwrap_or_else(|_| panic!("Unable to parse {}", port)); - let _runtime = solana_netutil::ip_echo_server(port); + let bind_addr = SocketAddr::from(([0, 0, 0, 0], port)); + let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener"); + let _runtime = solana_netutil::ip_echo_server(tcp_listener); loop { std::thread::park(); } diff --git a/netutil/src/ip_echo_server.rs b/netutil/src/ip_echo_server.rs index 5d7ff101bc..73564c9e23 100644 --- a/netutil/src/ip_echo_server.rs +++ b/netutil/src/ip_echo_server.rs @@ -7,6 +7,7 @@ use std::time::Duration; use tokio; use tokio::net::TcpListener; use tokio::prelude::*; +use tokio::reactor::Handle; use tokio::runtime::Runtime; use tokio_codec::{BytesCodec, Decoder}; @@ -32,11 +33,10 @@ impl IpEchoServerMessage { /// Starts a simple TCP server on the given port that echos the IP address of any peer that /// connects. Used by |get_public_ip_addr| -pub fn ip_echo_server(port: u16) -> IpEchoServer { - let bind_addr = SocketAddr::from(([0, 0, 0, 0], port)); - let tcp = TcpListener::bind(&bind_addr) - .unwrap_or_else(|err| panic!("Unable to bind to {}: {}", bind_addr, err)); - info!("bound to {:?}", bind_addr); +pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer { + info!("bound to {:?}", tcp.local_addr()); + let tcp = + TcpListener::from_std(tcp, &Handle::default()).expect("Failed to convert std::TcpListener"); let server = tcp .incoming() diff --git a/netutil/src/lib.rs b/netutil/src/lib.rs index 771d997983..a8731814d9 100644 --- a/netutil/src/lib.rs +++ b/netutil/src/lib.rs @@ -58,22 +58,9 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result, udp_sockets: &[&UdpSocket], ) { - let tcp: Vec<(_, _)> = tcp_ports - .iter() - .map(|port| { - ( - port, - TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else(|err| { - error!("Unable to bind to tcp/{}: {}", port, err); - std::process::exit(1); - }), - ) - }) - .collect(); - let udp: Vec<(_, _)> = udp_sockets .iter() .map(|udp_socket| { @@ -88,9 +75,10 @@ pub fn verify_reachable_ports( info!( "Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}", - tcp_ports, udp_ports, ip_echo_server_addr + tcp_listeners, udp_ports, ip_echo_server_addr ); + let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect(); let _ = ip_echo_server_request( ip_echo_server_addr, IpEchoServerMessage::new(&tcp_ports, &udp_ports), @@ -98,9 +86,8 @@ pub fn verify_reachable_ports( .map_err(|err| warn!("ip_echo_server request failed: {}", err)); // Wait for a connection to open on each TCP port - for (port, tcp_listener) in tcp { + for (port, tcp_listener) in tcp_listeners { let (sender, receiver) = channel(); - let port = *port; std::thread::spawn(move || { debug!("Waiting for incoming connection on tcp/{}", port); let _ = tcp_listener.incoming().next().expect("tcp incoming failed"); @@ -230,6 +217,30 @@ fn udp_socket(reuseaddr: bool) -> io::Result { Ok(sock) } +// Find a port in the given range that is available for both TCP and UDP +pub fn bind_common_in_range(range: PortRange) -> io::Result<(u16, (UdpSocket, TcpListener))> { + let (start, end) = range; + let mut tries_left = end - start; + let mut rand_port = thread_rng().gen_range(start, end); + loop { + match bind_common(rand_port, false) { + Ok((sock, listener)) => { + break Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); + } + Err(err) => { + if tries_left == 0 { + return Err(err); + } + } + } + rand_port += 1; + if rand_port == end { + rand_port = start; + } + tries_left -= 1; + } +} + pub fn bind_in_range(range: PortRange) -> io::Result<(u16, UdpSocket)> { let sock = udp_socket(false)?; @@ -292,6 +303,21 @@ pub fn bind_to(port: u16, reuseaddr: bool) -> io::Result { } } +// binds both a UdpSocket and a TcpListener +pub fn bind_common(port: u16, reuseaddr: bool) -> io::Result<(UdpSocket, TcpListener)> { + let sock = udp_socket(reuseaddr)?; + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); + let sock_addr = SockAddr::from(addr); + match sock.bind(&sock_addr) { + Ok(_) => match TcpListener::bind(&addr) { + Ok(listener) => Result::Ok((sock.into_udp_socket(), listener)), + Err(err) => Err(err), + }, + Err(err) => Err(err), + } +} + pub fn find_available_port_in_range(range: PortRange) -> io::Result { let (start, end) = range; let mut tries_left = end - start; @@ -386,4 +412,10 @@ mod tests { let port = find_available_port_in_range((3000, 3050)).unwrap(); assert!(3000 <= port && port < 3050); } + + #[test] + fn test_bind_common_in_range() { + let (port, _) = bind_common_in_range((3000, 3050)).unwrap(); + assert!(3000 <= port && port < 3050); + } } diff --git a/validator/src/main.rs b/validator/src/main.rs index bb76be832d..050b8e88f2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -17,7 +17,7 @@ use solana_sdk::hash::Hash; use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil}; use std::fs::{self, File}; use std::io::{self, Read}; -use std::net::SocketAddr; +use std::net::{SocketAddr, TcpListener}; use std::path::{Path, PathBuf}; use std::process::exit; use std::sync::Arc; @@ -530,8 +530,7 @@ fn main() { ); solana_metrics::set_host_id(keypair.pubkey().to_string()); - let mut tcp_ports = vec![gossip_addr.port()]; - + let mut tcp_ports = vec![]; let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range); if let Some(port) = matches.value_of("rpc_port") { let port_number = port.to_string().parse().expect("integer"); @@ -539,9 +538,9 @@ fn main() { eprintln!("Invalid RPC port requested: {:?}", port); exit(1); } - node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number); - node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1); - tcp_ports.extend_from_slice(&[port_number, port_number + 1]); + node.info.rpc = SocketAddr::new(node.info.gossip.ip(), port_number); + node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), port_number + 1); + tcp_ports = vec![port_number, port_number + 1]; }; if let Some(ref cluster_entrypoint) = cluster_entrypoint { @@ -552,9 +551,27 @@ fn main() { &node.sockets.retransmit, ]; + let mut tcp_listeners: Vec<(_, _)> = tcp_ports + .iter() + .map(|port| { + ( + *port, + TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else( + |err| { + error!("Unable to bind to tcp/{}: {}", port, err); + std::process::exit(1); + }, + ), + ) + }) + .collect(); + if let Some(ip_echo) = &node.sockets.ip_echo { + let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener"); + tcp_listeners.push((node.info.gossip.port(), ip_echo)); + } solana_netutil::verify_reachable_ports( &cluster_entrypoint.gossip, - &tcp_ports, + tcp_listeners, &udp_sockets, );