ran linter
This commit is contained in:
parent
08bcb62016
commit
da3bb6fb93
|
@ -1,6 +1,6 @@
|
|||
use crdt::ReplicatedData;
|
||||
use rand::thread_rng;
|
||||
use rand::distributions::{IndependentSample, Weighted, WeightedChoice};
|
||||
use rand::thread_rng;
|
||||
use result::{Error, Result};
|
||||
use signature::PublicKey;
|
||||
use std;
|
||||
|
@ -9,8 +9,7 @@ use std::collections::HashMap;
|
|||
pub const DEFAULT_WEIGHT: u32 = 1;
|
||||
|
||||
pub trait ChooseGossipPeerStrategy {
|
||||
fn choose_peer(&self, options: Vec<&ReplicatedData>) ->
|
||||
Result<ReplicatedData>;
|
||||
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData>;
|
||||
}
|
||||
|
||||
pub struct ChooseRandomPeerStrategy<'a> {
|
||||
|
@ -18,7 +17,7 @@ pub struct ChooseRandomPeerStrategy<'a> {
|
|||
}
|
||||
|
||||
impl<'a> ChooseRandomPeerStrategy<'a> {
|
||||
pub fn new(random: &'a Fn() -> u64,) -> Self {
|
||||
pub fn new(random: &'a Fn() -> u64) -> Self {
|
||||
ChooseRandomPeerStrategy { random }
|
||||
}
|
||||
}
|
||||
|
@ -45,9 +44,12 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
|||
remote: &'a HashMap<PublicKey, u64>,
|
||||
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
|
||||
get_stake: &'a Fn(PublicKey) -> f64,
|
||||
) -> Self
|
||||
{
|
||||
ChooseWeightedPeerStrategy { remote, external_liveness, get_stake }
|
||||
) -> Self {
|
||||
ChooseWeightedPeerStrategy {
|
||||
remote,
|
||||
external_liveness,
|
||||
get_stake,
|
||||
}
|
||||
}
|
||||
|
||||
fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 {
|
||||
|
@ -60,7 +62,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
|||
}
|
||||
|
||||
let liveness_entry = self.external_liveness.get(&peer_id);
|
||||
if liveness_entry.is_none(){
|
||||
if liveness_entry.is_none() {
|
||||
return DEFAULT_WEIGHT;
|
||||
}
|
||||
|
||||
|
@ -73,61 +75,55 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
|
|||
// Calculate the weighted average of the rumors
|
||||
let mut relevant_votes = vec![];
|
||||
|
||||
let total_stake = votes.iter().fold(
|
||||
0.0,
|
||||
|total_stake, (&id, &vote)| {
|
||||
let stake = (self.get_stake)(id);
|
||||
// If the total stake is going to overflow u64, pick
|
||||
// the larger of either the current total_stake, or the
|
||||
// new stake, this way we are guaranteed to get at least u64/2
|
||||
// sample of stake in our weighted calculation
|
||||
if std::f64::MAX - total_stake < stake {
|
||||
if stake > total_stake {
|
||||
relevant_votes = vec![(stake, vote)];
|
||||
stake
|
||||
} else {
|
||||
total_stake
|
||||
}
|
||||
let total_stake = votes.iter().fold(0.0, |total_stake, (&id, &vote)| {
|
||||
let stake = (self.get_stake)(id);
|
||||
// If the total stake is going to overflow u64, pick
|
||||
// the larger of either the current total_stake, or the
|
||||
// new stake, this way we are guaranteed to get at least u64/2
|
||||
// sample of stake in our weighted calculation
|
||||
if std::f64::MAX - total_stake < stake {
|
||||
if stake > total_stake {
|
||||
relevant_votes = vec![(stake, vote)];
|
||||
stake
|
||||
} else {
|
||||
relevant_votes.push((stake, vote));
|
||||
total_stake + stake
|
||||
total_stake
|
||||
}
|
||||
} else {
|
||||
relevant_votes.push((stake, vote));
|
||||
total_stake + stake
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
let weighted_vote = relevant_votes.iter().fold(
|
||||
0.0,
|
||||
|sum, &(stake, vote)| {
|
||||
if vote < last_seen_index {
|
||||
// This should never happen b/c we maintain the invariant that the indexes
|
||||
// in the external_liveness table are always greater than the corresponding
|
||||
// indexes in the remote table, if the index exists in the remote table at all.
|
||||
let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| {
|
||||
if vote < last_seen_index {
|
||||
// This should never happen b/c we maintain the invariant that the indexes
|
||||
// in the external_liveness table are always greater than the corresponding
|
||||
// indexes in the remote table, if the index exists in the remote table at all.
|
||||
|
||||
// Case 1: Attempt to insert bigger index into the "external_liveness" table
|
||||
// happens after an insertion into the "remote" table. In this case,
|
||||
// (see apply_updates()) function, we prevent the insertion if the entry
|
||||
// in the remote table >= the atempted insertion into the "external" liveness
|
||||
// table.
|
||||
// Case 1: Attempt to insert bigger index into the "external_liveness" table
|
||||
// happens after an insertion into the "remote" table. In this case,
|
||||
// (see apply_updates()) function, we prevent the insertion if the entry
|
||||
// in the remote table >= the atempted insertion into the "external" liveness
|
||||
// table.
|
||||
|
||||
// Case 2: Bigger index in the "external_liveness" table inserted before
|
||||
// a smaller insertion into the "remote" table. We clear the corresponding
|
||||
// "external_liveness" table entry on all insertions into the "remote" table
|
||||
// See apply_updates() function.
|
||||
// Case 2: Bigger index in the "external_liveness" table inserted before
|
||||
// a smaller insertion into the "remote" table. We clear the corresponding
|
||||
// "external_liveness" table entry on all insertions into the "remote" table
|
||||
// See apply_updates() function.
|
||||
|
||||
warn!("weighted peer index was smaller than local entry in remote table");
|
||||
return sum;
|
||||
}
|
||||
warn!("weighted peer index was smaller than local entry in remote table");
|
||||
return sum;
|
||||
}
|
||||
|
||||
let vote_difference = (vote - last_seen_index) as f64;
|
||||
let new_weight = vote_difference * (stake / total_stake);
|
||||
let vote_difference = (vote - last_seen_index) as f64;
|
||||
let new_weight = vote_difference * (stake / total_stake);
|
||||
|
||||
if std::f64::MAX - sum < new_weight {
|
||||
return f64::max(new_weight, sum);
|
||||
}
|
||||
if std::f64::MAX - sum < new_weight {
|
||||
return f64::max(new_weight, sum);
|
||||
}
|
||||
|
||||
sum + new_weight
|
||||
},
|
||||
);
|
||||
sum + new_weight
|
||||
});
|
||||
|
||||
// Return u32 b/c the weighted sampling API from rand::distributions
|
||||
// only takes u32 for weights
|
||||
|
@ -159,17 +155,19 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
|
|||
}
|
||||
|
||||
let mut rng = thread_rng();
|
||||
Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone())
|
||||
Ok(WeightedChoice::new(&mut weighted_peers)
|
||||
.ind_sample(&mut rng)
|
||||
.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT};
|
||||
use logger;
|
||||
use signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||
use std;
|
||||
use std::collections::HashMap;
|
||||
use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT};
|
||||
|
||||
fn get_stake(id: PublicKey) -> f64 {
|
||||
return 1.0;
|
||||
|
@ -185,11 +183,8 @@ mod tests {
|
|||
let remote: HashMap<PublicKey, u64> = HashMap::new();
|
||||
let external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();
|
||||
|
||||
let weighted_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&remote,
|
||||
&external_liveness,
|
||||
&get_stake,
|
||||
);
|
||||
let weighted_strategy =
|
||||
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
|
||||
|
||||
// If external_liveness table doesn't contain this entry,
|
||||
// return the default weight
|
||||
|
@ -210,16 +205,13 @@ mod tests {
|
|||
|
||||
// If only the liveness table contains the entry, should return the
|
||||
// weighted liveness entries
|
||||
let test_value : u32 = 5;
|
||||
let test_value: u32 = 5;
|
||||
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
|
||||
rumors.insert(key2, test_value as u64);
|
||||
external_liveness.insert(key1, rumors);
|
||||
|
||||
let weighted_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&remote,
|
||||
&external_liveness,
|
||||
&get_stake,
|
||||
);
|
||||
let weighted_strategy =
|
||||
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
|
||||
|
||||
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
||||
assert_eq!(result, test_value + DEFAULT_WEIGHT);
|
||||
|
@ -242,11 +234,8 @@ mod tests {
|
|||
rumors.insert(key2, test_value);
|
||||
external_liveness.insert(key1, rumors);
|
||||
|
||||
let weighted_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&remote,
|
||||
&external_liveness,
|
||||
&get_stake,
|
||||
);
|
||||
let weighted_strategy =
|
||||
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
|
||||
|
||||
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
||||
assert_eq!(result, std::u32::MAX);
|
||||
|
@ -275,14 +264,11 @@ mod tests {
|
|||
|
||||
external_liveness.insert(key1, rumors);
|
||||
|
||||
let weighted_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&remote,
|
||||
&external_liveness,
|
||||
&get_stake,
|
||||
);
|
||||
let weighted_strategy =
|
||||
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
|
||||
|
||||
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
||||
assert_eq!(result, (num_peers/2) as u32);
|
||||
assert_eq!(result, (num_peers / 2) as u32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -309,11 +295,8 @@ mod tests {
|
|||
|
||||
external_liveness.insert(key1, rumors);
|
||||
|
||||
let weighted_strategy = ChooseWeightedPeerStrategy::new(
|
||||
&remote,
|
||||
&external_liveness,
|
||||
&get_stake,
|
||||
);
|
||||
let weighted_strategy =
|
||||
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);
|
||||
|
||||
let result = weighted_strategy.calculate_weighted_remote_index(key1);
|
||||
|
||||
|
|
39
src/crdt.rs
39
src/crdt.rs
|
@ -15,11 +15,8 @@
|
|||
|
||||
use bincode::{deserialize, serialize};
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use choose_gossip_peer_strategy::{
|
||||
ChooseGossipPeerStrategy,
|
||||
ChooseRandomPeerStrategy,
|
||||
ChooseWeightedPeerStrategy,
|
||||
};
|
||||
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy,
|
||||
ChooseWeightedPeerStrategy};
|
||||
use hash::Hash;
|
||||
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
||||
use pnet_datalink as datalink;
|
||||
|
@ -241,11 +238,7 @@ impl Crdt {
|
|||
self.insert(&me);
|
||||
}
|
||||
|
||||
pub fn get_external_liveness_entry(
|
||||
&self,
|
||||
key: &PublicKey,
|
||||
) -> Option<&HashMap<PublicKey, u64>>
|
||||
{
|
||||
pub fn get_external_liveness_entry(&self, key: &PublicKey) -> Option<&HashMap<PublicKey, u64>> {
|
||||
self.external_liveness.get(key)
|
||||
}
|
||||
|
||||
|
@ -309,6 +302,9 @@ impl Crdt {
|
|||
self.remote.remove(id);
|
||||
self.local.remove(id);
|
||||
self.external_liveness.remove(id);
|
||||
for map in self.external_liveness.values_mut() {
|
||||
map.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -555,7 +551,7 @@ impl Crdt {
|
|||
self.table.len()
|
||||
);
|
||||
return Err(Error::CrdtTooSmall);
|
||||
},
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
|
@ -627,11 +623,12 @@ impl Crdt {
|
|||
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
|
||||
/// * `data` - the update data
|
||||
fn apply_updates(
|
||||
&mut self, from: PublicKey,
|
||||
&mut self,
|
||||
from: PublicKey,
|
||||
update_index: u64,
|
||||
data: &[ReplicatedData],
|
||||
external_liveness: &[(PublicKey, u64)],
|
||||
){
|
||||
) {
|
||||
trace!("got updates {}", data.len());
|
||||
// TODO we need to punish/spam resist here
|
||||
// sig verify the whole update and slash anyone who sends a bad update
|
||||
|
@ -640,12 +637,11 @@ impl Crdt {
|
|||
}
|
||||
|
||||
for (pk, external_remote_index) in external_liveness.iter() {
|
||||
let remote_entry =
|
||||
if let Some(v) = self.remote.get(pk) {
|
||||
*v
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let remote_entry = if let Some(v) = self.remote.get(pk) {
|
||||
*v
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
if remote_entry >= *external_remote_index {
|
||||
continue;
|
||||
|
@ -757,7 +753,10 @@ impl Crdt {
|
|||
let me = obj.read().unwrap();
|
||||
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||
let (from, ups, data) = me.get_updates_since(v);
|
||||
let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
|
||||
let external_liveness = me.remote
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
drop(me);
|
||||
trace!("get updates since response {} {}", v, data.len());
|
||||
let len = data.len();
|
||||
|
|
|
@ -13,6 +13,7 @@ pub mod bank;
|
|||
pub mod banking_stage;
|
||||
pub mod blob_fetch_stage;
|
||||
pub mod budget;
|
||||
mod choose_gossip_peer_strategy;
|
||||
pub mod crdt;
|
||||
pub mod drone;
|
||||
pub mod entry;
|
||||
|
@ -47,7 +48,6 @@ pub mod transaction;
|
|||
pub mod tvu;
|
||||
pub mod window_stage;
|
||||
pub mod write_stage;
|
||||
mod choose_gossip_peer_strategy;
|
||||
extern crate bincode;
|
||||
extern crate byteorder;
|
||||
extern crate chrono;
|
||||
|
|
Loading…
Reference in New Issue