improve localnet-sanity's robustness (#1160)

* fix poll_gossip_for_leader() loop to actually wait
         for 30 seconds
    * reduce reuseaddr use to only when necessary,
         try to avoid already bound sockets
    * move nat.rs to netutil.rs
    * add gossip tracing to thin_client and bench-tps
This commit is contained in:
Rob Walker 2018-09-09 04:50:43 +09:00 committed by GitHub
parent a77aca75b2
commit 97c3125a78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 200 additions and 63 deletions

View File

@ -69,8 +69,8 @@ echo "--- Wallet sanity"
echo "--- Node count"
(
set -x
source multinode-demo/common.sh
set -x
client_id=/tmp/client-id.json-$$
$solana_keygen -o $client_id
$solana_bench_tps --identity $client_id --num-nodes 3 --converge-only
@ -81,8 +81,8 @@ killBackgroundCommands
echo "--- Ledger verification"
(
set -x
source multinode-demo/common.sh
set -x
cp -R "$SOLANA_CONFIG_DIR"/ledger /tmp/ledger-$$
$solana_ledger_tool --ledger /tmp/ledger-$$ verify
rm -rf /tmp/ledger-$$

View File

@ -2,7 +2,7 @@ extern crate clap;
extern crate solana;
use clap::{App, Arg};
use solana::nat::bind_to;
use solana::netutil::bind_to;
use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver};
@ -84,7 +84,7 @@ fn main() -> Result<()> {
let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
for _ in 0..num_sockets {
let read = bind_to(port);
let read = bind_to(port, false).unwrap();
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
addr = read.local_addr().unwrap();

View File

@ -18,7 +18,7 @@ use solana::logger;
use solana::metrics;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil, Pubkey};
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
@ -507,16 +507,34 @@ fn main() {
let exit_signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
let (nodes, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads);
println!(" Node address | Node identifier");
println!("----------------------+------------------");
for node in &validators {
println!(" {:20} | {}", node.contact_info.tpu.to_string(), node.id);
let leader_id = if let Some(leader) = &leader {
leader.id
} else {
Default::default()
};
fn print_gossip_info(nodes: &Vec<NodeInfo>, leader_id: &Pubkey) -> () {
println!(" Node gossip address | Node identifier");
println!("---------------------+------------------");
for node in nodes {
println!(
" {:20} | {}{}",
node.contact_info.ncp.to_string(),
node.id,
if node.id == *leader_id {
" <==== leader"
} else {
""
}
);
}
println!("Nodes: {}", nodes.len());
}
println!("Nodes: {}", validators.len());
print_gossip_info(&nodes, &leader_id);
if validators.len() < num_nodes {
if nodes.len() < num_nodes {
println!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
@ -575,7 +593,7 @@ fn main() {
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling TPS every {} second...", sample_period);
let v_threads: Vec<_> = validators
let v_threads: Vec<_> = nodes
.into_iter()
.map(|v| {
let exit_signal = exit_signal.clone();
@ -725,6 +743,7 @@ fn converge(
//wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 {
if spy_ref.read().unwrap().leader_data().is_none() {
sleep(Duration::new(1, 0));
continue;
}

View File

@ -7,7 +7,7 @@ extern crate solana;
use clap::{App, Arg};
use solana::crdt::FULLNODE_PORT_RANGE;
use solana::fullnode::Config;
use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr};
use solana::netutil::{get_ip_addr, get_public_ip_addr, parse_port_or_addr};
use solana::signature::read_pkcs8;
use std::io;
use std::net::SocketAddr;

View File

@ -1,11 +1,11 @@
use crdt::{NodeInfo, FULLNODE_PORT_RANGE};
use nat::bind_in_range;
use netutil::bind_in_range;
use std::time::Duration;
use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let transactions_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let (_, requests_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))

View File

@ -18,7 +18,7 @@ use counter::Counter;
use hash::Hash;
use ledger::LedgerWindow;
use log::Level;
use nat::{bind_in_range, bind_to};
use netutil::{bind_in_range, bind_to, multi_bind_in_range};
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
@ -1172,7 +1172,7 @@ impl Crdt {
}
pub fn spy_node() -> (NodeInfo, UdpSocket) {
let gossip_socket = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let (_, gossip_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let pubkey = Keypair::new().pubkey();
let daddr = socketaddr_any!();
@ -1235,29 +1235,20 @@ impl Node {
}
pub fn new_with_external_ip(pubkey: Pubkey, ncp: &SocketAddr) -> Node {
fn bind() -> (u16, UdpSocket) {
match bind_in_range(FULLNODE_PORT_RANGE) {
Ok(socket) => (socket.local_addr().unwrap().port(), socket),
Err(err) => {
panic!("Failed to bind err: {}", err);
}
}
bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind")
};
let (gossip_port, gossip) = if ncp.port() != 0 {
(ncp.port(), bind_to(ncp.port()))
(ncp.port(), bind_to(ncp.port(), false).expect("ncp bind"))
} else {
bind()
};
let (replicate_port, replicate) = bind();
let (requests_port, requests) = bind();
let (transaction_port, transaction) = bind();
let mut transaction_sockets = vec![transaction];
for _ in 0..4 {
transaction_sockets.push(bind_to(transaction_port));
}
let (transaction_port, transaction_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 5).expect("tpu multi_bind");
let (_, repair) = bind();
let (_, broadcast) = bind();
@ -1275,6 +1266,7 @@ impl Node {
SocketAddr::new(ncp.ip(), requests_port),
SocketAddr::new(ncp.ip(), transaction_port),
);
trace!("new NodeInfo: {:?}", info);
Node {
info,

View File

@ -162,7 +162,7 @@ mod tests {
use fullnode::Fullnode;
use logger;
use mint::Mint;
use nat::get_ip_addr;
use netutil::get_ip_addr;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;

View File

@ -30,8 +30,8 @@ pub mod ledger;
pub mod logger;
pub mod metrics;
pub mod mint;
pub mod nat;
pub mod ncp;
pub mod netutil;
pub mod packet;
pub mod payment_plan;
pub mod record_stage;
@ -72,6 +72,7 @@ extern crate jsonrpc_http_server;
extern crate log;
extern crate nix;
extern crate rayon;
extern crate reqwest;
extern crate ring;
extern crate serde;
#[macro_use]

View File

@ -1,11 +1,10 @@
//! The `nat` module assists with NAT traversal
extern crate reqwest;
//! The `netutil` module assists with networking
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::{ReuseAddr, ReusePort};
use pnet_datalink as datalink;
use rand::{thread_rng, Rng};
use reqwest;
use socket2::{Domain, SockAddr, Socket, Type};
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
@ -73,20 +72,34 @@ pub fn get_ip_addr() -> Option<IpAddr> {
None
}
pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> {
fn udp_socket(reuseaddr: bool) -> io::Result<Socket> {
let sock = Socket::new(Domain::ipv4(), Type::dgram(), None)?;
let sock_fd = sock.as_raw_fd();
if reuseaddr {
// best effort, i.e. ignore errors here, we'll get the failure in caller
setsockopt(sock_fd, ReusePort, &true).ok();
setsockopt(sock_fd, ReuseAddr, &true).ok();
}
Ok(sock)
}
pub fn bind_in_range(range: (u16, u16)) -> io::Result<(u16, UdpSocket)> {
let sock = udp_socket(false)?;
let (start, end) = range;
let mut tries_left = end - start;
let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap();
let sock_fd = sock.as_raw_fd();
setsockopt(sock_fd, ReusePort, &true).unwrap();
setsockopt(sock_fd, ReuseAddr, &true).unwrap();
loop {
let rand_port = thread_rng().gen_range(start, end);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port);
match sock.bind(&SockAddr::from(addr)) {
Result::Ok(_) => break Result::Ok(sock.into_udp_socket()),
Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 {
Ok(_) => {
let sock = sock.into_udp_socket();
break Result::Ok((sock.local_addr().unwrap().port(), sock));
}
Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 {
return Err(err);
},
}
@ -94,23 +107,35 @@ pub fn bind_in_range(range: (u16, u16)) -> io::Result<UdpSocket> {
}
}
pub fn bind_to(port: u16) -> UdpSocket {
let sock = Socket::new(Domain::ipv4(), Type::dgram(), None).unwrap();
let sock_fd = sock.as_raw_fd();
setsockopt(sock_fd, ReusePort, &true).unwrap();
setsockopt(sock_fd, ReuseAddr, &true).unwrap();
let addr = socketaddr!(0, port);
// binds many sockets to the same port in a range
pub fn multi_bind_in_range(range: (u16, u16), num: usize) -> io::Result<(u16, Vec<UdpSocket>)> {
let mut sockets = Vec::with_capacity(num);
let port = {
let (port, _) = bind_in_range(range)?;
port
}; // drop the probe, port should be available... briefly.
for _ in 0..num {
sockets.push(bind_to(port, true)?);
}
Ok((port, sockets))
}
pub fn bind_to(port: u16, reuseaddr: bool) -> io::Result<UdpSocket> {
let sock = udp_socket(reuseaddr)?;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
match sock.bind(&SockAddr::from(addr)) {
Ok(_) => sock.into_udp_socket(),
Err(err) => {
panic!("Failed to bind to {:?}, err: {}", addr, err);
}
Ok(_) => Result::Ok(sock.into_udp_socket()),
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests {
use nat::parse_port_or_addr;
use netutil::*;
#[test]
fn test_parse_port_or_addr() {
@ -123,4 +148,26 @@ mod tests {
let p3 = parse_port_or_addr(None, 1);
assert_eq!(p3.port(), 1);
}
#[test]
fn test_bind() {
assert_eq!(bind_in_range((2000, 2001)).unwrap().0, 2000);
let x = bind_to(2002, true).unwrap();
let y = bind_to(2002, true).unwrap();
assert_eq!(
x.local_addr().unwrap().port(),
y.local_addr().unwrap().port()
);
let (port, v) = multi_bind_in_range((2010, 2110), 10).unwrap();
for sock in &v {
assert_eq!(port, sock.local_addr().unwrap().port());
}
}
#[test]
#[should_panic]
fn test_bind_in_range_nil() {
let _ = bind_in_range((2000, 2000));
}
}

View File

@ -7,10 +7,12 @@ use bank::{Account, Bank};
use bincode::{deserialize, serialize};
use crdt::{Crdt, CrdtError, NodeInfo};
use hash::Hash;
use log::Level;
use ncp::Ncp;
use request::{Request, Response};
use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature};
use std;
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
@ -64,9 +66,17 @@ impl ThinClient {
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
trace!("start recv_from");
let (len, from) = self.requests_socket.recv_from(&mut buf)?;
trace!("end recv_from got {} {}", len, from);
deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize")))
match self.requests_socket.recv_from(&mut buf) {
Ok((len, from)) => {
trace!("end recv_from got {} {}", len, from);
deserialize(&buf)
.or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize")))
}
Err(e) => {
trace!("end recv_from got {:?}", e);
Err(e)
}
}
}
pub fn process_response(&mut self, resp: &Response) {
@ -361,29 +371,97 @@ impl Drop for ThinClient {
}
}
fn trace_node_info(nodes: &Vec<NodeInfo>, leader_id: &Pubkey) -> () {
trace!(" NodeInfo.contact_info | Node identifier");
trace!("---------------------------+------------------");
for node in nodes {
trace!(
" ncp: {:20} | {}{}",
node.contact_info.ncp.to_string(),
node.id,
if node.id == *leader_id {
" <==== leader"
} else {
""
}
);
trace!(" rpu: {:20} | ", node.contact_info.rpu.to_string(),);
trace!(" tpu: {:20} | ", node.contact_info.tpu.to_string(),);
}
trace!("Nodes: {}", nodes.len());
}
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
trace!("polling {:?} for leader", leader_ncp);
let (node, gossip_socket) = Crdt::spy_node();
let my_addr = gossip_socket.local_addr().unwrap();
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone());
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);
sleep(Duration::from_millis(100));
let deadline = match timeout {
Some(timeout) => Duration::new(timeout, 0),
None => Duration::new(std::u64::MAX, 0),
};
let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) {
let leader;
loop {
trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr);
if let Some(l) = crdt.read().unwrap().leader_data() {
leader = Some(l.clone());
break;
}
if log_enabled!(Level::Trace) {
// print validators/fullnodes
let nodes: Vec<NodeInfo> = crdt
.read()
.unwrap()
.table
.values()
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
trace_node_info(&nodes, &Default::default());
}
if now.elapsed() > deadline {
return Err(Error::CrdtError(CrdtError::NoLeader));
}
sleep(Duration::from_millis(100));
}
ncp.close()?;
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
Ok(leader)
if log_enabled!(Level::Trace) {
let leader_id = if let Some(leader) = &leader {
leader.id
} else {
Default::default()
};
// print validators/fullnodes
let nodes: Vec<NodeInfo> = crdt
.read()
.unwrap()
.table
.values()
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
trace_node_info(&nodes, &leader_id);
}
Ok(leader.unwrap().clone())
}
#[cfg(test)]