2018-11-15 13:23:26 -08:00
|
|
|
//! Crds Gossip Push overlay
|
|
|
|
//! This module is used to propagate recently created CrdsValues across the network
|
|
|
|
//! Eager push strategy is based on Plumtree
|
|
|
|
//! http://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
|
|
|
|
//!
|
|
|
|
//! Main differences are:
|
|
|
|
//! 1. There is no `max hop`. Messages are signed with a local wallclock. If they are outside of
|
|
|
|
//! the local nodes wallclock window they are drooped silently.
|
|
|
|
//! 2. The prune set is stored in a Bloom filter.
|
|
|
|
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
|
|
|
use crate::crds::{Crds, VersionedCrdsValue};
|
2019-02-20 17:08:56 -08:00
|
|
|
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE};
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::crds_gossip_error::CrdsGossipError;
|
|
|
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
|
|
|
use crate::packet::BLOB_DATA_SIZE;
|
2018-11-15 13:23:26 -08:00
|
|
|
use bincode::serialized_size;
|
2018-12-14 15:10:10 -08:00
|
|
|
use hashbrown::HashMap;
|
2018-11-15 13:23:26 -08:00
|
|
|
use indexmap::map::IndexMap;
|
2018-12-05 14:12:10 -08:00
|
|
|
use rand;
|
2019-02-20 17:08:56 -08:00
|
|
|
use rand::distributions::{Distribution, WeightedIndex};
|
2018-12-05 14:12:10 -08:00
|
|
|
use rand::seq::SliceRandom;
|
2019-02-18 22:26:22 -08:00
|
|
|
use solana_runtime::bloom::Bloom;
|
2018-11-16 08:04:46 -08:00
|
|
|
use solana_sdk::hash::Hash;
|
2018-11-15 13:23:26 -08:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2019-02-20 17:08:56 -08:00
|
|
|
use solana_sdk::timing::timestamp;
|
2018-11-15 13:23:26 -08:00
|
|
|
use std::cmp;
|
|
|
|
|
|
|
|
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
|
|
|
|
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
|
|
|
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
|
2018-12-01 12:00:30 -08:00
|
|
|
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2019-02-11 16:20:31 -08:00
|
|
|
#[derive(Clone)]
|
2018-11-15 13:23:26 -08:00
|
|
|
pub struct CrdsGossipPush {
|
|
|
|
/// max bytes per message
|
|
|
|
pub max_bytes: usize,
|
|
|
|
/// active set of validators for push
|
|
|
|
active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
|
|
|
|
/// push message queue
|
|
|
|
push_messages: HashMap<CrdsValueLabel, Hash>,
|
|
|
|
pushed_once: HashMap<Hash, u64>,
|
|
|
|
pub num_active: usize,
|
|
|
|
pub push_fanout: usize,
|
|
|
|
pub msg_timeout: u64,
|
2018-12-01 12:00:30 -08:00
|
|
|
pub prune_timeout: u64,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for CrdsGossipPush {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
max_bytes: BLOB_DATA_SIZE,
|
|
|
|
active_set: IndexMap::new(),
|
|
|
|
push_messages: HashMap::new(),
|
|
|
|
pushed_once: HashMap::new(),
|
|
|
|
num_active: CRDS_GOSSIP_NUM_ACTIVE,
|
|
|
|
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
|
|
|
|
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
2018-12-01 12:00:30 -08:00
|
|
|
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl CrdsGossipPush {
|
|
|
|
pub fn num_pending(&self) -> usize {
|
|
|
|
self.push_messages.len()
|
|
|
|
}
|
|
|
|
/// process a push message to the network
|
|
|
|
pub fn process_push_message(
|
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
|
|
|
value: CrdsValue,
|
|
|
|
now: u64,
|
|
|
|
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
|
|
|
|
if now > value.wallclock() + self.msg_timeout {
|
|
|
|
return Err(CrdsGossipError::PushMessageTimeout);
|
|
|
|
}
|
|
|
|
if now + self.msg_timeout < value.wallclock() {
|
|
|
|
return Err(CrdsGossipError::PushMessageTimeout);
|
|
|
|
}
|
|
|
|
let label = value.label();
|
|
|
|
|
|
|
|
let new_value = crds.new_versioned(now, value);
|
|
|
|
let value_hash = new_value.value_hash;
|
|
|
|
if self.pushed_once.get(&value_hash).is_some() {
|
|
|
|
return Err(CrdsGossipError::PushMessagePrune);
|
|
|
|
}
|
|
|
|
let old = crds.insert_versioned(new_value);
|
|
|
|
if old.is_err() {
|
|
|
|
return Err(CrdsGossipError::PushMessageOldVersion);
|
|
|
|
}
|
|
|
|
self.push_messages.insert(label, value_hash);
|
|
|
|
self.pushed_once.insert(value_hash, now);
|
|
|
|
Ok(old.ok().and_then(|opt| opt))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// New push message to broadcast to peers.
|
|
|
|
/// Returns a list of Pubkeys for the selected peers and a list of values to send to all the
|
|
|
|
/// peers.
|
|
|
|
/// The list of push messages is created such that all the randomly selected peers have not
|
|
|
|
/// pruned the source addresses.
|
|
|
|
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> (Vec<Pubkey>, Vec<CrdsValue>) {
|
|
|
|
let max = self.active_set.len();
|
|
|
|
let mut nodes: Vec<_> = (0..max).collect();
|
2018-12-05 14:12:10 -08:00
|
|
|
nodes.shuffle(&mut rand::thread_rng());
|
2018-11-15 13:23:26 -08:00
|
|
|
let peers: Vec<Pubkey> = nodes
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|n| self.active_set.get_index(n))
|
|
|
|
.take(self.push_fanout)
|
|
|
|
.map(|n| *n.0)
|
|
|
|
.collect();
|
|
|
|
let mut total_bytes: usize = 0;
|
|
|
|
let mut values = vec![];
|
|
|
|
for (label, hash) in &self.push_messages {
|
|
|
|
let mut failed = false;
|
|
|
|
for p in &peers {
|
|
|
|
let filter = self.active_set.get_mut(p);
|
|
|
|
failed |= filter.is_none() || filter.unwrap().contains(&label.pubkey());
|
|
|
|
}
|
|
|
|
if failed {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
let res = crds.lookup_versioned(label);
|
|
|
|
if res.is_none() {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
let version = res.unwrap();
|
|
|
|
if version.value_hash != *hash {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
let value = &version.value;
|
|
|
|
if value.wallclock() > now || value.wallclock() + self.msg_timeout < now {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
total_bytes += serialized_size(value).unwrap() as usize;
|
|
|
|
if total_bytes > self.max_bytes {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
values.push(value.clone());
|
|
|
|
}
|
|
|
|
for v in &values {
|
|
|
|
self.push_messages.remove(&v.label());
|
|
|
|
}
|
|
|
|
(peers, values)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// add the `from` to the peer's filter of nodes
|
|
|
|
pub fn process_prune_msg(&mut self, peer: Pubkey, origins: &[Pubkey]) {
|
|
|
|
for origin in origins {
|
|
|
|
if let Some(p) = self.active_set.get_mut(&peer) {
|
|
|
|
p.add(origin)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn compute_need(num_active: usize, active_set_len: usize, ratio: usize) -> usize {
|
|
|
|
let num = active_set_len / ratio;
|
|
|
|
cmp::min(num_active, (num_active - active_set_len) + num)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// refresh the push active set
|
|
|
|
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
|
|
|
pub fn refresh_push_active_set(
|
|
|
|
&mut self,
|
|
|
|
crds: &Crds,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2018-11-15 13:23:26 -08:00
|
|
|
self_id: Pubkey,
|
|
|
|
network_size: usize,
|
|
|
|
ratio: usize,
|
|
|
|
) {
|
|
|
|
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
|
|
|
let mut new_items = HashMap::new();
|
|
|
|
|
2019-02-20 17:08:56 -08:00
|
|
|
let mut options: Vec<_> = crds
|
|
|
|
.table
|
|
|
|
.values()
|
|
|
|
.filter(|v| v.value.contact_info().is_some())
|
|
|
|
.map(|v| (v.value.contact_info().unwrap(), v))
|
|
|
|
.filter(|(info, _)| info.id != self_id && ContactInfo::is_valid_address(&info.gossip))
|
|
|
|
.map(|(info, value)| {
|
|
|
|
let max_weight = f32::from(u16::max_value()) - 1.0;
|
|
|
|
let last_updated: u64 = value.local_timestamp;
|
|
|
|
let since = ((timestamp() - last_updated) / 1024) as u32;
|
2019-02-20 20:02:47 -08:00
|
|
|
let stake = get_stake(&info.id, stakes);
|
2019-02-20 17:08:56 -08:00
|
|
|
let weight = get_weight(max_weight, since, stake);
|
|
|
|
(weight, info)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
if options.is_empty() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
while new_items.len() < need {
|
|
|
|
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0));
|
|
|
|
if index.is_err() {
|
|
|
|
break;
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-02-20 17:08:56 -08:00
|
|
|
let index = index.unwrap();
|
|
|
|
let index = index.sample(&mut rand::thread_rng());
|
|
|
|
let item = options[index].1;
|
|
|
|
options.remove(index);
|
|
|
|
if self.active_set.get(&item.id).is_some() {
|
2018-11-15 13:23:26 -08:00
|
|
|
continue;
|
|
|
|
}
|
2019-02-20 17:08:56 -08:00
|
|
|
if new_items.get(&item.id).is_some() {
|
2018-11-15 13:23:26 -08:00
|
|
|
continue;
|
|
|
|
}
|
2018-12-01 12:00:30 -08:00
|
|
|
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
|
2018-12-08 21:44:20 -08:00
|
|
|
let bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
|
2019-02-20 17:08:56 -08:00
|
|
|
new_items.insert(item.id, bloom);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
|
2018-12-05 14:12:10 -08:00
|
|
|
keys.shuffle(&mut rand::thread_rng());
|
2018-11-15 13:23:26 -08:00
|
|
|
let num = keys.len() / ratio;
|
|
|
|
for k in &keys[..num] {
|
|
|
|
self.active_set.remove(k);
|
|
|
|
}
|
|
|
|
for (k, v) in new_items {
|
|
|
|
self.active_set.insert(k, v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// purge old pending push messages
|
|
|
|
pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) {
|
|
|
|
let old_msgs: Vec<CrdsValueLabel> = self
|
|
|
|
.push_messages
|
|
|
|
.iter()
|
|
|
|
.filter_map(|(k, hash)| {
|
|
|
|
if let Some(versioned) = crds.lookup_versioned(k) {
|
|
|
|
if versioned.value.wallclock() < min_time || versioned.value_hash != *hash {
|
|
|
|
Some(k)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
Some(k)
|
|
|
|
}
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.cloned()
|
2018-11-15 13:23:26 -08:00
|
|
|
.collect();
|
|
|
|
for k in old_msgs {
|
|
|
|
self.push_messages.remove(&k);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/// purge old pushed_once messages
|
|
|
|
pub fn purge_old_pushed_once_messages(&mut self, min_time: u64) {
|
|
|
|
let old_msgs: Vec<Hash> = self
|
|
|
|
.pushed_once
|
|
|
|
.iter()
|
|
|
|
.filter_map(|(k, v)| if *v < min_time { Some(k) } else { None })
|
|
|
|
.cloned()
|
|
|
|
.collect();
|
|
|
|
for k in old_msgs {
|
|
|
|
self.pushed_once.remove(&k);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
2018-12-03 10:26:28 -08:00
|
|
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
2019-02-20 17:08:56 -08:00
|
|
|
use std::f32::consts::E;
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
|
|
|
fn test_process_push() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
let label = value.label();
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value.clone(), 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
assert_eq!(crds.lookup(&label), Some(&value));
|
|
|
|
|
|
|
|
// push it again
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value.clone(), 0),
|
|
|
|
Err(CrdsGossipError::PushMessagePrune)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_old_version() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
|
|
|
ci.wallclock = 1;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(push.process_push_message(&mut crds, value, 0), Ok(None));
|
|
|
|
|
|
|
|
// push an old version
|
|
|
|
ci.wallclock = 0;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value, 0),
|
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_timeout() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let timeout = push.msg_timeout;
|
|
|
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
|
|
|
|
|
|
|
// push a version to far in the future
|
|
|
|
ci.wallclock = timeout + 1;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value, 0),
|
|
|
|
Err(CrdsGossipError::PushMessageTimeout)
|
|
|
|
);
|
|
|
|
|
|
|
|
// push a version to far in the past
|
|
|
|
ci.wallclock = 0;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value, timeout + 1),
|
|
|
|
Err(CrdsGossipError::PushMessageTimeout)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_update() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
|
|
|
ci.wallclock = 0;
|
|
|
|
let value_old = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value_old.clone(), 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
|
|
|
|
// push an old version
|
|
|
|
ci.wallclock = 1;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value, 0)
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.value,
|
|
|
|
value_old
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_compute_need() {
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 0, 10), 30);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 1, 10), 29);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 30, 10), 3);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 29, 10), 3);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_refresh_active_set() {
|
2018-12-14 12:36:50 -08:00
|
|
|
solana_logger::setup();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
|
|
|
|
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
|
|
|
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
|
|
|
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
|
|
|
for _ in 0..30 {
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert!(push.active_set.get(&value2.label().pubkey()).is_some());
|
|
|
|
|
|
|
|
for _ in 0..push.num_active {
|
|
|
|
let value2 =
|
|
|
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
|
|
|
}
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(push.active_set.len(), push.num_active);
|
|
|
|
}
|
|
|
|
#[test]
|
2019-02-20 17:08:56 -08:00
|
|
|
fn test_active_set_refresh_with_bank() {
|
|
|
|
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-02-20 20:02:47 -08:00
|
|
|
let mut stakes = HashMap::new();
|
2019-02-20 17:08:56 -08:00
|
|
|
for i in 1..=100 {
|
|
|
|
let peer =
|
|
|
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time));
|
|
|
|
let id = peer.label().pubkey();
|
|
|
|
crds.insert(peer.clone(), time).unwrap();
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes.insert(id, i * 100);
|
2019-02-20 17:08:56 -08:00
|
|
|
}
|
|
|
|
let min_balance = E.powf(7000_f32.ln() - 0.5);
|
|
|
|
// try upto 10 times because of rng
|
|
|
|
for _ in 0..10 {
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &stakes, Pubkey::default(), 100, 30);
|
2019-02-20 17:08:56 -08:00
|
|
|
let mut num_correct = 0;
|
|
|
|
let mut num_wrong = 0;
|
|
|
|
push.active_set.iter().for_each(|peer| {
|
2019-02-20 20:02:47 -08:00
|
|
|
if *stakes.get(peer.0).unwrap_or(&0) >= min_balance as u64 {
|
2019-02-20 17:08:56 -08:00
|
|
|
num_correct += 1;
|
|
|
|
} else {
|
|
|
|
num_wrong += 1;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
// at least half of the heaviest nodes should be picked
|
|
|
|
if num_wrong <= num_correct {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert!(
|
|
|
|
false,
|
|
|
|
"expected at 50% of the active set to contain the heaviest nodes"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
2018-11-15 13:23:26 -08:00
|
|
|
fn test_new_push_messages() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let new_msg =
|
|
|
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, new_msg.clone(), 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
assert_eq!(push.active_set.len(), 1);
|
|
|
|
assert_eq!(
|
|
|
|
push.new_push_messages(&crds, 0),
|
|
|
|
(vec![peer.label().pubkey()], vec![new_msg])
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_prune() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let new_msg =
|
|
|
|
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, new_msg.clone(), 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
push.process_prune_msg(peer.label().pubkey(), &[new_msg.label().pubkey()]);
|
|
|
|
assert_eq!(
|
|
|
|
push.new_push_messages(&crds, 0),
|
|
|
|
(vec![peer.label().pubkey()], vec![])
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_purge_old_pending_push_messages() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
|
|
|
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
2019-02-20 20:02:47 -08:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
|
|
|
ci.wallclock = 1;
|
|
|
|
let new_msg = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, new_msg.clone(), 1),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
push.purge_old_pending_push_messages(&crds, 0);
|
|
|
|
assert_eq!(
|
|
|
|
push.new_push_messages(&crds, 0),
|
|
|
|
(vec![peer.label().pubkey()], vec![])
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_purge_old_pushed_once_messages() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
|
|
|
|
ci.wallclock = 0;
|
|
|
|
let value = CrdsValue::ContactInfo(ci.clone());
|
|
|
|
let label = value.label();
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value.clone(), 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
assert_eq!(crds.lookup(&label), Some(&value));
|
|
|
|
|
|
|
|
// push it again
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value.clone(), 0),
|
|
|
|
Err(CrdsGossipError::PushMessagePrune)
|
|
|
|
);
|
|
|
|
|
|
|
|
// purge the old pushed
|
|
|
|
push.purge_old_pushed_once_messages(1);
|
|
|
|
|
|
|
|
// push it again
|
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, value.clone(), 0),
|
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|