Some pull request fixes(linting + documentation)
This commit is contained in:
parent
da3bb6fb93
commit
551f639259
|
@ -53,7 +53,6 @@ unstable = []
|
||||||
ipv6 = []
|
ipv6 = []
|
||||||
cuda = []
|
cuda = []
|
||||||
erasure = []
|
erasure = []
|
||||||
gossip_choose_weighted_peer = []
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rayon = "1.0.0"
|
rayon = "1.0.0"
|
||||||
|
|
|
@ -9,33 +9,65 @@ use std::collections::HashMap;
|
||||||
pub const DEFAULT_WEIGHT: u32 = 1;
|
pub const DEFAULT_WEIGHT: u32 = 1;
|
||||||
|
|
||||||
pub trait ChooseGossipPeerStrategy {
|
pub trait ChooseGossipPeerStrategy {
|
||||||
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData>;
|
fn choose_peer<'a>(&self, options: Vec<&'a ReplicatedData>) -> Result<&'a ReplicatedData>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ChooseRandomPeerStrategy<'a> {
|
pub struct ChooseRandomPeerStrategy<'a> {
|
||||||
random: &'a Fn() -> u64,
|
random: &'a Fn() -> u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ChooseRandomPeerStrategy<'a> {
|
// Given a source of randomness "random", this strategy will randomly pick a validator
|
||||||
|
// from the input options. This strategy works in isolation, but doesn't leverage any
|
||||||
|
// rumors from the rest of the gossip network to make more informed decisions about
|
||||||
|
// which validators have more/less updates
|
||||||
|
impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
|
||||||
pub fn new(random: &'a Fn() -> u64) -> Self {
|
pub fn new(random: &'a Fn() -> u64) -> Self {
|
||||||
ChooseRandomPeerStrategy { random }
|
ChooseRandomPeerStrategy { random }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
|
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
|
||||||
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
|
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
|
||||||
if options.len() < 1 {
|
if options.is_empty() {
|
||||||
return Err(Error::CrdtTooSmall);
|
return Err(Error::CrdtTooSmall);
|
||||||
}
|
}
|
||||||
|
|
||||||
let n = ((self.random)() as usize) % options.len();
|
let n = ((self.random)() as usize) % options.len();
|
||||||
Ok(options[n].clone())
|
Ok(options[n])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This strategy uses rumors accumulated from the rest of the network to weight
|
||||||
|
// the importance of communicating with a particular validator based on cumulative network
|
||||||
|
// perceiption of the number of updates the validator has to offer. A validator is randomly
|
||||||
|
// picked based on a weighted sample from the pool of viable choices. The "weight", w, of a
|
||||||
|
// particular validator "v" is calculated as follows:
|
||||||
|
//
|
||||||
|
// w = [Sum for all i in I_v: (rumor_v(i) - observed(v)) * stake(i)] /
|
||||||
|
// [Sum for all i in I_v: Sum(stake(i))]
|
||||||
|
//
|
||||||
|
// where I_v is the set of all validators that returned a rumor about the update_index of
|
||||||
|
// validator "v", stake(i) is the size of the stake of validator "i", observed(v) is the
|
||||||
|
// observed update_index from the last direct communication validator "v", and
|
||||||
|
// rumor_v(i) is the rumored update_index of validator "v" propagated by fellow validator "i".
|
||||||
|
|
||||||
|
// This could be a problem if there are validators with large stakes lying about their
|
||||||
|
// observed updates. There could also be a problem in network partitions, or even just
|
||||||
|
// when certain validators are disproportionately active, where we hear more rumors about
|
||||||
|
// certain clusters of nodes that then propagate more rumros about each other. Hopefully
|
||||||
|
// this can be resolved with a good baseline DEFAULT_WEIGHT, or by implementing lockout
|
||||||
|
// periods for very active validators in the future.
|
||||||
|
|
||||||
pub struct ChooseWeightedPeerStrategy<'a> {
|
pub struct ChooseWeightedPeerStrategy<'a> {
|
||||||
|
// The map of last directly observed update_index for each active validator.
|
||||||
|
// This is how we get observed(v) from the formula above.
|
||||||
remote: &'a HashMap<PublicKey, u64>,
|
remote: &'a HashMap<PublicKey, u64>,
|
||||||
|
// The map of rumored update_index for each active validator. Using the formula above,
|
||||||
|
// to find rumor_v(i), we would first look up "v" in the outer map, then look up
|
||||||
|
// "i" in the inner map, i.e. look up external_liveness[v][i]
|
||||||
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
|
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
|
||||||
|
// A function returning the size of the stake for a particular validator, corresponds
|
||||||
|
// to stake(i) in the formula above.
|
||||||
get_stake: &'a Fn(PublicKey) -> f64,
|
get_stake: &'a Fn(PublicKey) -> f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +128,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
||||||
|
|
||||||
let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| {
|
let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| {
|
||||||
if vote < last_seen_index {
|
if vote < last_seen_index {
|
||||||
// This should never happen b/c we maintain the invariant that the indexes
|
// This should never happen because we maintain the invariant that the indexes
|
||||||
// in the external_liveness table are always greater than the corresponding
|
// in the external_liveness table are always greater than the corresponding
|
||||||
// indexes in the remote table, if the index exists in the remote table at all.
|
// indexes in the remote table, if the index exists in the remote table at all.
|
||||||
|
|
||||||
|
@ -140,7 +172,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
||||||
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
|
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
|
||||||
if options.len() < 1 {
|
if options.len() < 1 {
|
||||||
return Err(Error::CrdtTooSmall);
|
return Err(Error::CrdtTooSmall);
|
||||||
}
|
}
|
||||||
|
@ -148,16 +180,11 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
||||||
let mut weighted_peers = vec![];
|
let mut weighted_peers = vec![];
|
||||||
for peer in options {
|
for peer in options {
|
||||||
let weight = self.calculate_weighted_remote_index(peer.id);
|
let weight = self.calculate_weighted_remote_index(peer.id);
|
||||||
weighted_peers.push(Weighted {
|
weighted_peers.push(Weighted { weight, item: peer });
|
||||||
weight: weight,
|
|
||||||
item: peer,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
Ok(WeightedChoice::new(&mut weighted_peers)
|
Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng))
|
||||||
.ind_sample(&mut rng)
|
|
||||||
.clone())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +327,7 @@ mod tests {
|
||||||
|
|
||||||
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
||||||
|
|
||||||
// If nobody has seen a newer update then rever to default
|
// If nobody has seen a newer update then revert to default
|
||||||
assert_eq!(result, DEFAULT_WEIGHT);
|
assert_eq!(result, DEFAULT_WEIGHT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
15
src/crdt.rs
15
src/crdt.rs
|
@ -15,8 +15,9 @@
|
||||||
|
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy,
|
use choose_gossip_peer_strategy::{
|
||||||
ChooseWeightedPeerStrategy};
|
ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, ChooseWeightedPeerStrategy,
|
||||||
|
};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||||
use pnet_datalink as datalink;
|
use pnet_datalink as datalink;
|
||||||
|
@ -489,9 +490,9 @@ impl Crdt {
|
||||||
.expect("rdr.read_u64 in fn random")
|
.expect("rdr.read_u64 in fn random")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: fill in with real implmentation wonce staking is implemented
|
// TODO: fill in with real implmentation once staking is implemented
|
||||||
fn get_stake(id: PublicKey) -> f64 {
|
fn get_stake(id: PublicKey) -> f64 {
|
||||||
return 1.0;
|
1.0
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
||||||
|
@ -530,10 +531,6 @@ impl Crdt {
|
||||||
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||||
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
||||||
|
|
||||||
#[cfg(not(feature = "choose_gossip_peer_strategy"))]
|
|
||||||
let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random);
|
|
||||||
|
|
||||||
#[cfg(feature = "choose_gossip_peer_strategy")]
|
|
||||||
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
|
let choose_peer_strategy = ChooseWeightedPeerStrategy::new(
|
||||||
&self.remote,
|
&self.remote,
|
||||||
&self.external_liveness,
|
&self.external_liveness,
|
||||||
|
@ -636,7 +633,7 @@ impl Crdt {
|
||||||
self.insert(&v);
|
self.insert(&v);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (pk, external_remote_index) in external_liveness.iter() {
|
for (pk, external_remote_index) in external_liveness {
|
||||||
let remote_entry = if let Some(v) = self.remote.get(pk) {
|
let remote_entry = if let Some(v) = self.remote.get(pk) {
|
||||||
*v
|
*v
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -185,7 +185,7 @@ pub fn crdt_retransmit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn check_external_liveness_table() {
|
fn test_external_liveness_table() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let c1_c4_exit = Arc::new(AtomicBool::new(false));
|
let c1_c4_exit = Arc::new(AtomicBool::new(false));
|
||||||
let c2_c3_exit = Arc::new(AtomicBool::new(false));
|
let c2_c3_exit = Arc::new(AtomicBool::new(false));
|
||||||
|
@ -223,7 +223,8 @@ fn check_external_liveness_table() {
|
||||||
trace!("waiting to converge:");
|
trace!("waiting to converge:");
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
|
done = c1.read().unwrap().table.len() == 3
|
||||||
|
&& c2.read().unwrap().table.len() == 3
|
||||||
&& c3.read().unwrap().table.len() == 3;
|
&& c3.read().unwrap().table.len() == 3;
|
||||||
if done {
|
if done {
|
||||||
break;
|
break;
|
||||||
|
@ -244,12 +245,12 @@ fn check_external_liveness_table() {
|
||||||
// Make sure liveness table entry contains correct result for c2
|
// Make sure liveness table entry contains correct result for c2
|
||||||
let c2_index_result_for_c4 = liveness_map.get(&c2_id);
|
let c2_index_result_for_c4 = liveness_map.get(&c2_id);
|
||||||
assert!(c2_index_result_for_c4.is_some());
|
assert!(c2_index_result_for_c4.is_some());
|
||||||
assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4);
|
assert_eq!(*(c2_index_result_for_c4.unwrap()), c2_index_for_c4);
|
||||||
|
|
||||||
// Make sure liveness table entry contains correct result for c3
|
// Make sure liveness table entry contains correct result for c3
|
||||||
let c3_index_result_for_c4 = liveness_map.get(&c3_id);
|
let c3_index_result_for_c4 = liveness_map.get(&c3_id);
|
||||||
assert!(c3_index_result_for_c4.is_some());
|
assert!(c3_index_result_for_c4.is_some());
|
||||||
assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4);
|
assert_eq!(*(c3_index_result_for_c4.unwrap()), c3_index_for_c4);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown validators c2 and c3
|
// Shutdown validators c2 and c3
|
||||||
|
@ -258,7 +259,7 @@ fn check_external_liveness_table() {
|
||||||
threads.extend(dr2.thread_hdls.into_iter());
|
threads.extend(dr2.thread_hdls.into_iter());
|
||||||
threads.extend(dr3.thread_hdls.into_iter());
|
threads.extend(dr3.thread_hdls.into_iter());
|
||||||
|
|
||||||
for t in threads.into_iter() {
|
for t in threads {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,7 +268,10 @@ fn check_external_liveness_table() {
|
||||||
c4.write().unwrap().insert(&c1_data);
|
c4.write().unwrap().insert(&c1_data);
|
||||||
c4.write().unwrap().set_leader(c1_data.id);
|
c4.write().unwrap().set_leader(c1_data.id);
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none();
|
done = c1.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_external_liveness_entry(&c4_id)
|
||||||
|
.is_none();
|
||||||
if done {
|
if done {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -281,7 +285,7 @@ fn check_external_liveness_table() {
|
||||||
threads.extend(dr1.thread_hdls.into_iter());
|
threads.extend(dr1.thread_hdls.into_iter());
|
||||||
threads.extend(dr4.thread_hdls.into_iter());
|
threads.extend(dr4.thread_hdls.into_iter());
|
||||||
|
|
||||||
for t in threads.into_iter() {
|
for t in threads {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue