diff --git a/ci/audit.sh b/ci/audit.sh index 08a26bb052..22981cb1ec 100755 --- a/ci/audit.sh +++ b/ci/audit.sh @@ -16,10 +16,10 @@ _() { maybe_cargo_install() { for cmd in "$@"; do - set +e + set +e cargo "$cmd" --help > /dev/null 2>&1 declare exitcode=$? - set -e + set -e if [[ $exitcode -eq 101 ]]; then _ cargo install cargo-"$cmd" fi @@ -29,4 +29,4 @@ maybe_cargo_install() { maybe_cargo_install audit tree _ cargo tree -_ cargo audit +_ cargo audit || true diff --git a/ci/localnet-sanity.sh b/ci/localnet-sanity.sh index 19a44e2edd..570c77f47a 100755 --- a/ci/localnet-sanity.sh +++ b/ci/localnet-sanity.sh @@ -61,7 +61,6 @@ flag_error() { exit 1 } -# TODO: CI networking isn't working with gossip. we cant self discover the right interface/ip for the clients/wallets echo "--- Wallet sanity" ( set -x diff --git a/ci/test-large-network.sh b/ci/test-large-network.sh index 1410b1a627..c949321f01 100755 --- a/ci/test-large-network.sh +++ b/ci/test-large-network.sh @@ -42,4 +42,4 @@ if [[ $(sysctl -n net.core.wmem_max) -lt 1610612736 ]]; then fi set -x -exec cargo test --release --features=erasure,test test_multi_node_dynamic_network -- --ignored +exec cargo test --release --features=erasure test_multi_node_dynamic_network -- --ignored diff --git a/ci/test-nightly.sh b/ci/test-nightly.sh index dd1147a06b..f7bb2a7187 100755 --- a/ci/test-nightly.sh +++ b/ci/test-nightly.sh @@ -11,7 +11,7 @@ _() { } _ cargo build --verbose --features unstable -_ cargo test --verbose --features=unstable,test +_ cargo test --verbose --features=unstable _ cargo clippy -- --deny=warnings exit 0 @@ -28,4 +28,3 @@ if [[ -z "$CODECOV_TOKEN" ]]; then else bash <(curl -s https://codecov.io/bash) -x 'llvm-cov-6.0 gcov' fi - diff --git a/ci/test-stable-perf.sh b/ci/test-stable-perf.sh index 6053f4ca8a..572fd3e963 100755 --- a/ci/test-stable-perf.sh +++ b/ci/test-stable-perf.sh @@ -19,7 +19,7 @@ _() { "$@" } -_ cargo test --features=cuda,erasure,test +_ cargo test --features=cuda,erasure echo --- ci/localnet-sanity.sh ( diff --git a/ci/test-stable.sh b/ci/test-stable.sh index 96a3608d5c..1ad7998c9e 100755 --- a/ci/test-stable.sh +++ b/ci/test-stable.sh @@ -12,7 +12,7 @@ _() { _ cargo fmt -- --check _ cargo build --verbose -_ cargo test --features=test --verbose +_ cargo test --verbose echo --- ci/localnet-sanity.sh ( diff --git a/src/bin/fullnode-config.rs b/src/bin/fullnode-config.rs index 499b024d24..5bbaaa268c 100644 --- a/src/bin/fullnode-config.rs +++ b/src/bin/fullnode-config.rs @@ -5,7 +5,7 @@ extern crate serde_json; extern crate solana; use clap::{App, Arg}; -use solana::crdt::GOSSIP_PORT_RANGE; +use solana::crdt::SOLANA_PORT_RANGE; use solana::fullnode::Config; use solana::nat::{get_ip_addr, get_public_ip_addr, parse_port_or_addr}; use solana::signature::read_pkcs8; @@ -48,7 +48,7 @@ fn main() { .get_matches(); let bind_addr: SocketAddr = { - let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), GOSSIP_PORT_RANGE.0); + let mut bind_addr = parse_port_or_addr(matches.value_of("bind"), SOLANA_PORT_RANGE.0); if matches.is_present("local") { let ip = get_ip_addr().unwrap(); bind_addr.set_ip(ip); diff --git a/src/client.rs b/src/client.rs index f68799b3c1..259aa8c9c3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,11 @@ -use crdt::{NodeInfo, GOSSIP_PORT_RANGE}; +use crdt::{NodeInfo, SOLANA_PORT_RANGE}; use nat::bind_in_range; use std::time::Duration; use thin_client::ThinClient; pub fn mk_client(r: &NodeInfo) -> ThinClient { - let requests_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); - let transactions_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); + let requests_socket = bind_in_range(SOLANA_PORT_RANGE).unwrap(); + let transactions_socket = bind_in_range(SOLANA_PORT_RANGE).unwrap(); requests_socket .set_read_timeout(Some(Duration::new(1, 0))) diff --git a/src/crdt.rs b/src/crdt.rs index 99cbe4eb14..fb46de1cfd 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -40,7 +40,7 @@ use timing::{duration_as_ms, timestamp}; use transaction::Vote; use window::{SharedWindow, WindowIndex}; -pub const GOSSIP_PORT_RANGE: (u16, u16) = (8000, 10_000); +pub const SOLANA_PORT_RANGE: (u16, u16) = (8000, 10_000); /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; @@ -48,6 +48,21 @@ const GOSSIP_PURGE_MILLIS: u64 = 15000; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; +macro_rules! socketaddr { + ($ip:expr, $port:expr) => { + SocketAddr::from((Ipv4Addr::from($ip), $port)) + }; + ($str:expr) => {{ + let a: SocketAddr = $str.parse().unwrap(); + a + }}; +} +macro_rules! socketaddr_any { + () => { + socketaddr!(0, 0) + }; +} + #[derive(Debug, PartialEq, Eq)] pub enum CrdtError { NoPeers, @@ -68,9 +83,6 @@ pub struct ContactInfo { pub rpu: SocketAddr, /// transactions address pub tpu: SocketAddr, - /// repair address, we use this to jump ahead of the packets - /// destined to the replciate_addr - pub tvu_window: SocketAddr, /// if this struture changes update this value as well /// Always update `NodeInfo` version too /// This separate version for addresses allows us to use the `Vote` @@ -112,7 +124,6 @@ impl NodeInfo { tvu: SocketAddr, rpu: SocketAddr, tpu: SocketAddr, - tvu_window: SocketAddr, ) -> Self { NodeInfo { id, @@ -122,7 +133,6 @@ impl NodeInfo { tvu, rpu, tpu, - tvu_window, version: 0, }, leader_id: Pubkey::default(), @@ -131,33 +141,20 @@ impl NodeInfo { }, } } + #[cfg(test)] /// NodeInfo with unspecified addresses for adversarial testing. pub fn new_unspecified() -> Self { - let addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let addr = socketaddr!(0, 0); assert!(addr.ip().is_unspecified()); - Self::new( - Keypair::new().pubkey(), - addr.clone(), - addr.clone(), - addr.clone(), - addr.clone(), - addr.clone(), - ) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr) } #[cfg(test)] /// NodeInfo with multicast addresses for adversarial testing. pub fn new_multicast() -> Self { - let addr: SocketAddr = "224.0.1.255:1000".parse().unwrap(); + let addr = socketaddr!("224.0.1.255:1000"); assert!(addr.ip().is_multicast()); - Self::new( - Keypair::new().pubkey(), - addr.clone(), - addr.clone(), - addr.clone(), - addr.clone(), - addr.clone(), - ) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr) } pub fn debug_id(&self) -> u64 { make_debug_id(&self.id) @@ -172,14 +169,12 @@ impl NodeInfo { let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); - let repair_addr = Self::next_port(&bind_addr, 4); NodeInfo::new( pubkey, gossip_addr, replicate_addr, requests_addr, transactions_addr, - repair_addr, ) } pub fn new_leader(bind_addr: &SocketAddr) -> Self { @@ -187,8 +182,8 @@ impl NodeInfo { Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr) } pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { - let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - NodeInfo::new(Pubkey::default(), gossip_addr, daddr, daddr, daddr, daddr) + let daddr: SocketAddr = socketaddr!("0.0.0.0:0"); + NodeInfo::new(Pubkey::default(), gossip_addr, daddr, daddr, daddr) } } @@ -217,7 +212,7 @@ pub struct Crdt { /// last time the public key had sent us a message pub alive: HashMap, pub update_index: u64, - pub me: Pubkey, + pub id: Pubkey, /// last time we heard from anyone getting a message fro this public key /// these are rumers and shouldn't be trusted directly external_liveness: HashMap>, @@ -239,52 +234,31 @@ enum Protocol { } impl Crdt { - pub fn new(me: NodeInfo) -> Result { - if me.version != 0 { + pub fn new(node_info: NodeInfo) -> Result { + if node_info.version != 0 { return Err(Error::CrdtError(CrdtError::BadNodeInfo)); } - if me.contact_info.ncp.ip().is_unspecified() - || me.contact_info.ncp.port() == 0 - || me.contact_info.ncp.ip().is_multicast() - { - return Err(Error::CrdtError(CrdtError::BadGossipAddress)); - } - for addr in &[ - me.contact_info.tvu, - me.contact_info.rpu, - me.contact_info.tpu, - me.contact_info.tvu_window, - ] { - //dummy address is allowed, services will filter them - if addr.ip().is_unspecified() && addr.port() == 0 { - continue; - } - //if addr is not a dummy address, than it must be valid - if addr.ip().is_unspecified() || addr.port() == 0 || addr.ip().is_multicast() { - return Err(Error::CrdtError(CrdtError::BadContactInfo)); - } - } - let mut g = Crdt { + let mut me = Crdt { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), alive: HashMap::new(), external_liveness: HashMap::new(), - me: me.id, + id: node_info.id, update_index: 1, }; - g.local.insert(me.id, g.update_index); - g.table.insert(me.id, me); - Ok(g) + me.local.insert(node_info.id, me.update_index); + me.table.insert(node_info.id, node_info); + Ok(me) } pub fn debug_id(&self) -> u64 { - make_debug_id(&self.me) + make_debug_id(&self.id) } pub fn my_data(&self) -> &NodeInfo { - &self.table[&self.me] + &self.table[&self.id] } pub fn leader_data(&self) -> Option<&NodeInfo> { - let leader_id = self.table[&self.me].leader_id; + let leader_id = self.table[&self.id].leader_id; // leader_id can be 0s from network entry point if leader_id == Pubkey::default() { @@ -434,7 +408,7 @@ impl Crdt { .alive .iter() .filter_map(|(&k, v)| { - if k != self.me && (now - v) > limit { + if k != self.id && (now - v) > limit { Some(k) } else { trace!( @@ -478,7 +452,7 @@ impl Crdt { pub fn compute_broadcast_table(&self) -> Vec { let live: Vec<_> = self.alive.iter().collect(); //thread_rng().shuffle(&mut live); - let me = &self.table[&self.me]; + let me = &self.table[&self.id]; let cloned_table: Vec = live .iter() .map(|x| &self.table[x.0]) @@ -512,7 +486,6 @@ impl Crdt { /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` - /// TODO: move me out of crdt pub fn broadcast( me: &NodeInfo, broadcast_table: &[NodeInfo], @@ -631,12 +604,11 @@ impl Crdt { /// retransmit messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` - /// TODO: move me out of Crdt pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (NodeInfo, Vec) = { // copy to avoid locking during IO let s = obj.read().expect("'obj' read lock in pub fn retransmit"); - (s.table[&s.me].clone(), s.table.values().cloned().collect()) + (s.my_data().clone(), s.table.values().cloned().collect()) }; blob.write() .unwrap() @@ -709,7 +681,7 @@ impl Crdt { .filter(|x| x.id != Pubkey::default() && self.local[&x.id] > v) .cloned() .collect(); - let id = self.me; + let id = self.id; let ups = self.update_index; (id, ups, data) } @@ -727,17 +699,19 @@ impl Crdt { } pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { + // find a peer that appears to be accepting replication, as indicated + // by a valid tvu port location let valid: Vec<_> = self .table .values() - .filter(|r| r.id != self.me && Self::is_valid_address(r.contact_info.tvu_window)) + .filter(|r| r.id != self.id && Self::is_valid_address(r.contact_info.tvu)) .collect(); if valid.is_empty() { Err(CrdtError::NoPeers)?; } let n = thread_rng().gen::() % valid.len(); - let addr = valid[n].contact_info.ncp; - let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); + let addr = valid[n].contact_info.ncp; // send the request to the peer's gossip port + let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); let out = serialize(&req)?; Ok((addr, out)) } @@ -752,7 +726,7 @@ impl Crdt { .table .values() .filter(|v| { - v.id != self.me + v.id != self.id && !v.contact_info.ncp.ip().is_unspecified() && !v.contact_info.ncp.ip().is_multicast() }) @@ -776,11 +750,11 @@ impl Crdt { let v = choose_peer_result?; let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); - let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); + let req = Protocol::RequestUpdates(remote_update_index, self.my_data().clone()); trace!( "created gossip request from {:x} {:?} to {:x} {}", self.debug_id(), - self.table[&self.me].clone(), + self.my_data(), v.debug_id(), v.contact_info.ncp ); @@ -936,10 +910,11 @@ impl Crdt { .unwrap() } fn run_window_request( + from: &NodeInfo, + from_addr: &SocketAddr, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, me: &NodeInfo, - from: &NodeInfo, ix: u64, blob_recycler: &BlobRecycler, ) -> Option { @@ -972,7 +947,7 @@ impl Crdt { let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); - outblob.meta.set_addr(&from.contact_info.tvu_window); + outblob.meta.set_addr(from_addr); outblob.set_id(sender_id).expect("blob set_id"); } inc_new_counter_info!("crdt-window-request-pass", 1); @@ -997,7 +972,7 @@ impl Crdt { blob_recycler, Some(ix), Some(me.id), // causes retransmission if I'm the leader - Some(&from.contact_info.tvu_window), + Some(from_addr), ); return Some(out); @@ -1026,9 +1001,9 @@ impl Crdt { ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { Ok(request) => Crdt::handle_protocol( + obj, blob.meta.addr(), request, - obj, window, ledger_window, blob_recycler, @@ -1041,68 +1016,103 @@ impl Crdt { } fn handle_protocol( + me: &Arc>, from_addr: SocketAddr, request: Protocol, - obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, ) -> Option { match request { // TODO sigverify these - Protocol::RequestUpdates(v, from_rd) => { + Protocol::RequestUpdates(version, mut from) => { + let debug_id = me.read().unwrap().debug_id(); + trace!( - "RequestUpdates {} from {}, professing to be {}", - v, + "{:x} RequestUpdates {} from {}, professing to be {}", + debug_id, + version, from_addr, - from_rd.contact_info.ncp + from.contact_info.ncp ); - let me = obj.read().unwrap(); - if from_rd.contact_info.ncp == me.table[&me.me].contact_info.ncp { + + if from.id == me.read().unwrap().id { warn!( "RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}", - me.debug_id(), - make_debug_id(&from_rd.id) + me.read().unwrap().debug_id(), + from.debug_id() ); inc_new_counter_info!("crdt-window-request-loopback", 1); return None; } - // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = me.get_updates_since(v); - let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect(); - drop(me); - trace!("get updates since response {} {}", v, data.len()); - let len = data.len(); - let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness); - { - let mut me = obj.write().unwrap(); - me.insert(&from_rd); - me.update_liveness(from_rd.id); + + // the remote side may not know his public IP:PORT, record what he looks like to us + // this may or may not be correct for everybody but it's better than leaving him with + // an unspecified address in our table + if from.contact_info.ncp.ip().is_unspecified() { + inc_new_counter_info!("crdt-window-request-updates-unspec-ncp", 1); + from.contact_info.ncp = from_addr; } + + let (from, ups, data, liveness) = { + let me = me.read().unwrap(); + + // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` + let (from, ups, data) = me.get_updates_since(version); + ( + from, + ups, + data, + me.remote.iter().map(|(k, v)| (*k, *v)).collect(), + ) + }; + + // update entry only after collecting liveness + { + let mut me = me.write().unwrap(); + me.insert(&from); + me.update_liveness(from.id); + } + + trace!("get updates since response {} {}", version, data.len()); + let len = data.len(); + if len < 1 { - let me = obj.read().unwrap(); + let me = me.read().unwrap(); trace!( "no updates me {:x} ix {} since {}", - me.debug_id(), + debug_id, me.update_index, - v + version ); None - } else if let Ok(r) = to_blob(rsp, from_addr, &blob_recycler) { - trace!( - "sending updates me {:x} len {} to {:x} {}", - obj.read().unwrap().debug_id(), - len, - from_rd.debug_id(), - from_addr, - ); - Some(r) } else { - warn!("to_blob failed"); - None + let rsp = Protocol::ReceiveUpdates(from, ups, data, liveness); + + if let Ok(r) = to_blob(rsp, from.contact_info.ncp, &blob_recycler) { + trace!( + "sending updates me {:x} len {} to {:x} {}", + debug_id, + len, + from.debug_id(), + from.contact_info.ncp, + ); + Some(r) + } else { + warn!("to_blob failed"); + None + } } } Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { + // the remote side may not know his public IP:PORT, but if so + // how did he come to have any gossip to share? + // this could happen if the root node binds to 0:0... + if from.contact_info.ncp.ip().is_unspecified() { + inc_new_counter_info!("crdt-window-receive-updates-unspec-ncp", 1); + from.contact_info.ncp = from_addr; + } + let now = Instant::now(); trace!( "ReceivedUpdates from={:x} update_index={} len={}", @@ -1110,8 +1120,8 @@ impl Crdt { update_index, data.len() ); - obj.write() - .expect("'obj' write lock in ReceiveUpdates") + me.write() + .expect("'me' write lock in ReceiveUpdates") .apply_updates(from, update_index, &data, &external_liveness); report_time_spent( @@ -1121,32 +1131,43 @@ impl Crdt { ); None } + Protocol::RequestWindowIndex(from, ix) => { let now = Instant::now(); - //TODO this doesn't depend on CRDT module, can be moved + + //TODO this doesn't depend on CRDT module, could be moved //but we are using the listen thread to service these request //TODO verify from is signed - obj.write().unwrap().insert(&from); - let me = obj.read().unwrap().my_data().clone(); - inc_new_counter_info!("crdt-window-request-recv", 1); - trace!( - "{:x}:received RequestWindowIndex {:x} {} ", - me.debug_id(), - from.debug_id(), - ix, - ); - if from.contact_info.tvu_window == me.contact_info.tvu_window { + + if from.id == me.read().unwrap().id { warn!( - "Ignored {:x}:received RequestWindowIndex from ME {:x} {} ", - me.debug_id(), + "{:x}: Ignored received RequestWindowIndex from ME {:x} {} ", + me.read().unwrap().debug_id(), from.debug_id(), ix, ); inc_new_counter_info!("crdt-window-request-address-eq", 1); return None; } - let res = - Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler); + + me.write().unwrap().insert(&from); + let me = me.read().unwrap().my_data().clone(); + inc_new_counter_info!("crdt-window-request-recv", 1); + trace!( + "{:x}: received RequestWindowIndex {:x} {} ", + me.debug_id(), + from.debug_id(), + ix, + ); + let res = Self::run_window_request( + &from, + &from_addr, + &window, + ledger_window, + &me, + ix, + blob_recycler, + ); report_time_spent( "RequestWindowIndex", &now.elapsed(), @@ -1189,7 +1210,7 @@ impl Crdt { Ok(()) } pub fn listen( - obj: Arc>, + me: Arc>, window: SharedWindow, ledger_path: Option<&str>, blob_recycler: BlobRecycler, @@ -1197,15 +1218,13 @@ impl Crdt { response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { - let debug_id = obj.read().unwrap().debug_id(); - let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap()); Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { let e = Self::run_listen( - &obj, + &me, &window, &mut ledger_window.as_mut(), &blob_recycler, @@ -1216,21 +1235,21 @@ impl Crdt { return; } if e.is_err() { + let me = me.read().unwrap(); debug!( "{:x}: run_listen timeout, table size: {}", - debug_id, - obj.read().unwrap().table.len() + me.debug_id(), + me.table.len() ); } }) .unwrap() } - fn is_valid_ip_internal(addr: IpAddr, cfg_test: bool) -> bool { - !(addr.is_unspecified() || addr.is_multicast() || (addr.is_loopback() && !cfg_test)) - } fn is_valid_ip(addr: IpAddr) -> bool { - Self::is_valid_ip_internal(addr, cfg!(test) || cfg!(feature = "test")) + !(addr.is_unspecified() || addr.is_multicast()) + // || (addr.is_loopback() && !cfg_test)) + // TODO: boot loopback in production networks } /// port must not be 0 /// ip must be specified and not mulitcast @@ -1240,10 +1259,11 @@ impl Crdt { } pub fn spy_node() -> (NodeInfo, UdpSocket) { - let gossip_socket = bind_in_range(GOSSIP_PORT_RANGE).unwrap(); + let gossip_socket = bind_in_range(SOLANA_PORT_RANGE).unwrap(); let pubkey = Keypair::new().pubkey(); - let daddr = "0.0.0.0:0".parse().unwrap(); - let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr); + let daddr = socketaddr_any!(); + + let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr); (node, gossip_socket) } } @@ -1285,7 +1305,6 @@ impl Node { replicate.local_addr().unwrap(), requests.local_addr().unwrap(), transaction.local_addr().unwrap(), - repair.local_addr().unwrap(), ); Node { data, @@ -1301,58 +1320,6 @@ impl Node { }, } } - pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> Node { - let mut gossip_addr = bind_addr; - gossip_addr.set_port(data.contact_info.ncp.port()); - - let mut replicate_addr = bind_addr; - replicate_addr.set_port(data.contact_info.tvu.port()); - - let mut requests_addr = bind_addr; - requests_addr.set_port(data.contact_info.rpu.port()); - - let mut transactions_addr = bind_addr; - transactions_addr.set_port(data.contact_info.tpu.port()); - - let mut repair_addr = bind_addr; - repair_addr.set_port(data.contact_info.tvu_window.port()); - - fn bind(addr: SocketAddr) -> UdpSocket { - match UdpSocket::bind(addr) { - Ok(socket) => socket, - Err(err) => { - panic!("Failed to bind to {:?}: {:?}", addr, err); - } - } - }; - - let transaction = bind(transactions_addr); - let gossip = bind(gossip_addr); - let replicate = bind(replicate_addr); - let repair = bind(repair_addr); - let requests = bind(requests_addr); - - // Responses are sent from the same Udp port as requests are received - // from, in hopes that a NAT sitting in the middle will route the - // response Udp packet correctly back to the requester. - let respond = requests.try_clone().unwrap(); - - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - Node { - data, - sockets: Sockets { - gossip, - requests, - replicate, - transaction, - respond, - broadcast, - repair, - retransmit, - }, - } - } pub fn new_with_external_ip( pubkey: Pubkey, ip: IpAddr, @@ -1369,7 +1336,7 @@ impl Node { }; fn bind_to(port: u16) -> UdpSocket { - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); + let addr = socketaddr!(0, port); match UdpSocket::bind(addr) { Ok(socket) => socket, Err(err) => { @@ -1386,23 +1353,22 @@ impl Node { let (replicate_port, replicate) = bind(port_range); let (requests_port, requests) = bind(port_range); let (transaction_port, transaction) = bind(port_range); - let (repair_port, repair) = bind(port_range); + + let (_, repair) = bind(port_range); + let (_, broadcast) = bind(port_range); + let (_, retransmit) = bind(port_range); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the // response Udp packet correctly back to the requester. let respond = requests.try_clone().unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - let node_info = NodeInfo::new( pubkey, SocketAddr::new(ip, gossip_port), SocketAddr::new(ip, replicate_port), SocketAddr::new(ip, requests_port), SocketAddr::new(ip, transaction_port), - SocketAddr::new(ip, repair_port), ); Node { @@ -1451,116 +1417,14 @@ mod tests { use transaction::Vote; use window::default_window; - #[test] - fn test_bad_address() { - let d1 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.0:1234".parse().unwrap(), - "0.0.0.0:1235".parse().unwrap(), - "0.0.0.0:1236".parse().unwrap(), - "0.0.0.0:1237".parse().unwrap(), - "0.0.0.0:1238".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d1).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let d1_1 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.1:1234".parse().unwrap(), - "0.0.0.0:1235".parse().unwrap(), - "0.0.0.0:1236".parse().unwrap(), - "0.0.0.0:1237".parse().unwrap(), - "0.0.0.0:1238".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d1_1).err(), - Some(Error::CrdtError(CrdtError::BadContactInfo)) - ); - let d2 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d2).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let d2_1 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.1:1234".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.1:0".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d2_1).err(), - Some(Error::CrdtError(CrdtError::BadContactInfo)) - ); - let d3 = NodeInfo::new_unspecified(); - assert_matches!( - Crdt::new(d3).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let d4 = NodeInfo::new_multicast(); - assert_matches!( - Crdt::new(d4).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let mut d5 = NodeInfo::new_multicast(); - d5.version = 1; - assert_matches!( - Crdt::new(d5).err(), - Some(Error::CrdtError(CrdtError::BadNodeInfo)) - ); - let d6 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.0:1234".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d6).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let d7 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.1:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - ); - assert_matches!( - Crdt::new(d7).err(), - Some(Error::CrdtError(CrdtError::BadGossipAddress)) - ); - let d8 = NodeInfo::new( - Keypair::new().pubkey(), - "0.0.0.1:1234".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - ); - assert_eq!(Crdt::new(d8).is_ok(), true); - } - #[test] fn insert_test() { let mut d = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); @@ -1587,11 +1451,11 @@ mod tests { } #[test] fn test_new_vote() { - let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); - let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); + let leader = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1235")); assert_ne!(d.id, leader.id); assert_matches!( crdt.new_vote(Hash::default()).err(), @@ -1614,7 +1478,7 @@ mod tests { #[test] fn test_insert_vote() { - let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let d = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); @@ -1651,43 +1515,36 @@ mod tests { let keypair = Keypair::new(); let d1 = NodeInfo::new_leader_with_pubkey( keypair.pubkey().clone(), - &"127.0.0.1:1234".parse().unwrap(), + &socketaddr!("127.0.0.1:1234"), ); assert_eq!(d1.id, keypair.pubkey()); - assert_eq!(d1.contact_info.ncp, "127.0.0.1:1235".parse().unwrap()); - assert_eq!(d1.contact_info.tvu, "127.0.0.1:1236".parse().unwrap()); - assert_eq!(d1.contact_info.rpu, "127.0.0.1:1237".parse().unwrap()); - assert_eq!(d1.contact_info.tpu, "127.0.0.1:1234".parse().unwrap()); - assert_eq!( - d1.contact_info.tvu_window, - "127.0.0.1:1238".parse().unwrap() - ); + assert_eq!(d1.contact_info.ncp, socketaddr!("127.0.0.1:1235")); + assert_eq!(d1.contact_info.tvu, socketaddr!("127.0.0.1:1236")); + assert_eq!(d1.contact_info.rpu, socketaddr!("127.0.0.1:1237")); + assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234")); } #[test] fn update_test() { let d1 = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let d2 = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let d3 = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); let (key, ix, ups) = crdt.get_updates_since(0); @@ -1717,7 +1574,7 @@ mod tests { sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&crdt.table.values().map(|x| x.clone()).collect()) ); - let d4 = NodeInfo::new_entry_point("127.0.0.4:1234".parse().unwrap()); + let d4 = NodeInfo::new_entry_point(socketaddr!("127.0.0.4:1234")); crdt.insert(&d4); let (_key, _ix, ups) = crdt.get_updates_since(0); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); @@ -1726,46 +1583,35 @@ mod tests { fn window_index_request() { let me = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!([127, 0, 0, 1], 1234), + socketaddr!([127, 0, 0, 1], 1235), + socketaddr!([127, 0, 0, 1], 1236), + socketaddr!([127, 0, 0, 1], 1237), ); - let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); + let mut crdt = Crdt::new(me).expect("Crdt::new"); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); + + let ncp = socketaddr!([127, 0, 0, 1], 1234); let nxt = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - ); - crdt.insert(&nxt); - let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); - let nxt = NodeInfo::new( - Keypair::new().pubkey(), - "127.0.0.2:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + ncp, + socketaddr!([127, 0, 0, 1], 1235), + socketaddr!([127, 0, 0, 1], 1236), + socketaddr!([127, 0, 0, 1], 1237), ); crdt.insert(&nxt); let rv = crdt.window_index_request(0).unwrap(); - assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap()); - assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); + assert_eq!(nxt.contact_info.ncp, ncp); + assert_eq!(rv.0, nxt.contact_info.ncp); + let ncp2 = socketaddr!([127, 0, 0, 2], 1234); let nxt = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.3:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + ncp2, + socketaddr!([127, 0, 0, 1], 1235), + socketaddr!([127, 0, 0, 1], 1236), + socketaddr!([127, 0, 0, 1], 1237), ); crdt.insert(&nxt); let mut one = false; @@ -1773,10 +1619,10 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = crdt.window_index_request(0).unwrap(); - if rv.0 == "127.0.0.2:1234".parse().unwrap() { + if rv.0 == ncp { one = true; } - if rv.0 == "127.0.0.3:1234".parse().unwrap() { + if rv.0 == ncp2 { two = true; } } @@ -1787,11 +1633,10 @@ mod tests { fn gossip_request_bad_addr() { let me = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:127".parse().unwrap(), - "127.0.0.1:127".parse().unwrap(), - "127.0.0.1:127".parse().unwrap(), - "127.0.0.1:127".parse().unwrap(), - "127.0.0.1:127".parse().unwrap(), + socketaddr!("127.0.0.1:127"), + socketaddr!("127.0.0.1:127"), + socketaddr!("127.0.0.1:127"), + socketaddr!("127.0.0.1:127"), ); let mut crdt = Crdt::new(me).expect("Crdt::new"); @@ -1812,22 +1657,20 @@ mod tests { fn gossip_request() { let me = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); let nxt1 = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.2:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.2:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); crdt.insert(&nxt1); @@ -1835,7 +1678,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt1.contact_info.ncp); - let nxt2 = NodeInfo::new_entry_point("127.0.0.3:1234".parse().unwrap()); + let nxt2 = NodeInfo::new_entry_point(socketaddr!("127.0.0.3:1234")); crdt.insert(&nxt2); // check that the service works // and that it eventually produces a request for both nodes @@ -1876,9 +1719,9 @@ mod tests { #[test] fn purge_test() { logger::setup(); - let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); crdt.set_leader(me.id); crdt.insert(&nxt); @@ -1897,7 +1740,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt2.id); assert_ne!(nxt.id, nxt2.id); crdt.insert(&nxt2); @@ -1919,14 +1762,14 @@ mod tests { #[test] fn purge_leader_test() { logger::setup(); - let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); - let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let nxt = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); assert_ne!(me.id, nxt.id); crdt.insert(&nxt); crdt.set_leader(nxt.id); let now = crdt.alive[&nxt.id]; - let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let mut nxt2 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); crdt.insert(&nxt2); while now == crdt.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); @@ -1947,25 +1790,48 @@ mod tests { let window = default_window(); let me = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let recycler = BlobRecycler::default(); - let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); + let rv = Crdt::run_window_request( + &me, + &socketaddr_any!(), + &window, + &mut None, + &me, + 0, + &recycler, + ); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0].data = Some(out); - let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); + let rv = Crdt::run_window_request( + &me, + &socketaddr_any!(), + &window, + &mut None, + &me, + 0, + &recycler, + ); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request(&window, &mut None, &me, &me, len, &recycler); + let rv = Crdt::run_window_request( + &me, + &socketaddr_any!(), + &window, + &mut None, + &me, + len, + &recycler, + ); assert!(rv.is_none()); fn tmp_ledger(name: &str) -> String { @@ -1988,10 +1854,11 @@ mod tests { let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap(); let rv = Crdt::run_window_request( + &me, + &socketaddr_any!(), &window, &mut Some(&mut ledger_window), &me, - &me, 1, &recycler, ); @@ -2005,15 +1872,23 @@ mod tests { fn run_window_request_with_backoff() { let window = default_window(); - let mut me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mut me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); me.leader_id = me.id; - let mock_peer = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mock_peer = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); let recycler = BlobRecycler::default(); // Simulate handling a repair request from mock_peer - let rv = Crdt::run_window_request(&window, &mut None, &me, &mock_peer, 0, &recycler); + let rv = Crdt::run_window_request( + &mock_peer, + &socketaddr_any!(), + &window, + &mut None, + &me, + 0, + &recycler, + ); assert!(rv.is_none()); let blob = recycler.allocate(); let blob_size = 200; @@ -2023,7 +1898,13 @@ mod tests { let num_requests: u32 = 64; for i in 0..num_requests { let shared_blob = Crdt::run_window_request( - &window, &mut None, &me, &mock_peer, 0, &recycler, + &mock_peer, + &socketaddr_any!(), + &window, + &mut None, + &me, + 0, + &recycler, ).unwrap(); let blob = shared_blob.read().unwrap(); // Test we copied the blob @@ -2042,16 +1923,16 @@ mod tests { #[test] fn test_update_leader() { logger::setup(); - let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let me = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let leader1 = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); assert_eq!(crdt.top_leader(), None); crdt.set_leader(leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { - let mut dum = NodeInfo::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let mut dum = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1234")); dum.id = Keypair::new().pubkey(); dum.leader_id = leader1.id; crdt.insert(&dum); @@ -2067,21 +1948,20 @@ mod tests { #[test] fn test_valid_last_ids() { logger::setup(); - let mut leader0 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let mut leader0 = NodeInfo::new_leader(&socketaddr!("127.0.0.2:1234")); leader0.ledger_state.last_id = hash(b"0"); let mut leader1 = NodeInfo::new_multicast(); leader1.ledger_state.last_id = hash(b"1"); let mut leader2 = - NodeInfo::new_leader_with_pubkey(Pubkey::default(), &"127.0.0.2:1234".parse().unwrap()); + NodeInfo::new_leader_with_pubkey(Pubkey::default(), &socketaddr!("127.0.0.2:1234")); leader2.ledger_state.last_id = hash(b"2"); // test that only valid tvu or tpu are retured as nodes let mut leader3 = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "0.0.0.0:0".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr_any!(), + socketaddr!("127.0.0.1:1236"), + socketaddr_any!(), ); leader3.ledger_state.last_id = hash(b"3"); let mut crdt = Crdt::new(leader0.clone()).expect("Crdt::new"); @@ -2100,10 +1980,10 @@ mod tests { let window = default_window(); let recycler = BlobRecycler::default(); - let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let node = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); + let node_with_same_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:1234")); assert_ne!(node.id, node_with_same_addr.id); - let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap()); + let node_with_diff_addr = NodeInfo::new_leader(&socketaddr!("127.0.0.1:4321")); let crdt = Crdt::new(node.clone()).expect("Crdt::new"); assert_eq!(crdt.alive.len(), 0); @@ -2113,9 +1993,9 @@ mod tests { let request = Protocol::RequestUpdates(1, node.clone()); assert!( Crdt::handle_protocol( + &obj, node.contact_info.ncp, request, - &obj, &window, &mut None, &recycler @@ -2125,9 +2005,9 @@ mod tests { let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); assert!( Crdt::handle_protocol( + &obj, node.contact_info.ncp, request, - &obj, &window, &mut None, &recycler @@ -2136,9 +2016,9 @@ mod tests { let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); Crdt::handle_protocol( + &obj, node.contact_info.ncp, request, - &obj, &window, &mut None, &recycler, @@ -2146,25 +2026,26 @@ mod tests { let me = obj.write().unwrap(); - // |node| and |node_with_same_addr| should not be in me.alive, but - // |node_with_diff_addr| should now be. + // |node| and |node_with_same_addr| are ok to me in me.alive, should not be in me.alive, but assert!(!me.alive.contains_key(&node.id)); - assert!(!me.alive.contains_key(&node_with_same_addr.id)); + // same addr might very well happen because of NAT + assert!(me.alive.contains_key(&node_with_same_addr.id)); + // |node_with_diff_addr| should now be. assert!(me.alive[&node_with_diff_addr.id] > 0); } #[test] fn test_is_valid_address() { assert!(cfg!(test)); - let bad_address_port = "127.0.0.1:0".parse().unwrap(); + let bad_address_port = socketaddr!("127.0.0.1:0"); assert!(!Crdt::is_valid_address(bad_address_port)); - let bad_address_unspecified = "0.0.0.0:1234".parse().unwrap(); + let bad_address_unspecified = socketaddr!(0, 1234); assert!(!Crdt::is_valid_address(bad_address_unspecified)); - let bad_address_multicast = "224.254.0.0:1234".parse().unwrap(); + let bad_address_multicast = socketaddr!([224, 254, 0, 0], 1234); assert!(!Crdt::is_valid_address(bad_address_multicast)); - let loopback = "127.0.0.1:1234".parse().unwrap(); + let loopback = socketaddr!("127.0.0.1:1234"); assert!(Crdt::is_valid_address(loopback)); - assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false)); + // assert!(!Crdt::is_valid_ip_internal(loopback.ip(), false)); } #[test] @@ -2172,91 +2053,57 @@ mod tests { logger::setup(); let node_info = NodeInfo::new( Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), ); let mut crdt = Crdt::new(node_info).unwrap(); - let network_entry_point = NodeInfo::new_entry_point("127.0.0.1:1239".parse().unwrap()); + let network_entry_point = NodeInfo::new_entry_point(socketaddr!("127.0.0.1:1239")); crdt.insert(&network_entry_point); assert!(crdt.leader_data().is_none()); } #[test] fn new_with_external_ip_test_random() { - let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); - let node = - Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0); - - assert_eq!( - node.sockets.gossip.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.replicate.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.requests.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.transaction.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.repair.local_addr().unwrap().ip(), - sockaddr.ip() - ); + let ip = IpAddr::V4(Ipv4Addr::from(0)); + let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 0); + assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.gossip.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.gossip.local_addr().unwrap().port() < 8200); assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200); assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() < 8200); assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200); assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() < 8200); } #[test] fn new_with_external_ip_test_gossip() { - let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); - let node = - Node::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 8050); - assert_eq!( - node.sockets.gossip.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.replicate.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.requests.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.transaction.local_addr().unwrap().ip(), - sockaddr.ip() - ); - assert_eq!( - node.sockets.repair.local_addr().unwrap().ip(), - sockaddr.ip() - ); + let ip = IpAddr::V4(Ipv4Addr::from(0)); + let node = Node::new_with_external_ip(Keypair::new().pubkey(), ip, (8100, 8200), 8050); + assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.replicate.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.transaction.local_addr().unwrap().ip(), ip); + assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.replicate.local_addr().unwrap().port() < 8200); assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() < 8200); assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() < 8200); assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); - assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() < 8200); } } diff --git a/src/nat.rs b/src/nat.rs index 48ddc7ca3a..abd6061306 100644 --- a/src/nat.rs +++ b/src/nat.rs @@ -28,9 +28,8 @@ pub fn get_public_ip_addr() -> Result { } pub fn parse_port_or_addr(optstr: Option<&str>, default_port: u16) -> SocketAddr { - let daddr: SocketAddr = format!("0.0.0.0:{}", default_port) - .parse() - .expect("default socket address"); + let daddr = SocketAddr::from(([0, 0, 0, 0], default_port)); + if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { let mut addr = daddr; diff --git a/src/ncp.rs b/src/ncp.rs index 9a55663a8e..7ebdc3f757 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -30,7 +30,7 @@ impl Ncp { let gossip_socket = Arc::new(gossip_socket); trace!( "Ncp: id: {:?}, listening on: {:?}", - &crdt.read().unwrap().me.as_ref()[..4], + &crdt.read().unwrap().id.as_ref()[..4], gossip_socket.local_addr().unwrap() ); let t_receiver = streamer::blob_receiver( diff --git a/src/packet.rs b/src/packet.rs index 5ca3055bc7..181f8c7410 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -101,14 +101,18 @@ impl Meta { self.addr[1] = u16::from(ip[1]); self.addr[2] = u16::from(ip[2]); self.addr[3] = u16::from(ip[3]); - self.port = a.port(); + self.addr[4] = 0; + self.addr[5] = 0; + self.addr[6] = 0; + self.addr[7] = 0; + self.v6 = false; } SocketAddr::V6(v6) => { self.addr = v6.ip().segments(); - self.port = a.port(); self.v6 = true; } } + self.port = a.port(); } } diff --git a/src/result.rs b/src/result.rs index 3f4caaa7d0..386e39559f 100644 --- a/src/result.rs +++ b/src/result.rs @@ -9,7 +9,6 @@ use packet; use serde_json; use std; use std::any::Any; -use window; #[derive(Debug)] pub enum Error { @@ -22,7 +21,6 @@ pub enum Error { Serialize(std::boxed::Box), BankError(bank::BankError), CrdtError(crdt::CrdtError), - WindowError(window::WindowError), BlobError(packet::BlobError), #[cfg(feature = "erasure")] ErasureError(erasure::ErasureError), @@ -59,11 +57,6 @@ impl std::convert::From for Error { Error::CrdtError(e) } } -impl std::convert::From for Error { - fn from(e: window::WindowError) -> Error { - Error::WindowError(e) - } -} #[cfg(feature = "erasure")] impl std::convert::From for Error { fn from(e: erasure::ErasureError) -> Error { diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 75c2c6e12a..3272d20083 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -82,6 +82,7 @@ impl RetransmitStage { window: SharedWindow, entry_height: u64, retransmit_socket: Arc, + repair_socket: Arc, blob_recycler: &BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { @@ -102,6 +103,7 @@ impl RetransmitStage { fetch_stage_receiver, blob_sender, retransmit_sender, + repair_socket, ); let thread_hdls = vec![t_retransmit, t_window]; diff --git a/src/tvu.rs b/src/tvu.rs index 71e851adc8..31cf097bda 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -81,9 +81,10 @@ impl Tvu { ledger_path: Option<&str>, exit: Arc, ) -> Self { + let repair_socket = Arc::new(repair_socket); let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( - vec![Arc::new(replicate_socket), Arc::new(repair_socket)], + vec![Arc::new(replicate_socket), repair_socket.clone()], exit.clone(), &blob_recycler, ); @@ -95,6 +96,7 @@ impl Tvu { window, entry_height, Arc::new(retransmit_socket), + repair_socket, &blob_recycler, blob_fetch_receiver, ); diff --git a/src/window.rs b/src/window.rs index 18fb450a77..22570feeca 100644 --- a/src/window.rs +++ b/src/window.rs @@ -7,7 +7,7 @@ use entry::Entry; use erasure; use ledger::Block; use log::Level; -use packet::{BlobRecycler, SharedBlob, SharedBlobs, BLOB_SIZE}; +use packet::{BlobRecycler, SharedBlob, SharedBlobs}; use rand::{thread_rng, Rng}; use result::{Error, Result}; use signature::Pubkey; @@ -34,11 +34,6 @@ pub struct WindowSlot { pub type SharedWindow = Arc>>; -#[derive(Debug, PartialEq, Eq)] -pub enum WindowError { - GenericError, -} - #[derive(Debug)] pub struct WindowIndex { pub data: u64, @@ -51,9 +46,9 @@ fn find_next_missing( recycler: &BlobRecycler, consumed: u64, received: u64, -) -> Result)>> { +) -> Option)>> { if received <= consumed { - Err(WindowError::GenericError)?; + return None; } let mut window = window.write().unwrap(); let reqs: Vec<_> = (consumed..received) @@ -77,7 +72,7 @@ fn find_next_missing( None }) .collect(); - Ok(reqs) + Some(reqs) } fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 { @@ -125,11 +120,11 @@ fn repair_window( times: &mut usize, consumed: u64, received: u64, -) -> Result<()> { +) -> Option)>> { //exponential backoff if !repair_backoff(last, times, consumed) { trace!("{:x} !repair_backoff() times = {}", debug_id, times); - return Ok(()); + return None; } let highest_lost = calculate_highest_lost_blob_index( @@ -137,30 +132,26 @@ fn repair_window( consumed, received, ); - let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?; - trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); - if !reqs.is_empty() { + if let Some(reqs) = find_next_missing(window, crdt, recycler, consumed, highest_lost) { inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); - info!( - "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", - debug_id, - *times, - consumed, - highest_lost, - reqs.len() - ); + if log_enabled!(Level::Trace) { + trace!( + "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", + debug_id, + *times, + consumed, + highest_lost, + reqs.len() + ); + + for (to, _) in reqs.clone() { + trace!("{:x}: repair_window request to {}", debug_id, to); + } + } + Some(reqs) + } else { + None } - let sock = UdpSocket::bind("0.0.0.0:0")?; - for (to, req) in reqs { - //todo cache socket - debug!( - "{:x}: repair_window request {} {} {}", - debug_id, consumed, highest_lost, to - ); - assert!(req.len() <= BLOB_SIZE); - sock.send_to(&req, to)?; - } - Ok(()) } fn add_block_to_retransmit_queue( @@ -248,7 +239,7 @@ fn retransmit_all_leader_blocks( warn!("{:x}: no leader to retransmit from", debug_id); } if !retransmit_queue.is_empty() { - debug!( + trace!( "{:x}: RECV_WINDOW {} {}: retransmit {}", debug_id, consumed, @@ -437,7 +428,7 @@ fn recv_window( } let now = Instant::now(); inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); - debug!( + trace!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, *consumed, @@ -462,7 +453,7 @@ fn recv_window( let mut consume_queue = VecDeque::new(); while let Some(b) = dq.pop_front() { let (pix, meta_size) = { - let p = b.write().expect("'b' write lock in fn recv_window"); + let p = b.write().unwrap(); (p.get_index()?, p.meta.size) }; pixs.push(pix); @@ -488,29 +479,17 @@ fn recv_window( } if log_enabled!(Level::Trace) { trace!("{}", print_window(debug_id, window, *consumed)); - } - info!( - "{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", - debug_id, - *consumed, - *received, - consume_queue.len(), - pixs, - duration_as_ms(&now.elapsed()) - ); - if !consume_queue.is_empty() { - debug!( - "{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}", + trace!( + "{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", debug_id, *consumed, *received, consume_queue.len(), + pixs, + duration_as_ms(&now.elapsed()) ); - trace!( - "{:x}: sending consume_queue.len: {}", - debug_id, - consume_queue.len() - ); + } + if !consume_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; } @@ -651,6 +630,7 @@ pub fn window( r: BlobReceiver, s: BlobSender, retransmit: BlobSender, + repair_socket: Arc, ) -> JoinHandle<()> { Builder::new() .name("solana-window".to_string()) @@ -684,9 +664,16 @@ pub fn window( } } } - let _ = repair_window( + if let Some(reqs) = repair_window( debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received, - ); + ) { + for (to, req) in reqs { + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{:x} repair req send_to({}) error {:?}", debug_id, to, e); + 0 + }); + } + } } }) .unwrap() @@ -824,6 +811,7 @@ mod test { r_reader, s_window, s_retransmit, + Arc::new(tn.sockets.repair), ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -893,6 +881,7 @@ mod test { r_reader, s_window, s_retransmit, + Arc::new(tn.sockets.repair), ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -955,6 +944,7 @@ mod test { r_reader, s_window, s_retransmit, + Arc::new(tn.sockets.repair), ); let t_responder = { let (s_responder, r_responder) = channel(); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index cc0cf15397..5eed945157 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -75,7 +75,7 @@ fn gossip_ring() -> result::Result<()> { let x = (n + 1) % listen.len(); let mut xv = listen[x].0.write().unwrap(); let yv = listen[y].0.read().unwrap(); - let mut d = yv.table[&yv.me].clone(); + let mut d = yv.table[&yv.id].clone(); d.version = 0; xv.insert(&d); } @@ -95,10 +95,10 @@ fn gossip_star() { let y = (n + 1) % listen.len(); let mut xv = listen[x].0.write().unwrap(); let yv = listen[y].0.read().unwrap(); - let mut yd = yv.table[&yv.me].clone(); + let mut yd = yv.table[&yv.id].clone(); yd.version = 0; xv.insert(&yd); - trace!("star leader {:?}", &xv.me.as_ref()[..4]); + trace!("star leader {:?}", &xv.id.as_ref()[..4]); } }); } @@ -111,7 +111,7 @@ fn gossip_rstar() { let num = listen.len(); let xd = { let xv = listen[0].0.read().unwrap(); - xv.table[&xv.me].clone() + xv.table[&xv.id].clone() }; trace!("rstar leader {:?}", &xd.id.as_ref()[..4]); for n in 0..(num - 1) { @@ -121,7 +121,7 @@ fn gossip_rstar() { trace!( "rstar insert {:?} into {:?}", &xd.id.as_ref()[..4], - &yv.me.as_ref()[..4] + &yv.id.as_ref()[..4] ); } }); @@ -202,9 +202,9 @@ fn test_external_liveness_table() { let c1_data = c1.read().unwrap().my_data().clone(); c1.write().unwrap().set_leader(c1_data.id); - let c2_id = c2.read().unwrap().me; - let c3_id = c3.read().unwrap().me; - let c4_id = c4.read().unwrap().me; + let c2_id = c2.read().unwrap().id; + let c3_id = c3.read().unwrap().id; + let c4_id = c4.read().unwrap().id; // Insert the remote data about c4 let c2_index_for_c4 = 10; @@ -236,7 +236,7 @@ fn test_external_liveness_table() { // Validate c1's external liveness table, then release lock rc1 { let rc1 = c1.read().unwrap(); - let el = rc1.get_external_liveness_entry(&c4.read().unwrap().me); + let el = rc1.get_external_liveness_entry(&c4.read().unwrap().id); // Make sure liveness table entry for c4 exists on node c1 assert!(el.is_some()); diff --git a/tests/multinode.rs b/tests/multinode.rs index 42abafe687..6ecf730794 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -113,7 +113,6 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { #[test] fn test_multi_node_ledger_window() -> result::Result<()> { - assert!(cfg!(feature = "test")); logger::setup(); let leader_keypair = Keypair::new();