diff --git a/src/crdt.rs b/src/crdt.rs index 4c9e294870..58cb8db1fc 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -32,6 +32,9 @@ use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender, Window}; +use timing::timestamp; + +const GOSSIP_SLEEP_MILLIS: u64 = 100; pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); @@ -180,6 +183,7 @@ pub struct Crdt { /// The value of the remote update index that I have last seen /// This Node will ask external nodes for updates since the value in this list pub remote: HashMap, + pub alive: HashMap, pub update_index: u64, pub me: PublicKey, } @@ -204,6 +208,7 @@ impl Crdt { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), + alive: HashMap::new(), me: me.id, update_index: 1, }; @@ -248,6 +253,35 @@ impl Crdt { self.table[&v.id].version ); } + //update the liveness table + let now = timestamp(); + *self.alive.entry(v.id).or_insert(now) = now; + } + + /// purge old validators + /// TODO: we need a robust membership protocol + /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf + /// challenging part is that we are on a permissionless network + pub fn purge(&mut self, now: u64) { + //wait for 4x as long as it would randomly take to reach our node + //assuming everyone is waiting the same amount of time as this node + let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + let purge_set: Vec = self.alive + .iter() + .filter_map(|(k, v)| { + if *k != self.me && (now - v) > limit { + Some((*k).clone()) + } else { + None + } + }) + .collect(); + for p in purge_set.iter() { + self.alive.remove(p); + self.table.remove(p); + self.remote.remove(p); + self.local.remove(p); + } } pub fn index_blobs( @@ -540,7 +574,7 @@ impl Crdt { } //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); }) .unwrap() } @@ -585,9 +619,9 @@ impl Crdt { ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { // TODO sigverify these - Ok(Protocol::RequestUpdates(v, reqdata)) => { + Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); - let addr = reqdata.gossip_addr; + let addr = from_rd.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read() .expect("'obj' read lock in RequestUpdates") @@ -595,7 +629,7 @@ impl Crdt { trace!("get updates since response {} {}", v, data.len()); let len = data.len(); let rsp = Protocol::ReceiveUpdates(from, ups, data); - obj.write().unwrap().insert(&reqdata); + obj.write().unwrap().insert(&from_rd); if len < 1 { let me = obj.read().unwrap(); trace!( @@ -610,7 +644,7 @@ impl Crdt { "sending updates me {:?} len {} to {:?} {}", &obj.read().unwrap().me[..4], len, - &reqdata.id[..4], + &from_rd.id[..4], addr, ); Some(r) @@ -759,7 +793,7 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; + use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS}; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; @@ -998,6 +1032,28 @@ mod tests { assert!(one && two); } + #[test] + fn purge_test() { + let me = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let mut crdt = Crdt::new(me.clone()); + let nxt = ReplicatedData::new_entry_point("127.0.0.2:1234".parse().unwrap()); + crdt.insert(&nxt); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + let now = crdt.alive[&nxt.id]; + crdt.purge(now); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request(); + assert_matches!(rv, Err(Error::CrdtTooSmall)); + } + /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request() {