Purge leader (#687)

* purge leader

* fixup!

* fixup!
This commit is contained in:
anatoly yakovenko 2018-07-18 14:39:43 -07:00 committed by Michael Vines
parent fc1dfd86d2
commit 7f810a29ff
1 changed files with 33 additions and 19 deletions

View File

@ -466,29 +466,12 @@ impl Crdt {
return;
}
let leader_id = self.leader_data().unwrap().id;
let limit = GOSSIP_PURGE_MILLIS;
let dead_ids: Vec<PublicKey> = self.alive
.iter()
.filter_map(|(&k, v)| {
if k != self.me && (now - v) > limit {
if leader_id == k {
info!(
"{:x}: PURGE LEADER {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
} else {
info!(
"{:x}: PURGE {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
}
Some(k)
} else {
trace!(
"{:x} purge skipped {:x} {} {}",
@ -510,9 +493,19 @@ impl Crdt {
self.remote.remove(id);
self.local.remove(id);
self.external_liveness.remove(id);
info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id));
for map in self.external_liveness.values_mut() {
map.remove(id);
}
if *id == leader_id {
info!(
"{:x}: PURGE LEADER {:x}",
self.debug_id(),
make_debug_id(id),
);
inc_new_counter!("crdt-purge-purged_leader", 1, 1);
self.set_leader(PublicKey::default());
}
}
}
@ -1245,7 +1238,7 @@ mod tests {
use logger;
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
use signature::{KeyPair, KeyPairUtil, PublicKey};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
@ -1742,6 +1735,27 @@ mod tests {
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp);
}
#[test]
fn purge_leader_test() {
logger::setup();
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id);
crdt.insert(&nxt);
crdt.set_leader(nxt.id);
let now = crdt.alive[&nxt.id];
let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
crdt.insert(&nxt2);
while now == crdt.alive[&nxt2.id] {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
crdt.insert(&nxt2);
}
let len = crdt.table.len() as u64;
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
assert_eq!(len as usize - 1, crdt.table.len());
assert_eq!(crdt.my_data().leader_id, PublicKey::default());
}
/// test window requests respond with the right blob, and do not overrun
#[test]