//! Crds Gossip Pull overlay //! This module implements the anti-entropy protocol for the network. //! //! The basic strategy is as follows: //! 1. Construct a bloom filter of the local data set //! 2. Randomly ask a node on the network for data that is not contained in the bloom filter. //! //! Bloom filters have a false positive rate. Each requests uses a different bloom filter //! with random hash functions. So each subsequent request will have a different distribution //! of false positives. use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use itertools::Itertools; use rand::distributions::{Distribution, WeightedIndex}; use rand::Rng; use rayon::{prelude::*, ThreadPool}; use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; use std::convert::TryInto; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000; // Retention period of hashes of received outdated values. const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; // Do not pull from peers which have not been updated for this long. const PULL_ACTIVE_TIMEOUT_MS: u64 = 60_000; pub const FALSE_RATE: f64 = 0.1f64; pub const KEYS: f64 = 8f64; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)] pub struct CrdsFilter { pub filter: Bloom, mask: u64, mask_bits: u32, } impl Default for CrdsFilter { fn default() -> Self { CrdsFilter { filter: Bloom::default(), mask: !0u64, mask_bits: 0u32, } } } impl solana_sdk::sanitize::Sanitize for CrdsFilter { fn sanitize(&self) -> std::result::Result<(), solana_sdk::sanitize::SanitizeError> { self.filter.sanitize()?; Ok(()) } } impl CrdsFilter { pub fn new_rand(num_items: usize, max_bytes: usize) -> Self { let max_bits = (max_bytes * 8) as f64; let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS); let mask_bits = Self::mask_bits(num_items as f64, max_items as f64); let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits)); let mask = Self::compute_mask(seed, mask_bits); CrdsFilter { filter, mask, mask_bits, } } fn compute_mask(seed: u64, mask_bits: u32) -> u64 { assert!(seed <= 2u64.pow(mask_bits)); let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0); seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64 } fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 { let m = max_bits; let p = false_rate; let k = num_keys; (m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil() } fn mask_bits(num_items: f64, max_items: f64) -> u32 { // for small ratios this can result in a negative number, ensure it returns 0 instead ((num_items / max_items).log2().ceil()).max(0.0) as u32 } pub fn hash_as_u64(item: &Hash) -> u64 { let buf = item.as_ref()[..8].try_into().unwrap(); u64::from_le_bytes(buf) } fn test_mask(&self, item: &Hash) -> bool { // only consider the highest mask_bits bits from the hash and set the rest to 1. let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64); let bits = Self::hash_as_u64(item) | ones; bits == self.mask } #[cfg(test)] fn add(&mut self, item: &Hash) { if self.test_mask(item) { self.filter.add(item); } } #[cfg(test)] fn contains(&self, item: &Hash) -> bool { if !self.test_mask(item) { return true; } self.filter.contains(item) } fn filter_contains(&self, item: &Hash) -> bool { self.filter.contains(item) } } /// A vector of crds filters that together hold a complete set of Hashes. struct CrdsFilterSet { filters: Vec>, mask_bits: u32, } impl CrdsFilterSet { fn new(num_items: usize, max_bytes: usize) -> Self { let max_bits = (max_bytes * 8) as f64; let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS); let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items as f64); let filters = std::iter::repeat_with(|| { Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into() }) .take(1 << mask_bits) .collect(); Self { filters, mask_bits } } fn add(&self, hash_value: Hash) { let index = CrdsFilter::hash_as_u64(&hash_value) .checked_shr(64 - self.mask_bits) .unwrap_or(0); self.filters[index as usize].add(&hash_value); } } impl From for Vec { fn from(cfs: CrdsFilterSet) -> Self { let mask_bits = cfs.mask_bits; cfs.filters .into_iter() .enumerate() .map(|(seed, filter)| CrdsFilter { filter: filter.into(), mask: CrdsFilter::compute_mask(seed as u64, mask_bits), mask_bits, }) .collect() } } #[derive(Default)] pub struct ProcessPullStats { pub success: usize, pub failed_insert: usize, pub failed_timeout: usize, pub timeout_count: usize, } #[derive(Clone)] pub struct CrdsGossipPull { /// timestamp of last request pub pull_request_time: HashMap, /// hash and insert time pub purged_values: VecDeque<(Hash, u64)>, // Hash value and record time (ms) of the pull responses which failed to be // inserted in crds table; Preserved to stop the sender to send back the // same outdated payload again by adding them to the filter for the next // pull request. pub failed_inserts: VecDeque<(Hash, u64)>, pub crds_timeout: u64, pub msg_timeout: u64, pub num_pulls: usize, } impl Default for CrdsGossipPull { fn default() -> Self { Self { purged_values: VecDeque::new(), pull_request_time: HashMap::new(), failed_inserts: VecDeque::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, num_pulls: 0, } } } impl CrdsGossipPull { /// generate a random request pub fn new_pull_request( &self, thread_pool: &ThreadPool, crds: &Crds, self_id: &Pubkey, self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { let options = self.pull_options( crds, &self_id, self_shred_version, now, gossip_validators, stakes, ); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let random = index.sample(&mut rand::thread_rng()); let self_info = crds .lookup(&CrdsValueLabel::ContactInfo(*self_id)) .unwrap_or_else(|| panic!("self_id invalid {}", self_id)); Ok((options[random].1.id, filters, self_info.clone())) } fn pull_options<'a>( &self, crds: &'a Crds, self_id: &Pubkey, self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, ) -> Vec<(f32, &'a ContactInfo)> { let mut rng = rand::thread_rng(); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); crds.get_nodes() .filter_map(|value| { let info = value.value.contact_info().unwrap(); // Stop pulling from nodes which have not been active recently. if value.local_timestamp < active_cutoff { // In order to mitigate eclipse attack, for staked nodes // continue retrying periodically. let stake = stakes.get(&info.id).unwrap_or(&0); if *stake == 0 || !rng.gen_ratio(1, 16) { return None; } } Some(info) }) .filter(|v| { v.id != *self_id && ContactInfo::is_valid_address(&v.gossip) && (self_shred_version == 0 || self_shred_version == v.shred_version) && gossip_validators .map_or(true, |gossip_validators| gossip_validators.contains(&v.id)) }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); let since = ((now - req_time) / 1024) as u32; let stake = get_stake(&item.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, item) }) .collect() } /// time when a request to `from` was initiated /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { self.pull_request_time.insert(*from, now); } /// Store an old hash in the purged values set pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) { self.purged_values.push_back((hash, timestamp)) } /// process a pull request pub fn process_pull_requests(&mut self, crds: &mut Crds, callers: I, now: u64) where I: IntoIterator, { for caller in callers { let key = caller.label().pubkey(); if let Ok(Some(val)) = crds.insert(caller, now) { self.purged_values .push_back((val.value_hash, val.local_timestamp)); } crds.update_record_timestamp(&key, now); } } /// Create gossip responses to pull requests pub fn generate_pull_responses( &self, crds: &Crds, requests: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { self.filter_crds_values(crds, requests, output_size_limit, now) } // Checks if responses should be inserted and // returns those responses converted to VersionedCrdsValue // Separated in three vecs as: // .0 => responses that update the owner timestamp // .1 => responses that do not update the owner timestamp // .2 => hash value of outdated values which will fail to insert. pub fn filter_pull_responses( &self, crds: &Crds, timeouts: &HashMap, responses: Vec, now: u64, stats: &mut ProcessPullStats, ) -> (Vec, Vec, Vec) { let mut versioned = vec![]; let mut versioned_expired_timestamp = vec![]; let mut failed_inserts = vec![]; let mut maybe_push = |response, values: &mut Vec| { let (push, value) = crds.would_insert(response, now); if push { values.push(value); } else { failed_inserts.push(value.value_hash) } }; for r in responses { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout if now > r.wallclock().checked_add(self.msg_timeout).unwrap_or(0) || now + self.msg_timeout < r.wallclock() { match &r.label() { CrdsValueLabel::ContactInfo(_) => { // Check if this ContactInfo is actually too old, it's possible that it has // stake and so might have a longer effective timeout let timeout = *timeouts .get(&owner) .unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap()); if now > r.wallclock().checked_add(timeout).unwrap_or(0) || now + timeout < r.wallclock() { stats.timeout_count += 1; stats.failed_timeout += 1; continue; } } _ => { // Before discarding this value, check if a ContactInfo for the owner // exists in the table. If it doesn't, that implies that this value can be discarded if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { stats.timeout_count += 1; stats.failed_timeout += 1; } else { // Silently insert this old value without bumping record timestamps maybe_push(r, &mut versioned_expired_timestamp); } continue; } } } maybe_push(r, &mut versioned); } (versioned, versioned_expired_timestamp, failed_inserts) } /// process a vec of pull responses pub fn process_pull_responses( &mut self, crds: &mut Crds, from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, mut failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, ) -> Vec<(CrdsValueLabel, Hash, u64)> { let mut success = vec![]; let mut owners = HashSet::new(); for r in responses_expired_timeout { let value_hash = r.value_hash; if crds.insert_versioned(r).is_err() { failed_inserts.push(value_hash); } } for r in responses { let label = r.value.label(); let wc = r.value.wallclock(); let hash = r.value_hash; match crds.insert_versioned(r) { Err(_) => failed_inserts.push(hash), Ok(old) => { stats.success += 1; self.num_pulls += 1; owners.insert(label.pubkey()); success.push((label, hash, wc)); if let Some(val) = old { self.purged_values .push_back((val.value_hash, val.local_timestamp)) } } } } owners.insert(*from); for owner in owners { crds.update_record_timestamp(&owner, now); } stats.failed_insert += failed_inserts.len(); self.purge_failed_inserts(now); self.failed_inserts .extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); success } pub fn purge_failed_inserts(&mut self, now: u64) { if FAILED_INSERTS_RETENTION_MS < now { let cutoff = now - FAILED_INSERTS_RETENTION_MS; let outdated = self .failed_inserts .iter() .take_while(|(_, ts)| *ts < cutoff) .count(); self.failed_inserts.drain(..outdated); } } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter pub fn build_crds_filters( &self, thread_pool: &ThreadPool, crds: &Crds, bloom_size: usize, ) -> Vec { const PAR_MIN_LENGTH: usize = 512; let num = cmp::max( CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, crds.len() + self.purged_values.len() + self.failed_inserts.len(), ); let filters = CrdsFilterSet::new(num, bloom_size); thread_pool.install(|| { crds.par_values() .with_min_len(PAR_MIN_LENGTH) .map(|v| v.value_hash) .chain( self.purged_values .par_iter() .with_min_len(PAR_MIN_LENGTH) .map(|(v, _)| *v), ) .chain( self.failed_inserts .par_iter() .with_min_len(PAR_MIN_LENGTH) .map(|(v, _)| *v), ) .for_each(|v| filters.add(v)); }); filters.into() } /// filter values that fail the bloom filter up to max_bytes fn filter_crds_values( &self, crds: &Crds, filters: &[(CrdsValue, CrdsFilter)], mut output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); //skip filters from callers that are too old let future = now.saturating_add(msg_timeout); let past = now.saturating_sub(msg_timeout); let mut dropped_requests = 0; let mut total_skipped = 0; let ret: Vec<_> = filters .iter() .map(|(caller, filter)| { if output_size_limit == 0 { return None; } let caller_wallclock = caller.wallclock(); if caller_wallclock >= future || caller_wallclock < past { dropped_requests += 1; return Some(vec![]); } let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); let out: Vec<_> = crds .filter_bitmask(filter.mask, filter.mask_bits) .filter_map(|item| { debug_assert!(filter.test_mask(&item.value_hash)); //skip values that are too new if item.value.wallclock() > caller_wallclock { total_skipped += 1; None } else if filter.filter_contains(&item.value_hash) { None } else { Some(item.value.clone()) } }) .take(output_size_limit) .collect(); output_size_limit -= out.len(); Some(out) }) .while_some() .collect(); inc_new_counter_info!( "gossip_filter_crds_values-dropped_requests", dropped_requests + filters.len() - ret.len() ); inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } pub fn make_timeouts_def( &self, self_id: &Pubkey, stakes: &HashMap, epoch_ms: u64, min_ts: u64, ) -> HashMap { let mut timeouts: HashMap = stakes.keys().map(|s| (*s, epoch_ms)).collect(); timeouts.insert(*self_id, std::u64::MAX); timeouts.insert(Pubkey::default(), min_ts); timeouts } pub fn make_timeouts( &self, self_id: &Pubkey, stakes: &HashMap, epoch_ms: u64, ) -> HashMap { self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout) } /// Purge values from the crds that are older then `active_timeout` /// The value_hash of an active item is put into self.purged_values queue pub fn purge_active( &mut self, thread_pool: &ThreadPool, crds: &mut Crds, now: u64, timeouts: &HashMap, ) -> usize { let num_purged_values = self.purged_values.len(); self.purged_values.extend( crds.find_old_labels(thread_pool, now, timeouts) .into_iter() .filter_map(|label| { let val = crds.remove(&label)?; Some((val.value_hash, val.local_timestamp)) }), ); self.purged_values.len() - num_purged_values } /// Purge values from the `self.purged_values` queue that are older then purge_timeout pub fn purge_purged(&mut self, min_ts: u64) { let cnt = self .purged_values .iter() .take_while(|v| v.1 < min_ts) .count(); self.purged_values.drain(..cnt); } /// For legacy tests #[cfg(test)] pub fn process_pull_response( &mut self, crds: &mut Crds, from: &Pubkey, timeouts: &HashMap, response: Vec, now: u64, ) -> (usize, usize, usize) { let mut stats = ProcessPullStats::default(); let (versioned, versioned_expired_timeout, failed_inserts) = self.filter_pull_responses(crds, timeouts, response, now, &mut stats); self.process_pull_responses( crds, from, versioned, versioned_expired_timeout, failed_inserts, now, &mut stats, ); ( stats.failed_timeout + stats.failed_insert, stats.timeout_count, stats.success, ) } } #[cfg(test)] mod test { use super::*; use crate::cluster_info::MAX_BLOOM_SIZE; use crate::contact_info::ContactInfo; use crate::crds_value::{CrdsData, Vote}; use itertools::Itertools; use rand::thread_rng; use rayon::ThreadPoolBuilder; use solana_perf::test_tx::test_tx; use solana_sdk::hash::{hash, HASH_BYTES}; use solana_sdk::packet::PACKET_DATA_SIZE; #[test] fn test_hash_as_u64() { let arr: Vec = (0..HASH_BYTES).map(|i| i as u8 + 1).collect(); let hash = Hash::new(&arr); assert_eq!(CrdsFilter::hash_as_u64(&hash), 0x807060504030201); } #[test] fn test_hash_as_u64_random() { fn hash_as_u64_bitops(hash: &Hash) -> u64 { let mut out = 0; for (i, val) in hash.as_ref().iter().enumerate().take(8) { out |= (u64::from(*val)) << (i * 8) as u64; } out } let mut rng = thread_rng(); for _ in 0..100 { let hash = solana_sdk::hash::new_rand(&mut rng); assert_eq!(CrdsFilter::hash_as_u64(&hash), hash_as_u64_bitops(&hash)); } } #[test] fn test_crds_filter_default() { let filter = CrdsFilter::default(); let mask = CrdsFilter::compute_mask(0, filter.mask_bits); assert_eq!(filter.mask, mask); let mut rng = thread_rng(); for _ in 0..10 { let hash = solana_sdk::hash::new_rand(&mut rng); assert!(filter.test_mask(&hash)); } } #[test] fn test_new_pull_with_stakes() { let mut crds = Crds::default(); let mut stakes = HashMap::new(); let node = CrdsGossipPull::default(); let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); crds.insert(me.clone(), 0).unwrap(); for i in 1..=30 { let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let id = entry.label().pubkey(); crds.insert(entry.clone(), 0).unwrap(); stakes.insert(id, i * 100); } let now = 1024; let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. assert_eq!( *stakes.get(&options.get(0).unwrap().1.id).unwrap(), 3000_u64 ); } #[test] fn test_no_pulls_from_different_shred_versions() { let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPull::default(); let gossip = socketaddr!("127.0.0.1:1234"); let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), shred_version: 123, gossip, ..ContactInfo::default() })); let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), shred_version: 0, gossip, ..ContactInfo::default() })); let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), shred_version: 123, gossip, ..ContactInfo::default() })); let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), shred_version: 456, gossip, ..ContactInfo::default() })); crds.insert(me.clone(), 0).unwrap(); crds.insert(spy.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_456.clone(), 0).unwrap(); // shred version 123 should ignore nodes with versions 0 and 456 let options = node .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); assert_eq!(options.len(), 1); assert!(!options.contains(&spy.pubkey())); assert!(options.contains(&node_123.pubkey())); // spy nodes will see all let options = node .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); assert_eq!(options.len(), 3); assert!(options.contains(&me.pubkey())); assert!(options.contains(&node_123.pubkey())); assert!(options.contains(&node_456.pubkey())); } #[test] fn test_pulls_only_from_allowed() { let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPull::default(); let gossip = socketaddr!("127.0.0.1:1234"); let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), gossip, ..ContactInfo::default() })); let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { id: solana_sdk::pubkey::new_rand(), gossip, ..ContactInfo::default() })); crds.insert(me.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap(); // Empty gossip_validators -- will pull from nobody let mut gossip_validators = HashSet::new(); let options = node.pull_options( &crds, &me.label().pubkey(), 0, 0, Some(&gossip_validators), &stakes, ); assert!(options.is_empty()); // Unknown pubkey in gossip_validators -- will pull from nobody gossip_validators.insert(solana_sdk::pubkey::new_rand()); let options = node.pull_options( &crds, &me.label().pubkey(), 0, 0, Some(&gossip_validators), &stakes, ); assert!(options.is_empty()); // node_123 pubkey in gossip_validators -- will pull from it gossip_validators.insert(node_123.pubkey()); let options = node.pull_options( &crds, &me.label().pubkey(), 0, 0, Some(&gossip_validators), &stakes, ); assert_eq!(options.len(), 1); assert_eq!(options[0].1.id, node_123.pubkey()); } #[test] fn test_crds_filter_set_add() { let mut rng = thread_rng(); let crds_filter_set = CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); let hash_values: Vec<_> = std::iter::repeat_with(|| solana_sdk::hash::new_rand(&mut rng)) .take(1024) .collect(); for hash_value in &hash_values { crds_filter_set.add(*hash_value); } let filters: Vec = crds_filter_set.into(); assert_eq!(filters.len(), 1024); for hash_value in hash_values { let mut num_hits = 0; let mut false_positives = 0; for filter in &filters { if filter.test_mask(&hash_value) { num_hits += 1; assert!(filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value)); } else if filter.filter.contains(&hash_value) { false_positives += 1; } } assert_eq!(num_hits, 1); assert!(false_positives < 5); } } #[test] fn test_crds_filter_set_new() { // Validates invariances required by CrdsFilterSet::get in the // vector of filters generated by CrdsFilterSet::new. let filters: Vec = CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into(); assert_eq!(filters.len(), 16384); let mask_bits = filters[0].mask_bits; let right_shift = 64 - mask_bits; let ones = !0u64 >> mask_bits; for (i, filter) in filters.iter().enumerate() { // Check that all mask_bits are equal. assert_eq!(mask_bits, filter.mask_bits); assert_eq!(i as u64, filter.mask >> right_shift); assert_eq!(ones, ones & filter.mask); } } #[test] fn test_build_crds_filter() { let mut rng = thread_rng(); let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds_gossip_pull = CrdsGossipPull::default(); let mut crds = Crds::default(); for _ in 0..10_000 { crds_gossip_pull .purged_values .push_back((solana_sdk::hash::new_rand(&mut rng), rng.gen())); } let mut num_inserts = 0; for _ in 0..20_000 { if crds .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) .is_ok() { num_inserts += 1; } } assert_eq!(num_inserts, 20_000); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); assert_eq!(filters.len(), 32); let hash_values: Vec<_> = crds .values() .map(|v| v.value_hash) .chain( crds_gossip_pull .purged_values .iter() .map(|(value_hash, _)| value_hash) .cloned(), ) .collect(); assert_eq!(hash_values.len(), 10_000 + 20_000); let mut false_positives = 0; for hash_value in hash_values { let mut num_hits = 0; for filter in &filters { if filter.test_mask(&hash_value) { num_hits += 1; assert!(filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value)); } else if filter.filter.contains(&hash_value) { false_positives += 1; } } assert_eq!(num_hits, 1); } assert!(false_positives < 50_000, "fp: {}", false_positives); } #[test] fn test_new_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( node.new_pull_request( &thread_pool, &crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE ), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( node.new_pull_request( &thread_pool, &crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE ), Err(CrdsGossipError::NoPeers) ); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); crds.insert(new.clone(), 0).unwrap(); let req = node.new_pull_request( &thread_pool, &crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, ); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); } #[test] fn test_new_mark_creation_time() { let now: u64 = 1_605_127_770_789; let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); crds.insert(entry.clone(), now).unwrap(); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); crds.insert(old.clone(), now).unwrap(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); crds.insert(new.clone(), now).unwrap(); // set request creation time to now. let now = now + 50_000; node.mark_pull_request_creation_time(&new.label().pubkey(), now); // odds of getting the other request should be close to 1. let now = now + 1_000; for _ in 0..10 { let req = node.new_pull_request( &thread_pool, &crds, &node_pubkey, 0, now, None, &HashMap::new(), PACKET_DATA_SIZE, ); let (to, _, self_info) = req.unwrap(); assert_eq!(to, old.label().pubkey()); assert_eq!(self_info, entry); } } #[test] fn test_generate_pull_responses() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); node_crds.insert(new, 0).unwrap(); let req = node.new_pull_request( &thread_pool, &node_crds, &node_pubkey, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, ); let mut dest_crds = Crds::default(); let dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, 0, ); assert_eq!(rsp[0].len(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ))); dest_crds .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) .unwrap(); //should skip new value since caller is to old let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); assert_eq!(rsp[0].len(), 0); assert_eq!(filters.len(), 1); filters.push(filters[0].clone()); //should return new value since caller is new filters[1].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1, ))); let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); assert_eq!(rsp.len(), 2); assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[1].len(), 1); // Orders are also preserved. } #[test] fn test_process_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); node_crds.insert(new, 0).unwrap(); let req = node.new_pull_request( &thread_pool, &node_crds, &node_pubkey, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, ); let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, 0, ); dest.process_pull_requests( &mut dest_crds, filters.into_iter().map(|(caller, _)| caller), 1, ); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( dest_crds .lookup_versioned(&caller.label()) .unwrap() .insert_timestamp, 1 ); assert_eq!( dest_crds .lookup_versioned(&caller.label()) .unwrap() .local_timestamp, 1 ); } #[test] fn test_process_pull_request_response() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 1, ))); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 1, ))); node_crds.insert(new, 0).unwrap(); let mut dest = CrdsGossipPull::default(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &new_id, 1, ))); dest_crds.insert(new.clone(), 0).unwrap(); // node contains a key from the dest node, but at an older local timestamp let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &new_id, 0, ))); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); node_crds.insert(same_key.clone(), 0).unwrap(); assert_eq!( node_crds .lookup_versioned(&same_key.label()) .unwrap() .local_timestamp, 0 ); let mut done = false; for _ in 0..30 { // there is a chance of a false positive with bloom filters let req = node.new_pull_request( &thread_pool, &node_crds, &node_pubkey, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE, ); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut rsp = dest.generate_pull_responses( &dest_crds, &filters, /*output_size_limit=*/ usize::MAX, 0, ); dest.process_pull_requests( &mut dest_crds, filters.into_iter().map(|(caller, _)| caller), 0, ); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { continue; } if rsp.is_empty() { continue; } assert_eq!(rsp.len(), 1); let failed = node .process_pull_response( &mut node_crds, &node_pubkey, &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1), rsp.pop().unwrap(), 1, ) .0; assert_eq!(failed, 0); assert_eq!( node_crds .lookup_versioned(&new.label()) .unwrap() .local_timestamp, 1 ); // verify that the whole record was updated for dest since this is a response from dest assert_eq!( node_crds .lookup_versioned(&same_key.label()) .unwrap() .local_timestamp, 1 ); done = true; break; } assert!(done); } #[test] fn test_gossip_purge() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let node_label = entry.label(); let node_pubkey = node_label.pubkey(); let mut node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); node_crds.insert(old.clone(), 0).unwrap(); let value_hash = node_crds.lookup_versioned(&old.label()).unwrap().value_hash; //verify self is valid assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); // purge let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1); node.purge_active(&thread_pool, &mut node_crds, 2, &timeouts); //verify self is still valid after purge assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); assert_eq!(node_crds.lookup_versioned(&old.label()), None); assert_eq!(node.purged_values.len(), 1); for _ in 0..30 { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set // chance of 30 consecutive false positives is 0.1^30 let filters = node.build_crds_filters(&thread_pool, &node_crds, PACKET_DATA_SIZE); assert!(filters.iter().any(|filter| filter.contains(&value_hash))); } // purge the value node.purge_purged(1); assert_eq!(node.purged_values.len(), 0); } #[test] #[allow(clippy::float_cmp)] fn test_crds_filter_mask() { let filter = CrdsFilter::new_rand(1, 128); assert_eq!(filter.mask, !0x0); assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64); //1000/9 = 111, so 7 bits are needed to mask it assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32); let filter = CrdsFilter::new_rand(1000, 10); assert_eq!(filter.mask & 0x00_ffff_ffff, 0x00_ffff_ffff); } #[test] fn test_crds_filter_add_no_mask() { let mut filter = CrdsFilter::new_rand(1, 128); let h: Hash = hash(Hash::default().as_ref()); assert!(!filter.contains(&h)); filter.add(&h); assert!(filter.contains(&h)); let h: Hash = hash(h.as_ref()); assert!(!filter.contains(&h)); } #[test] fn test_crds_filter_add_mask() { let mut filter = CrdsFilter::new_rand(1000, 10); let mut h: Hash = Hash::default(); while !filter.test_mask(&h) { h = hash(h.as_ref()); } assert!(filter.test_mask(&h)); //if the mask succeeds, we want the guaranteed negative assert!(!filter.contains(&h)); filter.add(&h); assert!(filter.contains(&h)); } #[test] fn test_crds_filter_complete_set_add_mask() { let mut filters: Vec = CrdsFilterSet::new(1000, 10).into(); assert!(filters.iter().all(|f| f.mask_bits > 0)); let mut h: Hash = Hash::default(); // rev to make the hash::default() miss on the first few test_masks while !filters.iter().rev().any(|f| f.test_mask(&h)) { h = hash(h.as_ref()); } let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap(); assert!(filter.test_mask(&h)); //if the mask succeeds, we want the guaranteed negative assert!(!filter.contains(&h)); filter.add(&h); assert!(filter.contains(&h)); } #[test] fn test_crds_filter_contains_mask() { let filter = CrdsFilter::new_rand(1000, 10); assert!(filter.mask_bits > 0); let mut h: Hash = Hash::default(); while filter.test_mask(&h) { h = hash(h.as_ref()); } assert!(!filter.test_mask(&h)); //if the mask fails, the hash is contained in the set, and can be treated as a false //positive assert!(filter.contains(&h)); } #[test] fn test_mask() { for i in 0..16 { run_test_mask(i); } } fn run_test_mask(mask_bits: u32) { let masks: Vec<_> = (0..2u64.pow(mask_bits)) .map(|seed| CrdsFilter::compute_mask(seed, mask_bits)) .dedup() .collect(); assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize) } #[test] fn test_process_pull_response() { let mut node_crds = Crds::default(); let mut node = CrdsGossipPull::default(); let peer_pubkey = solana_sdk::pubkey::new_rand(); let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&peer_pubkey, 0), )); let mut timeouts = HashMap::new(); timeouts.insert(Pubkey::default(), node.crds_timeout); timeouts.insert(peer_pubkey, node.msg_timeout + 1); // inserting a fresh value should be fine. assert_eq!( node.process_pull_response( &mut node_crds, &peer_pubkey, &timeouts, vec![peer_entry.clone()], 1, ) .0, 0 ); let mut node_crds = Crds::default(); let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&peer_pubkey, 0), )); // check that old contact infos fail if they are too old, regardless of "timeouts" assert_eq!( node.process_pull_response( &mut node_crds, &peer_pubkey, &timeouts, vec![peer_entry.clone(), unstaked_peer_entry], node.msg_timeout + 100, ) .0, 2 ); let mut node_crds = Crds::default(); // check that old contact infos can still land as long as they have a "timeouts" entry assert_eq!( node.process_pull_response( &mut node_crds, &peer_pubkey, &timeouts, vec![peer_entry], node.msg_timeout + 1, ) .0, 0 ); // construct something that's not a contact info let peer_vote = CrdsValue::new_unsigned(CrdsData::Vote(0, Vote::new(peer_pubkey, test_tx(), 0))); // check that older CrdsValues (non-ContactInfos) infos pass even if are too old, // but a recent contact info (inserted above) exists assert_eq!( node.process_pull_response( &mut node_crds, &peer_pubkey, &timeouts, vec![peer_vote.clone()], node.msg_timeout + 1, ) .0, 0 ); let mut node_crds = Crds::default(); // without a contact info, inserting an old value should fail assert_eq!( node.process_pull_response( &mut node_crds, &peer_pubkey, &timeouts, vec![peer_vote], node.msg_timeout + 1, ) .0, 1 ); } }