From a5cf745e1c4e0110206ef1a8fa6d28fd1f2dea54 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 15 May 2018 19:59:20 -0700 Subject: [PATCH 1/9] check convergence --- src/crdt.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index e2303fbbe..144fdc775 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -552,21 +552,29 @@ mod test { .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) .collect(); let mut done = true; - for _ in 0..(num * 32) { - done = true; + for i in 0..(num * 32) { + done = false; + trace!("round {}", i); for &(ref c, _) in listen.iter() { - trace!( - "done updates {} {}", - c.read().unwrap().table.len(), - c.read().unwrap().update_index - ); - //make sure the number of updates doesn't grow unbounded - assert!(c.read().unwrap().update_index <= num as u64); - //make sure we got all the updates - if c.read().unwrap().table.len() != num { - done = false; + assert!(num >= c.read().unwrap().remote.values().len()); + trace!("len {}", c.read().unwrap().remote.values().len()); + if (num - 1)== c.read().unwrap().remote.values().len() { + done = true; + //for this node check if it thinks every node received num updates + for r in c.read().unwrap().remote.values() { + assert!(*r <= num as u64); + trace!("r value {}", *r); + if *r != num as u64 { + done = false; + } + } + } + //at least 1 node thinks every node has received num updates + if done == true { + break; } } + //at least 1 node things every node has received num updates if done == true { break; } @@ -590,6 +598,7 @@ mod test { #[test] #[ignore] fn gossip_ring_test() { + logger::setup(); run_gossip_topo(|listen| { let num = listen.len(); for n in 0..num { From f3f0b9f0c5a6d3dad82b6c38218b900064b50642 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 21:35:18 -0700 Subject: [PATCH 2/9] update --- src/crdt.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crdt.rs b/src/crdt.rs index 144fdc775..3ea12c82d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -557,7 +557,7 @@ mod test { trace!("round {}", i); for &(ref c, _) in listen.iter() { assert!(num >= c.read().unwrap().remote.values().len()); - trace!("len {}", c.read().unwrap().remote.values().len()); + trace!("len {}", c.read().unwrap().table.values().len()); if (num - 1)== c.read().unwrap().remote.values().len() { done = true; //for this node check if it thinks every node received num updates From bee1e7ebaf708988795e08b9d9b020b019303227 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 22:54:06 -0700 Subject: [PATCH 3/9] compute convergence maximum --- src/crdt.rs | 24 +++++++++--------------- src/thin_client.rs | 10 +++++++--- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 3ea12c82d..12d3b911e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -28,6 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; +use std; /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone)] @@ -295,6 +296,12 @@ impl Crdt { Ok(()) } + // number of nodes that we are converged to + pub fn convergence(&self) -> u64 { + let min = self.remote.values().fold(std::u64::MAX, |a,b| std::cmp::min(a, *b)); + std::cmp::min(min, self.remote.values().len() as u64 + 1) + } + fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; @@ -556,25 +563,12 @@ mod test { done = false; trace!("round {}", i); for &(ref c, _) in listen.iter() { - assert!(num >= c.read().unwrap().remote.values().len()); - trace!("len {}", c.read().unwrap().table.values().len()); - if (num - 1)== c.read().unwrap().remote.values().len() { + if num == c.read().unwrap().convergence() as usize { done = true; - //for this node check if it thinks every node received num updates - for r in c.read().unwrap().remote.values() { - assert!(*r <= num as u64); - trace!("r value {}", *r); - if *r != num as u64 { - done = false; - } - } - } - //at least 1 node thinks every node has received num updates - if done == true { break; } } - //at least 1 node things every node has received num updates + //at least 1 node converged if done == true { break; } diff --git a/src/thin_client.rs b/src/thin_client.rs index 1979d42ff..2af3b0d66 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -365,21 +365,25 @@ mod tests { let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge + let mut converged = false; for _ in 0..30 { - let len = spy_ref.read().unwrap().table.values().len(); + let len = spy_ref.read().unwrap().remote.values().len(); let mut min = num_nodes as u64; for u in spy_ref.read().unwrap().remote.values() { if min > *u { min = *u; } } - info!("length {} {}", len, min); - if num_nodes == len && min >= (num_nodes as u64) { + info!("converging... {} {}", len, min); + assert!(min <= num_nodes as u64); + if (num_nodes - 1) == len && min == (num_nodes as u64) { + converged = true; warn!("converged! {} {}", len, min); break; } sleep(Duration::new(1, 0)); } + assert!(converged); threads.push(t_spy_listen); threads.push(t_spy_gossip); let v: Vec = spy_ref From 8dc1b07e75bc19f221c4e36f641e5d7fcd23714d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 22:58:32 -0700 Subject: [PATCH 4/9] docs --- src/crdt.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crdt.rs b/src/crdt.rs index 12d3b911e..053b4491c 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -296,7 +296,7 @@ impl Crdt { Ok(()) } - // number of nodes that we are converged to + // max number of nodes that we could be converged to pub fn convergence(&self) -> u64 { let min = self.remote.values().fold(std::u64::MAX, |a,b| std::cmp::min(a, *b)); std::cmp::min(min, self.remote.values().len() as u64 + 1) From 051fa6f1f10cede02fe6a6868e595eda510a0bbb Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 23:10:36 -0700 Subject: [PATCH 5/9] cleanup --- src/thin_client.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index 2af3b0d66..bdf43d319 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -367,18 +367,9 @@ mod tests { //wait for the network to converge let mut converged = false; for _ in 0..30 { - let len = spy_ref.read().unwrap().remote.values().len(); - let mut min = num_nodes as u64; - for u in spy_ref.read().unwrap().remote.values() { - if min > *u { - min = *u; - } - } - info!("converging... {} {}", len, min); - assert!(min <= num_nodes as u64); - if (num_nodes - 1) == len && min == (num_nodes as u64) { + let num = spy_ref.read().unwrap().convergence(); + if num == num_nodes as u64 { converged = true; - warn!("converged! {} {}", len, min); break; } sleep(Duration::new(1, 0)); From b04716d40d1e292c950b0e33102ba74a85336dc0 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 23:11:51 -0700 Subject: [PATCH 6/9] fmt --- src/crdt.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 053b4491c..0bde81dc4 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -21,6 +21,7 @@ use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; use signature::{PublicKey, Signature}; +use std; use std::collections::HashMap; use std::io::Cursor; use std::net::{SocketAddr, UdpSocket}; @@ -28,7 +29,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; -use std; /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone)] @@ -298,10 +298,12 @@ impl Crdt { // max number of nodes that we could be converged to pub fn convergence(&self) -> u64 { - let min = self.remote.values().fold(std::u64::MAX, |a,b| std::cmp::min(a, *b)); + let min = self.remote + .values() + .fold(std::u64::MAX, |a, b| std::cmp::min(a, *b)); std::cmp::min(min, self.remote.values().len() as u64 + 1) } - + fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; From 35ee2d0ce1291df438768f501d670c61bd0004de Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 23:17:45 -0700 Subject: [PATCH 7/9] cleanup --- src/crdt.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 0bde81dc4..f278865b9 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -298,10 +298,10 @@ impl Crdt { // max number of nodes that we could be converged to pub fn convergence(&self) -> u64 { - let min = self.remote + let max = self.remote.values().len() as u64 + 1; + self.remote .values() - .fold(std::u64::MAX, |a, b| std::cmp::min(a, *b)); - std::cmp::min(min, self.remote.values().len() as u64 + 1) + .fold(max, |a, b| std::cmp::min(a, *b)) } fn random() -> u64 { From 9a4ce6d70e2131c70f78290b7363cdb8e69bf305 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 23:18:56 -0700 Subject: [PATCH 8/9] fmt --- src/crdt.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index f278865b9..5a6bed149 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -299,9 +299,7 @@ impl Crdt { // max number of nodes that we could be converged to pub fn convergence(&self) -> u64 { let max = self.remote.values().len() as u64 + 1; - self.remote - .values() - .fold(max, |a, b| std::cmp::min(a, *b)) + self.remote.values().fold(max, |a, b| std::cmp::min(a, *b)) } fn random() -> u64 { From 63a0ba6ec82e8e5123744e1a20d6c77955754fb3 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 16 May 2018 23:21:41 -0700 Subject: [PATCH 9/9] fixed --- src/crdt.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/crdt.rs b/src/crdt.rs index 5a6bed149..7029e79b2 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -195,7 +195,6 @@ impl Crdt { if nodes.len() < 1 { return Err(Error::CrdtTooSmall); } - info!("nodes table {}", nodes.len()); info!("blobs table {}", blobs.len()); // enumerate all the blobs, those are the indices