added remote table to update respones

This commit is contained in:
OEM Configuration (temporary user) 2018-06-18 23:50:41 -07:00 committed by Greg Fitzgerald
parent 8f4ce1e8d0
commit 08bcb62016
5 changed files with 520 additions and 20 deletions

View File

@ -53,6 +53,7 @@ unstable = []
ipv6 = []
cuda = []
erasure = []
gossip_choose_weighted_peer = []
[dependencies]
rayon = "1.0.0"

View File

@ -0,0 +1,323 @@
use crdt::ReplicatedData;
use rand::thread_rng;
use rand::distributions::{IndependentSample, Weighted, WeightedChoice};
use result::{Error, Result};
use signature::PublicKey;
use std;
use std::collections::HashMap;
pub const DEFAULT_WEIGHT: u32 = 1;
pub trait ChooseGossipPeerStrategy {
fn choose_peer(&self, options: Vec<&ReplicatedData>) ->
Result<ReplicatedData>;
}
pub struct ChooseRandomPeerStrategy<'a> {
random: &'a Fn() -> u64,
}
impl<'a> ChooseRandomPeerStrategy<'a> {
pub fn new(random: &'a Fn() -> u64,) -> Self {
ChooseRandomPeerStrategy { random }
}
}
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
if options.len() < 1 {
return Err(Error::CrdtTooSmall);
}
let n = ((self.random)() as usize) % options.len();
Ok(options[n].clone())
}
}
pub struct ChooseWeightedPeerStrategy<'a> {
remote: &'a HashMap<PublicKey, u64>,
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
get_stake: &'a Fn(PublicKey) -> f64,
}
impl<'a> ChooseWeightedPeerStrategy<'a> {
pub fn new(
remote: &'a HashMap<PublicKey, u64>,
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
get_stake: &'a Fn(PublicKey) -> f64,
) -> Self
{
ChooseWeightedPeerStrategy { remote, external_liveness, get_stake }
}
fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 {
let mut last_seen_index = 0;
// If the peer is not in our remote table, then we leave last_seen_index as zero.
// Only happens when a peer appears in our crdt.table but not in our crdt.remote,
// which means a validator was directly injected into our crdt.table
if let Some(index) = self.remote.get(&peer_id) {
last_seen_index = *index;
}
let liveness_entry = self.external_liveness.get(&peer_id);
if liveness_entry.is_none(){
return DEFAULT_WEIGHT;
}
let votes = liveness_entry.unwrap();
if votes.is_empty() {
return DEFAULT_WEIGHT;
}
// Calculate the weighted average of the rumors
let mut relevant_votes = vec![];
let total_stake = votes.iter().fold(
0.0,
|total_stake, (&id, &vote)| {
let stake = (self.get_stake)(id);
// If the total stake is going to overflow u64, pick
// the larger of either the current total_stake, or the
// new stake, this way we are guaranteed to get at least u64/2
// sample of stake in our weighted calculation
if std::f64::MAX - total_stake < stake {
if stake > total_stake {
relevant_votes = vec![(stake, vote)];
stake
} else {
total_stake
}
} else {
relevant_votes.push((stake, vote));
total_stake + stake
}
}
);
let weighted_vote = relevant_votes.iter().fold(
0.0,
|sum, &(stake, vote)| {
if vote < last_seen_index {
// This should never happen b/c we maintain the invariant that the indexes
// in the external_liveness table are always greater than the corresponding
// indexes in the remote table, if the index exists in the remote table at all.
// Case 1: Attempt to insert bigger index into the "external_liveness" table
// happens after an insertion into the "remote" table. In this case,
// (see apply_updates()) function, we prevent the insertion if the entry
// in the remote table >= the atempted insertion into the "external" liveness
// table.
// Case 2: Bigger index in the "external_liveness" table inserted before
// a smaller insertion into the "remote" table. We clear the corresponding
// "external_liveness" table entry on all insertions into the "remote" table
// See apply_updates() function.
warn!("weighted peer index was smaller than local entry in remote table");
return sum;
}
let vote_difference = (vote - last_seen_index) as f64;
let new_weight = vote_difference * (stake / total_stake);
if std::f64::MAX - sum < new_weight {
return f64::max(new_weight, sum);
}
sum + new_weight
},
);
// Return u32 b/c the weighted sampling API from rand::distributions
// only takes u32 for weights
if weighted_vote >= std::u32::MAX as f64 {
return std::u32::MAX;
}
// If the weighted rumors we've heard about aren't any greater than
// what we've directly learned from the last time we communicated with the
// peer (i.e. weighted_vote == 0), then return a weight of 1.
// Otherwise, return the calculated weight.
weighted_vote as u32 + DEFAULT_WEIGHT
}
}
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
if options.len() < 1 {
return Err(Error::CrdtTooSmall);
}
let mut weighted_peers = vec![];
for peer in options {
let weight = self.calculate_weighted_remote_index(peer.id);
weighted_peers.push(Weighted {
weight: weight,
item: peer,
});
}
let mut rng = thread_rng();
Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone())
}
}
#[cfg(test)]
mod tests {
use logger;
use signature::{KeyPair, KeyPairUtil, PublicKey};
use std;
use std::collections::HashMap;
use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT};
fn get_stake(id: PublicKey) -> f64 {
return 1.0;
}
#[test]
fn test_default() {
logger::setup();
// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let remote: HashMap<PublicKey, u64> = HashMap::new();
let external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
let weighted_strategy = ChooseWeightedPeerStrategy::new(
&remote,
&external_liveness,
&get_stake,
);
// If external_liveness table doesn't contain this entry,
// return the default weight
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, DEFAULT_WEIGHT);
}
#[test]
fn test_only_external_liveness() {
logger::setup();
// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let key2 = KeyPair::new().pubkey();
let remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
// If only the liveness table contains the entry, should return the
// weighted liveness entries
let test_value : u32 = 5;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
rumors.insert(key2, test_value as u64);
external_liveness.insert(key1, rumors);
let weighted_strategy = ChooseWeightedPeerStrategy::new(
&remote,
&external_liveness,
&get_stake,
);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, test_value + DEFAULT_WEIGHT);
}
#[test]
fn test_overflow_votes() {
logger::setup();
// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let key2 = KeyPair::new().pubkey();
let remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
// If the vote index is greater than u32::MAX, default to u32::MAX
let test_value = (std::u32::MAX as u64) + 10;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
rumors.insert(key2, test_value);
external_liveness.insert(key1, rumors);
let weighted_strategy = ChooseWeightedPeerStrategy::new(
&remote,
&external_liveness,
&get_stake,
);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, std::u32::MAX);
}
#[test]
fn test_many_validators() {
logger::setup();
// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let mut remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
// Test many validators' rumors in external_liveness
let num_peers = 10;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
remote.insert(key1, 0);
for i in 0..num_peers {
let pk = KeyPair::new().pubkey();
rumors.insert(pk, i);
}
external_liveness.insert(key1, rumors);
let weighted_strategy = ChooseWeightedPeerStrategy::new(
&remote,
&external_liveness,
&get_stake,
);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, (num_peers/2) as u32);
}
#[test]
fn test_many_validators2() {
logger::setup();
// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let mut remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
// Test many validators' rumors in external_liveness
let num_peers = 10;
let old_index = 20;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
remote.insert(key1, old_index);
for i in 0..num_peers {
let pk = KeyPair::new().pubkey();
rumors.insert(pk, old_index);
}
external_liveness.insert(key1, rumors);
let weighted_strategy = ChooseWeightedPeerStrategy::new(
&remote,
&external_liveness,
&get_stake,
);
let result = weighted_strategy.calculate_weighted_remote_index(key1);
// If nobody has seen a newer update then rever to default
assert_eq!(result, DEFAULT_WEIGHT);
}
}

View File

@ -15,6 +15,11 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{
ChooseGossipPeerStrategy,
ChooseRandomPeerStrategy,
ChooseWeightedPeerStrategy,
};
use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink;
@ -190,6 +195,7 @@ pub struct Crdt {
pub alive: HashMap<PublicKey, u64>,
pub update_index: u64,
pub me: PublicKey,
external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>>,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)]
@ -200,7 +206,7 @@ enum Protocol {
RequestUpdates(u64, ReplicatedData),
//TODO might need a since?
/// from id, form's last update index, ReplicatedData
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>, Vec<(PublicKey, u64)>),
/// ask for a missing index
RequestWindowIndex(ReplicatedData, u64),
}
@ -213,6 +219,7 @@ impl Crdt {
local: HashMap::new(),
remote: HashMap::new(),
alive: HashMap::new(),
external_liveness: HashMap::new(),
me: me.id,
update_index: 1,
};
@ -234,6 +241,14 @@ impl Crdt {
self.insert(&me);
}
pub fn get_external_liveness_entry(
&self,
key: &PublicKey,
) -> Option<&HashMap<PublicKey, u64>>
{
self.external_liveness.get(key)
}
pub fn insert(&mut self, v: &ReplicatedData) {
// TODO check that last_verified types are always increasing
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
@ -270,9 +285,11 @@ impl Crdt {
if self.table.len() <= MIN_TABLE_SIZE {
return;
}
//wait for 4x as long as it would randomly take to reach our node
//assuming everyone is waiting the same amount of time as this node
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
let dead_ids: Vec<PublicKey> = self.alive
.iter()
.filter_map(|(&k, v)| {
@ -285,11 +302,13 @@ impl Crdt {
}
})
.collect();
for id in dead_ids.iter() {
self.alive.remove(id);
self.table.remove(id);
self.remote.remove(id);
self.local.remove(id);
self.external_liveness.remove(id);
}
}
@ -473,6 +492,12 @@ impl Crdt {
rdr.read_u64::<LittleEndian>()
.expect("rdr.read_u64 in fn random")
}
// TODO: fill in with real implmentation wonce staking is implemented
fn get_stake(id: PublicKey) -> f64 {
return 1.0;
}
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
//trace!("get updates since {}", v);
let data = self.table
@ -508,16 +533,32 @@ impl Crdt {
/// * B - RequestUpdates protocol message
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
if options.len() < 1 {
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
return Err(Error::CrdtTooSmall);
}
let n = (Self::random() as usize) % options.len();
let v = options[n].clone();
#[cfg(not(feature = "choose_gossip_peer_strategy"))]
let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random);
#[cfg(feature = "choose_gossip_peer_strategy")]
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
&self.remote,
&self.external_liveness,
&Self::get_stake,
);
let choose_peer_result = choose_peer_strategy.choose_peer(options);
let v = match choose_peer_result {
Ok(peer) => peer,
Err(Error::CrdtTooSmall) => {
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
return Err(Error::CrdtTooSmall);
},
Err(e) => return Err(e),
};
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
trace!(
@ -526,6 +567,7 @@ impl Crdt {
&v.id[..4],
v.gossip_addr
);
Ok((v.gossip_addr, req))
}
@ -543,6 +585,7 @@ impl Crdt {
let (remote_gossip_addr, req) = obj.read()
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
@ -583,14 +626,43 @@ impl Crdt {
/// * `from` - identity of the sender of the updates
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
/// * `data` - the update data
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
fn apply_updates(
&mut self, from: PublicKey,
update_index: u64,
data: &[ReplicatedData],
external_liveness: &[(PublicKey, u64)],
){
trace!("got updates {}", data.len());
// TODO we need to punish/spam resist here
// sig verify the whole update and slash anyone who sends a bad update
for v in data {
self.insert(&v);
}
for (pk, external_remote_index) in external_liveness.iter() {
let remote_entry =
if let Some(v) = self.remote.get(pk) {
*v
} else {
0
};
if remote_entry >= *external_remote_index {
continue;
}
let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new());
let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index);
if *external_remote_index > peer_index {
liveness_entry.insert(from, *external_remote_index);
}
}
*self.remote.entry(from).or_insert(update_index) = update_index;
// Clear the remote liveness table for this node, b/c we've heard directly from them
// so we don't need to rely on rumors
self.external_liveness.remove(&from);
}
/// randomly pick a node and ask them for updates asynchronously
@ -682,13 +754,14 @@ impl Crdt {
Ok(Protocol::RequestUpdates(v, from_rd)) => {
trace!("RequestUpdates {}", v);
let addr = from_rd.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
let me = obj.read().unwrap();
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = me.get_updates_since(v);
let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
drop(me);
trace!("get updates since response {} {}", v, data.len());
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data);
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
obj.write().unwrap().insert(&from_rd);
if len < 1 {
let me = obj.read().unwrap();
@ -713,11 +786,11 @@ impl Crdt {
None
}
}
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => {
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
.apply_updates(from, ups, &data, &external_liveness);
None
}
Ok(Protocol::RequestWindowIndex(from, ix)) => {
@ -956,7 +1029,7 @@ mod tests {
sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
);
let mut crdt2 = Crdt::new(d2.clone());
crdt2.apply_updates(key, ix, &ups);
crdt2.apply_updates(key, ix, &ups, &vec![]);
assert_eq!(crdt2.table.values().len(), 3);
assert_eq!(
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),

View File

@ -47,6 +47,7 @@ pub mod transaction;
pub mod tvu;
pub mod window_stage;
pub mod write_stage;
mod choose_gossip_peer_strategy;
extern crate bincode;
extern crate byteorder;
extern crate chrono;

View File

@ -183,3 +183,105 @@ pub fn crdt_retransmit() {
t.join().unwrap();
}
}
#[test]
fn check_external_liveness_table() {
logger::setup();
let c1_c4_exit = Arc::new(AtomicBool::new(false));
let c2_c3_exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
let (c1, dr1, _) = test_node(c1_c4_exit.clone());
trace!("c2:");
let (c2, dr2, _) = test_node(c2_c3_exit.clone());
trace!("c3:");
let (c3, dr3, _) = test_node(c2_c3_exit.clone());
trace!("c4:");
let (c4, dr4, _) = test_node(c1_c4_exit.clone());
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
let c2_id = c2.read().unwrap().me;
let c3_id = c3.read().unwrap().me;
let c4_id = c4.read().unwrap().me;
// Insert the remote data about c4
let c2_index_for_c4 = 10;
c2.write().unwrap().remote.insert(c4_id, c2_index_for_c4);
let c3_index_for_c4 = 20;
c3.write().unwrap().remote.insert(c4_id, c3_index_for_c4);
// Set up the initial network topology
c2.write().unwrap().insert(&c1_data);
c3.write().unwrap().insert(&c1_data);
c2.write().unwrap().set_leader(c1_data.id);
c3.write().unwrap().set_leader(c1_data.id);
// Wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
// Validate c1's external liveness table, then release lock rc1
{
let rc1 = c1.read().unwrap();
let el = rc1.get_external_liveness_entry(&c4.read().unwrap().me);
// Make sure liveness table entry for c4 exists on node c1
assert!(el.is_some());
let liveness_map = el.unwrap();
// Make sure liveness table entry contains correct result for c2
let c2_index_result_for_c4 = liveness_map.get(&c2_id);
assert!(c2_index_result_for_c4.is_some());
assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4);
// Make sure liveness table entry contains correct result for c3
let c3_index_result_for_c4 = liveness_map.get(&c3_id);
assert!(c3_index_result_for_c4.is_some());
assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4);
}
// Shutdown validators c2 and c3
c2_c3_exit.store(true, Ordering::Relaxed);
let mut threads = vec![];
threads.extend(dr2.thread_hdls.into_iter());
threads.extend(dr3.thread_hdls.into_iter());
for t in threads.into_iter() {
t.join().unwrap();
}
// Allow communication between c1 and c4, make sure that c1's external_liveness table
// entry for c4 gets cleared
c4.write().unwrap().insert(&c1_data);
c4.write().unwrap().set_leader(c1_data.id);
for _ in 0..30 {
done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none();
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
// Shutdown validators c1 and c4
c1_c4_exit.store(true, Ordering::Relaxed);
let mut threads = vec![];
threads.extend(dr1.thread_hdls.into_iter());
threads.extend(dr4.thread_hdls.into_iter());
for t in threads.into_iter() {
t.join().unwrap();
}
}