diff --git a/ci/localnet-sanity.sh b/ci/localnet-sanity.sh index 60b00b07f..19a44e2ed 100755 --- a/ci/localnet-sanity.sh +++ b/ci/localnet-sanity.sh @@ -71,7 +71,7 @@ echo "--- Wallet sanity" echo "--- Node count" ( set -x - ./multinode-demo/client.sh "$PWD" 3 -c --addr 127.0.0.1 + ./multinode-demo/client.sh "$PWD" 3 -c ) || flag_error killBackgroundCommands diff --git a/multinode-demo/wallet.sh b/multinode-demo/wallet.sh index d43b0e876..a2166d5ad 100755 --- a/multinode-demo/wallet.sh +++ b/multinode-demo/wallet.sh @@ -47,4 +47,4 @@ fi # shellcheck disable=SC2086 # $solana_wallet should not be quoted exec $solana_wallet \ - -a 127.0.0.1 -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@" + -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@" diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index b8eff762f..b5c43793b 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -63,7 +63,12 @@ fn main() -> Result<()> { let pack_recycler = PacketRecycler::default(); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); + let t_reader = receiver( + Arc::new(read), + exit.clone(), + pack_recycler.clone(), + s_reader, + ); let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); let t_producer2 = producer(&addr, &pack_recycler, exit.clone()); let t_producer3 = producer(&addr, &pack_recycler, exit.clone()); diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index b452effbb..c7dca9fa4 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -16,7 +16,6 @@ use solana::fullnode::Config; use solana::hash::Hash; use solana::logger; use solana::metrics; -use solana::nat::get_public_ip_addr; use solana::ncp::Ncp; use solana::service::Service; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; @@ -440,14 +439,6 @@ fn main() { .short("c") .help("exit immediately after converging"), ) - .arg( - Arg::with_name("addr") - .short("a") - .long("addr") - .value_name("IPADDR") - .takes_value(true) - .help("address to advertise to the network"), - ) .arg( Arg::with_name("sustained") .long("sustained") @@ -484,18 +475,6 @@ fn main() { time_sec = s.to_string().parse().expect("integer"); } - let addr = if let Some(s) = matches.value_of("addr") { - s.to_string().parse().unwrap_or_else(|e| { - eprintln!("failed to parse {} as IP address error: {:?}", s, e); - exit(1); - }) - } else { - get_public_ip_addr().unwrap_or_else(|e| { - eprintln!("failed to get public IP, try --addr? error: {:?}", e); - exit(1); - }) - }; - if let Some(s) = matches.value_of("tx_count") { tx_count = s.to_string().parse().expect("integer"); } @@ -506,7 +485,7 @@ fn main() { let exit_signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; - let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads, addr); + let (validators, leader) = converge(&leader, &exit_signal, num_nodes, &mut c_threads); println!(" Node address | Node identifier"); println!("----------------------+------------------"); @@ -672,10 +651,9 @@ fn converge( exit_signal: &Arc, num_nodes: usize, threads: &mut Vec>, - addr: IpAddr, ) -> (Vec, Option) { //lets spy on the network - let (node, gossip_socket, gossip_send_socket) = Crdt::spy_node(addr); + let (node, gossip_socket) = Crdt::spy_node(); let mut spy_crdt = Crdt::new(node).expect("Crdt::new"); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); @@ -686,7 +664,6 @@ fn converge( window.clone(), None, gossip_socket, - gossip_send_socket, exit_signal.clone(), ).expect("DataReplicator::new"); let mut v: Vec = vec![]; diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 8cd1a8ca0..840fcd8bd 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -15,14 +15,12 @@ use solana::drone::{Drone, DroneRequest, DRONE_PORT}; use solana::fullnode::Config; use solana::logger; use solana::metrics::set_panic_hook; -use solana::nat::get_public_ip_addr; use solana::signature::read_keypair; use solana::thin_client::poll_gossip_for_leader; use std::error; use std::fs::File; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::process::exit; use std::sync::{Arc, Mutex}; use std::thread; use tokio::net::TcpListener; @@ -72,28 +70,8 @@ fn main() -> Result<(), Box> { .takes_value(true) .help("Max SECONDS to wait to get necessary gossip from the network"), ) - .arg( - Arg::with_name("addr") - .short("a") - .long("addr") - .value_name("IPADDR") - .takes_value(true) - .help("address to advertise to the network"), - ) .get_matches(); - let addr = if let Some(s) = matches.value_of("addr") { - s.to_string().parse().unwrap_or_else(|e| { - eprintln!("failed to parse {} as IP address error: {:?}", s, e); - exit(1); - }) - } else { - get_public_ip_addr().unwrap_or_else(|e| { - eprintln!("failed to get public IP, try --addr? error: {:?}", e); - exit(1); - }) - }; - let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { leader = read_leader(l).node_info; @@ -124,9 +102,11 @@ fn main() -> Result<(), Box> { timeout = None; } - let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout, addr)?; - + eprintln!("hioisdlflkj"); let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap(); + let socket = TcpListener::bind(&drone_addr).unwrap(); + + let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?; let drone = Arc::new(Mutex::new(Drone::new( mint_keypair, @@ -144,7 +124,6 @@ fn main() -> Result<(), Box> { drone1.lock().unwrap().clear_request_count(); }); - let socket = TcpListener::bind(&drone_addr).unwrap(); println!("Drone started. Listening on: {}", drone_addr); let done = socket .incoming() diff --git a/src/bin/fullnode-config.rs b/src/bin/fullnode-config.rs index b2541a041..499b024d2 100644 --- a/src/bin/fullnode-config.rs +++ b/src/bin/fullnode-config.rs @@ -5,9 +5,9 @@ extern crate serde_json; extern crate solana; use clap::{App, Arg}; -use solana::crdt::{get_ip_addr, parse_port_or_addr}; +use solana::crdt::GOSSIP_PORT_RANGE; use solana::fullnode::Config; -use solana::nat::get_public_ip_addr; +use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; use solana::signature::read_pkcs8; use std::io; use std::net::SocketAddr; @@ -48,13 +48,7 @@ fn main() { .get_matches(); let bind_addr: SocketAddr = { - let mut bind_addr = parse_port_or_addr({ - if let Some(b) = matches.value_of("bind") { - Some(b.to_string()) - } else { - None - } - }); + let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), GOSSIP_PORT_RANGE.0); if matches.is_present("local") { let ip = get_ip_addr().unwrap(); bind_addr.set_ip(ip); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index eea739e52..64e119cb6 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -7,7 +7,7 @@ extern crate solana; use clap::{App, Arg}; use solana::client::mk_client; -use solana::crdt::{NodeInfo, TestNode}; +use solana::crdt::{Node, NodeInfo}; use solana::drone::DRONE_PORT; use solana::fullnode::{Config, Fullnode}; use solana::logger; @@ -78,14 +78,14 @@ fn main() -> () { let port_range = (8100, 10000); let node = if let Some(_t) = matches.value_of("testnet") { - TestNode::new_with_external_ip( + Node::new_with_external_ip( leader_pubkey, repl_data.contact_info.ncp.ip(), port_range, 0, ) } else { - TestNode::new_with_external_ip( + Node::new_with_external_ip( leader_pubkey, repl_data.contact_info.ncp.ip(), port_range, diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 0c8af7ce7..09e07b68d 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -13,7 +13,6 @@ use solana::crdt::NodeInfo; use solana::drone::DRONE_PORT; use solana::fullnode::Config; use solana::logger; -use solana::nat::get_public_ip_addr; use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature}; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::wallet::request_airdrop; @@ -21,7 +20,6 @@ use std::error; use std::fmt; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::process::exit; use std::thread::sleep; use std::time::Duration; @@ -94,14 +92,6 @@ fn parse_args() -> Result> { .takes_value(true) .help("/path/to/id.json"), ) - .arg( - Arg::with_name("addr") - .short("a") - .long("addr") - .value_name("IPADDR") - .takes_value(true) - .help("address to advertise to the network"), - ) .arg( Arg::with_name("timeout") .long("timeout") @@ -155,18 +145,6 @@ fn parse_args() -> Result> { .subcommand(SubCommand::with_name("address").about("Get your public key")) .get_matches(); - let addr = if let Some(s) = matches.value_of("addr") { - s.to_string().parse().unwrap_or_else(|e| { - eprintln!("failed to parse {} as IP address error: {:?}", s, e); - exit(1) - }) - } else { - get_public_ip_addr().unwrap_or_else(|e| { - eprintln!("failed to get public IP, try --addr? error: {:?}", e); - exit(1) - }) - }; - let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { leader = read_leader(l)?.node_info; @@ -195,7 +173,7 @@ fn parse_args() -> Result> { ))) })?; - let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout, addr)?; + let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?; let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(DRONE_PORT); diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index b7ef06124..2800bfd2a 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -16,31 +16,27 @@ pub struct BlobFetchStage { impl BlobFetchStage { pub fn new( - socket: UdpSocket, + socket: Arc, exit: Arc, - blob_recycler: &BlobRecycler, + recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { - Self::new_multi_socket(vec![socket], exit, blob_recycler) + Self::new_multi_socket(vec![socket], exit, recycler) } pub fn new_multi_socket( - sockets: Vec, + sockets: Vec>, exit: Arc, - blob_recycler: &BlobRecycler, + recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { - let (blob_sender, blob_receiver) = channel(); + let (sender, receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| { - streamer::blob_receiver( - exit.clone(), - blob_recycler.clone(), - socket, - blob_sender.clone(), - ).expect("blob receiver init") + streamer::blob_receiver(socket, exit.clone(), recycler.clone(), sender.clone()) + .expect("blob receiver init") }) .collect(); - (BlobFetchStage { exit, thread_hdls }, blob_receiver) + (BlobFetchStage { exit, thread_hdls }, receiver) } pub fn close(&self) { diff --git a/src/client.rs b/src/client.rs index 15859d8ae..f68799b3c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,11 @@ -use crdt::NodeInfo; -use nat::udp_random_bind; +use crdt::{NodeInfo, GOSSIP_PORT_RANGE}; +use nat::bind_in_range; use std::time::Duration; use thin_client::ThinClient; pub fn mk_client(r: &NodeInfo) -> ThinClient { - let requests_socket = udp_random_bind(8000, 10000, 5).unwrap(); - let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap(); + let requests_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); + let transactions_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); requests_socket .set_read_timeout(Some(Duration::new(1, 0))) diff --git a/src/crdt.rs b/src/crdt.rs index 704db301f..99cbe4eb1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -20,10 +20,9 @@ use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use log::Level; -use nat::udp_random_bind; +use nat::bind_in_range; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; -use pnet_datalink as datalink; -use rand::{thread_rng, RngCore}; +use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::{Error, Result}; use signature::{Keypair, KeypairUtil, Pubkey}; @@ -41,6 +40,7 @@ use timing::{duration_as_ms, timestamp}; use transaction::Vote; use window::{SharedWindow, WindowIndex}; +pub const GOSSIP_PORT_RANGE: (u16, u16) = (8000, 10_000); /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; @@ -57,47 +57,6 @@ pub enum CrdtError { BadGossipAddress, } -pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { - let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); - if let Some(addrstr) = optstr { - if let Ok(port) = addrstr.parse() { - let mut addr = daddr; - addr.set_port(port); - addr - } else if let Ok(addr) = addrstr.parse() { - addr - } else { - daddr - } - } else { - daddr - } -} - -pub fn get_ip_addr() -> Option { - for iface in datalink::interfaces() { - for p in iface.ips { - if !p.ip().is_loopback() && !p.ip().is_multicast() { - match p.ip() { - IpAddr::V4(addr) => { - if !addr.is_link_local() { - return Some(p.ip()); - } - } - IpAddr::V6(_addr) => { - // Select an ipv6 address if the config is selected - #[cfg(feature = "ipv6")] - { - return Some(p.ip()); - } - } - } - } - } - } - None -} - /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ContactInfo { @@ -529,9 +488,10 @@ impl Crdt { false } else if !(Self::is_valid_address(v.contact_info.tvu)) { trace!( - "{:x}:broadcast skip not listening {:x}", + "{:x}:broadcast skip not listening {:x} {}", me.debug_id(), - v.debug_id() + v.debug_id(), + v.contact_info.tvu, ); false } else { @@ -552,6 +512,7 @@ impl Crdt { /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` + /// TODO: move me out of crdt pub fn broadcast( me: &NodeInfo, broadcast_table: &[NodeInfo], @@ -670,6 +631,7 @@ impl Crdt { /// retransmit messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` + /// TODO: move me out of Crdt pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (NodeInfo, Vec) = { // copy to avoid locking during IO @@ -685,12 +647,17 @@ impl Crdt { .iter() .filter(|v| { if me.id == v.id { + trace!("skip retransmit to self {:?}", v.id); false } else if me.leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false } else if !(Self::is_valid_address(v.contact_info.tvu)) { - trace!("skip nodes that are not listening {:?}", v.id); + trace!( + "skip nodes that are not listening {:?} {}", + v.id, + v.contact_info.tvu + ); false } else { true @@ -702,10 +669,11 @@ impl Crdt { .par_iter() .map(|v| { debug!( - "{:x}: retransmit blob {} to {:x}", + "{:x}: retransmit blob {} to {:x} {}", me.debug_id(), rblob.get_index().unwrap(), v.debug_id(), + v.contact_info.tvu, ); //TODO profile this, may need multiple sockets for par_iter assert!(rblob.meta.size <= BLOB_SIZE); @@ -728,10 +696,6 @@ impl Crdt { self.remote.values().fold(max, |a, b| std::cmp::min(a, *b)) } - fn random() -> u64 { - thread_rng().next_u64() - } - // TODO: fill in with real implmentation once staking is implemented fn get_stake(_id: Pubkey) -> f64 { 1.0 @@ -771,7 +735,7 @@ impl Crdt { if valid.is_empty() { Err(CrdtError::NoPeers)?; } - let n = (Self::random() as usize) % valid.len(); + let n = thread_rng().gen::() % valid.len(); let addr = valid[n].contact_info.ncp; let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; @@ -814,8 +778,9 @@ impl Crdt { let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); trace!( - "created gossip request from {:x} to {:x} {}", + "created gossip request from {:x} {:?} to {:x} {}", self.debug_id(), + self.table[&self.me].clone(), v.debug_id(), v.contact_info.ncp ); @@ -1060,9 +1025,14 @@ impl Crdt { blob: &Blob, ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { - Ok(request) => { - Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler) - } + Ok(request) => Crdt::handle_protocol( + blob.meta.addr(), + request, + obj, + window, + ledger_window, + blob_recycler, + ), Err(_) => { warn!("deserialize crdt packet failed"); None @@ -1071,6 +1041,7 @@ impl Crdt { } fn handle_protocol( + from_addr: SocketAddr, request: Protocol, obj: &Arc>, window: &SharedWindow, @@ -1080,10 +1051,14 @@ impl Crdt { match request { // TODO sigverify these Protocol::RequestUpdates(v, from_rd) => { - let addr = from_rd.contact_info.ncp; - trace!("RequestUpdates {} from {}", v, addr); + trace!( + "RequestUpdates {} from {}, professing to be {}", + v, + from_addr, + from_rd.contact_info.ncp + ); let me = obj.read().unwrap(); - if addr == me.table[&me.me].contact_info.ncp { + if from_rd.contact_info.ncp == me.table[&me.me].contact_info.ncp { warn!( "RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}", me.debug_id(), @@ -1113,13 +1088,13 @@ impl Crdt { v ); None - } else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) { + } else if let Ok(r) = to_blob(rsp, from_addr, &blob_recycler) { trace!( "sending updates me {:x} len {} to {:x} {}", obj.read().unwrap().debug_id(), len, from_rd.debug_id(), - addr, + from_addr, ); Some(r) } else { @@ -1254,7 +1229,7 @@ impl Crdt { fn is_valid_ip_internal(addr: IpAddr, cfg_test: bool) -> bool { !(addr.is_unspecified() || addr.is_multicast() || (addr.is_loopback() && !cfg_test)) } - pub fn is_valid_ip(addr: IpAddr) -> bool { + fn is_valid_ip(addr: IpAddr) -> bool { Self::is_valid_ip_internal(addr, cfg!(test) || cfg!(feature = "test")) } /// port must not be 0 @@ -1264,20 +1239,17 @@ impl Crdt { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } - pub fn spy_node(addr: IpAddr) -> (NodeInfo, UdpSocket, UdpSocket) { - let gossip_socket = udp_random_bind(8000, 10000, 5).unwrap(); - let gossip_send_socket = udp_random_bind(8000, 10000, 5).unwrap(); - let gossip_addr = SocketAddr::new(addr, gossip_socket.local_addr().unwrap().port()); + pub fn spy_node() -> (NodeInfo, UdpSocket) { + let gossip_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); let pubkey = Keypair::new().pubkey(); let daddr = "0.0.0.0:0".parse().unwrap(); - let node = NodeInfo::new(pubkey, gossip_addr, daddr, daddr, daddr, daddr); - (node, gossip_socket, gossip_send_socket) + let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr); + (node, gossip_socket) } } pub struct Sockets { pub gossip: UdpSocket, - pub gossip_send: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, pub transaction: UdpSocket, @@ -1287,12 +1259,12 @@ pub struct Sockets { pub retransmit: UdpSocket, } -pub struct TestNode { +pub struct Node { pub data: NodeInfo, pub sockets: Sockets, } -impl TestNode { +impl Node { pub fn new_localhost() -> Self { let pubkey = Keypair::new().pubkey(); Self::new_localhost_with_pubkey(pubkey) @@ -1304,7 +1276,6 @@ impl TestNode { let requests = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1316,11 +1287,10 @@ impl TestNode { transaction.local_addr().unwrap(), repair.local_addr().unwrap(), ); - TestNode { + Node { data, sockets: Sockets { gossip, - gossip_send, requests, replicate, transaction, @@ -1331,21 +1301,21 @@ impl TestNode { }, } } - pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { - let mut local_gossip_addr = bind_addr; - local_gossip_addr.set_port(data.contact_info.ncp.port()); + pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> Node { + let mut gossip_addr = bind_addr; + gossip_addr.set_port(data.contact_info.ncp.port()); - let mut local_replicate_addr = bind_addr; - local_replicate_addr.set_port(data.contact_info.tvu.port()); + let mut replicate_addr = bind_addr; + replicate_addr.set_port(data.contact_info.tvu.port()); - let mut local_requests_addr = bind_addr; - local_requests_addr.set_port(data.contact_info.rpu.port()); + let mut requests_addr = bind_addr; + requests_addr.set_port(data.contact_info.rpu.port()); - let mut local_transactions_addr = bind_addr; - local_transactions_addr.set_port(data.contact_info.tpu.port()); + let mut transactions_addr = bind_addr; + transactions_addr.set_port(data.contact_info.tpu.port()); - let mut local_repair_addr = bind_addr; - local_repair_addr.set_port(data.contact_info.tvu_window.port()); + let mut repair_addr = bind_addr; + repair_addr.set_port(data.contact_info.tvu_window.port()); fn bind(addr: SocketAddr) -> UdpSocket { match UdpSocket::bind(addr) { @@ -1356,25 +1326,23 @@ impl TestNode { } }; - let transaction = bind(local_transactions_addr); - let gossip = bind(local_gossip_addr); - let replicate = bind(local_replicate_addr); - let repair = bind(local_repair_addr); - let requests = bind(local_requests_addr); + let transaction = bind(transactions_addr); + let gossip = bind(gossip_addr); + let replicate = bind(replicate_addr); + let repair = bind(repair_addr); + let requests = bind(requests_addr); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the // response Udp packet correctly back to the requester. let respond = requests.try_clone().unwrap(); - let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - TestNode { + Node { data, sockets: Sockets { gossip, - gossip_send, requests, replicate, transaction, @@ -1390,9 +1358,9 @@ impl TestNode { ip: IpAddr, port_range: (u16, u16), ncp_port: u16, - ) -> TestNode { + ) -> Node { fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) { - match udp_random_bind(port_range.0, port_range.1, 5) { + match bind_in_range(port_range) { Ok(socket) => (socket.local_addr().unwrap().port(), socket), Err(err) => { panic!("Failed to bind to {:?}", err); @@ -1425,7 +1393,6 @@ impl TestNode { // response Udp packet correctly back to the requester. let respond = requests.try_clone().unwrap(); - let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1438,11 +1405,10 @@ impl TestNode { SocketAddr::new(ip, repair_port), ); - TestNode { + Node { data: node_info, sockets: Sockets { gossip, - gossip_send, requests, replicate, transaction, @@ -1465,8 +1431,8 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, TestNode, GOSSIP_PURGE_MILLIS, - GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, + Crdt, CrdtError, Node, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, + MIN_TABLE_SIZE, }; use entry::Entry; use hash::{hash, Hash}; @@ -1485,15 +1451,6 @@ mod tests { use transaction::Vote; use window::default_window; - #[test] - fn test_parse_port_or_addr() { - let p1 = parse_port_or_addr(Some("9000".to_string())); - assert_eq!(p1.port(), 9000); - let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string())); - assert_eq!(p2.port(), 7000); - let p3 = parse_port_or_addr(None); - assert_eq!(p3.port(), 8000); - } #[test] fn test_bad_address() { let d1 = NodeInfo::new( @@ -2154,13 +2111,38 @@ mod tests { let obj = Arc::new(RwLock::new(crdt)); let request = Protocol::RequestUpdates(1, node.clone()); - assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); + assert!( + Crdt::handle_protocol( + node.contact_info.ncp, + request, + &obj, + &window, + &mut None, + &recycler + ).is_none() + ); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); - assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); + assert!( + Crdt::handle_protocol( + node.contact_info.ncp, + request, + &obj, + &window, + &mut None, + &recycler + ).is_none() + ); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); - Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler); + Crdt::handle_protocol( + node.contact_info.ncp, + request, + &obj, + &window, + &mut None, + &recycler, + ); let me = obj.write().unwrap(); @@ -2206,7 +2188,7 @@ mod tests { fn new_with_external_ip_test_random() { let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); let node = - TestNode::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0); + Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0); assert_eq!( node.sockets.gossip.local_addr().unwrap().ip(), @@ -2244,12 +2226,8 @@ mod tests { #[test] fn new_with_external_ip_test_gossip() { let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); - let node = TestNode::new_with_external_ip( - Keypair::new().pubkey(), - sockaddr.ip(), - (8100, 8200), - 8050, - ); + let node = + Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 8050); assert_eq!( node.sockets.gossip.local_addr().unwrap().ip(), sockaddr.ip() diff --git a/src/drone.rs b/src/drone.rs index 55da973f7..88c61282a 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -157,11 +157,12 @@ impl Drop for Drone { #[cfg(test)] mod tests { use bank::Bank; - use crdt::{get_ip_addr, TestNode}; + use crdt::Node; use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE}; use fullnode::Fullnode; use logger; use mint::Mint; + use nat::get_ip_addr; use service::Service; use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; @@ -275,7 +276,7 @@ mod tests { logger::setup(); let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); let bank = Bank::new(&alice); diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index c1731f2c3..5d0f62a63 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -16,31 +16,26 @@ pub struct FetchStage { impl FetchStage { pub fn new( - socket: UdpSocket, + socket: Arc, exit: Arc, - packet_recycler: &PacketRecycler, + recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { - Self::new_multi_socket(vec![socket], exit, packet_recycler) + Self::new_multi_socket(vec![socket], exit, recycler) } pub fn new_multi_socket( - sockets: Vec, + sockets: Vec>, exit: Arc, - packet_recycler: &PacketRecycler, + recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { - let (packet_sender, packet_receiver) = channel(); + let (sender, receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| { - streamer::receiver( - socket, - exit.clone(), - packet_recycler.clone(), - packet_sender.clone(), - ) + streamer::receiver(socket, exit.clone(), recycler.clone(), sender.clone()) }) .collect(); - (FetchStage { exit, thread_hdls }, packet_receiver) + (FetchStage { exit, thread_hdls }, receiver) } pub fn close(&self) { diff --git a/src/fullnode.rs b/src/fullnode.rs index 0fb6aa854..1794b82dc 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,7 +2,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; -use crdt::{Crdt, NodeInfo, TestNode}; +use crdt::{Crdt, Node, NodeInfo}; use drone::DRONE_PORT; use entry::Entry; use ledger::read_ledger; @@ -50,7 +50,7 @@ impl Config { impl Fullnode { pub fn new( - node: TestNode, + node: Node, ledger_path: &str, keypair: Keypair, leader_addr: Option, @@ -165,7 +165,7 @@ impl Fullnode { bank: Bank, entry_height: u64, ledger_tail: &[Entry], - mut node: TestNode, + mut node: Node, leader_info: Option<&NodeInfo>, exit: Arc, ledger_path: Option<&str>, @@ -209,7 +209,6 @@ impl Fullnode { window.clone(), ledger_path, node.sockets.gossip, - node.sockets.gossip_send, exit.clone(), ).expect("Ncp::new"); thread_hdls.extend(ncp.thread_hdls()); @@ -293,7 +292,7 @@ impl Service for Fullnode { #[cfg(test)] mod tests { use bank::Bank; - use crdt::TestNode; + use crdt::Node; use fullnode::Fullnode; use mint::Mint; use service::Service; @@ -304,7 +303,7 @@ mod tests { #[test] fn validator_exit() { let keypair = Keypair::new(); - let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); @@ -318,7 +317,7 @@ mod tests { let vals: Vec = (0..2) .map(|_| { let keypair = Keypair::new(); - let tn = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); diff --git a/src/nat.rs b/src/nat.rs index 50bbe4880..48ddc7ca3 100644 --- a/src/nat.rs +++ b/src/nat.rs @@ -2,10 +2,10 @@ extern crate reqwest; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; - +use pnet_datalink as datalink; use rand::{thread_rng, Rng}; use std::io; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// A data type representing a public Udp socket pub struct UdpSocketPair { @@ -27,19 +27,79 @@ pub fn get_public_ip_addr() -> Result { } } -pub fn udp_random_bind(start: u16, end: u16, tries: u32) -> io::Result { - let mut count = 0; - loop { - count += 1; +pub fn parse_port_or_addr(optstr: Option<&str>, default_port: u16) -> SocketAddr { + let daddr: SocketAddr = format!("0.0.0.0:{}", default_port) + .parse() + .expect("default socket address"); + if let Some(addrstr) = optstr { + if let Ok(port) = addrstr.parse() { + let mut addr = daddr; + addr.set_port(port); + addr + } else if let Ok(addr) = addrstr.parse() { + addr + } else { + daddr + } + } else { + daddr + } +} +pub fn get_ip_addr() -> Option { + for iface in datalink::interfaces() { + for p in iface.ips { + if !p.ip().is_loopback() && !p.ip().is_multicast() { + match p.ip() { + IpAddr::V4(addr) => { + if !addr.is_link_local() { + return Some(p.ip()); + } + } + IpAddr::V6(_addr) => { + // Select an ipv6 address if the config is selected + #[cfg(feature = "ipv6")] + { + return Some(p.ip()); + } + } + } + } + } + } + None +} + +pub fn bind_in_range(range: (u16, u16)) -> io::Result { + let (start, end) = range; + let mut tries_left = end - start; + loop { let rand_port = thread_rng().gen_range(start, end); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rand_port); match UdpSocket::bind(addr) { Result::Ok(val) => break Result::Ok(val), - Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || count >= tries { + Result::Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { return Err(err); }, } + tries_left -= 1; + } +} + +#[cfg(test)] +mod tests { + use nat::parse_port_or_addr; + + #[test] + fn test_parse_port_or_addr() { + let p1 = parse_port_or_addr(Some("9000"), 1); + assert_eq!(p1.port(), 9000); + let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), 1); + assert_eq!(p2.port(), 7000); + let p2 = parse_port_or_addr(Some("hi there"), 1); + assert_eq!(p2.port(), 1); + let p3 = parse_port_or_addr(None, 1); + assert_eq!(p3.port(), 1); } } diff --git a/src/ncp.rs b/src/ncp.rs index 13725ecd0..9a55663a8 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -22,27 +22,27 @@ impl Ncp { crdt: &Arc>, window: SharedWindow, ledger_path: Option<&str>, - gossip_listen_socket: UdpSocket, - gossip_send_socket: UdpSocket, + gossip_socket: UdpSocket, exit: Arc, ) -> Result { let blob_recycler = BlobRecycler::default(); let (request_sender, request_receiver) = channel(); + let gossip_socket = Arc::new(gossip_socket); trace!( "Ncp: id: {:?}, listening on: {:?}", &crdt.read().unwrap().me.as_ref()[..4], - gossip_listen_socket.local_addr().unwrap() + gossip_socket.local_addr().unwrap() ); let t_receiver = streamer::blob_receiver( + gossip_socket.clone(), exit.clone(), blob_recycler.clone(), - gossip_listen_socket, request_sender, )?; let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder( "ncp", - gossip_send_socket, + gossip_socket, blob_recycler.clone(), response_receiver, ); @@ -81,7 +81,7 @@ impl Service for Ncp { #[cfg(test)] mod tests { - use crdt::{Crdt, TestNode}; + use crdt::{Crdt, Node}; use ncp::Ncp; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -91,18 +91,11 @@ mod tests { // test that stage will exit when flag is set fn test_exit() { let exit = Arc::new(AtomicBool::new(false)); - let tn = TestNode::new_localhost(); + let tn = Node::new_localhost(); let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new( - &c, - w, - None, - tn.sockets.gossip, - tn.sockets.gossip_send, - exit.clone(), - ).unwrap(); + let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()).unwrap(); d.close().expect("thread join"); } } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 744f5b4ba..9eaadfa89 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -84,7 +84,7 @@ impl ReplicateStage { let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder( "replicate_stage", - send, + Arc::new(send), blob_recycler.clone(), vote_blob_receiver, ); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 29b1e22d6..75c2c6e12 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -47,7 +47,7 @@ fn retransmit( /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. fn retransmitter( - sock: UdpSocket, + sock: Arc, crdt: Arc>, recycler: BlobRecycler, r: BlobReceiver, @@ -81,7 +81,7 @@ impl RetransmitStage { crdt: &Arc>, window: SharedWindow, entry_height: u64, - retransmit_socket: UdpSocket, + retransmit_socket: Arc, blob_recycler: &BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { diff --git a/src/rpu.rs b/src/rpu.rs index af8b52ca9..1cd7ca000 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -49,7 +49,7 @@ impl Rpu { let packet_recycler = PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( - requests_socket, + Arc::new(requests_socket), exit, packet_recycler.clone(), packet_sender, @@ -64,8 +64,12 @@ impl Rpu { blob_recycler.clone(), ); - let t_responder = - streamer::responder("rpu", respond_socket, blob_recycler.clone(), blob_receiver); + let t_responder = streamer::responder( + "rpu", + Arc::new(respond_socket), + blob_recycler.clone(), + blob_receiver, + ); let mut thread_hdls = vec![t_receiver, t_responder]; thread_hdls.extend(request_stage.thread_hdls().into_iter()); diff --git a/src/streamer.rs b/src/streamer.rs index e6cbfccc2..467478e81 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -44,7 +44,7 @@ fn recv_loop( } pub fn receiver( - sock: UdpSocket, + sock: Arc, exit: Arc, recycler: PacketRecycler, packet_sender: PacketSender, @@ -90,7 +90,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> pub fn responder( name: &'static str, - sock: UdpSocket, + sock: Arc, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -120,9 +120,9 @@ fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Resu } pub fn blob_receiver( + sock: Arc, exit: Arc, recycler: BlobRecycler, - sock: UdpSocket, s: BlobSender, ) -> Result> { //DOCUMENTED SIDE-EFFECT @@ -184,12 +184,17 @@ mod test { let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); + let t_receiver = receiver( + Arc::new(read), + exit.clone(), + pack_recycler.clone(), + s_reader, + ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder( "streamer_send_test", - send, + Arc::new(send), resp_recycler.clone(), r_responder, ); diff --git a/src/thin_client.rs b/src/thin_client.rs index 8ec6178be..c1a49bbc9 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -13,7 +13,6 @@ use result::{Error, Result}; use signature::{Keypair, Pubkey, Signature}; use std::collections::HashMap; use std::io; -use std::net::IpAddr; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -361,23 +360,13 @@ impl Drop for ThinClient { } } -pub fn poll_gossip_for_leader( - leader_ncp: SocketAddr, - timeout: Option, - addr: IpAddr, -) -> Result { +pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> Result { let exit = Arc::new(AtomicBool::new(false)); - let (node, gossip_socket, gossip_send_socket) = Crdt::spy_node(addr); + trace!("polling {:?} for leader", leader_ncp); + let (node, gossip_socket) = Crdt::spy_node(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new( - &crdt.clone(), - window, - None, - gossip_socket, - gossip_send_socket, - exit.clone(), - ).unwrap(); + let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()).unwrap(); let leader_entry_point = NodeInfo::new_entry_point(leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); @@ -401,7 +390,7 @@ mod tests { use super::*; use bank::Bank; use budget::Budget; - use crdt::TestNode; + use crdt::Node; use fullnode::Fullnode; use ledger::LedgerWriter; use logger; @@ -430,7 +419,7 @@ mod tests { fn test_thin_client() { logger::setup(); let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let alice = Mint::new(10_000); @@ -479,7 +468,7 @@ mod tests { fn test_bad_sig() { logger::setup(); let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); @@ -539,7 +528,7 @@ mod tests { fn test_client_check_signature() { logger::setup(); let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); diff --git a/src/tpu.rs b/src/tpu.rs index 900e9ee89..9011e56f8 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -65,7 +65,7 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_socket, exit, &packet_recycler); + FetchStage::new(Arc::new(transactions_socket), exit, &packet_recycler); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); diff --git a/src/tvu.rs b/src/tvu.rs index 2b19b8a82..71e851adc 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -83,7 +83,7 @@ impl Tvu { ) -> Self { let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( - vec![replicate_socket, repair_socket], + vec![Arc::new(replicate_socket), Arc::new(repair_socket)], exit.clone(), &blob_recycler, ); @@ -94,7 +94,7 @@ impl Tvu { &crdt, window, entry_height, - retransmit_socket, + Arc::new(retransmit_socket), &blob_recycler, blob_fetch_receiver, ); @@ -143,7 +143,7 @@ impl Service for Tvu { pub mod tests { use bank::Bank; use bincode::serialize; - use crdt::{Crdt, TestNode}; + use crdt::{Crdt, Node}; use entry::Entry; use hash::{hash, Hash}; use logger; @@ -166,12 +166,11 @@ pub mod tests { fn new_ncp( crdt: Arc>, - listen: UdpSocket, + gossip: UdpSocket, exit: Arc, ) -> Result<(Ncp, SharedWindow)> { let window = window::default_window(); - let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new(&crdt, window.clone(), None, listen, send_sock, exit)?; + let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit)?; Ok((ncp, window)) } @@ -179,10 +178,10 @@ pub mod tests { #[test] fn test_replicate() { logger::setup(); - let leader = TestNode::new_localhost(); + let leader = Node::new_localhost(); let target1_keypair = Keypair::new(); - let target1 = TestNode::new_localhost_with_pubkey(target1_keypair.pubkey()); - let target2 = TestNode::new_localhost(); + let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey()); + let target2 = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader @@ -207,9 +206,9 @@ pub mod tests { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = streamer::blob_receiver( + Arc::new(target2.sockets.replicate), exit.clone(), recv_recycler.clone(), - target2.sockets.replicate, s_reader, ).unwrap(); @@ -217,7 +216,7 @@ pub mod tests { let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( "test_replicate", - leader.sockets.requests, + Arc::new(leader.sockets.requests), resp_recycler.clone(), r_responder, ); diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 204c7550c..5ae720d96 100755 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -232,7 +232,7 @@ pub mod tests { use super::*; use bank::Bank; use bincode::deserialize; - use crdt::{Crdt, NodeInfo, TestNode}; + use crdt::{Crdt, Node, NodeInfo}; use entry::next_entry; use hash::{hash, Hash}; use logger; @@ -253,7 +253,7 @@ pub mod tests { let mint = Mint::new(1234); let bank = Arc::new(Bank::new(&mint)); - let node = TestNode::new_localhost(); + let node = Node::new_localhost(); let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new"); crdt.set_leader(node.data.id); let blob_recycler = BlobRecycler::default(); diff --git a/src/window.rs b/src/window.rs index c20de428c..18fb450a7 100644 --- a/src/window.rs +++ b/src/window.rs @@ -93,6 +93,8 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6 cmp::min(consumed + WINDOW_SIZE - 1, highest_lost) } +pub const MAX_REPAIR_BACKOFF: usize = 128; + fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { //exponential backoff if *last != consumed { @@ -105,9 +107,9 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { // Experiment with capping repair request duration. // Once nodes are too far behind they can spend many // seconds without asking for repair - if *times > 128 { + if *times > MAX_REPAIR_BACKOFF { // 50% chance that a request will fire between 64 - 128 tries - *times = 64; + *times = MAX_REPAIR_BACKOFF / 2; } //if we get lucky, make the request, which should exponentially get less likely @@ -126,6 +128,7 @@ fn repair_window( ) -> Result<()> { //exponential backoff if !repair_backoff(last, times, consumed) { + trace!("{:x} !repair_backoff() times = {}", debug_id, times); return Ok(()); } @@ -691,7 +694,7 @@ pub fn window( #[cfg(test)] mod test { - use crdt::{Crdt, TestNode}; + use crdt::{Crdt, Node}; use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use std::collections::VecDeque; @@ -737,12 +740,17 @@ mod test { let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); + let t_receiver = receiver( + Arc::new(read), + exit.clone(), + pack_recycler.clone(), + s_reader, + ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder( "streamer_send_test", - send, + Arc::new(send), resp_recycler.clone(), r_responder, ); @@ -790,7 +798,7 @@ mod test { #[test] pub fn window_send_test() { logger::setup(); - let tn = TestNode::new_localhost(); + let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); let mut crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; @@ -800,9 +808,9 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( + Arc::new(tn.sockets.gossip), exit.clone(), resp_recycler.clone(), - tn.sockets.gossip, s_reader, ).unwrap(); let (s_window, r_window) = channel(); @@ -821,7 +829,7 @@ mod test { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - tn.sockets.replicate, + Arc::new(tn.sockets.replicate), resp_recycler.clone(), r_responder, ); @@ -860,7 +868,7 @@ mod test { #[test] pub fn window_send_no_leader_test() { logger::setup(); - let tn = TestNode::new_localhost(); + let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; @@ -869,9 +877,9 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( + Arc::new(tn.sockets.gossip), exit.clone(), resp_recycler.clone(), - tn.sockets.gossip, s_reader, ).unwrap(); let (s_window, _r_window) = channel(); @@ -890,7 +898,7 @@ mod test { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - tn.sockets.replicate, + Arc::new(tn.sockets.replicate), resp_recycler.clone(), r_responder, ); @@ -922,7 +930,7 @@ mod test { #[test] pub fn window_send_late_leader_test() { logger::setup(); - let tn = TestNode::new_localhost(); + let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); let me_id = crdt_me.my_data().id; @@ -931,9 +939,9 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( + Arc::new(tn.sockets.gossip), exit.clone(), resp_recycler.clone(), - tn.sockets.gossip, s_reader, ).unwrap(); let (s_window, _r_window) = channel(); @@ -952,7 +960,7 @@ mod test { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - tn.sockets.replicate, + Arc::new(tn.sockets.replicate), resp_recycler.clone(), r_responder, ); diff --git a/src/write_stage.rs b/src/write_stage.rs index c43aa4739..7ab7ef317 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -81,7 +81,7 @@ impl WriteStage { let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder( "write_stage_vote_sender", - send, + Arc::new(send), blob_recycler.clone(), vote_blob_receiver, ); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 652e36637..cc0cf1539 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -4,7 +4,7 @@ extern crate rayon; extern crate solana; use rayon::iter::*; -use solana::crdt::{Crdt, TestNode}; +use solana::crdt::{Crdt, Node}; use solana::logger; use solana::ncp::Ncp; use solana::packet::Blob; @@ -17,18 +17,11 @@ use std::thread::sleep; use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { - let tn = TestNode::new_localhost(); + let tn = Node::new_localhost(); let crdt = Crdt::new(tn.data.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new( - &c.clone(), - w, - None, - tn.sockets.gossip, - tn.sockets.gossip_send, - exit, - ).unwrap(); + let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap(); (c, d, tn.sockets.replicate) } diff --git a/tests/multinode.rs b/tests/multinode.rs index fa76e24ef..42abafe68 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -5,7 +5,7 @@ extern crate chrono; extern crate serde_json; extern crate solana; -use solana::crdt::{Crdt, NodeInfo, TestNode}; +use solana::crdt::{Crdt, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::Fullnode; use solana::hash::Hash; @@ -32,7 +32,7 @@ use std::time::{Duration, Instant}; fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network let exit = Arc::new(AtomicBool::new(false)); - let mut spy = TestNode::new_localhost(); + let mut spy = Node::new_localhost(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.data.id.clone(); spy.data.contact_info.tvu = daddr; @@ -42,14 +42,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let ncp = Ncp::new( - &spy_ref, - spy_window, - None, - spy.sockets.gossip, - spy.sockets.gossip_send, - exit.clone(), - ).unwrap(); + let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()).unwrap(); //wait for the network to converge let mut converged = false; let mut rv = vec![]; @@ -125,7 +118,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -156,7 +149,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // start up another validator from zero, converge and then check // balances let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); let validator = Fullnode::new( validator, @@ -205,7 +198,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { trace!("test_multi_node_validator_catchup_from_zero"); let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -229,7 +222,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let mut nodes = vec![server]; for _ in 0..N { let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger( &leader_ledger_path, "multi_node_validator_catchup_from_zero_validator", @@ -270,7 +263,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { // start up another validator from zero, converge and then check everyone's // balances let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let val = Fullnode::new( validator, &zero_ledger_path, @@ -329,7 +322,7 @@ fn test_multi_node_basic() { trace!("test_multi_node_basic"); let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); @@ -346,7 +339,7 @@ fn test_multi_node_basic() { let mut nodes = vec![server]; for _ in 0..N { let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); ledger_paths.push(ledger_path.clone()); let val = Fullnode::new( @@ -390,7 +383,7 @@ fn test_multi_node_basic() { fn test_boot_validator_from_file() -> result::Result<()> { logger::setup(); let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000); let mut ledger_paths = Vec::new(); @@ -406,7 +399,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { assert_eq!(leader_balance, 1000); let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); ledger_paths.push(ledger_path.clone()); @@ -432,7 +425,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false); (leader_data, leader_fullnode) @@ -445,7 +438,10 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // ledger (currently up to WINDOW_SIZE entries) logger::setup(); - let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000); + let (alice, ledger_path) = genesis( + "leader_restart_validator_start_from_old_ledger", + 100_000 + 500 * solana::window::MAX_REPAIR_BACKOFF as i64, + ); let bob_pubkey = Keypair::new().pubkey(); let (leader_data, leader_fullnode) = create_leader(&ledger_path); @@ -476,7 +472,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // start validator from old ledger let keypair = Keypair::new(); - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); let val_fullnode = Fullnode::new( @@ -492,15 +488,17 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // send requests so the validator eventually sees a gap and requests a repair let mut expected = 1500; let mut client = mk_client(&validator_data); - for _ in 0..10 { + for _ in 0..solana::window::MAX_REPAIR_BACKOFF { let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) .unwrap(); assert_eq!(leader_balance, expected); + let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); if getbal == Some(leader_balance) { break; } + expected += 500; } let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); @@ -530,7 +528,7 @@ fn test_multi_node_dynamic_network() { let leader_keypair = Keypair::new(); let leader_pubkey = leader_keypair.pubkey().clone(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000); @@ -605,7 +603,7 @@ fn test_multi_node_dynamic_network() { Builder::new() .name("validator-launch-thread".to_string()) .spawn(move || { - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let rd = validator.data.clone(); info!("starting {} {:x}", keypair.pubkey(), rd.debug_id()); let val = Fullnode::new(