diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 367effcbc..133ca41d8 100755 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -423,7 +423,6 @@ fn converge( threads: &mut Vec>, ) -> Vec { //lets spy on the network - let daddr = "0.0.0.0:0".parse().unwrap(); let (spy, spy_gossip) = spy_node(); let mut spy_crdt = Crdt::new(spy).expect("Crdt::new"); spy_crdt.insert(&leader); @@ -447,7 +446,7 @@ fn converge( .table .values() .into_iter() - .filter(|x| x.contact_info.rpu != daddr) + .filter(|x| Crdt::is_valid_address(x.contact_info.rpu)) .cloned() .collect(); if v.len() >= num_nodes { diff --git a/src/crdt.rs b/src/crdt.rs index 8918a1790..c7671ab8a 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -524,7 +524,6 @@ impl Crdt { pub fn compute_broadcast_table(&self) -> Vec { let live: Vec<_> = self.alive.iter().collect(); //thread_rng().shuffle(&mut live); - let daddr = "0.0.0.0:0".parse().unwrap(); let me = &self.table[&self.me]; let cloned_table: Vec = live.iter() .map(|x| &self.table[x.0]) @@ -532,7 +531,7 @@ impl Crdt { if me.id == v.id { //filter myself false - } else if v.contact_info.tvu == daddr { + } else if !(Self::is_valid_address(v.contact_info.tvu)) { trace!( "{:x}:broadcast skip not listening {:x}", me.debug_id(), @@ -640,7 +639,6 @@ impl Crdt { .set_id(me.id) .expect("set_id in pub fn retransmit"); let rblob = blob.read().unwrap(); - let daddr = "0.0.0.0:0".parse().unwrap(); let orders: Vec<_> = table .iter() .filter(|v| { @@ -649,7 +647,7 @@ impl Crdt { } else if me.leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false - } else if v.contact_info.tvu == daddr { + } else if !(Self::is_valid_address(v.contact_info.tvu)) { trace!("skip nodes that are not listening {:?}", v.id); false } else { @@ -710,10 +708,9 @@ impl Crdt { } pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { - let daddr = "0.0.0.0:0".parse().unwrap(); let valid: Vec<_> = self.table .values() - .filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr) + .filter(|r| r.id != self.me && Self::is_valid_address(r.contact_info.tvu_window)) .collect(); if valid.is_empty() { Err(CrdtError::NoPeers)?; @@ -1143,6 +1140,10 @@ impl Crdt { }) .unwrap() } + + pub fn is_valid_address(addr: SocketAddr) -> bool { + (addr.port() != 0) && !(addr.ip().is_unspecified() || addr.ip().is_multicast()) + } } pub struct Sockets { diff --git a/tests/multinode.rs b/tests/multinode.rs index ea5c51866..c739d741b 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -54,7 +54,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { .values() .into_iter() .filter(|x| x.id != me) - .filter(|x| x.contact_info.rpu != daddr) + .filter(|x| Crdt::is_valid_address(x.contact_info.rpu)) .cloned() .collect(); if num >= num_nodes as u64 && v.len() >= num_nodes { @@ -483,9 +483,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient { .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let daddr = "0.0.0.0:0".parse().unwrap(); - assert!(leader.contact_info.rpu != daddr); - assert!(leader.contact_info.tpu != daddr); + assert!(Crdt::is_valid_address(leader.contact_info.rpu)); + assert!(Crdt::is_valid_address(leader.contact_info.tpu)); ThinClient::new( leader.contact_info.rpu, requests_socket,