This commit is contained in:
Anatoly Yakovenko 2018-07-02 11:23:20 -07:00 committed by Greg Fitzgerald
parent fa247196c0
commit 430d9d9314
2 changed files with 49 additions and 3 deletions

View File

@ -27,7 +27,7 @@ fn print_usage(program: &str, opts: Options) {
print!("{}", opts.usage(&brief));
}
fn main() {
fn main() -> () {
env_logger::init();
let mut opts = Options::new();
opts.optflag("h", "help", "print help");

View File

@ -37,6 +37,7 @@ use timing::timestamp;
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
//const GOSSIP_MIN_PURGE_MILLIS: u64 = 15000;
/// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2;
@ -226,14 +227,15 @@ impl Crdt {
pub fn my_data(&self) -> &ReplicatedData {
&self.table[&self.me]
}
pub fn leader_data(&self) -> &ReplicatedData {
&self.table[&self.table[&self.me].current_leader_id]
pub fn leader_data(&self) -> Option<&ReplicatedData> {
self.table.get(&(self.table[&self.me].current_leader_id))
}
pub fn set_leader(&mut self, key: PublicKey) -> () {
let mut me = self.my_data().clone();
me.current_leader_id = key;
me.version += 1;
info!("setting leader to {:?}", &key[..4]);
self.insert(&me);
}
@ -281,6 +283,7 @@ impl Crdt {
//wait for 4x as long as it would randomly take to reach our node
//assuming everyone is waiting the same amount of time as this node
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
//let limit = std::cmp::max(limit, GOSSIP_MIN_PURGE_MILLIS);
let dead_ids: Vec<PublicKey> = self.alive
.iter()
@ -596,6 +599,9 @@ impl Crdt {
trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt);
}
let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect();
if sorted.len() > 0 {
info!("sorted leaders {:?}", sorted);
}
sorted.sort_by_key(|a| a.1);
sorted.last().map(|a| *a.0)
}
@ -919,6 +925,46 @@ impl TestNode {
},
}
}
pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode {
let mut local_gossip_addr = bind_addr.clone();
local_gossip_addr.set_port(data.gossip_addr.port());
let mut local_replicate_addr = bind_addr.clone();
local_replicate_addr.set_port(data.replicate_addr.port());
let mut local_requests_addr = bind_addr.clone();
local_requests_addr.set_port(data.requests_addr.port());
let mut local_transactions_addr = bind_addr.clone();
local_transactions_addr.set_port(data.transactions_addr.port());
let mut local_repair_addr = bind_addr.clone();
local_repair_addr.set_port(data.repair_addr.port());
let transaction = UdpSocket::bind(local_transactions_addr).unwrap();
let gossip = UdpSocket::bind(local_gossip_addr).unwrap();
let replicate = UdpSocket::bind(local_replicate_addr).unwrap();
let requests = UdpSocket::bind(local_requests_addr).unwrap();
let repair = UdpSocket::bind(local_repair_addr).unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
TestNode {
data: data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
respond,
broadcast,
repair,
retransmit,
},
}
}
}
#[cfg(test)]