purge validators we havent seen for a long time
This commit is contained in:
parent
7fe1fd2f95
commit
a7460ffbd1
68
src/crdt.rs
68
src/crdt.rs
|
@ -32,6 +32,9 @@ use std::sync::{Arc, RwLock};
|
|||
use std::thread::{sleep, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer::{BlobReceiver, BlobSender, Window};
|
||||
use timing::timestamp;
|
||||
|
||||
const GOSSIP_SLEEP_MILLIS: u64 = 100;
|
||||
|
||||
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
|
||||
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
|
||||
|
@ -180,6 +183,7 @@ pub struct Crdt {
|
|||
/// The value of the remote update index that I have last seen
|
||||
/// This Node will ask external nodes for updates since the value in this list
|
||||
pub remote: HashMap<PublicKey, u64>,
|
||||
pub alive: HashMap<PublicKey, u64>,
|
||||
pub update_index: u64,
|
||||
pub me: PublicKey,
|
||||
}
|
||||
|
@ -204,6 +208,7 @@ impl Crdt {
|
|||
table: HashMap::new(),
|
||||
local: HashMap::new(),
|
||||
remote: HashMap::new(),
|
||||
alive: HashMap::new(),
|
||||
me: me.id,
|
||||
update_index: 1,
|
||||
};
|
||||
|
@ -248,6 +253,35 @@ impl Crdt {
|
|||
self.table[&v.id].version
|
||||
);
|
||||
}
|
||||
//update the liveness table
|
||||
let now = timestamp();
|
||||
*self.alive.entry(v.id).or_insert(now) = now;
|
||||
}
|
||||
|
||||
/// purge old validators
|
||||
/// TODO: we need a robust membership protocol
|
||||
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
|
||||
/// challenging part is that we are on a permissionless network
|
||||
pub fn purge(&mut self, now: u64) {
|
||||
//wait for 4x as long as it would randomly take to reach our node
|
||||
//assuming everyone is waiting the same amount of time as this node
|
||||
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
|
||||
let purge_set: Vec<PublicKey> = self.alive
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if *k != self.me && (now - v) > limit {
|
||||
Some((*k).clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
for p in purge_set.iter() {
|
||||
self.alive.remove(p);
|
||||
self.table.remove(p);
|
||||
self.remote.remove(p);
|
||||
self.local.remove(p);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index_blobs(
|
||||
|
@ -540,7 +574,7 @@ impl Crdt {
|
|||
}
|
||||
//TODO: possibly tune this parameter
|
||||
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
|
||||
sleep(Duration::from_millis(100));
|
||||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -585,9 +619,9 @@ impl Crdt {
|
|||
) -> Option<SharedBlob> {
|
||||
match deserialize(&blob.data[..blob.meta.size]) {
|
||||
// TODO sigverify these
|
||||
Ok(Protocol::RequestUpdates(v, reqdata)) => {
|
||||
Ok(Protocol::RequestUpdates(v, from_rd)) => {
|
||||
trace!("RequestUpdates {}", v);
|
||||
let addr = reqdata.gossip_addr;
|
||||
let addr = from_rd.gossip_addr;
|
||||
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||
let (from, ups, data) = obj.read()
|
||||
.expect("'obj' read lock in RequestUpdates")
|
||||
|
@ -595,7 +629,7 @@ impl Crdt {
|
|||
trace!("get updates since response {} {}", v, data.len());
|
||||
let len = data.len();
|
||||
let rsp = Protocol::ReceiveUpdates(from, ups, data);
|
||||
obj.write().unwrap().insert(&reqdata);
|
||||
obj.write().unwrap().insert(&from_rd);
|
||||
if len < 1 {
|
||||
let me = obj.read().unwrap();
|
||||
trace!(
|
||||
|
@ -610,7 +644,7 @@ impl Crdt {
|
|||
"sending updates me {:?} len {} to {:?} {}",
|
||||
&obj.read().unwrap().me[..4],
|
||||
len,
|
||||
&reqdata.id[..4],
|
||||
&from_rd.id[..4],
|
||||
addr,
|
||||
);
|
||||
Some(r)
|
||||
|
@ -759,7 +793,7 @@ impl TestNode {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
|
||||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS};
|
||||
use packet::BlobRecycler;
|
||||
use result::Error;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
|
@ -998,6 +1032,28 @@ mod tests {
|
|||
assert!(one && two);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn purge_test() {
|
||||
let me = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap());
|
||||
let mut crdt = Crdt::new(me.clone());
|
||||
let nxt = ReplicatedData::new_entry_point("127.0.0.2:1234".parse().unwrap());
|
||||
crdt.insert(&nxt);
|
||||
let rv = crdt.gossip_request().unwrap();
|
||||
assert_eq!(rv.0, nxt.gossip_addr);
|
||||
let now = crdt.alive[&nxt.id];
|
||||
crdt.purge(now);
|
||||
let rv = crdt.gossip_request().unwrap();
|
||||
assert_eq!(rv.0, nxt.gossip_addr);
|
||||
|
||||
crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4);
|
||||
let rv = crdt.gossip_request().unwrap();
|
||||
assert_eq!(rv.0, nxt.gossip_addr);
|
||||
|
||||
crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4 + 1);
|
||||
let rv = crdt.gossip_request();
|
||||
assert_matches!(rv, Err(Error::CrdtTooSmall));
|
||||
}
|
||||
|
||||
/// test window requests respond with the right blob, and do not overrun
|
||||
#[test]
|
||||
fn run_window_request() {
|
||||
|
|
Loading…
Reference in New Issue