From 6db9f92b8a9ced5a80615fe587c58d897bb954bb Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 3 Jun 2018 19:59:17 -0700 Subject: [PATCH] crdt gossip tests --- src/crdt.rs | 45 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/crdt.rs b/src/crdt.rs index e4ba429c1..1861670bc 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -162,7 +162,7 @@ pub struct Crdt { timeout: Duration, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] enum Protocol { /// forward your own latest data structure when requesting an update /// this doesn't update the `remote` update index, but it allows the @@ -709,8 +709,13 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; + use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::time::Duration; #[test] fn test_parse_port_or_addr() { @@ -870,4 +875,42 @@ mod tests { } assert!(one && two); } + /// Test that insert drops messages that are older + #[test] + fn gossip_request() { + let me = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + let mut crdt = Crdt::new(me.clone()); + let rv = crdt.gossip_request(); + assert_matches!(rv, Err(Error::CrdtTooSmall)); + let nxt = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.3:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + crdt.insert(&nxt); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + let (sender, reader) = channel(); + let recycler = BlobRecycler::default(); + let exit = Arc::new(AtomicBool::new(false)); + let obj = Arc::new(RwLock::new(crdt)); + let thread = Crdt::gossip(obj, recycler, sender, exit.clone()); + let rv = reader.recv_timeout(Duration::new(1, 0)).unwrap(); + assert!(rv.len() > 0); + for i in rv.iter() { + assert_eq!(i.read().unwrap().meta.addr(), nxt.gossip_addr); + } + exit.store(true, Ordering::Relaxed); + thread.join().unwrap(); + } }