crdt gossip tests
This commit is contained in:
parent
4a44498d45
commit
6db9f92b8a
45
src/crdt.rs
45
src/crdt.rs
|
@ -162,7 +162,7 @@ pub struct Crdt {
|
|||
timeout: Duration,
|
||||
}
|
||||
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
enum Protocol {
|
||||
/// forward your own latest data structure when requesting an update
|
||||
/// this doesn't update the `remote` update index, but it allows the
|
||||
|
@ -709,8 +709,13 @@ impl TestNode {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
|
||||
use packet::BlobRecycler;
|
||||
use result::Error;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_parse_port_or_addr() {
|
||||
|
@ -870,4 +875,42 @@ mod tests {
|
|||
}
|
||||
assert!(one && two);
|
||||
}
|
||||
/// Test that insert drops messages that are older
|
||||
#[test]
|
||||
fn gossip_request() {
|
||||
let me = ReplicatedData::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.1:1234".parse().unwrap(),
|
||||
"127.0.0.1:1235".parse().unwrap(),
|
||||
"127.0.0.1:1236".parse().unwrap(),
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let rv = crdt.gossip_request();
|
||||
assert_matches!(rv, Err(Error::CrdtTooSmall));
|
||||
let nxt = ReplicatedData::new(
|
||||
KeyPair::new().pubkey(),
|
||||
"127.0.0.3:1234".parse().unwrap(),
|
||||
"127.0.0.1:1235".parse().unwrap(),
|
||||
"127.0.0.1:1236".parse().unwrap(),
|
||||
"127.0.0.1:1237".parse().unwrap(),
|
||||
"127.0.0.1:1238".parse().unwrap(),
|
||||
);
|
||||
crdt.insert(&nxt);
|
||||
let rv = crdt.gossip_request().unwrap();
|
||||
assert_eq!(rv.0, nxt.gossip_addr);
|
||||
let (sender, reader) = channel();
|
||||
let recycler = BlobRecycler::default();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let obj = Arc::new(RwLock::new(crdt));
|
||||
let thread = Crdt::gossip(obj, recycler, sender, exit.clone());
|
||||
let rv = reader.recv_timeout(Duration::new(1, 0)).unwrap();
|
||||
assert!(rv.len() > 0);
|
||||
for i in rv.iter() {
|
||||
assert_eq!(i.read().unwrap().meta.addr(), nxt.gossip_addr);
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
thread.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue