diff --git a/ci/localnet-sanity.sh b/ci/localnet-sanity.sh index 27ed2b65e7..6565e1d183 100755 --- a/ci/localnet-sanity.sh +++ b/ci/localnet-sanity.sh @@ -69,8 +69,8 @@ echo "--- Wallet sanity" echo "--- Node count" ( - set -x source multinode-demo/common.sh + set -x client_id=/tmp/client-id.json-$$ $solana_keygen -o $client_id $solana_bench_tps --identity $client_id --num-nodes 3 --converge-only @@ -81,8 +81,8 @@ killBackgroundCommands echo "--- Ledger verification" ( - set -x source multinode-demo/common.sh + set -x cp -R "$SOLANA_CONFIG_DIR"/ledger /tmp/ledger-$$ $solana_ledger_tool --ledger /tmp/ledger-$$ verify rm -rf /tmp/ledger-$$ diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index 54d43a402d..058b9cc5fc 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -2,7 +2,7 @@ extern crate clap; extern crate solana; use clap::{App, Arg}; -use solana::nat::bind_to; +use solana::netutil::bind_to; use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; @@ -84,7 +84,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); for _ in 0..num_sockets { - let read = bind_to(port); + let read = bind_to(port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); addr = read.local_addr().unwrap(); diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 9030407240..b9c3325402 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -18,7 +18,7 @@ use solana::logger; use solana::metrics; use solana::ncp::Ncp; use solana::service::Service; -use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; +use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil, Pubkey}; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; @@ -507,16 +507,34 @@ fn main() { let exit_signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; - let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads); + let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads); - println!(" Node address | Node identifier"); - println!("----------------------+------------------"); - for node in &validators { - println!(" {:20} | {}", node.contact_info.tpu.to_string(), node.id); + let leader_id = if let Some(leader) = &leader { + leader.id + } else { + Default::default() + }; + + fn print_gossip_info(nodes: &Vec, leader_id: &Pubkey) -> () { + println!(" Node gossip address | Node identifier"); + println!("---------------------+------------------"); + for node in nodes { + println!( + " {:20} | {}{}", + node.contact_info.ncp.to_string(), + node.id, + if node.id == *leader_id { + " <==== leader" + } else { + "" + } + ); + } + println!("Nodes: {}", nodes.len()); } - println!("Nodes: {}", validators.len()); + print_gossip_info(&nodes, &leader_id); - if validators.len() < num_nodes { + if nodes.len() < num_nodes { println!( "Error: Insufficient nodes discovered. Expecting {} or more", num_nodes @@ -575,7 +593,7 @@ fn main() { let maxes = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds println!("Sampling TPS every {} second...", sample_period); - let v_threads: Vec<_> = validators + let v_threads: Vec<_> = nodes .into_iter() .map(|v| { let exit_signal = exit_signal.clone(); @@ -725,6 +743,7 @@ fn converge( //wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { if spy_ref.read().unwrap().leader_data().is_none() { + sleep(Duration::new(1, 0)); continue; } diff --git a/src/bin/fullnode-config.rs b/src/bin/fullnode-config.rs index 4d7cfc97c4..c4474512e0 100644 --- a/src/bin/fullnode-config.rs +++ b/src/bin/fullnode-config.rs @@ -7,7 +7,7 @@ extern crate solana; use clap::{App, Arg}; use solana::crdt::FULLNODE_PORT_RANGE; use solana::fullnode::Config; -use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; +use solana::netutil::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; use solana::signature::read_pkcs8; use std::io; use std::net::SocketAddr; diff --git a/src/client.rs b/src/client.rs index d0fff5dd16..ab77800330 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,11 @@ use crdt::{NodeInfo, FULLNODE_PORT_RANGE}; -use nat::bind_in_range; +use netutil::bind_in_range; use std::time::Duration; use thin_client::ThinClient; pub fn mk_client(r: &NodeInfo) -> ThinClient { - let requests_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); - let transactions_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); + let (_, requests_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); + let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); requests_socket .set_read_timeout(Some(Duration::new(1, 0))) diff --git a/src/crdt.rs b/src/crdt.rs index dc4d8bd7e7..8b556d3ec9 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -18,7 +18,7 @@ use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use log::Level; -use nat::{bind_in_range, bind_to}; +use netutil::{bind_in_range, bind_to, multi_bind_in_range}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; @@ -1172,7 +1172,7 @@ impl Crdt { } pub fn spy_node() -> (NodeInfo, UdpSocket) { - let gossip_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); + let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let pubkey = Keypair::new().pubkey(); let daddr = socketaddr_any!(); @@ -1235,29 +1235,20 @@ impl Node { } pub fn new_with_external_ip(pubkey: Pubkey, ncp: &SocketAddr) -> Node { fn bind() -> (u16, UdpSocket) { - match bind_in_range(FULLNODE_PORT_RANGE) { - Ok(socket) => (socket.local_addr().unwrap().port(), socket), - Err(err) => { - panic!("Failed to bind err: {}", err); - } - } + bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind") }; let (gossip_port, gossip) = if ncp.port() != 0 { - (ncp.port(), bind_to(ncp.port())) + (ncp.port(), bind_to(ncp.port(), false).expect("ncp bind")) } else { bind() }; let (replicate_port, replicate) = bind(); let (requests_port, requests) = bind(); - let (transaction_port, transaction) = bind(); - let mut transaction_sockets = vec![transaction]; - - for _ in 0..4 { - transaction_sockets.push(bind_to(transaction_port)); - } + let (transaction_port, transaction_sockets) = + multi_bind_in_range(FULLNODE_PORT_RANGE, 5).expect("tpu multi_bind"); let (_, repair) = bind(); let (_, broadcast) = bind(); @@ -1275,6 +1266,7 @@ impl Node { SocketAddr::new(ncp.ip(), requests_port), SocketAddr::new(ncp.ip(), transaction_port), ); + trace!("new NodeInfo: {:?}", info); Node { info, diff --git a/src/drone.rs b/src/drone.rs index 5874514a2e..f5fbc411df 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -162,7 +162,7 @@ mod tests { use fullnode::Fullnode; use logger; use mint::Mint; - use nat::get_ip_addr; + use netutil::get_ip_addr; use service::Service; use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; diff --git a/src/lib.rs b/src/lib.rs index e4e0428206..e73cb10875 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,8 +30,8 @@ pub mod ledger; pub mod logger; pub mod metrics; pub mod mint; -pub mod nat; pub mod ncp; +pub mod netutil; pub mod packet; pub mod payment_plan; pub mod record_stage; @@ -72,6 +72,7 @@ extern crate jsonrpc_http_server; extern crate log; extern crate nix; extern crate rayon; +extern crate reqwest; extern crate ring; extern crate serde; #[macro_use] diff --git a/src/nat.rs b/src/netutil.rs similarity index 60% rename from src/nat.rs rename to src/netutil.rs index 5c10671ed0..aba76a242b 100644 --- a/src/nat.rs +++ b/src/netutil.rs @@ -1,11 +1,10 @@ -//! The `nat` module assists with NAT traversal - -extern crate reqwest; +//! The `netutil` module assists with networking use nix::sys::socket::setsockopt; use nix::sys::socket::sockopt::{ReuseAddr, ReusePort}; use pnet_datalink as datalink; use rand::{thread_rng, Rng}; +use reqwest; use socket2::{Domain, SockAddr, Socket, Type}; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -73,20 +72,34 @@ pub fn get_ip_addr() -> Option { None } -pub fn bind_in_range(range: (u16, u16)) -> io::Result { +fn udp_socket(reuseaddr: bool) -> io::Result { + let sock = Socket::new(Domain::ipv4(), Type::dgram(), None)?; + let sock_fd = sock.as_raw_fd(); + + if reuseaddr { + // best effort, i.e. ignore errors here, we'll get the failure in caller + setsockopt(sock_fd, ReusePort, &true).ok(); + setsockopt(sock_fd, ReuseAddr, &true).ok(); + } + + Ok(sock) +} + +pub fn bind_in_range(range: (u16, u16)) -> io::Result<(u16, UdpSocket)> { + let sock = udp_socket(false)?; + let (start, end) = range; let mut tries_left = end - start; - let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap(); - let sock_fd = sock.as_raw_fd(); - setsockopt(sock_fd, ReusePort, &true).unwrap(); - setsockopt(sock_fd, ReuseAddr, &true).unwrap(); loop { let rand_port = thread_rng().gen_range(start, end); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port); match sock.bind(&SockAddr::from(addr)) { - Result::Ok(_) => break Result::Ok(sock.into_udp_socket()), - Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { + Ok(_) => { + let sock = sock.into_udp_socket(); + break Result::Ok((sock.local_addr().unwrap().port(), sock)); + } + Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { return Err(err); }, } @@ -94,23 +107,35 @@ pub fn bind_in_range(range: (u16, u16)) -> io::Result { } } -pub fn bind_to(port: u16) -> UdpSocket { - let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap(); - let sock_fd = sock.as_raw_fd(); - setsockopt(sock_fd, ReusePort, &true).unwrap(); - setsockopt(sock_fd, ReuseAddr, &true).unwrap(); - let addr = socketaddr!(0, port); +// binds many sockets to the same port in a range +pub fn multi_bind_in_range(range: (u16, u16), num: usize) -> io::Result<(u16, Vec)> { + let mut sockets = Vec::with_capacity(num); + + let port = { + let (port, _) = bind_in_range(range)?; + port + }; // drop the probe, port should be available... briefly. + + for _ in 0..num { + sockets.push(bind_to(port, true)?); + } + Ok((port, sockets)) +} + +pub fn bind_to(port: u16, reuseaddr: bool) -> io::Result { + let sock = udp_socket(reuseaddr)?; + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); + match sock.bind(&SockAddr::from(addr)) { - Ok(_) => sock.into_udp_socket(), - Err(err) => { - panic!("Failed to bind to {:?}, err: {}", addr, err); - } + Ok(_) => Result::Ok(sock.into_udp_socket()), + Err(err) => Err(err), } } #[cfg(test)] mod tests { - use nat::parse_port_or_addr; + use netutil::*; #[test] fn test_parse_port_or_addr() { @@ -123,4 +148,26 @@ mod tests { let p3 = parse_port_or_addr(None, 1); assert_eq!(p3.port(), 1); } + + #[test] + fn test_bind() { + assert_eq!(bind_in_range((2000, 2001)).unwrap().0, 2000); + let x = bind_to(2002, true).unwrap(); + let y = bind_to(2002, true).unwrap(); + assert_eq!( + x.local_addr().unwrap().port(), + y.local_addr().unwrap().port() + ); + let (port, v) = multi_bind_in_range((2010, 2110), 10).unwrap(); + for sock in &v { + assert_eq!(port, sock.local_addr().unwrap().port()); + } + } + + #[test] + #[should_panic] + fn test_bind_in_range_nil() { + let _ = bind_in_range((2000, 2000)); + } + } diff --git a/src/thin_client.rs b/src/thin_client.rs index c7684d7c42..360b710f1b 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -7,10 +7,12 @@ use bank::{Account, Bank}; use bincode::{deserialize, serialize}; use crdt::{Crdt, CrdtError, NodeInfo}; use hash::Hash; +use log::Level; use ncp::Ncp; use request::{Request, Response}; use result::{Error, Result}; use signature::{Keypair, Pubkey, Signature}; +use std; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; @@ -64,9 +66,17 @@ impl ThinClient { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; trace!("start recv_from"); - let (len, from) = self.requests_socket.recv_from(&mut buf)?; - trace!("end recv_from got {} {}", len, from); - deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) + match self.requests_socket.recv_from(&mut buf) { + Ok((len, from)) => { + trace!("end recv_from got {} {}", len, from); + deserialize(&buf) + .or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) + } + Err(e) => { + trace!("end recv_from got {:?}", e); + Err(e) + } + } } pub fn process_response(&mut self, resp: &Response) { @@ -361,29 +371,97 @@ impl Drop for ThinClient { } } +fn trace_node_info(nodes: &Vec, leader_id: &Pubkey) -> () { + trace!(" NodeInfo.contact_info | Node identifier"); + trace!("---------------------------+------------------"); + for node in nodes { + trace!( + " ncp: {:20} | {}{}", + node.contact_info.ncp.to_string(), + node.id, + if node.id == *leader_id { + " <==== leader" + } else { + "" + } + ); + trace!(" rpu: {:20} | ", node.contact_info.rpu.to_string(),); + trace!(" tpu: {:20} | ", node.contact_info.tpu.to_string(),); + } + trace!("Nodes: {}", nodes.len()); +} + pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> Result { let exit = Arc::new(AtomicBool::new(false)); - trace!("polling {:?} for leader", leader_ncp); let (node, gossip_socket) = Crdt::spy_node(); + let my_addr = gossip_socket.local_addr().unwrap(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); + let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); sleep(Duration::from_millis(100)); + let deadline = match timeout { + Some(timeout) => Duration::new(timeout, 0), + None => Duration::new(std::u64::MAX, 0), + }; let now = Instant::now(); // Block until leader's correct contact info is received - while crdt.read().unwrap().leader_data().is_none() { - if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) { + let leader; + + loop { + trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr); + + if let Some(l) = crdt.read().unwrap().leader_data() { + leader = Some(l.clone()); + break; + } + + if log_enabled!(Level::Trace) { + // print validators/fullnodes + let nodes: Vec = crdt + .read() + .unwrap() + .table + .values() + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .cloned() + .collect(); + trace_node_info(&nodes, &Default::default()); + } + + if now.elapsed() > deadline { return Err(Error::CrdtError(CrdtError::NoLeader)); } + + sleep(Duration::from_millis(100)); } ncp.close()?; - let leader = crdt.read().unwrap().leader_data().unwrap().clone(); - Ok(leader) + + if log_enabled!(Level::Trace) { + let leader_id = if let Some(leader) = &leader { + leader.id + } else { + Default::default() + }; + + // print validators/fullnodes + let nodes: Vec = crdt + .read() + .unwrap() + .table + .values() + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .cloned() + .collect(); + trace_node_info(&nodes, &leader_id); + } + + Ok(leader.unwrap().clone()) } #[cfg(test)]