Fix bind errors (#5986)

* Add ability to bind to a common tcp/udp port

* Extend port range for local-net sanity and fix validator executable
This commit is contained in:
Sagar Dhawan 2019-09-19 17:16:22 -07:00 committed by GitHub
parent ca9d4e34df
commit d379786c90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 107 additions and 47 deletions

View File

@ -75,18 +75,18 @@ nodes=(
"multinode-demo/drone.sh"
"multinode-demo/bootstrap-leader.sh \
--init-complete-file init-complete-node1.log \
--dynamic-port-range 8000-8019"
--dynamic-port-range 8000-8050"
"multinode-demo/validator.sh \
--enable-rpc-exit \
--no-restart \
--dynamic-port-range 8020-8039
--dynamic-port-range 8050-8100
--init-complete-file init-complete-node2.log \
--rpc-port 18899"
)
for i in $(seq 1 $extraNodes); do
portStart=$((8040 + i * 20))
portEnd=$((portStart + 19))
portStart=$((8100 + i * 50))
portEnd=$((portStart + 49))
nodes+=(
"multinode-demo/validator.sh \
--no-restart \

View File

@ -34,7 +34,8 @@ use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_netutil::{
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
multi_bind_in_range, PortRange,
};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
@ -46,7 +47,7 @@ use std::borrow::Cow;
use std::cmp::min;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
@ -1485,7 +1486,7 @@ impl ClusterInfo {
/// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip.
pub fn gossip_node(id: &Pubkey, gossip_addr: &SocketAddr) -> (ContactInfo, UdpSocket) {
let (port, gossip_socket) = Node::get_gossip_port(gossip_addr, FULLNODE_PORT_RANGE);
let (port, (gossip_socket, _)) = Node::get_gossip_port(gossip_addr, FULLNODE_PORT_RANGE);
let daddr = socketaddr_any!();
let node = ContactInfo::new(
@ -1563,6 +1564,7 @@ pub fn compute_retransmit_peers(
#[derive(Debug)]
pub struct Sockets {
pub gossip: UdpSocket,
pub ip_echo: Option<TcpListener>,
pub tvu: Vec<UdpSocket>,
pub tvu_forwards: Vec<UdpSocket>,
pub tpu: Vec<UdpSocket>,
@ -1619,12 +1621,14 @@ impl Node {
repair,
retransmit,
storage: Some(storage),
ip_echo: None,
},
}
}
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let (gossip_port, (gossip, ip_echo)) = bind_common_in_range((1024, 65535)).unwrap();
let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port);
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -1640,7 +1644,7 @@ impl Node {
let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = ContactInfo::new(
pubkey,
gossip.local_addr().unwrap(),
gossip_addr,
tvu.local_addr().unwrap(),
tvu_forwards.local_addr().unwrap(),
tpu.local_addr().unwrap(),
@ -1654,6 +1658,7 @@ impl Node {
info,
sockets: Sockets {
gossip,
ip_echo: Some(ip_echo),
tvu: vec![tvu],
tvu_forwards: vec![tvu_forwards],
tpu: vec![tpu],
@ -1665,16 +1670,19 @@ impl Node {
},
}
}
fn get_gossip_port(gossip_addr: &SocketAddr, port_range: PortRange) -> (u16, UdpSocket) {
fn get_gossip_port(
gossip_addr: &SocketAddr,
port_range: PortRange,
) -> (u16, (UdpSocket, TcpListener)) {
if gossip_addr.port() != 0 {
(
gossip_addr.port(),
bind_to(gossip_addr.port(), false).unwrap_or_else(|e| {
bind_common(gossip_addr.port(), false).unwrap_or_else(|e| {
panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
}),
)
} else {
Self::bind(port_range)
bind_common_in_range(port_range).expect("Failed to bind")
}
}
fn bind(port_range: PortRange) -> (u16, UdpSocket) {
@ -1685,7 +1693,7 @@ impl Node {
gossip_addr: &SocketAddr,
port_range: PortRange,
) -> Node {
let (gossip_port, gossip) = Self::get_gossip_port(gossip_addr, port_range);
let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range);
let (tvu_port, tvu_sockets) = multi_bind_in_range(port_range, 8).expect("tvu multi_bind");
@ -1727,6 +1735,7 @@ impl Node {
repair,
retransmit,
storage: None,
ip_echo: Some(ip_echo),
},
}
}

View File

@ -240,8 +240,7 @@ impl Validator {
"New blob signal for the TVU should be the same as the clear bank signal."
);
let ip_echo_server =
solana_netutil::ip_echo_server(node.sockets.gossip.local_addr().unwrap().port());
let ip_echo_server = solana_netutil::ip_echo_server(node.sockets.ip_echo.unwrap());
let gossip_service = GossipService::new(
&cluster_info,

View File

@ -74,7 +74,7 @@ fn test_replicator_startup_1_node() {
}
#[test]
#[ignore]
#[serial]
fn test_replicator_startup_2_nodes() {
run_replicator_startup_basic(2, 1);
}
@ -119,7 +119,7 @@ fn test_replicator_startup_leader_hang() {
}
#[test]
#[ignore]
#[serial]
fn test_replicator_startup_ledger_hang() {
solana_logger::setup();
info!("starting replicator test");

View File

@ -1,4 +1,5 @@
use clap::{crate_version, App, Arg};
use std::net::{SocketAddr, TcpListener};
fn main() {
solana_logger::setup();
@ -16,7 +17,9 @@ fn main() {
let port = port
.parse()
.unwrap_or_else(|_| panic!("Unable to parse {}", port));
let _runtime = solana_netutil::ip_echo_server(port);
let bind_addr = SocketAddr::from(([0, 0, 0, 0], port));
let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener");
let _runtime = solana_netutil::ip_echo_server(tcp_listener);
loop {
std::thread::park();
}

View File

@ -7,6 +7,7 @@ use std::time::Duration;
use tokio;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::reactor::Handle;
use tokio::runtime::Runtime;
use tokio_codec::{BytesCodec, Decoder};
@ -32,11 +33,10 @@ impl IpEchoServerMessage {
/// Starts a simple TCP server on the given port that echos the IP address of any peer that
/// connects. Used by |get_public_ip_addr|
pub fn ip_echo_server(port: u16) -> IpEchoServer {
let bind_addr = SocketAddr::from(([0, 0, 0, 0], port));
let tcp = TcpListener::bind(&bind_addr)
.unwrap_or_else(|err| panic!("Unable to bind to {}: {}", bind_addr, err));
info!("bound to {:?}", bind_addr);
pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
info!("bound to {:?}", tcp.local_addr());
let tcp =
TcpListener::from_std(tcp, &Handle::default()).expect("Failed to convert std::TcpListener");
let server = tcp
.incoming()

View File

@ -58,22 +58,9 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, St
// `ip_echo_server_addr`
pub fn verify_reachable_ports(
ip_echo_server_addr: &SocketAddr,
tcp_ports: &[u16],
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
) {
let tcp: Vec<(_, _)> = tcp_ports
.iter()
.map(|port| {
(
port,
TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else(|err| {
error!("Unable to bind to tcp/{}: {}", port, err);
std::process::exit(1);
}),
)
})
.collect();
let udp: Vec<(_, _)> = udp_sockets
.iter()
.map(|udp_socket| {
@ -88,9 +75,10 @@ pub fn verify_reachable_ports(
info!(
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
tcp_ports, udp_ports, ip_echo_server_addr
tcp_listeners, udp_ports, ip_echo_server_addr
);
let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &udp_ports),
@ -98,9 +86,8 @@ pub fn verify_reachable_ports(
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
// Wait for a connection to open on each TCP port
for (port, tcp_listener) in tcp {
for (port, tcp_listener) in tcp_listeners {
let (sender, receiver) = channel();
let port = *port;
std::thread::spawn(move || {
debug!("Waiting for incoming connection on tcp/{}", port);
let _ = tcp_listener.incoming().next().expect("tcp incoming failed");
@ -230,6 +217,30 @@ fn udp_socket(reuseaddr: bool) -> io::Result<Socket> {
Ok(sock)
}
// Find a port in the given range that is available for both TCP and UDP
pub fn bind_common_in_range(range: PortRange) -> io::Result<(u16, (UdpSocket, TcpListener))> {
let (start, end) = range;
let mut tries_left = end - start;
let mut rand_port = thread_rng().gen_range(start, end);
loop {
match bind_common(rand_port, false) {
Ok((sock, listener)) => {
break Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
}
Err(err) => {
if tries_left == 0 {
return Err(err);
}
}
}
rand_port += 1;
if rand_port == end {
rand_port = start;
}
tries_left -= 1;
}
}
pub fn bind_in_range(range: PortRange) -> io::Result<(u16, UdpSocket)> {
let sock = udp_socket(false)?;
@ -292,6 +303,21 @@ pub fn bind_to(port: u16, reuseaddr: bool) -> io::Result<UdpSocket> {
}
}
// binds both a UdpSocket and a TcpListener
pub fn bind_common(port: u16, reuseaddr: bool) -> io::Result<(UdpSocket, TcpListener)> {
let sock = udp_socket(reuseaddr)?;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let sock_addr = SockAddr::from(addr);
match sock.bind(&sock_addr) {
Ok(_) => match TcpListener::bind(&addr) {
Ok(listener) => Result::Ok((sock.into_udp_socket(), listener)),
Err(err) => Err(err),
},
Err(err) => Err(err),
}
}
pub fn find_available_port_in_range(range: PortRange) -> io::Result<u16> {
let (start, end) = range;
let mut tries_left = end - start;
@ -386,4 +412,10 @@ mod tests {
let port = find_available_port_in_range((3000, 3050)).unwrap();
assert!(3000 <= port && port < 3050);
}
#[test]
fn test_bind_common_in_range() {
let (port, _) = bind_common_in_range((3000, 3050)).unwrap();
assert!(3000 <= port && port < 3050);
}
}

View File

@ -17,7 +17,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil};
use std::fs::{self, File};
use std::io::{self, Read};
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpListener};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::sync::Arc;
@ -530,8 +530,7 @@ fn main() {
);
solana_metrics::set_host_id(keypair.pubkey().to_string());
let mut tcp_ports = vec![gossip_addr.port()];
let mut tcp_ports = vec![];
let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);
if let Some(port) = matches.value_of("rpc_port") {
let port_number = port.to_string().parse().expect("integer");
@ -539,9 +538,9 @@ fn main() {
eprintln!("Invalid RPC port requested: {:?}", port);
exit(1);
}
node.info.rpc = SocketAddr::new(gossip_addr.ip(), port_number);
node.info.rpc_pubsub = SocketAddr::new(gossip_addr.ip(), port_number + 1);
tcp_ports.extend_from_slice(&[port_number, port_number + 1]);
node.info.rpc = SocketAddr::new(node.info.gossip.ip(), port_number);
node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), port_number + 1);
tcp_ports = vec![port_number, port_number + 1];
};
if let Some(ref cluster_entrypoint) = cluster_entrypoint {
@ -552,9 +551,27 @@ fn main() {
&node.sockets.retransmit,
];
let mut tcp_listeners: Vec<(_, _)> = tcp_ports
.iter()
.map(|port| {
(
*port,
TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], *port))).unwrap_or_else(
|err| {
error!("Unable to bind to tcp/{}: {}", port, err);
std::process::exit(1);
},
),
)
})
.collect();
if let Some(ip_echo) = &node.sockets.ip_echo {
let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
tcp_listeners.push((node.info.gossip.port(), ip_echo));
}
solana_netutil::verify_reachable_ports(
&cluster_entrypoint.gossip,
&tcp_ports,
tcp_listeners,
&udp_sockets,
);