From ee54ce47278140204f889bb1415fdef662c1c19d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 22:03:49 -0700 Subject: [PATCH] min table size before purge --- src/crdt.rs | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index c893b66a9e..ab9c287274 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -34,8 +34,12 @@ use std::time::Duration; use streamer::{BlobReceiver, BlobSender, Window}; use timing::timestamp; +/// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; +/// minimum membership table size before we start purging dead nodes +const MIN_TABLE_SIZE: usize = 2; + pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { @@ -263,26 +267,29 @@ impl Crdt { /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf /// challenging part is that we are on a permissionless network pub fn purge(&mut self, now: u64) { + if self.table.len() <= MIN_TABLE_SIZE { + return; + } //wait for 4x as long as it would randomly take to reach our node //assuming everyone is waiting the same amount of time as this node let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; - let purge_set: Vec = self.alive + let dead_ids: Vec = self.alive .iter() - .filter_map(|(k, v)| { - if *k != self.me && (now - v) > limit { - trace!("purging {:?} {}", &k[..4], v); - Some((*k).clone()) + .filter_map(|(&k, v)| { + if k != self.me && (now - v) > limit { + info!("purge {:?} {}", &k[..4], now - v); + Some(k) } else { - trace!("purge skipped {:?} {}", &k[..4], v); + trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit); None } }) .collect(); - for p in purge_set.iter() { - self.alive.remove(p); - self.table.remove(p); - self.remote.remove(p); - self.local.remove(p); + for id in dead_ids.iter() { + self.alive.remove(id); + self.table.remove(id); + self.remote.remove(id); + self.local.remove(id); } } @@ -1054,8 +1061,19 @@ mod tests { assert_eq!(rv.0, nxt.gossip_addr); crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); - let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt2.id); + assert_ne!(nxt.id, nxt2.id); + crdt.insert(&nxt2); + let len = crdt.table.len() as u64; + assert_eq!(3, len); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(2, crdt.table.len()); } /// test window requests respond with the right blob, and do not overrun