diff --git a/Cargo.toml b/Cargo.toml index fcce844ba..a39a8a158 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ unstable = [] ipv6 = [] cuda = [] erasure = [] +gossip_choose_weighted_peer = [] [dependencies] rayon = "1.0.0" diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs new file mode 100644 index 000000000..20a3bd0b4 --- /dev/null +++ b/src/choose_gossip_peer_strategy.rs @@ -0,0 +1,323 @@ +use crdt::ReplicatedData; +use rand::thread_rng; +use rand::distributions::{IndependentSample, Weighted, WeightedChoice}; +use result::{Error, Result}; +use signature::PublicKey; +use std; +use std::collections::HashMap; + +pub const DEFAULT_WEIGHT: u32 = 1; + +pub trait ChooseGossipPeerStrategy { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> + Result; +} + +pub struct ChooseRandomPeerStrategy<'a> { + random: &'a Fn() -> u64, +} + +impl<'a> ChooseRandomPeerStrategy<'a> { + pub fn new(random: &'a Fn() -> u64,) -> Self { + ChooseRandomPeerStrategy { random } + } +} + +impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + if options.len() < 1 { + return Err(Error::CrdtTooSmall); + } + + let n = ((self.random)() as usize) % options.len(); + Ok(options[n].clone()) + } +} + +pub struct ChooseWeightedPeerStrategy<'a> { + remote: &'a HashMap, + external_liveness: &'a HashMap>, + get_stake: &'a Fn(PublicKey) -> f64, +} + +impl<'a> ChooseWeightedPeerStrategy<'a> { + pub fn new( + remote: &'a HashMap, + external_liveness: &'a HashMap>, + get_stake: &'a Fn(PublicKey) -> f64, + ) -> Self + { + ChooseWeightedPeerStrategy { remote, external_liveness, get_stake } + } + + fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 { + let mut last_seen_index = 0; + // If the peer is not in our remote table, then we leave last_seen_index as zero. + // Only happens when a peer appears in our crdt.table but not in our crdt.remote, + // which means a validator was directly injected into our crdt.table + if let Some(index) = self.remote.get(&peer_id) { + last_seen_index = *index; + } + + let liveness_entry = self.external_liveness.get(&peer_id); + if liveness_entry.is_none(){ + return DEFAULT_WEIGHT; + } + + let votes = liveness_entry.unwrap(); + + if votes.is_empty() { + return DEFAULT_WEIGHT; + } + + // Calculate the weighted average of the rumors + let mut relevant_votes = vec![]; + + let total_stake = votes.iter().fold( + 0.0, + |total_stake, (&id, &vote)| { + let stake = (self.get_stake)(id); + // If the total stake is going to overflow u64, pick + // the larger of either the current total_stake, or the + // new stake, this way we are guaranteed to get at least u64/2 + // sample of stake in our weighted calculation + if std::f64::MAX - total_stake < stake { + if stake > total_stake { + relevant_votes = vec![(stake, vote)]; + stake + } else { + total_stake + } + } else { + relevant_votes.push((stake, vote)); + total_stake + stake + } + } + ); + + let weighted_vote = relevant_votes.iter().fold( + 0.0, + |sum, &(stake, vote)| { + if vote < last_seen_index { + // This should never happen b/c we maintain the invariant that the indexes + // in the external_liveness table are always greater than the corresponding + // indexes in the remote table, if the index exists in the remote table at all. + + // Case 1: Attempt to insert bigger index into the "external_liveness" table + // happens after an insertion into the "remote" table. In this case, + // (see apply_updates()) function, we prevent the insertion if the entry + // in the remote table >= the atempted insertion into the "external" liveness + // table. + + // Case 2: Bigger index in the "external_liveness" table inserted before + // a smaller insertion into the "remote" table. We clear the corresponding + // "external_liveness" table entry on all insertions into the "remote" table + // See apply_updates() function. + + warn!("weighted peer index was smaller than local entry in remote table"); + return sum; + } + + let vote_difference = (vote - last_seen_index) as f64; + let new_weight = vote_difference * (stake / total_stake); + + if std::f64::MAX - sum < new_weight { + return f64::max(new_weight, sum); + } + + sum + new_weight + }, + ); + + // Return u32 b/c the weighted sampling API from rand::distributions + // only takes u32 for weights + if weighted_vote >= std::u32::MAX as f64 { + return std::u32::MAX; + } + + // If the weighted rumors we've heard about aren't any greater than + // what we've directly learned from the last time we communicated with the + // peer (i.e. weighted_vote == 0), then return a weight of 1. + // Otherwise, return the calculated weight. + weighted_vote as u32 + DEFAULT_WEIGHT + } +} + +impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + if options.len() < 1 { + return Err(Error::CrdtTooSmall); + } + + let mut weighted_peers = vec![]; + for peer in options { + let weight = self.calculate_weighted_remote_index(peer.id); + weighted_peers.push(Weighted { + weight: weight, + item: peer, + }); + } + + let mut rng = thread_rng(); + Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone()) + } +} + +#[cfg(test)] +mod tests { + use logger; + use signature::{KeyPair, KeyPairUtil, PublicKey}; + use std; + use std::collections::HashMap; + use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; + + fn get_stake(id: PublicKey) -> f64 { + return 1.0; + } + + #[test] + fn test_default() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let external_liveness: HashMap> = HashMap::new(); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + // If external_liveness table doesn't contain this entry, + // return the default weight + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, DEFAULT_WEIGHT); + } + + #[test] + fn test_only_external_liveness() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + let key2 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // If only the liveness table contains the entry, should return the + // weighted liveness entries + let test_value : u32 = 5; + let mut rumors: HashMap = HashMap::new(); + rumors.insert(key2, test_value as u64); + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, test_value + DEFAULT_WEIGHT); + } + + #[test] + fn test_overflow_votes() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + let key2 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // If the vote index is greater than u32::MAX, default to u32::MAX + let test_value = (std::u32::MAX as u64) + 10; + let mut rumors: HashMap = HashMap::new(); + rumors.insert(key2, test_value); + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, std::u32::MAX); + } + + #[test] + fn test_many_validators() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let mut remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // Test many validators' rumors in external_liveness + let num_peers = 10; + let mut rumors: HashMap = HashMap::new(); + + remote.insert(key1, 0); + + for i in 0..num_peers { + let pk = KeyPair::new().pubkey(); + rumors.insert(pk, i); + } + + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, (num_peers/2) as u32); + } + + #[test] + fn test_many_validators2() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let mut remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // Test many validators' rumors in external_liveness + let num_peers = 10; + let old_index = 20; + let mut rumors: HashMap = HashMap::new(); + + remote.insert(key1, old_index); + + for i in 0..num_peers { + let pk = KeyPair::new().pubkey(); + rumors.insert(pk, old_index); + } + + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + + // If nobody has seen a newer update then rever to default + assert_eq!(result, DEFAULT_WEIGHT); + } +} \ No newline at end of file diff --git a/src/crdt.rs b/src/crdt.rs index 307d80a2e..bb2031503 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,6 +15,11 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; +use choose_gossip_peer_strategy::{ + ChooseGossipPeerStrategy, + ChooseRandomPeerStrategy, + ChooseWeightedPeerStrategy, +}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -190,6 +195,7 @@ pub struct Crdt { pub alive: HashMap, pub update_index: u64, pub me: PublicKey, + external_liveness: HashMap>, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] @@ -200,7 +206,7 @@ enum Protocol { RequestUpdates(u64, ReplicatedData), //TODO might need a since? /// from id, form's last update index, ReplicatedData - ReceiveUpdates(PublicKey, u64, Vec), + ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), /// ask for a missing index RequestWindowIndex(ReplicatedData, u64), } @@ -213,6 +219,7 @@ impl Crdt { local: HashMap::new(), remote: HashMap::new(), alive: HashMap::new(), + external_liveness: HashMap::new(), me: me.id, update_index: 1, }; @@ -234,6 +241,14 @@ impl Crdt { self.insert(&me); } + pub fn get_external_liveness_entry( + &self, + key: &PublicKey, + ) -> Option<&HashMap> + { + self.external_liveness.get(key) + } + pub fn insert(&mut self, v: &ReplicatedData) { // TODO check that last_verified types are always increasing if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { @@ -270,9 +285,11 @@ impl Crdt { if self.table.len() <= MIN_TABLE_SIZE { return; } + //wait for 4x as long as it would randomly take to reach our node //assuming everyone is waiting the same amount of time as this node let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + let dead_ids: Vec = self.alive .iter() .filter_map(|(&k, v)| { @@ -285,11 +302,13 @@ impl Crdt { } }) .collect(); + for id in dead_ids.iter() { self.alive.remove(id); self.table.remove(id); self.remote.remove(id); self.local.remove(id); + self.external_liveness.remove(id); } } @@ -473,6 +492,12 @@ impl Crdt { rdr.read_u64::() .expect("rdr.read_u64 in fn random") } + + // TODO: fill in with real implmentation wonce staking is implemented + fn get_stake(id: PublicKey) -> f64 { + return 1.0; + } + fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); let data = self.table @@ -508,16 +533,32 @@ impl Crdt { /// * B - RequestUpdates protocol message fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); - if options.len() < 1 { - trace!( - "crdt too small for gossip {:?} {}", - &self.me[..4], - self.table.len() - ); - return Err(Error::CrdtTooSmall); - } - let n = (Self::random() as usize) % options.len(); - let v = options[n].clone(); + + #[cfg(not(feature = "choose_gossip_peer_strategy"))] + let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random); + + #[cfg(feature = "choose_gossip_peer_strategy")] + let choose_peer_strategy = ChooseWeightedPeerStrategy::new( + &self.remote, + &self.external_liveness, + &Self::get_stake, + ); + + let choose_peer_result = choose_peer_strategy.choose_peer(options); + + let v = match choose_peer_result { + Ok(peer) => peer, + Err(Error::CrdtTooSmall) => { + trace!( + "crdt too small for gossip {:?} {}", + &self.me[..4], + self.table.len() + ); + return Err(Error::CrdtTooSmall); + }, + Err(e) => return Err(e), + }; + let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); trace!( @@ -526,6 +567,7 @@ impl Crdt { &v.id[..4], v.gossip_addr ); + Ok((v.gossip_addr, req)) } @@ -543,6 +585,7 @@ impl Crdt { let (remote_gossip_addr, req) = obj.read() .expect("'obj' read lock in fn run_gossip") .gossip_request()?; + // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; @@ -583,14 +626,43 @@ impl Crdt { /// * `from` - identity of the sender of the updates /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data - fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) { + fn apply_updates( + &mut self, from: PublicKey, + update_index: u64, + data: &[ReplicatedData], + external_liveness: &[(PublicKey, u64)], + ){ trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update for v in data { self.insert(&v); } + + for (pk, external_remote_index) in external_liveness.iter() { + let remote_entry = + if let Some(v) = self.remote.get(pk) { + *v + } else { + 0 + }; + + if remote_entry >= *external_remote_index { + continue; + } + + let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new()); + let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); + if *external_remote_index > peer_index { + liveness_entry.insert(from, *external_remote_index); + } + } + *self.remote.entry(from).or_insert(update_index) = update_index; + + // Clear the remote liveness table for this node, b/c we've heard directly from them + // so we don't need to rely on rumors + self.external_liveness.remove(&from); } /// randomly pick a node and ask them for updates asynchronously @@ -682,13 +754,14 @@ impl Crdt { Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); let addr = from_rd.gossip_addr; - // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read() - .expect("'obj' read lock in RequestUpdates") - .get_updates_since(v); + let me = obj.read().unwrap(); + // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` + let (from, ups, data) = me.get_updates_since(v); + let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); - let rsp = Protocol::ReceiveUpdates(from, ups, data); + let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness); obj.write().unwrap().insert(&from_rd); if len < 1 { let me = obj.read().unwrap(); @@ -713,11 +786,11 @@ impl Crdt { None } } - Ok(Protocol::ReceiveUpdates(from, ups, data)) => { + Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => { trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len()); obj.write() .expect("'obj' write lock in ReceiveUpdates") - .apply_updates(from, ups, &data); + .apply_updates(from, ups, &data, &external_liveness); None } Ok(Protocol::RequestWindowIndex(from, ix)) => { @@ -956,7 +1029,7 @@ mod tests { sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) ); let mut crdt2 = Crdt::new(d2.clone()); - crdt2.apply_updates(key, ix, &ups); + crdt2.apply_updates(key, ix, &ups, &vec![]); assert_eq!(crdt2.table.values().len(), 3); assert_eq!( sorted(&crdt2.table.values().map(|x| x.clone()).collect()), diff --git a/src/lib.rs b/src/lib.rs index f2bcaa454..d551c68b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ pub mod transaction; pub mod tvu; pub mod window_stage; pub mod write_stage; +mod choose_gossip_peer_strategy; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index b750dbc72..a25eeb8e8 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -183,3 +183,105 @@ pub fn crdt_retransmit() { t.join().unwrap(); } } + +#[test] +fn check_external_liveness_table() { + logger::setup(); + let c1_c4_exit = Arc::new(AtomicBool::new(false)); + let c2_c3_exit = Arc::new(AtomicBool::new(false)); + + trace!("c1:"); + let (c1, dr1, _) = test_node(c1_c4_exit.clone()); + trace!("c2:"); + let (c2, dr2, _) = test_node(c2_c3_exit.clone()); + trace!("c3:"); + let (c3, dr3, _) = test_node(c2_c3_exit.clone()); + trace!("c4:"); + let (c4, dr4, _) = test_node(c1_c4_exit.clone()); + + let c1_data = c1.read().unwrap().my_data().clone(); + c1.write().unwrap().set_leader(c1_data.id); + + let c2_id = c2.read().unwrap().me; + let c3_id = c3.read().unwrap().me; + let c4_id = c4.read().unwrap().me; + + // Insert the remote data about c4 + let c2_index_for_c4 = 10; + c2.write().unwrap().remote.insert(c4_id, c2_index_for_c4); + let c3_index_for_c4 = 20; + c3.write().unwrap().remote.insert(c4_id, c3_index_for_c4); + + // Set up the initial network topology + c2.write().unwrap().insert(&c1_data); + c3.write().unwrap().insert(&c1_data); + + c2.write().unwrap().set_leader(c1_data.id); + c3.write().unwrap().set_leader(c1_data.id); + + // Wait to converge + trace!("waiting to converge:"); + let mut done = false; + for _ in 0..30 { + done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + && c3.read().unwrap().table.len() == 3; + if done { + break; + } + sleep(Duration::new(1, 0)); + } + assert!(done); + + // Validate c1's external liveness table, then release lock rc1 + { + let rc1 = c1.read().unwrap(); + let el = rc1.get_external_liveness_entry(&c4.read().unwrap().me); + + // Make sure liveness table entry for c4 exists on node c1 + assert!(el.is_some()); + let liveness_map = el.unwrap(); + + // Make sure liveness table entry contains correct result for c2 + let c2_index_result_for_c4 = liveness_map.get(&c2_id); + assert!(c2_index_result_for_c4.is_some()); + assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4); + + // Make sure liveness table entry contains correct result for c3 + let c3_index_result_for_c4 = liveness_map.get(&c3_id); + assert!(c3_index_result_for_c4.is_some()); + assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4); + } + + // Shutdown validators c2 and c3 + c2_c3_exit.store(true, Ordering::Relaxed); + let mut threads = vec![]; + threads.extend(dr2.thread_hdls.into_iter()); + threads.extend(dr3.thread_hdls.into_iter()); + + for t in threads.into_iter() { + t.join().unwrap(); + } + + // Allow communication between c1 and c4, make sure that c1's external_liveness table + // entry for c4 gets cleared + c4.write().unwrap().insert(&c1_data); + c4.write().unwrap().set_leader(c1_data.id); + for _ in 0..30 { + done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none(); + if done { + break; + } + sleep(Duration::new(1, 0)); + } + assert!(done); + + // Shutdown validators c1 and c4 + c1_c4_exit.store(true, Ordering::Relaxed); + let mut threads = vec![]; + threads.extend(dr1.thread_hdls.into_iter()); + threads.extend(dr4.thread_hdls.into_iter()); + + for t in threads.into_iter() { + t.join().unwrap(); + } +} \ No newline at end of file