2018-11-15 13:23:26 -08:00
|
|
|
//! Crds Gossip Pull overlay
|
|
|
|
//! This module implements the anti-entropy protocol for the network.
|
|
|
|
//!
|
|
|
|
//! The basic strategy is as follows:
|
|
|
|
//! 1. Construct a bloom filter of the local data set
|
|
|
|
//! 2. Randomly ask a node on the network for data that is not contained in the bloom filter.
|
|
|
|
//!
|
|
|
|
//! Bloom filters have a false positive rate. Each requests uses a different bloom filter
|
|
|
|
//! with random hash functions. So each subsequent request will have a different distribution
|
|
|
|
//! of false positives.
|
|
|
|
|
2019-02-20 17:08:56 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
2020-06-09 17:08:13 -07:00
|
|
|
use crate::crds::{Crds, VersionedCrdsValue};
|
2019-08-13 18:04:14 -07:00
|
|
|
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::crds_gossip_error::CrdsGossipError;
|
|
|
|
use crate::crds_value::{CrdsValue, CrdsValueLabel};
|
2018-12-05 14:12:10 -08:00
|
|
|
use rand::distributions::{Distribution, WeightedIndex};
|
2019-08-13 18:04:14 -07:00
|
|
|
use rand::Rng;
|
2019-02-18 22:26:22 -08:00
|
|
|
use solana_runtime::bloom::Bloom;
|
2018-11-16 08:04:46 -08:00
|
|
|
use solana_sdk::hash::Hash;
|
2018-11-15 13:23:26 -08:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
|
|
|
use std::cmp;
|
|
|
|
use std::collections::VecDeque;
|
2020-06-09 17:08:13 -07:00
|
|
|
use std::collections::{HashMap, HashSet};
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
2020-02-07 12:38:24 -08:00
|
|
|
// The maximum age of a value received over pull responses
|
|
|
|
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
|
2019-08-13 18:04:14 -07:00
|
|
|
pub const FALSE_RATE: f64 = 0.1f64;
|
2019-08-19 18:14:10 -07:00
|
|
|
pub const KEYS: f64 = 8f64;
|
2019-08-13 18:04:14 -07:00
|
|
|
|
2020-07-06 04:22:23 -07:00
|
|
|
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq, AbiExample)]
|
2019-08-13 18:04:14 -07:00
|
|
|
pub struct CrdsFilter {
|
|
|
|
pub filter: Bloom<Hash>,
|
|
|
|
mask: u64,
|
|
|
|
mask_bits: u32,
|
|
|
|
}
|
|
|
|
|
2020-04-27 11:06:00 -07:00
|
|
|
impl solana_sdk::sanitize::Sanitize for CrdsFilter {
|
|
|
|
fn sanitize(&self) -> std::result::Result<(), solana_sdk::sanitize::SanitizeError> {
|
|
|
|
self.filter.sanitize()?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-13 18:04:14 -07:00
|
|
|
impl CrdsFilter {
|
|
|
|
pub fn new_rand(num_items: usize, max_bytes: usize) -> Self {
|
|
|
|
let max_bits = (max_bytes * 8) as f64;
|
2019-08-19 18:14:10 -07:00
|
|
|
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
2019-08-13 18:04:14 -07:00
|
|
|
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
|
2019-08-19 18:14:10 -07:00
|
|
|
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
2019-08-13 18:04:14 -07:00
|
|
|
let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits));
|
|
|
|
let mask = Self::compute_mask(seed, mask_bits);
|
|
|
|
CrdsFilter {
|
|
|
|
filter,
|
|
|
|
mask,
|
|
|
|
mask_bits,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// generates a vec of filters that together hold a complete set of Hashes
|
|
|
|
pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec<Self> {
|
|
|
|
let max_bits = (max_bytes * 8) as f64;
|
2019-08-19 18:14:10 -07:00
|
|
|
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
2019-08-13 18:04:14 -07:00
|
|
|
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
|
|
|
|
// for each possible mask combination, generate a new filter.
|
|
|
|
let mut filters = vec![];
|
|
|
|
for seed in 0..2u64.pow(mask_bits) {
|
|
|
|
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
|
|
|
let mask = Self::compute_mask(seed, mask_bits);
|
|
|
|
let filter = CrdsFilter {
|
|
|
|
filter,
|
|
|
|
mask,
|
|
|
|
mask_bits,
|
|
|
|
};
|
|
|
|
filters.push(filter)
|
|
|
|
}
|
|
|
|
filters
|
|
|
|
}
|
|
|
|
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
|
|
|
|
assert!(seed <= 2u64.pow(mask_bits));
|
|
|
|
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
|
|
|
|
seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64
|
|
|
|
}
|
|
|
|
pub fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 {
|
|
|
|
let m = max_bits;
|
|
|
|
let p = false_rate;
|
|
|
|
let k = num_keys;
|
|
|
|
(m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil()
|
|
|
|
}
|
|
|
|
fn mask_bits(num_items: f64, max_items: f64) -> u32 {
|
|
|
|
// for small ratios this can result in a negative number, ensure it returns 0 instead
|
|
|
|
((num_items / max_items).log2().ceil()).max(0.0) as u32
|
|
|
|
}
|
2020-08-12 22:45:19 -07:00
|
|
|
pub fn hash_as_u64(item: &Hash) -> u64 {
|
2019-08-13 18:04:14 -07:00
|
|
|
let arr = item.as_ref();
|
|
|
|
let mut accum = 0;
|
|
|
|
for (i, val) in arr.iter().enumerate().take(8) {
|
|
|
|
accum |= (u64::from(*val)) << (i * 8) as u64;
|
|
|
|
}
|
|
|
|
accum
|
|
|
|
}
|
2020-08-12 22:45:19 -07:00
|
|
|
pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool {
|
|
|
|
let bits = item | ones;
|
|
|
|
bits == self.mask
|
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
pub fn test_mask(&self, item: &Hash) -> bool {
|
|
|
|
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
|
|
|
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
|
|
|
let bits = Self::hash_as_u64(item) | ones;
|
|
|
|
bits == self.mask
|
|
|
|
}
|
|
|
|
pub fn add(&mut self, item: &Hash) {
|
|
|
|
if self.test_mask(item) {
|
|
|
|
self.filter.add(item);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pub fn contains(&self, item: &Hash) -> bool {
|
|
|
|
if !self.test_mask(item) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
self.filter.contains(item)
|
|
|
|
}
|
2020-08-12 22:45:19 -07:00
|
|
|
pub fn filter_contains(&self, item: &Hash) -> bool {
|
|
|
|
self.filter.contains(item)
|
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-06-09 17:08:13 -07:00
|
|
|
#[derive(Default)]
|
|
|
|
pub struct ProcessPullStats {
|
|
|
|
pub success: usize,
|
|
|
|
pub failed_insert: usize,
|
|
|
|
pub failed_timeout: usize,
|
|
|
|
pub timeout_count: usize,
|
|
|
|
}
|
|
|
|
|
2019-02-11 16:20:31 -08:00
|
|
|
#[derive(Clone)]
|
2018-11-15 13:23:26 -08:00
|
|
|
pub struct CrdsGossipPull {
|
|
|
|
/// timestamp of last request
|
|
|
|
pub pull_request_time: HashMap<Pubkey, u64>,
|
|
|
|
/// hash and insert time
|
2020-08-11 14:03:54 -07:00
|
|
|
pub purged_values: VecDeque<(Hash, u64)>,
|
2018-11-15 13:23:26 -08:00
|
|
|
pub crds_timeout: u64,
|
2020-02-07 12:38:24 -08:00
|
|
|
pub msg_timeout: u64,
|
2020-06-13 22:03:38 -07:00
|
|
|
pub num_pulls: usize,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for CrdsGossipPull {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
purged_values: VecDeque::new(),
|
|
|
|
pull_request_time: HashMap::new(),
|
|
|
|
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
|
2020-02-07 12:38:24 -08:00
|
|
|
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
2020-06-13 22:03:38 -07:00
|
|
|
num_pulls: 0,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl CrdsGossipPull {
|
|
|
|
/// generate a random request
|
|
|
|
pub fn new_pull_request(
|
|
|
|
&self,
|
|
|
|
crds: &Crds,
|
2019-03-09 19:28:43 -08:00
|
|
|
self_id: &Pubkey,
|
2020-05-05 20:15:19 -07:00
|
|
|
self_shred_version: u16,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2019-08-13 18:04:14 -07:00
|
|
|
bloom_size: usize,
|
|
|
|
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
|
2020-05-05 20:15:19 -07:00
|
|
|
let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes);
|
2018-11-15 13:23:26 -08:00
|
|
|
if options.is_empty() {
|
|
|
|
return Err(CrdsGossipError::NoPeers);
|
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
let filters = self.build_crds_filters(crds, bloom_size);
|
2018-12-05 14:12:10 -08:00
|
|
|
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
|
|
|
|
let random = index.sample(&mut rand::thread_rng());
|
2018-11-15 13:23:26 -08:00
|
|
|
let self_info = crds
|
2019-03-09 19:28:43 -08:00
|
|
|
.lookup(&CrdsValueLabel::ContactInfo(*self_id))
|
2018-11-15 13:23:26 -08:00
|
|
|
.unwrap_or_else(|| panic!("self_id invalid {}", self_id));
|
2019-08-13 18:04:14 -07:00
|
|
|
Ok((options[random].1.id, filters, self_info.clone()))
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
2019-02-26 11:45:10 -08:00
|
|
|
fn pull_options<'a>(
|
|
|
|
&self,
|
|
|
|
crds: &'a Crds,
|
|
|
|
self_id: &Pubkey,
|
2020-05-05 20:15:19 -07:00
|
|
|
self_shred_version: u16,
|
2019-02-26 11:45:10 -08:00
|
|
|
now: u64,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
) -> Vec<(f32, &'a ContactInfo)> {
|
|
|
|
crds.table
|
|
|
|
.values()
|
|
|
|
.filter_map(|v| v.value.contact_info())
|
2020-05-05 20:15:19 -07:00
|
|
|
.filter(|v| {
|
|
|
|
v.id != *self_id
|
|
|
|
&& ContactInfo::is_valid_address(&v.gossip)
|
2020-08-18 18:52:45 -07:00
|
|
|
&& (self_shred_version == 0 || self_shred_version == v.shred_version)
|
2020-05-05 20:15:19 -07:00
|
|
|
})
|
2019-02-26 11:45:10 -08:00
|
|
|
.map(|item| {
|
|
|
|
let max_weight = f32::from(u16::max_value()) - 1.0;
|
|
|
|
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
|
|
|
|
let since = ((now - req_time) / 1024) as u32;
|
|
|
|
let stake = get_stake(&item.id, stakes);
|
|
|
|
let weight = get_weight(max_weight, since, stake);
|
|
|
|
(weight, item)
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
/// time when a request to `from` was initiated
|
|
|
|
/// This is used for weighted random selection during `new_pull_request`
|
|
|
|
/// It's important to use the local nodes request creation time as the weight
|
|
|
|
/// instead of the response received time otherwise failed nodes will increase their weight.
|
2019-03-09 19:28:43 -08:00
|
|
|
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) {
|
|
|
|
self.pull_request_time.insert(*from, now);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Store an old hash in the purged values set
|
|
|
|
pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) {
|
|
|
|
self.purged_values.push_back((hash, timestamp))
|
|
|
|
}
|
|
|
|
|
2020-05-28 11:38:13 -07:00
|
|
|
/// process a pull request
|
2019-08-15 17:04:45 -07:00
|
|
|
pub fn process_pull_requests(
|
2018-11-15 13:23:26 -08:00
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
2019-08-15 17:04:45 -07:00
|
|
|
requests: Vec<(CrdsValue, CrdsFilter)>,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2020-05-28 11:38:13 -07:00
|
|
|
) {
|
2019-08-15 17:04:45 -07:00
|
|
|
requests.into_iter().for_each(|(caller, _)| {
|
|
|
|
let key = caller.label().pubkey();
|
|
|
|
let old = crds.insert(caller, now);
|
|
|
|
if let Some(val) = old.ok().and_then(|opt| opt) {
|
|
|
|
self.purged_values
|
|
|
|
.push_back((val.value_hash, val.local_timestamp));
|
|
|
|
}
|
|
|
|
crds.update_record_timestamp(&key, now);
|
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-05-28 11:38:13 -07:00
|
|
|
|
|
|
|
/// Create gossip responses to pull requests
|
|
|
|
pub fn generate_pull_responses(
|
|
|
|
&self,
|
|
|
|
crds: &Crds,
|
|
|
|
requests: &[(CrdsValue, CrdsFilter)],
|
2020-08-11 06:26:42 -07:00
|
|
|
now: u64,
|
2020-05-28 11:38:13 -07:00
|
|
|
) -> Vec<Vec<CrdsValue>> {
|
2020-08-11 06:26:42 -07:00
|
|
|
self.filter_crds_values(crds, requests, now)
|
2020-05-28 11:38:13 -07:00
|
|
|
}
|
|
|
|
|
2020-06-09 17:08:13 -07:00
|
|
|
// Checks if responses should be inserted and
|
|
|
|
// returns those responses converted to VersionedCrdsValue
|
|
|
|
// Separated in two vecs as:
|
|
|
|
// .0 => responses that update the owner timestamp
|
|
|
|
// .1 => responses that do not update the owner timestamp
|
|
|
|
pub fn filter_pull_responses(
|
|
|
|
&self,
|
|
|
|
crds: &Crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
2020-06-09 17:08:13 -07:00
|
|
|
responses: Vec<CrdsValue>,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2020-06-09 17:08:13 -07:00
|
|
|
stats: &mut ProcessPullStats,
|
|
|
|
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) {
|
|
|
|
let mut versioned = vec![];
|
|
|
|
let mut versioned_expired_timestamp = vec![];
|
|
|
|
for r in responses {
|
2018-11-15 13:23:26 -08:00
|
|
|
let owner = r.label().pubkey();
|
2020-02-07 12:38:24 -08:00
|
|
|
// Check if the crds value is older than the msg_timeout
|
|
|
|
if now
|
|
|
|
> r.wallclock()
|
|
|
|
.checked_add(self.msg_timeout)
|
|
|
|
.unwrap_or_else(|| 0)
|
|
|
|
|| now + self.msg_timeout < r.wallclock()
|
|
|
|
{
|
|
|
|
match &r.label() {
|
|
|
|
CrdsValueLabel::ContactInfo(_) => {
|
|
|
|
// Check if this ContactInfo is actually too old, it's possible that it has
|
|
|
|
// stake and so might have a longer effective timeout
|
|
|
|
let timeout = *timeouts
|
|
|
|
.get(&owner)
|
|
|
|
.unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap());
|
|
|
|
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|
|
|
|
|| now + timeout < r.wallclock()
|
|
|
|
{
|
2020-06-09 17:08:13 -07:00
|
|
|
stats.timeout_count += 1;
|
|
|
|
stats.failed_timeout += 1;
|
2020-02-07 12:38:24 -08:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
// Before discarding this value, check if a ContactInfo for the owner
|
|
|
|
// exists in the table. If it doesn't, that implies that this value can be discarded
|
|
|
|
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
|
2020-06-09 17:08:13 -07:00
|
|
|
stats.timeout_count += 1;
|
|
|
|
stats.failed_timeout += 1;
|
2020-02-07 12:38:24 -08:00
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
// Silently insert this old value without bumping record timestamps
|
2020-06-09 17:08:13 -07:00
|
|
|
match crds.would_insert(r, now) {
|
|
|
|
Some(resp) => versioned_expired_timestamp.push(resp),
|
|
|
|
None => stats.failed_insert += 1,
|
|
|
|
}
|
2020-02-07 12:38:24 -08:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-06-09 17:08:13 -07:00
|
|
|
match crds.would_insert(r, now) {
|
|
|
|
Some(resp) => versioned.push(resp),
|
|
|
|
None => stats.failed_insert += 1,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
(versioned, versioned_expired_timestamp)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// process a vec of pull responses
|
|
|
|
pub fn process_pull_responses(
|
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
|
|
|
from: &Pubkey,
|
|
|
|
responses: Vec<VersionedCrdsValue>,
|
|
|
|
responses_expired_timeout: Vec<VersionedCrdsValue>,
|
|
|
|
now: u64,
|
|
|
|
stats: &mut ProcessPullStats,
|
2020-06-13 22:03:38 -07:00
|
|
|
) -> Vec<(CrdsValueLabel, Hash, u64)> {
|
|
|
|
let mut success = vec![];
|
2020-06-09 17:08:13 -07:00
|
|
|
let mut owners = HashSet::new();
|
|
|
|
for r in responses_expired_timeout {
|
|
|
|
stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
|
|
|
|
}
|
|
|
|
for r in responses {
|
|
|
|
let owner = r.value.label().pubkey();
|
2020-06-13 22:03:38 -07:00
|
|
|
let label = r.value.label();
|
|
|
|
let wc = r.value.wallclock();
|
|
|
|
let hash = r.value_hash;
|
2020-06-09 17:08:13 -07:00
|
|
|
let old = crds.insert_versioned(r);
|
2020-05-28 11:38:13 -07:00
|
|
|
if old.is_err() {
|
2020-06-09 17:08:13 -07:00
|
|
|
stats.failed_insert += 1;
|
2020-05-28 11:38:13 -07:00
|
|
|
} else {
|
2020-06-09 17:08:13 -07:00
|
|
|
stats.success += 1;
|
2020-06-13 22:03:38 -07:00
|
|
|
self.num_pulls += 1;
|
|
|
|
success.push((label, hash, wc));
|
2020-05-28 11:38:13 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
old.ok().map(|opt| {
|
2020-06-09 17:08:13 -07:00
|
|
|
owners.insert(owner);
|
2018-11-15 13:23:26 -08:00
|
|
|
opt.map(|val| {
|
|
|
|
self.purged_values
|
|
|
|
.push_back((val.value_hash, val.local_timestamp))
|
|
|
|
})
|
|
|
|
});
|
|
|
|
}
|
2020-06-09 17:08:13 -07:00
|
|
|
owners.insert(*from);
|
|
|
|
for owner in owners {
|
|
|
|
crds.update_record_timestamp(&owner, now);
|
|
|
|
}
|
2020-06-13 22:03:38 -07:00
|
|
|
success
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
// build a set of filters of the current crds table
|
2020-04-15 15:22:16 -07:00
|
|
|
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
|
2019-08-13 18:04:14 -07:00
|
|
|
pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec<CrdsFilter> {
|
2018-12-01 12:00:30 -08:00
|
|
|
let num = cmp::max(
|
2019-08-13 18:04:14 -07:00
|
|
|
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
|
2018-12-01 12:00:30 -08:00
|
|
|
crds.table.values().count() + self.purged_values.len(),
|
|
|
|
);
|
2019-08-13 18:04:14 -07:00
|
|
|
let mut filters = CrdsFilter::new_complete_set(num, bloom_size);
|
2018-11-15 13:23:26 -08:00
|
|
|
for v in crds.table.values() {
|
2019-08-13 18:04:14 -07:00
|
|
|
filters
|
|
|
|
.iter_mut()
|
|
|
|
.for_each(|filter| filter.add(&v.value_hash));
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
for (value_hash, _insert_timestamp) in &self.purged_values {
|
2019-08-13 18:04:14 -07:00
|
|
|
filters.iter_mut().for_each(|filter| filter.add(value_hash));
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-08-11 14:03:54 -07:00
|
|
|
|
2019-08-13 18:04:14 -07:00
|
|
|
filters
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-08-11 14:03:54 -07:00
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
/// filter values that fail the bloom filter up to max_bytes
|
2019-08-15 17:04:45 -07:00
|
|
|
fn filter_crds_values(
|
2020-06-09 07:27:00 -07:00
|
|
|
&self,
|
2019-08-15 17:04:45 -07:00
|
|
|
crds: &Crds,
|
|
|
|
filters: &[(CrdsValue, CrdsFilter)],
|
2020-08-11 06:26:42 -07:00
|
|
|
now: u64,
|
2019-08-15 17:04:45 -07:00
|
|
|
) -> Vec<Vec<CrdsValue>> {
|
|
|
|
let mut ret = vec![vec![]; filters.len()];
|
2020-08-11 06:26:42 -07:00
|
|
|
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
|
|
|
let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4);
|
|
|
|
let start = filters.len();
|
|
|
|
//skip filters from callers that are too old
|
|
|
|
let future = now.saturating_add(msg_timeout);
|
|
|
|
let past = now.saturating_sub(msg_timeout);
|
|
|
|
let recent: Vec<_> = filters
|
|
|
|
.iter()
|
2020-08-21 12:32:37 -07:00
|
|
|
.enumerate()
|
|
|
|
.filter(|(_, (caller, _))| caller.wallclock() < future && caller.wallclock() >= past)
|
2020-08-11 06:26:42 -07:00
|
|
|
.collect();
|
|
|
|
inc_new_counter_info!(
|
|
|
|
"gossip_filter_crds_values-dropped_requests",
|
|
|
|
start - recent.len()
|
|
|
|
);
|
|
|
|
if recent.is_empty() {
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
let mut total_skipped = 0;
|
2020-08-12 22:45:19 -07:00
|
|
|
let mask_ones: Vec<_> = recent
|
|
|
|
.iter()
|
2020-08-21 12:32:37 -07:00
|
|
|
.map(|(_i, (_caller, filter))| (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64))
|
2020-08-12 22:45:19 -07:00
|
|
|
.collect();
|
|
|
|
for (label, mask) in crds.masks.iter() {
|
2020-08-21 12:32:37 -07:00
|
|
|
recent
|
|
|
|
.iter()
|
|
|
|
.zip(mask_ones.iter())
|
|
|
|
.for_each(|((i, (caller, filter)), mask_ones)| {
|
2020-08-12 22:45:19 -07:00
|
|
|
if filter.test_mask_u64(*mask, *mask_ones) {
|
|
|
|
let item = crds.table.get(label).unwrap();
|
|
|
|
|
|
|
|
//skip values that are too new
|
|
|
|
if item.value.wallclock()
|
|
|
|
> caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
|
|
|
|
{
|
|
|
|
total_skipped += 1;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if !filter.filter_contains(&item.value_hash) {
|
2020-08-21 12:32:37 -07:00
|
|
|
ret[*i].push(item.value.clone());
|
2020-08-12 22:45:19 -07:00
|
|
|
}
|
|
|
|
}
|
2020-08-21 12:32:37 -07:00
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-08-11 06:26:42 -07:00
|
|
|
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
2018-11-15 13:23:26 -08:00
|
|
|
ret
|
|
|
|
}
|
2019-11-20 11:25:18 -08:00
|
|
|
pub fn make_timeouts_def(
|
|
|
|
&self,
|
|
|
|
self_id: &Pubkey,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
epoch_ms: u64,
|
|
|
|
min_ts: u64,
|
|
|
|
) -> HashMap<Pubkey, u64> {
|
|
|
|
let mut timeouts: HashMap<Pubkey, u64> = stakes.keys().map(|s| (*s, epoch_ms)).collect();
|
|
|
|
timeouts.insert(*self_id, std::u64::MAX);
|
|
|
|
timeouts.insert(Pubkey::default(), min_ts);
|
|
|
|
timeouts
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn make_timeouts(
|
|
|
|
&self,
|
|
|
|
self_id: &Pubkey,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
epoch_ms: u64,
|
|
|
|
) -> HashMap<Pubkey, u64> {
|
|
|
|
self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout)
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
/// Purge values from the crds that are older then `active_timeout`
|
|
|
|
/// The value_hash of an active item is put into self.purged_values queue
|
2019-11-20 11:25:18 -08:00
|
|
|
pub fn purge_active(
|
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
|
|
|
now: u64,
|
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
|
|
|
) -> usize {
|
|
|
|
let old = crds.find_old_labels(now, timeouts);
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut purged: VecDeque<_> = old
|
|
|
|
.iter()
|
|
|
|
.filter_map(|label| {
|
|
|
|
let rv = crds
|
|
|
|
.lookup_versioned(label)
|
|
|
|
.map(|val| (val.value_hash, val.local_timestamp));
|
|
|
|
crds.remove(label);
|
|
|
|
rv
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.collect();
|
2019-11-20 11:25:18 -08:00
|
|
|
let ret = purged.len();
|
2018-11-15 13:23:26 -08:00
|
|
|
self.purged_values.append(&mut purged);
|
2019-11-20 11:25:18 -08:00
|
|
|
ret
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
/// Purge values from the `self.purged_values` queue that are older then purge_timeout
|
|
|
|
pub fn purge_purged(&mut self, min_ts: u64) {
|
|
|
|
let cnt = self
|
|
|
|
.purged_values
|
|
|
|
.iter()
|
|
|
|
.take_while(|v| v.1 < min_ts)
|
|
|
|
.count();
|
|
|
|
self.purged_values.drain(..cnt);
|
|
|
|
}
|
2020-06-09 17:08:13 -07:00
|
|
|
|
|
|
|
/// For legacy tests
|
|
|
|
#[cfg(test)]
|
|
|
|
pub fn process_pull_response(
|
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
|
|
|
from: &Pubkey,
|
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
|
|
|
response: Vec<CrdsValue>,
|
|
|
|
now: u64,
|
|
|
|
) -> (usize, usize, usize) {
|
|
|
|
let mut stats = ProcessPullStats::default();
|
|
|
|
let (versioned, versioned_expired_timeout) =
|
|
|
|
self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
|
|
|
|
self.process_pull_responses(
|
|
|
|
crds,
|
|
|
|
from,
|
|
|
|
versioned,
|
|
|
|
versioned_expired_timeout,
|
|
|
|
now,
|
|
|
|
&mut stats,
|
|
|
|
);
|
|
|
|
(
|
|
|
|
stats.failed_timeout + stats.failed_insert,
|
|
|
|
stats.timeout_count,
|
|
|
|
stats.success,
|
|
|
|
)
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
2020-02-07 12:38:24 -08:00
|
|
|
use crate::crds_value::{CrdsData, Vote};
|
2019-08-13 18:04:14 -07:00
|
|
|
use itertools::Itertools;
|
2020-02-07 12:38:24 -08:00
|
|
|
use solana_perf::test_tx::test_tx;
|
2019-08-13 18:04:14 -07:00
|
|
|
use solana_sdk::hash::hash;
|
|
|
|
use solana_sdk::packet::PACKET_DATA_SIZE;
|
2019-02-20 17:08:56 -08:00
|
|
|
|
|
|
|
#[test]
|
2019-02-20 20:02:47 -08:00
|
|
|
fn test_new_pull_with_stakes() {
|
2019-02-20 17:08:56 -08:00
|
|
|
let mut crds = Crds::default();
|
2019-02-20 20:02:47 -08:00
|
|
|
let mut stakes = HashMap::new();
|
2019-02-20 17:08:56 -08:00
|
|
|
let node = CrdsGossipPull::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2019-02-20 17:08:56 -08:00
|
|
|
crds.insert(me.clone(), 0).unwrap();
|
|
|
|
for i in 1..=30 {
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2019-02-20 17:08:56 -08:00
|
|
|
let id = entry.label().pubkey();
|
|
|
|
crds.insert(entry.clone(), 0).unwrap();
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes.insert(id, i * 100);
|
2019-02-20 17:08:56 -08:00
|
|
|
}
|
|
|
|
let now = 1024;
|
2020-05-05 20:15:19 -07:00
|
|
|
let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes);
|
2019-02-26 11:45:10 -08:00
|
|
|
assert!(!options.is_empty());
|
|
|
|
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
|
|
|
|
// check that the highest stake holder is also the heaviest weighted.
|
|
|
|
assert_eq!(
|
|
|
|
*stakes.get(&options.get(0).unwrap().1.id).unwrap(),
|
|
|
|
3000_u64
|
|
|
|
);
|
2019-02-20 17:08:56 -08:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-05-05 20:15:19 -07:00
|
|
|
#[test]
|
|
|
|
fn test_no_pulls_from_different_shred_versions() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let stakes = HashMap::new();
|
|
|
|
let node = CrdsGossipPull::default();
|
|
|
|
|
|
|
|
let gossip = socketaddr!("127.0.0.1:1234");
|
|
|
|
|
|
|
|
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
|
|
|
id: Pubkey::new_rand(),
|
|
|
|
shred_version: 123,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
|
|
|
id: Pubkey::new_rand(),
|
|
|
|
shred_version: 0,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
|
|
|
id: Pubkey::new_rand(),
|
|
|
|
shred_version: 123,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
|
|
|
id: Pubkey::new_rand(),
|
|
|
|
shred_version: 456,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
|
|
|
|
crds.insert(me.clone(), 0).unwrap();
|
|
|
|
crds.insert(spy.clone(), 0).unwrap();
|
|
|
|
crds.insert(node_123.clone(), 0).unwrap();
|
|
|
|
crds.insert(node_456.clone(), 0).unwrap();
|
|
|
|
|
2020-08-18 18:52:45 -07:00
|
|
|
// shred version 123 should ignore nodes with versions 0 and 456
|
2020-05-05 20:15:19 -07:00
|
|
|
let options = node
|
|
|
|
.pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes)
|
|
|
|
.iter()
|
|
|
|
.map(|(_, c)| c.id)
|
|
|
|
.collect::<Vec<_>>();
|
2020-08-18 18:52:45 -07:00
|
|
|
assert_eq!(options.len(), 1);
|
|
|
|
assert!(!options.contains(&spy.pubkey()));
|
2020-05-05 20:15:19 -07:00
|
|
|
assert!(options.contains(&node_123.pubkey()));
|
|
|
|
|
|
|
|
// spy nodes will see all
|
|
|
|
let options = node
|
|
|
|
.pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes)
|
|
|
|
.iter()
|
|
|
|
.map(|(_, c)| c.id)
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
assert_eq!(options.len(), 3);
|
|
|
|
assert!(options.contains(&me.pubkey()));
|
|
|
|
assert!(options.contains(&node_123.pubkey()));
|
|
|
|
assert!(options.contains(&node_456.pubkey()));
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
|
|
|
fn test_new_pull_request() {
|
|
|
|
let mut crds = Crds::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
let id = entry.label().pubkey();
|
|
|
|
let node = CrdsGossipPull::default();
|
|
|
|
assert_eq!(
|
2020-05-05 20:15:19 -07:00
|
|
|
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::NoPeers)
|
|
|
|
);
|
|
|
|
|
|
|
|
crds.insert(entry.clone(), 0).unwrap();
|
|
|
|
assert_eq!(
|
2020-05-05 20:15:19 -07:00
|
|
|
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::NoPeers)
|
|
|
|
);
|
|
|
|
|
2019-11-03 10:07:51 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
crds.insert(new.clone(), 0).unwrap();
|
2020-05-05 20:15:19 -07:00
|
|
|
let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE);
|
2018-11-15 13:23:26 -08:00
|
|
|
let (to, _, self_info) = req.unwrap();
|
|
|
|
assert_eq!(to, new.label().pubkey());
|
|
|
|
assert_eq!(self_info, entry);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_new_mark_creation_time() {
|
|
|
|
let mut crds = Crds::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = entry.label().pubkey();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node = CrdsGossipPull::default();
|
|
|
|
crds.insert(entry.clone(), 0).unwrap();
|
2019-11-03 10:07:51 -08:00
|
|
|
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
crds.insert(old.clone(), 0).unwrap();
|
2019-11-03 10:07:51 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
crds.insert(new.clone(), 0).unwrap();
|
|
|
|
|
|
|
|
// set request creation time to max_value
|
2019-03-09 19:28:43 -08:00
|
|
|
node.mark_pull_request_creation_time(&new.label().pubkey(), u64::max_value());
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// odds of getting the other request should be 1 in u64::max_value()
|
|
|
|
for _ in 0..10 {
|
2019-08-13 18:04:14 -07:00
|
|
|
let req = node.new_pull_request(
|
|
|
|
&crds,
|
|
|
|
&node_pubkey,
|
2020-05-05 20:15:19 -07:00
|
|
|
0,
|
2019-08-13 18:04:14 -07:00
|
|
|
u64::max_value(),
|
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
let (to, _, self_info) = req.unwrap();
|
|
|
|
assert_eq!(to, old.label().pubkey());
|
|
|
|
assert_eq!(self_info, entry);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-11 06:26:42 -07:00
|
|
|
#[test]
|
|
|
|
fn test_generate_pull_responses() {
|
|
|
|
let mut node_crds = Crds::default();
|
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
|
|
|
let node_pubkey = entry.label().pubkey();
|
|
|
|
let node = CrdsGossipPull::default();
|
|
|
|
node_crds.insert(entry, 0).unwrap();
|
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
|
|
|
node_crds.insert(new, 0).unwrap();
|
|
|
|
let req = node.new_pull_request(
|
|
|
|
&node_crds,
|
|
|
|
&node_pubkey,
|
|
|
|
0,
|
|
|
|
0,
|
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut dest_crds = Crds::default();
|
|
|
|
let dest = CrdsGossipPull::default();
|
|
|
|
let (_, filters, caller) = req.unwrap();
|
|
|
|
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
|
|
|
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
|
|
|
|
|
|
|
assert_eq!(rsp[0].len(), 0);
|
|
|
|
|
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
|
|
|
)));
|
|
|
|
dest_crds
|
|
|
|
.insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
//should skip new value since caller is to old
|
|
|
|
let rsp =
|
|
|
|
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
|
|
|
|
assert_eq!(rsp[0].len(), 0);
|
|
|
|
|
2020-08-21 12:32:37 -07:00
|
|
|
assert_eq!(filters.len(), 1);
|
|
|
|
filters.push(filters[0].clone());
|
2020-08-11 06:26:42 -07:00
|
|
|
//should return new value since caller is new
|
2020-08-21 12:32:37 -07:00
|
|
|
filters[1].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-08-11 06:26:42 -07:00
|
|
|
&Pubkey::new_rand(),
|
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
|
|
|
|
)));
|
|
|
|
|
|
|
|
let rsp =
|
|
|
|
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
|
2020-08-21 12:32:37 -07:00
|
|
|
assert_eq!(rsp.len(), 2);
|
|
|
|
assert_eq!(rsp[0].len(), 0);
|
|
|
|
assert_eq!(rsp[1].len(), 1); // Orders are also preserved.
|
2020-08-11 06:26:42 -07:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
|
|
|
fn test_process_pull_request() {
|
|
|
|
let mut node_crds = Crds::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = entry.label().pubkey();
|
2018-11-15 13:23:26 -08:00
|
|
|
let node = CrdsGossipPull::default();
|
2020-05-15 09:35:43 -07:00
|
|
|
node_crds.insert(entry, 0).unwrap();
|
2019-11-03 10:07:51 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2020-05-15 09:35:43 -07:00
|
|
|
node_crds.insert(new, 0).unwrap();
|
2019-08-13 18:04:14 -07:00
|
|
|
let req = node.new_pull_request(
|
|
|
|
&node_crds,
|
|
|
|
&node_pubkey,
|
|
|
|
0,
|
2020-05-05 20:15:19 -07:00
|
|
|
0,
|
2019-08-13 18:04:14 -07:00
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let mut dest_crds = Crds::default();
|
|
|
|
let mut dest = CrdsGossipPull::default();
|
2019-08-13 18:04:14 -07:00
|
|
|
let (_, filters, caller) = req.unwrap();
|
2020-05-28 11:38:13 -07:00
|
|
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
2020-08-11 06:26:42 -07:00
|
|
|
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
2020-05-28 11:38:13 -07:00
|
|
|
dest.process_pull_requests(&mut dest_crds, filters, 1);
|
2019-08-15 17:04:45 -07:00
|
|
|
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert!(dest_crds.lookup(&caller.label()).is_some());
|
|
|
|
assert_eq!(
|
|
|
|
dest_crds
|
|
|
|
.lookup_versioned(&caller.label())
|
|
|
|
.unwrap()
|
|
|
|
.insert_timestamp,
|
|
|
|
1
|
|
|
|
);
|
|
|
|
assert_eq!(
|
|
|
|
dest_crds
|
|
|
|
.lookup_versioned(&caller.label())
|
|
|
|
.unwrap()
|
|
|
|
.local_timestamp,
|
|
|
|
1
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_pull_request_response() {
|
|
|
|
let mut node_crds = Crds::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
2020-08-11 06:26:42 -07:00
|
|
|
1,
|
2019-11-03 10:07:51 -08:00
|
|
|
)));
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = entry.label().pubkey();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node = CrdsGossipPull::default();
|
2020-05-15 09:35:43 -07:00
|
|
|
node_crds.insert(entry, 0).unwrap();
|
2019-03-08 19:28:19 -08:00
|
|
|
|
2019-11-03 10:07:51 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
2020-08-11 06:26:42 -07:00
|
|
|
1,
|
2019-11-03 10:07:51 -08:00
|
|
|
)));
|
2020-05-15 09:35:43 -07:00
|
|
|
node_crds.insert(new, 0).unwrap();
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let mut dest = CrdsGossipPull::default();
|
|
|
|
let mut dest_crds = Crds::default();
|
2019-03-30 20:37:33 -07:00
|
|
|
let new_id = Pubkey::new_rand();
|
2019-11-03 10:07:51 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&new_id, 1,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
dest_crds.insert(new.clone(), 0).unwrap();
|
|
|
|
|
|
|
|
// node contains a key from the dest node, but at an older local timestamp
|
2019-11-03 10:07:51 -08:00
|
|
|
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&new_id, 0,
|
|
|
|
)));
|
2019-03-08 19:28:19 -08:00
|
|
|
assert_eq!(same_key.label(), new.label());
|
|
|
|
assert!(same_key.wallclock() < new.wallclock());
|
2018-11-15 13:23:26 -08:00
|
|
|
node_crds.insert(same_key.clone(), 0).unwrap();
|
|
|
|
assert_eq!(
|
|
|
|
node_crds
|
|
|
|
.lookup_versioned(&same_key.label())
|
|
|
|
.unwrap()
|
|
|
|
.local_timestamp,
|
|
|
|
0
|
|
|
|
);
|
|
|
|
let mut done = false;
|
|
|
|
for _ in 0..30 {
|
|
|
|
// there is a chance of a false positive with bloom filters
|
2019-08-13 18:04:14 -07:00
|
|
|
let req = node.new_pull_request(
|
|
|
|
&node_crds,
|
|
|
|
&node_pubkey,
|
|
|
|
0,
|
2020-05-05 20:15:19 -07:00
|
|
|
0,
|
2019-08-13 18:04:14 -07:00
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
);
|
|
|
|
let (_, filters, caller) = req.unwrap();
|
2020-05-28 11:38:13 -07:00
|
|
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
2020-08-11 06:26:42 -07:00
|
|
|
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
|
2020-05-28 11:38:13 -07:00
|
|
|
dest.process_pull_requests(&mut dest_crds, filters, 0);
|
2019-08-15 17:04:45 -07:00
|
|
|
// if there is a false positive this is empty
|
|
|
|
// prob should be around 0.1 per iteration
|
|
|
|
if rsp.is_empty() {
|
|
|
|
continue;
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
if rsp.is_empty() {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
assert_eq!(rsp.len(), 1);
|
2020-05-25 15:03:34 -07:00
|
|
|
let failed = node
|
|
|
|
.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&node_pubkey,
|
|
|
|
&node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
|
|
|
|
rsp.pop().unwrap(),
|
|
|
|
1,
|
|
|
|
)
|
|
|
|
.0;
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(failed, 0);
|
|
|
|
assert_eq!(
|
|
|
|
node_crds
|
|
|
|
.lookup_versioned(&new.label())
|
|
|
|
.unwrap()
|
|
|
|
.local_timestamp,
|
|
|
|
1
|
|
|
|
);
|
|
|
|
// verify that the whole record was updated for dest since this is a response from dest
|
|
|
|
assert_eq!(
|
|
|
|
node_crds
|
|
|
|
.lookup_versioned(&same_key.label())
|
|
|
|
.unwrap()
|
|
|
|
.local_timestamp,
|
|
|
|
1
|
|
|
|
);
|
|
|
|
done = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
assert!(done);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_gossip_purge() {
|
|
|
|
let mut node_crds = Crds::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
let node_label = entry.label();
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = node_label.pubkey();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node = CrdsGossipPull::default();
|
2020-05-15 09:35:43 -07:00
|
|
|
node_crds.insert(entry, 0).unwrap();
|
2019-11-03 10:07:51 -08:00
|
|
|
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&Pubkey::new_rand(),
|
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
node_crds.insert(old.clone(), 0).unwrap();
|
|
|
|
let value_hash = node_crds.lookup_versioned(&old.label()).unwrap().value_hash;
|
|
|
|
|
|
|
|
//verify self is valid
|
|
|
|
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
|
|
|
|
|
|
|
// purge
|
2019-11-20 11:25:18 -08:00
|
|
|
let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
|
|
|
|
node.purge_active(&mut node_crds, 2, &timeouts);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
//verify self is still valid after purge
|
|
|
|
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
|
|
|
|
|
|
|
assert_eq!(node_crds.lookup_versioned(&old.label()), None);
|
|
|
|
assert_eq!(node.purged_values.len(), 1);
|
|
|
|
for _ in 0..30 {
|
|
|
|
// there is a chance of a false positive with bloom filters
|
|
|
|
// assert that purged value is still in the set
|
|
|
|
// chance of 30 consecutive false positives is 0.1^30
|
2019-08-13 18:04:14 -07:00
|
|
|
let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE);
|
|
|
|
assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// purge the value
|
|
|
|
node.purge_purged(1);
|
|
|
|
assert_eq!(node.purged_values.len(), 0);
|
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
#[test]
|
2020-05-15 09:35:43 -07:00
|
|
|
#[allow(clippy::float_cmp)]
|
2019-08-13 18:04:14 -07:00
|
|
|
fn test_crds_filter_mask() {
|
|
|
|
let filter = CrdsFilter::new_rand(1, 128);
|
|
|
|
assert_eq!(filter.mask, !0x0);
|
|
|
|
assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64);
|
|
|
|
//1000/9 = 111, so 7 bits are needed to mask it
|
|
|
|
assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32);
|
|
|
|
let filter = CrdsFilter::new_rand(1000, 10);
|
2020-05-15 09:35:43 -07:00
|
|
|
assert_eq!(filter.mask & 0x00_ffff_ffff, 0x00_ffff_ffff);
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_add_no_mask() {
|
|
|
|
let mut filter = CrdsFilter::new_rand(1, 128);
|
|
|
|
let h: Hash = hash(Hash::default().as_ref());
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
let h: Hash = hash(h.as_ref());
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_add_mask() {
|
|
|
|
let mut filter = CrdsFilter::new_rand(1000, 10);
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
while !filter.test_mask(&h) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
assert!(filter.test_mask(&h));
|
|
|
|
//if the mask succeeds, we want the guaranteed negative
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_complete_set_add_mask() {
|
|
|
|
let mut filters = CrdsFilter::new_complete_set(1000, 10);
|
|
|
|
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
// rev to make the hash::default() miss on the first few test_masks
|
|
|
|
while !filters.iter().rev().any(|f| f.test_mask(&h)) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap();
|
|
|
|
assert!(filter.test_mask(&h));
|
|
|
|
//if the mask succeeds, we want the guaranteed negative
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_contains_mask() {
|
|
|
|
let filter = CrdsFilter::new_rand(1000, 10);
|
|
|
|
assert!(filter.mask_bits > 0);
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
while filter.test_mask(&h) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
assert!(!filter.test_mask(&h));
|
|
|
|
//if the mask fails, the hash is contained in the set, and can be treated as a false
|
|
|
|
//positive
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_mask() {
|
|
|
|
for i in 0..16 {
|
|
|
|
run_test_mask(i);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn run_test_mask(mask_bits: u32) {
|
|
|
|
let masks: Vec<_> = (0..2u64.pow(mask_bits))
|
|
|
|
.map(|seed| CrdsFilter::compute_mask(seed, mask_bits))
|
|
|
|
.dedup()
|
|
|
|
.collect();
|
|
|
|
assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize)
|
|
|
|
}
|
2020-02-07 12:38:24 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_process_pull_response() {
|
|
|
|
let mut node_crds = Crds::default();
|
|
|
|
let mut node = CrdsGossipPull::default();
|
|
|
|
|
|
|
|
let peer_pubkey = Pubkey::new_rand();
|
|
|
|
let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
|
|
|
ContactInfo::new_localhost(&peer_pubkey, 0),
|
|
|
|
));
|
|
|
|
let mut timeouts = HashMap::new();
|
|
|
|
timeouts.insert(Pubkey::default(), node.crds_timeout);
|
|
|
|
timeouts.insert(peer_pubkey, node.msg_timeout + 1);
|
|
|
|
// inserting a fresh value should be fine.
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_entry.clone()],
|
|
|
|
1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut node_crds = Crds::default();
|
|
|
|
let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
|
|
|
ContactInfo::new_localhost(&peer_pubkey, 0),
|
|
|
|
));
|
|
|
|
// check that old contact infos fail if they are too old, regardless of "timeouts"
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_entry.clone(), unstaked_peer_entry],
|
|
|
|
node.msg_timeout + 100,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
2
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut node_crds = Crds::default();
|
|
|
|
// check that old contact infos can still land as long as they have a "timeouts" entry
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
2020-05-15 09:35:43 -07:00
|
|
|
vec![peer_entry],
|
2020-02-07 12:38:24 -08:00
|
|
|
node.msg_timeout + 1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
|
|
|
// construct something that's not a contact info
|
|
|
|
let peer_vote =
|
|
|
|
CrdsValue::new_unsigned(CrdsData::Vote(0, Vote::new(&peer_pubkey, test_tx(), 0)));
|
|
|
|
// check that older CrdsValues (non-ContactInfos) infos pass even if are too old,
|
|
|
|
// but a recent contact info (inserted above) exists
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_vote.clone()],
|
|
|
|
node.msg_timeout + 1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut node_crds = Crds::default();
|
|
|
|
// without a contact info, inserting an old value should fail
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
|
|
|
&mut node_crds,
|
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
2020-05-15 09:35:43 -07:00
|
|
|
vec![peer_vote],
|
2020-02-07 12:38:24 -08:00
|
|
|
node.msg_timeout + 1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
1
|
|
|
|
);
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|