diff --git a/Cargo.toml b/Cargo.toml index a39a8a158..fcce844ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ unstable = [] ipv6 = [] cuda = [] erasure = [] -gossip_choose_weighted_peer = [] [dependencies] rayon = "1.0.0" diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 889e8709d..5642e0065 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -9,33 +9,65 @@ use std::collections::HashMap; pub const DEFAULT_WEIGHT: u32 = 1; pub trait ChooseGossipPeerStrategy { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result; + fn choose_peer<'a>(&self, options: Vec<&'a ReplicatedData>) -> Result<&'a ReplicatedData>; } pub struct ChooseRandomPeerStrategy<'a> { random: &'a Fn() -> u64, } -impl<'a> ChooseRandomPeerStrategy<'a> { +// Given a source of randomness "random", this strategy will randomly pick a validator +// from the input options. This strategy works in isolation, but doesn't leverage any +// rumors from the rest of the gossip network to make more informed decisions about +// which validators have more/less updates +impl<'a, 'b> ChooseRandomPeerStrategy<'a> { pub fn new(random: &'a Fn() -> u64) -> Self { ChooseRandomPeerStrategy { random } } } impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { - if options.len() < 1 { + fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { + if options.is_empty() { return Err(Error::CrdtTooSmall); } let n = ((self.random)() as usize) % options.len(); - Ok(options[n].clone()) + Ok(options[n]) } } +// This strategy uses rumors accumulated from the rest of the network to weight +// the importance of communicating with a particular validator based on cumulative network +// perceiption of the number of updates the validator has to offer. A validator is randomly +// picked based on a weighted sample from the pool of viable choices. The "weight", w, of a +// particular validator "v" is calculated as follows: +// +// w = [Sum for all i in I_v: (rumor_v(i) - observed(v)) * stake(i)] / +// [Sum for all i in I_v: Sum(stake(i))] +// +// where I_v is the set of all validators that returned a rumor about the update_index of +// validator "v", stake(i) is the size of the stake of validator "i", observed(v) is the +// observed update_index from the last direct communication validator "v", and +// rumor_v(i) is the rumored update_index of validator "v" propagated by fellow validator "i". + +// This could be a problem if there are validators with large stakes lying about their +// observed updates. There could also be a problem in network partitions, or even just +// when certain validators are disproportionately active, where we hear more rumors about +// certain clusters of nodes that then propagate more rumros about each other. Hopefully +// this can be resolved with a good baseline DEFAULT_WEIGHT, or by implementing lockout +// periods for very active validators in the future. + pub struct ChooseWeightedPeerStrategy<'a> { + // The map of last directly observed update_index for each active validator. + // This is how we get observed(v) from the formula above. remote: &'a HashMap, + // The map of rumored update_index for each active validator. Using the formula above, + // to find rumor_v(i), we would first look up "v" in the outer map, then look up + // "i" in the inner map, i.e. look up external_liveness[v][i] external_liveness: &'a HashMap>, + // A function returning the size of the stake for a particular validator, corresponds + // to stake(i) in the formula above. get_stake: &'a Fn(PublicKey) -> f64, } @@ -96,7 +128,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| { if vote < last_seen_index { - // This should never happen b/c we maintain the invariant that the indexes + // This should never happen because we maintain the invariant that the indexes // in the external_liveness table are always greater than the corresponding // indexes in the remote table, if the index exists in the remote table at all. @@ -140,7 +172,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.len() < 1 { return Err(Error::CrdtTooSmall); } @@ -148,16 +180,11 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { let mut weighted_peers = vec![]; for peer in options { let weight = self.calculate_weighted_remote_index(peer.id); - weighted_peers.push(Weighted { - weight: weight, - item: peer, - }); + weighted_peers.push(Weighted { weight, item: peer }); } let mut rng = thread_rng(); - Ok(WeightedChoice::new(&mut weighted_peers) - .ind_sample(&mut rng) - .clone()) + Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng)) } } @@ -300,7 +327,7 @@ mod tests { let result = weighted_strategy.calculate_weighted_remote_index(key1); - // If nobody has seen a newer update then rever to default + // If nobody has seen a newer update then revert to default assert_eq!(result, DEFAULT_WEIGHT); } } diff --git a/src/crdt.rs b/src/crdt.rs index a9b125444..cce92f017 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,8 +15,9 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; -use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, - ChooseWeightedPeerStrategy}; +use choose_gossip_peer_strategy::{ + ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, ChooseWeightedPeerStrategy, +}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -489,9 +490,9 @@ impl Crdt { .expect("rdr.read_u64 in fn random") } - // TODO: fill in with real implmentation wonce staking is implemented + // TODO: fill in with real implmentation once staking is implemented fn get_stake(id: PublicKey) -> f64 { - return 1.0; + 1.0 } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { @@ -530,10 +531,6 @@ impl Crdt { fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); - #[cfg(not(feature = "choose_gossip_peer_strategy"))] - let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random); - - #[cfg(feature = "choose_gossip_peer_strategy")] let choose_peer_strategy = ChooseWeightedPeerStrategy::new( &self.remote, &self.external_liveness, @@ -636,7 +633,7 @@ impl Crdt { self.insert(&v); } - for (pk, external_remote_index) in external_liveness.iter() { + for (pk, external_remote_index) in external_liveness { let remote_entry = if let Some(v) = self.remote.get(pk) { *v } else { diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index a25eeb8e8..b2f911d35 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -185,7 +185,7 @@ pub fn crdt_retransmit() { } #[test] -fn check_external_liveness_table() { +fn test_external_liveness_table() { logger::setup(); let c1_c4_exit = Arc::new(AtomicBool::new(false)); let c2_c3_exit = Arc::new(AtomicBool::new(false)); @@ -223,7 +223,8 @@ fn check_external_liveness_table() { trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { - done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + done = c1.read().unwrap().table.len() == 3 + && c2.read().unwrap().table.len() == 3 && c3.read().unwrap().table.len() == 3; if done { break; @@ -244,12 +245,12 @@ fn check_external_liveness_table() { // Make sure liveness table entry contains correct result for c2 let c2_index_result_for_c4 = liveness_map.get(&c2_id); assert!(c2_index_result_for_c4.is_some()); - assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4); + assert_eq!(*(c2_index_result_for_c4.unwrap()), c2_index_for_c4); // Make sure liveness table entry contains correct result for c3 let c3_index_result_for_c4 = liveness_map.get(&c3_id); assert!(c3_index_result_for_c4.is_some()); - assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4); + assert_eq!(*(c3_index_result_for_c4.unwrap()), c3_index_for_c4); } // Shutdown validators c2 and c3 @@ -258,7 +259,7 @@ fn check_external_liveness_table() { threads.extend(dr2.thread_hdls.into_iter()); threads.extend(dr3.thread_hdls.into_iter()); - for t in threads.into_iter() { + for t in threads { t.join().unwrap(); } @@ -267,7 +268,10 @@ fn check_external_liveness_table() { c4.write().unwrap().insert(&c1_data); c4.write().unwrap().set_leader(c1_data.id); for _ in 0..30 { - done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none(); + done = c1.read() + .unwrap() + .get_external_liveness_entry(&c4_id) + .is_none(); if done { break; } @@ -281,7 +285,7 @@ fn check_external_liveness_table() { threads.extend(dr1.thread_hdls.into_iter()); threads.extend(dr4.thread_hdls.into_iter()); - for t in threads.into_iter() { + for t in threads { t.join().unwrap(); } -} \ No newline at end of file +}