min table size before purge
This commit is contained in:
parent
e85bf2f2d5
commit
ee54ce4727
44
src/crdt.rs
44
src/crdt.rs
|
@ -34,8 +34,12 @@ use std::time::Duration;
|
||||||
use streamer::{BlobReceiver, BlobSender, Window};
|
use streamer::{BlobReceiver, BlobSender, Window};
|
||||||
use timing::timestamp;
|
use timing::timestamp;
|
||||||
|
|
||||||
|
/// milliseconds we sleep for between gossip requests
|
||||||
const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
||||||
|
|
||||||
|
/// minimum membership table size before we start purging dead nodes
|
||||||
|
const MIN_TABLE_SIZE: usize = 2;
|
||||||
|
|
||||||
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
||||||
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
|
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
|
||||||
if let Some(addrstr) = optstr {
|
if let Some(addrstr) = optstr {
|
||||||
|
@ -263,26 +267,29 @@ impl Crdt {
|
||||||
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
|
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
|
||||||
/// challenging part is that we are on a permissionless network
|
/// challenging part is that we are on a permissionless network
|
||||||
pub fn purge(&mut self, now: u64) {
|
pub fn purge(&mut self, now: u64) {
|
||||||
|
if self.table.len() <= MIN_TABLE_SIZE {
|
||||||
|
return;
|
||||||
|
}
|
||||||
//wait for 4x as long as it would randomly take to reach our node
|
//wait for 4x as long as it would randomly take to reach our node
|
||||||
//assuming everyone is waiting the same amount of time as this node
|
//assuming everyone is waiting the same amount of time as this node
|
||||||
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
|
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
|
||||||
let purge_set: Vec<PublicKey> = self.alive
|
let dead_ids: Vec<PublicKey> = self.alive
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(k, v)| {
|
.filter_map(|(&k, v)| {
|
||||||
if *k != self.me && (now - v) > limit {
|
if k != self.me && (now - v) > limit {
|
||||||
trace!("purging {:?} {}", &k[..4], v);
|
info!("purge {:?} {}", &k[..4], now - v);
|
||||||
Some((*k).clone())
|
Some(k)
|
||||||
} else {
|
} else {
|
||||||
trace!("purge skipped {:?} {}", &k[..4], v);
|
trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
for p in purge_set.iter() {
|
for id in dead_ids.iter() {
|
||||||
self.alive.remove(p);
|
self.alive.remove(id);
|
||||||
self.table.remove(p);
|
self.table.remove(id);
|
||||||
self.remote.remove(p);
|
self.remote.remove(id);
|
||||||
self.local.remove(p);
|
self.local.remove(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1054,8 +1061,19 @@ mod tests {
|
||||||
assert_eq!(rv.0, nxt.gossip_addr);
|
assert_eq!(rv.0, nxt.gossip_addr);
|
||||||
|
|
||||||
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1);
|
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1);
|
||||||
let rv = crdt.gossip_request();
|
let rv = crdt.gossip_request().unwrap();
|
||||||
assert_matches!(rv, Err(Error::CrdtTooSmall));
|
assert_eq!(rv.0, nxt.gossip_addr);
|
||||||
|
|
||||||
|
let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||||
|
assert_ne!(me.id, nxt2.id);
|
||||||
|
assert_ne!(nxt.id, nxt2.id);
|
||||||
|
crdt.insert(&nxt2);
|
||||||
|
let len = crdt.table.len() as u64;
|
||||||
|
assert_eq!(3, len);
|
||||||
|
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1);
|
||||||
|
let rv = crdt.gossip_request().unwrap();
|
||||||
|
assert_eq!(rv.0, nxt.gossip_addr);
|
||||||
|
assert_eq!(2, crdt.table.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// test window requests respond with the right blob, and do not overrun
|
/// test window requests respond with the right blob, and do not overrun
|
||||||
|
|
Loading…
Reference in New Issue