replace 'daddr' checks with 'is_valid_address()'
This commit is contained in:
parent
9cf0bd9b88
commit
84225beeef
|
@ -423,7 +423,6 @@ fn converge(
|
||||||
threads: &mut Vec<JoinHandle<()>>,
|
threads: &mut Vec<JoinHandle<()>>,
|
||||||
) -> Vec<NodeInfo> {
|
) -> Vec<NodeInfo> {
|
||||||
//lets spy on the network
|
//lets spy on the network
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
let (spy, spy_gossip) = spy_node();
|
let (spy, spy_gossip) = spy_node();
|
||||||
let mut spy_crdt = Crdt::new(spy).expect("Crdt::new");
|
let mut spy_crdt = Crdt::new(spy).expect("Crdt::new");
|
||||||
spy_crdt.insert(&leader);
|
spy_crdt.insert(&leader);
|
||||||
|
@ -447,7 +446,7 @@ fn converge(
|
||||||
.table
|
.table
|
||||||
.values()
|
.values()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|x| x.contact_info.rpu != daddr)
|
.filter(|x| Crdt::is_valid_address(x.contact_info.rpu))
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect();
|
.collect();
|
||||||
if v.len() >= num_nodes {
|
if v.len() >= num_nodes {
|
||||||
|
|
13
src/crdt.rs
13
src/crdt.rs
|
@ -524,7 +524,6 @@ impl Crdt {
|
||||||
pub fn compute_broadcast_table(&self) -> Vec<NodeInfo> {
|
pub fn compute_broadcast_table(&self) -> Vec<NodeInfo> {
|
||||||
let live: Vec<_> = self.alive.iter().collect();
|
let live: Vec<_> = self.alive.iter().collect();
|
||||||
//thread_rng().shuffle(&mut live);
|
//thread_rng().shuffle(&mut live);
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
let me = &self.table[&self.me];
|
let me = &self.table[&self.me];
|
||||||
let cloned_table: Vec<NodeInfo> = live.iter()
|
let cloned_table: Vec<NodeInfo> = live.iter()
|
||||||
.map(|x| &self.table[x.0])
|
.map(|x| &self.table[x.0])
|
||||||
|
@ -532,7 +531,7 @@ impl Crdt {
|
||||||
if me.id == v.id {
|
if me.id == v.id {
|
||||||
//filter myself
|
//filter myself
|
||||||
false
|
false
|
||||||
} else if v.contact_info.tvu == daddr {
|
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
|
||||||
trace!(
|
trace!(
|
||||||
"{:x}:broadcast skip not listening {:x}",
|
"{:x}:broadcast skip not listening {:x}",
|
||||||
me.debug_id(),
|
me.debug_id(),
|
||||||
|
@ -640,7 +639,6 @@ impl Crdt {
|
||||||
.set_id(me.id)
|
.set_id(me.id)
|
||||||
.expect("set_id in pub fn retransmit");
|
.expect("set_id in pub fn retransmit");
|
||||||
let rblob = blob.read().unwrap();
|
let rblob = blob.read().unwrap();
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
let orders: Vec<_> = table
|
let orders: Vec<_> = table
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|v| {
|
.filter(|v| {
|
||||||
|
@ -649,7 +647,7 @@ impl Crdt {
|
||||||
} else if me.leader_id == v.id {
|
} else if me.leader_id == v.id {
|
||||||
trace!("skip retransmit to leader {:?}", v.id);
|
trace!("skip retransmit to leader {:?}", v.id);
|
||||||
false
|
false
|
||||||
} else if v.contact_info.tvu == daddr {
|
} else if !(Self::is_valid_address(v.contact_info.tvu)) {
|
||||||
trace!("skip nodes that are not listening {:?}", v.id);
|
trace!("skip nodes that are not listening {:?}", v.id);
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
|
@ -710,10 +708,9 @@ impl Crdt {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
let valid: Vec<_> = self.table
|
let valid: Vec<_> = self.table
|
||||||
.values()
|
.values()
|
||||||
.filter(|r| r.id != self.me && r.contact_info.tvu_window != daddr)
|
.filter(|r| r.id != self.me && Self::is_valid_address(r.contact_info.tvu_window))
|
||||||
.collect();
|
.collect();
|
||||||
if valid.is_empty() {
|
if valid.is_empty() {
|
||||||
Err(CrdtError::NoPeers)?;
|
Err(CrdtError::NoPeers)?;
|
||||||
|
@ -1143,6 +1140,10 @@ impl Crdt {
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_valid_address(addr: SocketAddr) -> bool {
|
||||||
|
(addr.port() != 0) && !(addr.ip().is_unspecified() || addr.ip().is_multicast())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Sockets {
|
pub struct Sockets {
|
||||||
|
|
|
@ -54,7 +54,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
|
||||||
.values()
|
.values()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|x| x.id != me)
|
.filter(|x| x.id != me)
|
||||||
.filter(|x| x.contact_info.rpu != daddr)
|
.filter(|x| Crdt::is_valid_address(x.contact_info.rpu))
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect();
|
.collect();
|
||||||
if num >= num_nodes as u64 && v.len() >= num_nodes {
|
if num >= num_nodes as u64 && v.len() >= num_nodes {
|
||||||
|
@ -483,9 +483,8 @@ fn mk_client(leader: &NodeInfo) -> ThinClient {
|
||||||
.set_read_timeout(Some(Duration::new(1, 0)))
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
assert!(Crdt::is_valid_address(leader.contact_info.rpu));
|
||||||
assert!(leader.contact_info.rpu != daddr);
|
assert!(Crdt::is_valid_address(leader.contact_info.tpu));
|
||||||
assert!(leader.contact_info.tpu != daddr);
|
|
||||||
ThinClient::new(
|
ThinClient::new(
|
||||||
leader.contact_info.rpu,
|
leader.contact_info.rpu,
|
||||||
requests_socket,
|
requests_socket,
|
||||||
|
|
Loading…
Reference in New Issue