2021-07-15 17:40:07 -07:00
|
|
|
//! Crds Gossip Pull overlay.
|
|
|
|
//!
|
2018-11-15 13:23:26 -08:00
|
|
|
//! This module implements the anti-entropy protocol for the network.
|
|
|
|
//!
|
|
|
|
//! The basic strategy is as follows:
|
2021-07-15 17:40:07 -07:00
|
|
|
//!
|
2018-11-15 13:23:26 -08:00
|
|
|
//! 1. Construct a bloom filter of the local data set
|
|
|
|
//! 2. Randomly ask a node on the network for data that is not contained in the bloom filter.
|
|
|
|
//!
|
|
|
|
//! Bloom filters have a false positive rate. Each requests uses a different bloom filter
|
|
|
|
//! with random hash functions. So each subsequent request will have a different distribution
|
|
|
|
//! of false positives.
|
|
|
|
|
2021-05-26 08:15:46 -07:00
|
|
|
use {
|
|
|
|
crate::{
|
2023-01-14 07:44:38 -08:00
|
|
|
cluster_info::Ping,
|
2022-04-18 16:14:59 -07:00
|
|
|
cluster_info_metrics::GossipStats,
|
2021-10-26 06:02:30 -07:00
|
|
|
crds::{Crds, GossipRoute, VersionedCrdsValue},
|
2023-01-14 07:44:38 -08:00
|
|
|
crds_gossip,
|
2021-05-26 08:15:46 -07:00
|
|
|
crds_gossip_error::CrdsGossipError,
|
2023-01-26 09:02:18 -08:00
|
|
|
crds_value::{CrdsData, CrdsValue},
|
2023-01-08 08:00:55 -08:00
|
|
|
legacy_contact_info::LegacyContactInfo as ContactInfo,
|
2021-05-26 08:15:46 -07:00
|
|
|
ping_pong::PingCache,
|
|
|
|
},
|
2022-05-26 05:45:53 -07:00
|
|
|
itertools::Itertools,
|
|
|
|
rand::{
|
|
|
|
distributions::{Distribution, WeightedIndex},
|
|
|
|
Rng,
|
|
|
|
},
|
2021-05-26 08:15:46 -07:00
|
|
|
rayon::{prelude::*, ThreadPool},
|
2022-01-19 13:58:20 -08:00
|
|
|
solana_bloom::bloom::{AtomicBloom, Bloom},
|
2021-05-26 08:15:46 -07:00
|
|
|
solana_sdk::{
|
|
|
|
hash::{hash, Hash},
|
2023-01-14 07:44:38 -08:00
|
|
|
native_token::LAMPORTS_PER_SOL,
|
2021-05-26 08:15:46 -07:00
|
|
|
pubkey::Pubkey,
|
|
|
|
signature::{Keypair, Signer},
|
|
|
|
},
|
2021-07-23 08:25:03 -07:00
|
|
|
solana_streamer::socket::SocketAddrSpace,
|
2021-05-26 08:15:46 -07:00
|
|
|
std::{
|
|
|
|
collections::{HashMap, HashSet, VecDeque},
|
|
|
|
convert::TryInto,
|
2021-07-11 08:32:10 -07:00
|
|
|
iter::{repeat, repeat_with},
|
2021-05-26 08:15:46 -07:00
|
|
|
net::SocketAddr,
|
2021-07-11 08:32:10 -07:00
|
|
|
sync::{
|
2021-07-26 10:13:11 -07:00
|
|
|
atomic::{AtomicI64, AtomicUsize, Ordering},
|
2021-07-11 08:32:10 -07:00
|
|
|
Mutex, RwLock,
|
|
|
|
},
|
2023-01-14 07:44:38 -08:00
|
|
|
time::Duration,
|
2021-05-26 08:15:46 -07:00
|
|
|
},
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
};
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
2020-02-07 12:38:24 -08:00
|
|
|
// The maximum age of a value received over pull responses
|
|
|
|
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
|
2020-09-30 17:39:22 -07:00
|
|
|
// Retention period of hashes of received outdated values.
|
|
|
|
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
|
2023-01-30 09:59:56 -08:00
|
|
|
// Maximum number of pull requests to send out each time around.
|
|
|
|
const MAX_NUM_PULL_REQUESTS: usize = 1024;
|
2019-08-13 18:04:14 -07:00
|
|
|
pub const FALSE_RATE: f64 = 0.1f64;
|
2019-08-19 18:14:10 -07:00
|
|
|
pub const KEYS: f64 = 8f64;
|
2019-08-13 18:04:14 -07:00
|
|
|
|
2022-05-22 18:00:42 -07:00
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
2019-08-13 18:04:14 -07:00
|
|
|
pub struct CrdsFilter {
|
|
|
|
pub filter: Bloom<Hash>,
|
|
|
|
mask: u64,
|
|
|
|
mask_bits: u32,
|
|
|
|
}
|
|
|
|
|
2020-09-13 06:08:25 -07:00
|
|
|
impl Default for CrdsFilter {
|
|
|
|
fn default() -> Self {
|
|
|
|
CrdsFilter {
|
|
|
|
filter: Bloom::default(),
|
|
|
|
mask: !0u64,
|
|
|
|
mask_bits: 0u32,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-27 11:06:00 -07:00
|
|
|
impl solana_sdk::sanitize::Sanitize for CrdsFilter {
|
|
|
|
fn sanitize(&self) -> std::result::Result<(), solana_sdk::sanitize::SanitizeError> {
|
|
|
|
self.filter.sanitize()?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-13 18:04:14 -07:00
|
|
|
impl CrdsFilter {
|
2021-07-10 15:16:33 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
pub(crate) fn new_rand(num_items: usize, max_bytes: usize) -> Self {
|
2019-08-13 18:04:14 -07:00
|
|
|
let max_bits = (max_bytes * 8) as f64;
|
2019-08-19 18:14:10 -07:00
|
|
|
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
2022-11-09 11:39:38 -08:00
|
|
|
let mask_bits = Self::mask_bits(num_items as f64, max_items);
|
2019-08-19 18:14:10 -07:00
|
|
|
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
2019-08-13 18:04:14 -07:00
|
|
|
let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits));
|
|
|
|
let mask = Self::compute_mask(seed, mask_bits);
|
|
|
|
CrdsFilter {
|
|
|
|
filter,
|
|
|
|
mask,
|
|
|
|
mask_bits,
|
|
|
|
}
|
|
|
|
}
|
2020-09-03 13:32:23 -07:00
|
|
|
|
2019-08-13 18:04:14 -07:00
|
|
|
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
|
|
|
|
assert!(seed <= 2u64.pow(mask_bits));
|
|
|
|
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
|
2022-11-09 11:39:38 -08:00
|
|
|
seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0)
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
2020-11-19 15:35:22 -08:00
|
|
|
fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 {
|
2019-08-13 18:04:14 -07:00
|
|
|
let m = max_bits;
|
|
|
|
let p = false_rate;
|
|
|
|
let k = num_keys;
|
|
|
|
(m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil()
|
|
|
|
}
|
|
|
|
fn mask_bits(num_items: f64, max_items: f64) -> u32 {
|
|
|
|
// for small ratios this can result in a negative number, ensure it returns 0 instead
|
|
|
|
((num_items / max_items).log2().ceil()).max(0.0) as u32
|
|
|
|
}
|
2020-08-12 22:45:19 -07:00
|
|
|
pub fn hash_as_u64(item: &Hash) -> u64 {
|
2020-09-09 08:28:17 -07:00
|
|
|
let buf = item.as_ref()[..8].try_into().unwrap();
|
|
|
|
u64::from_le_bytes(buf)
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
2020-11-19 15:35:22 -08:00
|
|
|
fn test_mask(&self, item: &Hash) -> bool {
|
2019-08-13 18:04:14 -07:00
|
|
|
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
|
|
|
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
|
|
|
let bits = Self::hash_as_u64(item) | ones;
|
|
|
|
bits == self.mask
|
|
|
|
}
|
2020-11-19 15:35:22 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
fn add(&mut self, item: &Hash) {
|
2019-08-13 18:04:14 -07:00
|
|
|
if self.test_mask(item) {
|
|
|
|
self.filter.add(item);
|
|
|
|
}
|
|
|
|
}
|
2020-11-19 15:35:22 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
fn contains(&self, item: &Hash) -> bool {
|
2019-08-13 18:04:14 -07:00
|
|
|
if !self.test_mask(item) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
self.filter.contains(item)
|
|
|
|
}
|
2020-11-19 15:35:22 -08:00
|
|
|
fn filter_contains(&self, item: &Hash) -> bool {
|
2020-08-12 22:45:19 -07:00
|
|
|
self.filter.contains(item)
|
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-09-04 06:04:47 -07:00
|
|
|
/// A vector of crds filters that together hold a complete set of Hashes.
|
2020-09-29 16:06:02 -07:00
|
|
|
struct CrdsFilterSet {
|
|
|
|
filters: Vec<AtomicBloom<Hash>>,
|
|
|
|
mask_bits: u32,
|
|
|
|
}
|
2020-09-04 06:04:47 -07:00
|
|
|
|
|
|
|
impl CrdsFilterSet {
|
|
|
|
fn new(num_items: usize, max_bytes: usize) -> Self {
|
|
|
|
let max_bits = (max_bytes * 8) as f64;
|
|
|
|
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
|
2022-11-09 11:39:38 -08:00
|
|
|
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
|
2021-05-26 08:15:46 -07:00
|
|
|
let filters =
|
|
|
|
repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into())
|
|
|
|
.take(1 << mask_bits)
|
|
|
|
.collect();
|
2020-09-29 16:06:02 -07:00
|
|
|
Self { filters, mask_bits }
|
2020-09-04 06:04:47 -07:00
|
|
|
}
|
|
|
|
|
2020-09-29 16:06:02 -07:00
|
|
|
fn add(&self, hash_value: Hash) {
|
2023-01-30 09:59:56 -08:00
|
|
|
let shift = u64::BITS.checked_sub(self.mask_bits).unwrap();
|
|
|
|
let index = usize::try_from(
|
|
|
|
CrdsFilter::hash_as_u64(&hash_value)
|
|
|
|
.checked_shr(shift)
|
|
|
|
.unwrap_or_default(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
self.filters[index].add(&hash_value);
|
2020-09-29 16:06:02 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-23 11:55:15 -08:00
|
|
|
impl From<CrdsFilterSet> for Vec<CrdsFilter> {
|
|
|
|
fn from(cfs: CrdsFilterSet) -> Self {
|
|
|
|
let mask_bits = cfs.mask_bits;
|
|
|
|
cfs.filters
|
2020-09-29 16:06:02 -07:00
|
|
|
.into_iter()
|
|
|
|
.enumerate()
|
|
|
|
.map(|(seed, filter)| CrdsFilter {
|
|
|
|
filter: filter.into(),
|
|
|
|
mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
|
|
|
|
mask_bits,
|
|
|
|
})
|
|
|
|
.collect()
|
2020-09-04 06:04:47 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-09 17:08:13 -07:00
|
|
|
#[derive(Default)]
|
|
|
|
pub struct ProcessPullStats {
|
|
|
|
pub success: usize,
|
|
|
|
pub failed_insert: usize,
|
|
|
|
pub failed_timeout: usize,
|
|
|
|
pub timeout_count: usize,
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
pub struct CrdsGossipPull {
|
2020-09-30 17:39:22 -07:00
|
|
|
// Hash value and record time (ms) of the pull responses which failed to be
|
|
|
|
// inserted in crds table; Preserved to stop the sender to send back the
|
|
|
|
// same outdated payload again by adding them to the filter for the next
|
|
|
|
// pull request.
|
2021-07-11 08:32:10 -07:00
|
|
|
failed_inserts: RwLock<VecDeque<(Hash, /*timestamp:*/ u64)>>,
|
2018-11-15 13:23:26 -08:00
|
|
|
pub crds_timeout: u64,
|
2021-07-11 08:32:10 -07:00
|
|
|
msg_timeout: u64,
|
|
|
|
pub num_pulls: AtomicUsize,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for CrdsGossipPull {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
2021-07-11 08:32:10 -07:00
|
|
|
failed_inserts: RwLock::default(),
|
2018-11-15 13:23:26 -08:00
|
|
|
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
|
2020-02-07 12:38:24 -08:00
|
|
|
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
2021-07-11 08:32:10 -07:00
|
|
|
num_pulls: AtomicUsize::default(),
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl CrdsGossipPull {
|
2021-07-15 17:40:07 -07:00
|
|
|
/// Generate a random request
|
2021-04-20 11:06:13 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2021-07-10 15:16:33 -07:00
|
|
|
pub(crate) fn new_pull_request(
|
2018-11-15 13:23:26 -08:00
|
|
|
&self,
|
2020-09-29 16:06:02 -07:00
|
|
|
thread_pool: &ThreadPool,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2021-04-20 11:06:13 -07:00
|
|
|
self_keypair: &Keypair,
|
2020-05-05 20:15:19 -07:00
|
|
|
self_shred_version: u16,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2020-09-11 12:00:16 -07:00
|
|
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2019-08-13 18:04:14 -07:00
|
|
|
bloom_size: usize,
|
2021-04-20 11:06:13 -07:00
|
|
|
ping_cache: &Mutex<PingCache>,
|
|
|
|
pings: &mut Vec<(SocketAddr, Ping)>,
|
2021-07-23 08:25:03 -07:00
|
|
|
socket_addr_space: &SocketAddrSpace,
|
2022-05-26 05:45:53 -07:00
|
|
|
) -> Result<HashMap<ContactInfo, Vec<CrdsFilter>>, CrdsGossipError> {
|
2023-01-18 09:43:09 -08:00
|
|
|
let mut rng = rand::thread_rng();
|
2023-01-14 07:44:38 -08:00
|
|
|
// Active and valid gossip nodes with matching shred-version.
|
2023-01-18 09:43:09 -08:00
|
|
|
let nodes = crds_gossip::get_gossip_nodes(
|
|
|
|
&mut rng,
|
2022-05-26 05:45:53 -07:00
|
|
|
now,
|
2023-01-18 09:43:09 -08:00
|
|
|
&self_keypair.pubkey(),
|
|
|
|
// Pull from nodes with the same shred version, unless this is a
|
|
|
|
// spy node which then can pull from any node.
|
|
|
|
|shred_version| self_shred_version == 0u16 || shred_version == self_shred_version,
|
|
|
|
crds,
|
2022-05-26 05:45:53 -07:00
|
|
|
gossip_validators,
|
|
|
|
stakes,
|
|
|
|
socket_addr_space,
|
|
|
|
);
|
2023-01-14 07:44:38 -08:00
|
|
|
// Check for nodes which have responded to ping messages.
|
|
|
|
let nodes = crds_gossip::maybe_ping_gossip_addresses(
|
|
|
|
&mut rng,
|
|
|
|
nodes,
|
|
|
|
self_keypair,
|
|
|
|
ping_cache,
|
|
|
|
pings,
|
|
|
|
);
|
|
|
|
let stake_cap = stakes
|
|
|
|
.get(&self_keypair.pubkey())
|
|
|
|
.copied()
|
|
|
|
.unwrap_or_default();
|
|
|
|
let (weights, nodes): (Vec<u64>, Vec<ContactInfo>) =
|
|
|
|
crds_gossip::dedup_gossip_addresses(nodes, stakes)
|
|
|
|
.into_values()
|
|
|
|
.map(|(stake, node)| {
|
|
|
|
let stake = stake.min(stake_cap) / LAMPORTS_PER_SOL;
|
|
|
|
let weight = u64::BITS - stake.leading_zeros();
|
|
|
|
let weight = u64::from(weight).saturating_add(1).saturating_pow(2);
|
|
|
|
(weight, node)
|
2022-05-26 05:45:53 -07:00
|
|
|
})
|
2023-01-14 07:44:38 -08:00
|
|
|
.unzip();
|
|
|
|
if nodes.is_empty() {
|
2022-05-26 05:45:53 -07:00
|
|
|
return Err(CrdsGossipError::NoPeers);
|
|
|
|
}
|
2023-01-30 09:59:56 -08:00
|
|
|
let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size);
|
|
|
|
if filters.len() > MAX_NUM_PULL_REQUESTS {
|
|
|
|
for i in 0..MAX_NUM_PULL_REQUESTS {
|
|
|
|
let j = rng.gen_range(i, filters.len());
|
|
|
|
filters.swap(i, j);
|
|
|
|
}
|
|
|
|
filters.truncate(MAX_NUM_PULL_REQUESTS);
|
|
|
|
}
|
2022-05-26 05:45:53 -07:00
|
|
|
// Associate each pull-request filter with a randomly selected peer.
|
|
|
|
let dist = WeightedIndex::new(&weights).unwrap();
|
2023-01-14 07:44:38 -08:00
|
|
|
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
|
|
|
|
Ok(nodes.zip(filters).into_group_map())
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
2021-07-15 17:40:07 -07:00
|
|
|
/// Process a pull request
|
2021-07-14 15:27:17 -07:00
|
|
|
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
|
2020-10-28 10:03:02 -07:00
|
|
|
where
|
|
|
|
I: IntoIterator<Item = CrdsValue>,
|
|
|
|
{
|
2021-07-14 15:27:17 -07:00
|
|
|
let mut crds = crds.write().unwrap();
|
2020-10-28 10:03:02 -07:00
|
|
|
for caller in callers {
|
2021-05-24 06:47:21 -07:00
|
|
|
let key = caller.pubkey();
|
2021-10-26 06:02:30 -07:00
|
|
|
let _ = crds.insert(caller, now, GossipRoute::PullRequest);
|
2019-08-15 17:04:45 -07:00
|
|
|
crds.update_record_timestamp(&key, now);
|
2020-10-28 10:03:02 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-05-28 11:38:13 -07:00
|
|
|
|
|
|
|
/// Create gossip responses to pull requests
|
2021-07-10 15:16:33 -07:00
|
|
|
pub(crate) fn generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
thread_pool: &ThreadPool,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2020-05-28 11:38:13 -07:00
|
|
|
requests: &[(CrdsValue, CrdsFilter)],
|
2020-12-18 10:45:12 -08:00
|
|
|
output_size_limit: usize, // Limit number of crds values returned.
|
2020-08-11 06:26:42 -07:00
|
|
|
now: u64,
|
2022-04-18 16:14:59 -07:00
|
|
|
stats: &GossipStats,
|
2020-05-28 11:38:13 -07:00
|
|
|
) -> Vec<Vec<CrdsValue>> {
|
2022-04-18 16:14:59 -07:00
|
|
|
Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now, stats)
|
2020-05-28 11:38:13 -07:00
|
|
|
}
|
|
|
|
|
2020-06-09 17:08:13 -07:00
|
|
|
// Checks if responses should be inserted and
|
|
|
|
// returns those responses converted to VersionedCrdsValue
|
2020-09-30 17:39:22 -07:00
|
|
|
// Separated in three vecs as:
|
2020-06-09 17:08:13 -07:00
|
|
|
// .0 => responses that update the owner timestamp
|
|
|
|
// .1 => responses that do not update the owner timestamp
|
2020-09-30 17:39:22 -07:00
|
|
|
// .2 => hash value of outdated values which will fail to insert.
|
2021-07-10 15:16:33 -07:00
|
|
|
pub(crate) fn filter_pull_responses(
|
2020-06-09 17:08:13 -07:00
|
|
|
&self,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2020-02-07 12:38:24 -08:00
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
2020-06-09 17:08:13 -07:00
|
|
|
responses: Vec<CrdsValue>,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2020-06-09 17:08:13 -07:00
|
|
|
stats: &mut ProcessPullStats,
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
) -> (Vec<CrdsValue>, Vec<CrdsValue>, Vec<Hash>) {
|
|
|
|
let mut active_values = vec![];
|
|
|
|
let mut expired_values = vec![];
|
2021-04-14 13:18:00 -07:00
|
|
|
let default_timeout = timeouts
|
|
|
|
.get(&Pubkey::default())
|
|
|
|
.copied()
|
|
|
|
.unwrap_or(self.msg_timeout);
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = crds.read().unwrap();
|
2021-05-16 07:37:45 -07:00
|
|
|
let upsert = |response: CrdsValue| {
|
2021-04-14 13:18:00 -07:00
|
|
|
let owner = response.label().pubkey();
|
2020-02-07 12:38:24 -08:00
|
|
|
// Check if the crds value is older than the msg_timeout
|
2021-04-14 13:18:00 -07:00
|
|
|
let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout);
|
|
|
|
// Before discarding this value, check if a ContactInfo for the
|
|
|
|
// owner exists in the table. If it doesn't, that implies that this
|
|
|
|
// value can be discarded
|
2021-05-16 07:37:45 -07:00
|
|
|
if !crds.upserts(&response) {
|
|
|
|
Some(response)
|
|
|
|
} else if now <= response.wallclock().saturating_add(timeout) {
|
|
|
|
active_values.push(response);
|
|
|
|
None
|
2021-07-21 05:16:26 -07:00
|
|
|
} else if crds.get::<&ContactInfo>(owner).is_some() {
|
2021-04-14 13:18:00 -07:00
|
|
|
// Silently insert this old value without bumping record
|
|
|
|
// timestamps
|
2021-05-16 07:37:45 -07:00
|
|
|
expired_values.push(response);
|
|
|
|
None
|
2021-04-14 13:18:00 -07:00
|
|
|
} else {
|
|
|
|
stats.timeout_count += 1;
|
|
|
|
stats.failed_timeout += 1;
|
2021-05-16 07:37:45 -07:00
|
|
|
Some(response)
|
2020-02-07 12:38:24 -08:00
|
|
|
}
|
2021-05-16 07:37:45 -07:00
|
|
|
};
|
|
|
|
let failed_inserts = responses
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(upsert)
|
|
|
|
.map(|resp| hash(&bincode::serialize(&resp).unwrap()))
|
|
|
|
.collect();
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
(active_values, expired_values, failed_inserts)
|
2020-06-09 17:08:13 -07:00
|
|
|
}
|
|
|
|
|
2021-07-15 17:40:07 -07:00
|
|
|
/// Process a vec of pull responses
|
2021-07-10 15:16:33 -07:00
|
|
|
pub(crate) fn process_pull_responses(
|
2021-07-11 08:32:10 -07:00
|
|
|
&self,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2020-06-09 17:08:13 -07:00
|
|
|
from: &Pubkey,
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
responses: Vec<CrdsValue>,
|
|
|
|
responses_expired_timeout: Vec<CrdsValue>,
|
2021-05-24 06:47:21 -07:00
|
|
|
failed_inserts: Vec<Hash>,
|
2020-06-09 17:08:13 -07:00
|
|
|
now: u64,
|
|
|
|
stats: &mut ProcessPullStats,
|
2021-04-30 09:57:19 -07:00
|
|
|
) {
|
2020-06-09 17:08:13 -07:00
|
|
|
let mut owners = HashSet::new();
|
2021-07-14 15:27:17 -07:00
|
|
|
let mut crds = crds.write().unwrap();
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
for response in responses_expired_timeout {
|
2021-10-26 06:02:30 -07:00
|
|
|
let _ = crds.insert(response, now, GossipRoute::PullResponse);
|
2020-06-09 17:08:13 -07:00
|
|
|
}
|
2021-07-11 08:32:10 -07:00
|
|
|
let mut num_inserts = 0;
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
for response in responses {
|
2021-04-30 09:57:19 -07:00
|
|
|
let owner = response.pubkey();
|
2021-10-26 06:02:30 -07:00
|
|
|
if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) {
|
2021-07-11 08:32:10 -07:00
|
|
|
num_inserts += 1;
|
2021-05-24 06:47:21 -07:00
|
|
|
owners.insert(owner);
|
2020-05-28 11:38:13 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2021-07-11 08:32:10 -07:00
|
|
|
stats.success += num_inserts;
|
|
|
|
self.num_pulls.fetch_add(num_inserts, Ordering::Relaxed);
|
2020-06-09 17:08:13 -07:00
|
|
|
owners.insert(*from);
|
|
|
|
for owner in owners {
|
|
|
|
crds.update_record_timestamp(&owner, now);
|
|
|
|
}
|
2021-07-14 15:27:17 -07:00
|
|
|
drop(crds);
|
2020-09-30 17:39:22 -07:00
|
|
|
stats.failed_insert += failed_inserts.len();
|
|
|
|
self.purge_failed_inserts(now);
|
2021-07-11 08:32:10 -07:00
|
|
|
let failed_inserts = failed_inserts.into_iter().zip(repeat(now));
|
|
|
|
self.failed_inserts.write().unwrap().extend(failed_inserts);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-09-30 17:39:22 -07:00
|
|
|
|
2021-07-11 08:32:10 -07:00
|
|
|
pub(crate) fn purge_failed_inserts(&self, now: u64) {
|
2020-09-30 17:39:22 -07:00
|
|
|
if FAILED_INSERTS_RETENTION_MS < now {
|
|
|
|
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
|
2021-07-11 08:32:10 -07:00
|
|
|
let mut failed_inserts = self.failed_inserts.write().unwrap();
|
|
|
|
let outdated = failed_inserts
|
2020-09-30 17:39:22 -07:00
|
|
|
.iter()
|
|
|
|
.take_while(|(_, ts)| *ts < cutoff)
|
|
|
|
.count();
|
2021-07-11 08:32:10 -07:00
|
|
|
failed_inserts.drain(..outdated);
|
2020-09-30 17:39:22 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-11 08:32:10 -07:00
|
|
|
pub(crate) fn failed_inserts_size(&self) -> usize {
|
|
|
|
self.failed_inserts.read().unwrap().len()
|
|
|
|
}
|
|
|
|
|
2019-08-13 18:04:14 -07:00
|
|
|
// build a set of filters of the current crds table
|
2022-04-18 06:52:56 -07:00
|
|
|
// num_filters - used to increase the likelihood of a value in crds being added to some filter
|
2020-09-29 16:06:02 -07:00
|
|
|
pub fn build_crds_filters(
|
|
|
|
&self,
|
|
|
|
thread_pool: &ThreadPool,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2020-09-29 16:06:02 -07:00
|
|
|
bloom_size: usize,
|
|
|
|
) -> Vec<CrdsFilter> {
|
|
|
|
const PAR_MIN_LENGTH: usize = 512;
|
2021-05-23 09:50:19 -07:00
|
|
|
#[cfg(debug_assertions)]
|
|
|
|
const MIN_NUM_BLOOM_ITEMS: usize = 512;
|
|
|
|
#[cfg(not(debug_assertions))]
|
2021-05-21 06:59:26 -07:00
|
|
|
const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
|
2021-07-14 15:27:17 -07:00
|
|
|
let failed_inserts = self.failed_inserts.read().unwrap();
|
|
|
|
// crds should be locked last after self.failed_inserts.
|
|
|
|
let crds = crds.read().unwrap();
|
|
|
|
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
|
2021-05-21 06:59:26 -07:00
|
|
|
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
|
|
|
|
let filters = CrdsFilterSet::new(num_items, bloom_size);
|
2020-09-30 17:39:22 -07:00
|
|
|
thread_pool.install(|| {
|
2020-11-19 12:57:40 -08:00
|
|
|
crds.par_values()
|
2020-09-30 17:39:22 -07:00
|
|
|
.with_min_len(PAR_MIN_LENGTH)
|
|
|
|
.map(|v| v.value_hash)
|
2021-05-24 06:47:21 -07:00
|
|
|
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
|
2020-09-30 17:39:22 -07:00
|
|
|
.chain(
|
2021-07-14 15:27:17 -07:00
|
|
|
failed_inserts
|
2020-09-30 17:39:22 -07:00
|
|
|
.par_iter()
|
|
|
|
.with_min_len(PAR_MIN_LENGTH)
|
|
|
|
.map(|(v, _)| *v),
|
|
|
|
)
|
|
|
|
.for_each(|v| filters.add(v));
|
|
|
|
});
|
2021-07-14 15:27:17 -07:00
|
|
|
drop(crds);
|
|
|
|
drop(failed_inserts);
|
2020-09-29 16:06:02 -07:00
|
|
|
filters.into()
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-08-11 14:03:54 -07:00
|
|
|
|
2021-07-15 17:40:07 -07:00
|
|
|
/// Filter values that fail the bloom filter up to `max_bytes`.
|
2019-08-15 17:04:45 -07:00
|
|
|
fn filter_crds_values(
|
2021-07-26 10:13:11 -07:00
|
|
|
thread_pool: &ThreadPool,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2019-08-15 17:04:45 -07:00
|
|
|
filters: &[(CrdsValue, CrdsFilter)],
|
2021-07-26 10:13:11 -07:00
|
|
|
output_size_limit: usize, // Limit number of crds values returned.
|
2020-08-11 06:26:42 -07:00
|
|
|
now: u64,
|
2022-04-18 16:14:59 -07:00
|
|
|
stats: &GossipStats,
|
2019-08-15 17:04:45 -07:00
|
|
|
) -> Vec<Vec<CrdsValue>> {
|
2020-08-11 06:26:42 -07:00
|
|
|
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
|
|
|
let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4);
|
|
|
|
//skip filters from callers that are too old
|
2021-05-21 07:07:46 -07:00
|
|
|
let caller_wallclock_window =
|
|
|
|
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
|
2021-07-26 10:13:11 -07:00
|
|
|
let dropped_requests = AtomicUsize::default();
|
|
|
|
let total_skipped = AtomicUsize::default();
|
|
|
|
let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
|
|
|
|
let output_size_limit = AtomicI64::new(output_size_limit);
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = crds.read().unwrap();
|
2021-07-26 10:13:11 -07:00
|
|
|
let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| {
|
|
|
|
if output_size_limit.load(Ordering::Relaxed) <= 0 {
|
|
|
|
return Vec::default();
|
|
|
|
}
|
|
|
|
let caller_wallclock = caller.wallclock();
|
|
|
|
if !caller_wallclock_window.contains(&caller_wallclock) {
|
|
|
|
dropped_requests.fetch_add(1, Ordering::Relaxed);
|
|
|
|
return Vec::default();
|
|
|
|
}
|
|
|
|
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
|
|
|
let pred = |entry: &&VersionedCrdsValue| {
|
|
|
|
debug_assert!(filter.test_mask(&entry.value_hash));
|
|
|
|
// Skip values that are too new.
|
|
|
|
if entry.value.wallclock() > caller_wallclock {
|
|
|
|
total_skipped.fetch_add(1, Ordering::Relaxed);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
!filter.filter_contains(&entry.value_hash)
|
2020-09-17 07:05:16 -07:00
|
|
|
}
|
2021-07-26 10:13:11 -07:00
|
|
|
};
|
|
|
|
let out: Vec<_> = crds
|
|
|
|
.filter_bitmask(filter.mask, filter.mask_bits)
|
|
|
|
.filter(pred)
|
2023-01-26 09:02:18 -08:00
|
|
|
.filter(|entry| {
|
|
|
|
// Exclude the new ContactInfo from the pull responses
|
|
|
|
// until the cluster has upgraded.
|
|
|
|
!matches!(&entry.value.data, CrdsData::ContactInfo(_))
|
|
|
|
})
|
2021-07-26 10:13:11 -07:00
|
|
|
.map(|entry| entry.value.clone())
|
|
|
|
.take(output_size_limit.load(Ordering::Relaxed).max(0) as usize)
|
|
|
|
.collect();
|
|
|
|
output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
|
|
|
|
out
|
|
|
|
};
|
|
|
|
let ret: Vec<_> = thread_pool.install(|| {
|
|
|
|
filters
|
|
|
|
.par_iter()
|
|
|
|
.map(|(caller, filter)| apply_filter(caller, filter))
|
|
|
|
.collect()
|
|
|
|
});
|
2022-04-18 16:14:59 -07:00
|
|
|
stats
|
|
|
|
.filter_crds_values_dropped_requests
|
|
|
|
.add_relaxed(dropped_requests.into_inner() as u64);
|
|
|
|
stats
|
|
|
|
.filter_crds_values_dropped_values
|
|
|
|
.add_relaxed(total_skipped.into_inner() as u64);
|
2018-11-15 13:23:26 -08:00
|
|
|
ret
|
|
|
|
}
|
2019-11-20 11:25:18 -08:00
|
|
|
|
2021-05-21 08:55:22 -07:00
|
|
|
pub(crate) fn make_timeouts(
|
2019-11-20 11:25:18 -08:00
|
|
|
&self,
|
2021-05-21 08:55:22 -07:00
|
|
|
self_pubkey: Pubkey,
|
2019-11-20 11:25:18 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2021-05-21 08:55:22 -07:00
|
|
|
epoch_duration: Duration,
|
2019-11-20 11:25:18 -08:00
|
|
|
) -> HashMap<Pubkey, u64> {
|
2021-05-21 08:55:22 -07:00
|
|
|
let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64);
|
|
|
|
let default_timeout = if stakes.values().all(|stake| *stake == 0) {
|
|
|
|
extended_timeout
|
|
|
|
} else {
|
|
|
|
self.crds_timeout
|
|
|
|
};
|
|
|
|
stakes
|
|
|
|
.iter()
|
|
|
|
.filter(|(_, stake)| **stake > 0)
|
|
|
|
.map(|(pubkey, _)| (*pubkey, extended_timeout))
|
|
|
|
.chain(vec![
|
|
|
|
(Pubkey::default(), default_timeout),
|
|
|
|
(self_pubkey, u64::MAX),
|
|
|
|
])
|
|
|
|
.collect()
|
2019-11-20 11:25:18 -08:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
/// Purge values from the crds that are older then `active_timeout`
|
2021-07-10 15:16:33 -07:00
|
|
|
pub(crate) fn purge_active(
|
2020-10-23 07:17:37 -07:00
|
|
|
thread_pool: &ThreadPool,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2019-11-20 11:25:18 -08:00
|
|
|
now: u64,
|
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
|
|
|
) -> usize {
|
2021-07-14 15:27:17 -07:00
|
|
|
let mut crds = crds.write().unwrap();
|
2021-05-24 06:47:21 -07:00
|
|
|
let labels = crds.find_old_labels(thread_pool, now, timeouts);
|
|
|
|
for label in &labels {
|
|
|
|
crds.remove(label, now);
|
|
|
|
}
|
|
|
|
labels.len()
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-06-09 17:08:13 -07:00
|
|
|
|
|
|
|
/// For legacy tests
|
|
|
|
#[cfg(test)]
|
2021-07-10 15:16:33 -07:00
|
|
|
fn process_pull_response(
|
2021-07-11 08:32:10 -07:00
|
|
|
&self,
|
2021-07-14 15:27:17 -07:00
|
|
|
crds: &RwLock<Crds>,
|
2020-06-09 17:08:13 -07:00
|
|
|
from: &Pubkey,
|
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
|
|
|
response: Vec<CrdsValue>,
|
|
|
|
now: u64,
|
|
|
|
) -> (usize, usize, usize) {
|
|
|
|
let mut stats = ProcessPullStats::default();
|
2020-09-30 17:39:22 -07:00
|
|
|
let (versioned, versioned_expired_timeout, failed_inserts) =
|
2020-06-09 17:08:13 -07:00
|
|
|
self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
|
|
|
|
self.process_pull_responses(
|
|
|
|
crds,
|
|
|
|
from,
|
|
|
|
versioned,
|
|
|
|
versioned_expired_timeout,
|
2020-09-30 17:39:22 -07:00
|
|
|
failed_inserts,
|
2020-06-09 17:08:13 -07:00
|
|
|
now,
|
|
|
|
&mut stats,
|
|
|
|
);
|
|
|
|
(
|
|
|
|
stats.failed_timeout + stats.failed_insert,
|
|
|
|
stats.timeout_count,
|
|
|
|
stats.success,
|
|
|
|
)
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2021-07-11 08:32:10 -07:00
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[cfg(test)]
|
2021-05-23 09:50:19 -07:00
|
|
|
pub(crate) mod tests {
|
2021-05-26 08:15:46 -07:00
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
crate::{
|
|
|
|
cluster_info::MAX_BLOOM_SIZE,
|
|
|
|
crds_value::{CrdsData, Vote},
|
|
|
|
},
|
|
|
|
itertools::Itertools,
|
2021-10-28 11:29:32 -07:00
|
|
|
rand::{seq::SliceRandom, thread_rng, SeedableRng},
|
|
|
|
rand_chacha::ChaChaRng,
|
2021-05-26 08:15:46 -07:00
|
|
|
rayon::ThreadPoolBuilder,
|
2021-12-29 11:31:26 -08:00
|
|
|
solana_perf::test_tx::new_test_vote_tx,
|
2021-05-26 08:15:46 -07:00
|
|
|
solana_sdk::{
|
|
|
|
hash::{hash, HASH_BYTES},
|
|
|
|
packet::PACKET_DATA_SIZE,
|
|
|
|
},
|
2023-01-14 07:44:38 -08:00
|
|
|
std::time::Instant,
|
2021-03-24 11:33:56 -07:00
|
|
|
};
|
2019-02-20 17:08:56 -08:00
|
|
|
|
2021-05-23 09:50:19 -07:00
|
|
|
#[cfg(debug_assertions)]
|
|
|
|
pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 1;
|
|
|
|
#[cfg(not(debug_assertions))]
|
|
|
|
pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 64;
|
|
|
|
|
2020-09-09 08:28:17 -07:00
|
|
|
#[test]
|
|
|
|
fn test_hash_as_u64() {
|
|
|
|
let arr: Vec<u8> = (0..HASH_BYTES).map(|i| i as u8 + 1).collect();
|
|
|
|
let hash = Hash::new(&arr);
|
|
|
|
assert_eq!(CrdsFilter::hash_as_u64(&hash), 0x807060504030201);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_hash_as_u64_random() {
|
|
|
|
fn hash_as_u64_bitops(hash: &Hash) -> u64 {
|
|
|
|
let mut out = 0;
|
|
|
|
for (i, val) in hash.as_ref().iter().enumerate().take(8) {
|
|
|
|
out |= (u64::from(*val)) << (i * 8) as u64;
|
|
|
|
}
|
|
|
|
out
|
|
|
|
}
|
|
|
|
let mut rng = thread_rng();
|
|
|
|
for _ in 0..100 {
|
2020-10-19 12:15:55 -07:00
|
|
|
let hash = solana_sdk::hash::new_rand(&mut rng);
|
2020-09-09 08:28:17 -07:00
|
|
|
assert_eq!(CrdsFilter::hash_as_u64(&hash), hash_as_u64_bitops(&hash));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-13 06:08:25 -07:00
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_default() {
|
|
|
|
let filter = CrdsFilter::default();
|
|
|
|
let mask = CrdsFilter::compute_mask(0, filter.mask_bits);
|
|
|
|
assert_eq!(filter.mask, mask);
|
|
|
|
let mut rng = thread_rng();
|
|
|
|
for _ in 0..10 {
|
2020-10-19 12:15:55 -07:00
|
|
|
let hash = solana_sdk::hash::new_rand(&mut rng);
|
2020-09-13 06:08:25 -07:00
|
|
|
assert!(filter.test_mask(&hash));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-03 13:32:23 -07:00
|
|
|
#[test]
|
2020-09-29 16:06:02 -07:00
|
|
|
fn test_crds_filter_set_add() {
|
2020-09-03 13:32:23 -07:00
|
|
|
let mut rng = thread_rng();
|
2020-09-29 16:06:02 -07:00
|
|
|
let crds_filter_set =
|
|
|
|
CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
|
2021-05-26 08:15:46 -07:00
|
|
|
let hash_values: Vec<_> = repeat_with(|| solana_sdk::hash::new_rand(&mut rng))
|
2020-09-29 16:06:02 -07:00
|
|
|
.take(1024)
|
|
|
|
.collect();
|
|
|
|
for hash_value in &hash_values {
|
|
|
|
crds_filter_set.add(*hash_value);
|
|
|
|
}
|
|
|
|
let filters: Vec<CrdsFilter> = crds_filter_set.into();
|
|
|
|
assert_eq!(filters.len(), 1024);
|
|
|
|
for hash_value in hash_values {
|
2020-09-03 13:32:23 -07:00
|
|
|
let mut num_hits = 0;
|
2020-09-29 16:06:02 -07:00
|
|
|
let mut false_positives = 0;
|
|
|
|
for filter in &filters {
|
|
|
|
if filter.test_mask(&hash_value) {
|
2020-09-03 13:32:23 -07:00
|
|
|
num_hits += 1;
|
2020-09-29 16:06:02 -07:00
|
|
|
assert!(filter.contains(&hash_value));
|
|
|
|
assert!(filter.filter.contains(&hash_value));
|
|
|
|
} else if filter.filter.contains(&hash_value) {
|
|
|
|
false_positives += 1;
|
2020-09-03 13:32:23 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(num_hits, 1);
|
2020-09-29 16:06:02 -07:00
|
|
|
assert!(false_positives < 5);
|
2020-09-03 13:32:23 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2020-09-04 06:04:47 -07:00
|
|
|
fn test_crds_filter_set_new() {
|
|
|
|
// Validates invariances required by CrdsFilterSet::get in the
|
|
|
|
// vector of filters generated by CrdsFilterSet::new.
|
2020-09-29 16:06:02 -07:00
|
|
|
let filters: Vec<CrdsFilter> =
|
|
|
|
CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into();
|
2020-09-03 13:32:23 -07:00
|
|
|
assert_eq!(filters.len(), 16384);
|
|
|
|
let mask_bits = filters[0].mask_bits;
|
|
|
|
let right_shift = 64 - mask_bits;
|
|
|
|
let ones = !0u64 >> mask_bits;
|
|
|
|
for (i, filter) in filters.iter().enumerate() {
|
|
|
|
// Check that all mask_bits are equal.
|
|
|
|
assert_eq!(mask_bits, filter.mask_bits);
|
|
|
|
assert_eq!(i as u64, filter.mask >> right_shift);
|
|
|
|
assert_eq!(ones, ones & filter.mask);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-29 16:06:02 -07:00
|
|
|
#[test]
|
|
|
|
fn test_build_crds_filter() {
|
2021-10-28 11:29:32 -07:00
|
|
|
const SEED: [u8; 32] = [0x55; 32];
|
|
|
|
let mut rng = ChaChaRng::from_seed(SEED);
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-05-24 06:47:21 -07:00
|
|
|
let crds_gossip_pull = CrdsGossipPull::default();
|
2020-09-29 16:06:02 -07:00
|
|
|
let mut crds = Crds::default();
|
2021-10-28 11:29:32 -07:00
|
|
|
let keypairs: Vec<_> = repeat_with(|| Keypair::generate(&mut rng))
|
|
|
|
.take(10_000)
|
|
|
|
.collect();
|
2020-09-29 16:06:02 -07:00
|
|
|
let mut num_inserts = 0;
|
2021-05-24 06:47:21 -07:00
|
|
|
for _ in 0..40_000 {
|
|
|
|
let keypair = keypairs.choose(&mut rng).unwrap();
|
|
|
|
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
|
2021-10-26 06:02:30 -07:00
|
|
|
if crds
|
|
|
|
.insert(value, rng.gen(), GossipRoute::LocalMessage)
|
|
|
|
.is_ok()
|
|
|
|
{
|
2020-09-29 16:06:02 -07:00
|
|
|
num_inserts += 1;
|
|
|
|
}
|
|
|
|
}
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = RwLock::new(crds);
|
2022-12-06 06:30:06 -08:00
|
|
|
assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
|
2020-09-29 16:06:02 -07:00
|
|
|
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
|
2021-05-23 09:50:19 -07:00
|
|
|
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32));
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = crds.read().unwrap();
|
2021-05-24 06:47:21 -07:00
|
|
|
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
|
|
|
|
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
|
2021-10-28 11:29:32 -07:00
|
|
|
// CrdsValue::new_rand may generate exact same value twice in which
|
|
|
|
// case its hash-value is not added to purged values.
|
|
|
|
assert!(
|
|
|
|
hash_values.len() >= 40_000 - 5,
|
|
|
|
"hash_values.len(): {}",
|
|
|
|
hash_values.len()
|
|
|
|
);
|
2020-09-29 16:06:02 -07:00
|
|
|
let mut false_positives = 0;
|
|
|
|
for hash_value in hash_values {
|
|
|
|
let mut num_hits = 0;
|
|
|
|
for filter in &filters {
|
|
|
|
if filter.test_mask(&hash_value) {
|
|
|
|
num_hits += 1;
|
|
|
|
assert!(filter.contains(&hash_value));
|
|
|
|
assert!(filter.filter.contains(&hash_value));
|
|
|
|
} else if filter.filter.contains(&hash_value) {
|
|
|
|
false_positives += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(num_hits, 1);
|
|
|
|
}
|
2022-12-06 06:30:06 -08:00
|
|
|
assert!(false_positives < 150_000, "fp: {false_positives}");
|
2020-09-29 16:06:02 -07:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
|
|
|
fn test_new_pull_request() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = RwLock::<Crds>::default();
|
2021-04-20 11:06:13 -07:00
|
|
|
let node_keypair = Keypair::new();
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&node_keypair.pubkey(), 0),
|
|
|
|
));
|
2021-07-11 08:32:10 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut pings = Vec::new();
|
|
|
|
let ping_cache = Mutex::new(PingCache::new(
|
2022-09-26 14:16:56 -07:00
|
|
|
Duration::from_secs(20 * 60), // ttl
|
|
|
|
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
|
|
|
128, // capacity
|
2021-04-20 11:06:13 -07:00
|
|
|
));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2020-09-29 16:06:02 -07:00
|
|
|
node.new_pull_request(
|
|
|
|
&thread_pool,
|
|
|
|
&crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2020-09-29 16:06:02 -07:00
|
|
|
0,
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
&HashMap::new(),
|
2021-04-20 11:06:13 -07:00
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2020-09-29 16:06:02 -07:00
|
|
|
),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::NoPeers)
|
|
|
|
);
|
|
|
|
|
2021-10-26 06:02:30 -07:00
|
|
|
crds.write()
|
|
|
|
.unwrap()
|
|
|
|
.insert(entry, 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2020-09-29 16:06:02 -07:00
|
|
|
node.new_pull_request(
|
|
|
|
&thread_pool,
|
|
|
|
&crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2020-09-29 16:06:02 -07:00
|
|
|
0,
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
&HashMap::new(),
|
2021-04-20 11:06:13 -07:00
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2020-09-29 16:06:02 -07:00
|
|
|
),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::NoPeers)
|
|
|
|
);
|
2021-07-01 12:00:10 -07:00
|
|
|
let now = 1625029781069;
|
|
|
|
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
|
2021-04-20 11:06:13 -07:00
|
|
|
ping_cache
|
|
|
|
.lock()
|
|
|
|
.unwrap()
|
|
|
|
.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2021-10-26 06:02:30 -07:00
|
|
|
crds.write()
|
|
|
|
.unwrap()
|
|
|
|
.insert(new.clone(), now, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2020-09-29 16:06:02 -07:00
|
|
|
let req = node.new_pull_request(
|
|
|
|
&thread_pool,
|
|
|
|
&crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2020-09-29 16:06:02 -07:00
|
|
|
0,
|
2021-07-01 12:00:10 -07:00
|
|
|
now,
|
|
|
|
None,
|
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2021-07-01 12:00:10 -07:00
|
|
|
);
|
2022-05-26 05:45:53 -07:00
|
|
|
let peers: Vec<_> = req.unwrap().into_keys().collect();
|
|
|
|
assert_eq!(peers, vec![new.contact_info().unwrap().clone()]);
|
2021-07-01 12:00:10 -07:00
|
|
|
|
|
|
|
let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
|
2023-01-08 08:00:55 -08:00
|
|
|
let offline = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(offline));
|
2021-10-26 06:02:30 -07:00
|
|
|
crds.write()
|
|
|
|
.unwrap()
|
|
|
|
.insert(offline, now, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-07-01 12:00:10 -07:00
|
|
|
let req = node.new_pull_request(
|
|
|
|
&thread_pool,
|
|
|
|
&crds,
|
|
|
|
&node_keypair,
|
2020-09-29 16:06:02 -07:00
|
|
|
0,
|
2021-07-01 12:00:10 -07:00
|
|
|
now,
|
2020-09-29 16:06:02 -07:00
|
|
|
None,
|
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
2021-04-20 11:06:13 -07:00
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2020-09-29 16:06:02 -07:00
|
|
|
);
|
2021-07-01 12:00:10 -07:00
|
|
|
// Even though the offline node should have higher weight, we shouldn't request from it
|
|
|
|
// until we receive a ping.
|
2022-05-26 05:45:53 -07:00
|
|
|
let peers: Vec<_> = req.unwrap().into_keys().collect();
|
|
|
|
assert_eq!(peers, vec![new.contact_info().unwrap().clone()]);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_new_mark_creation_time() {
|
2020-11-12 08:09:37 -08:00
|
|
|
let now: u64 = 1_605_127_770_789;
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut ping_cache = PingCache::new(
|
2022-09-26 14:16:56 -07:00
|
|
|
Duration::from_secs(20 * 60), // ttl
|
|
|
|
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
|
|
|
128, // capacity
|
2021-04-20 11:06:13 -07:00
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
2021-04-20 11:06:13 -07:00
|
|
|
let node_keypair = Keypair::new();
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&node_keypair.pubkey(), 0),
|
|
|
|
));
|
2021-07-11 08:32:10 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-10-26 06:02:30 -07:00
|
|
|
crds.insert(entry, now, GossipRoute::LocalMessage).unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
|
|
|
ping_cache.mock_pong(old.id, old.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let old = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(old));
|
2021-10-26 06:02:30 -07:00
|
|
|
crds.insert(old.clone(), now, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
|
|
|
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2023-01-14 07:44:38 -08:00
|
|
|
crds.insert(new, now, GossipRoute::LocalMessage).unwrap();
|
2021-07-14 15:27:17 -07:00
|
|
|
let crds = RwLock::new(crds);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-11-12 08:09:37 -08:00
|
|
|
// set request creation time to now.
|
|
|
|
let now = now + 50_000;
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-11-12 08:09:37 -08:00
|
|
|
// odds of getting the other request should be close to 1.
|
|
|
|
let now = now + 1_000;
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut pings = Vec::new();
|
|
|
|
let ping_cache = Mutex::new(ping_cache);
|
2021-05-18 06:39:35 -07:00
|
|
|
let old = old.contact_info().unwrap();
|
|
|
|
let count = repeat_with(|| {
|
2022-05-26 05:45:53 -07:00
|
|
|
let requests = node
|
2021-05-18 06:39:35 -07:00
|
|
|
.new_pull_request(
|
|
|
|
&thread_pool,
|
|
|
|
&crds,
|
|
|
|
&node_keypair,
|
|
|
|
0, // self_shred_version
|
|
|
|
now,
|
|
|
|
None, // gossip_validators
|
|
|
|
&HashMap::new(), // stakes
|
|
|
|
PACKET_DATA_SIZE, // bloom_size
|
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2021-05-18 06:39:35 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
2022-05-26 05:45:53 -07:00
|
|
|
requests.into_keys()
|
2021-05-18 06:39:35 -07:00
|
|
|
})
|
2022-05-26 05:45:53 -07:00
|
|
|
.flatten()
|
2021-05-18 06:39:35 -07:00
|
|
|
.take(100)
|
|
|
|
.filter(|peer| peer != old)
|
|
|
|
.count();
|
2023-01-26 22:23:03 -08:00
|
|
|
assert!(count < 75, "count of peer != old: {count}");
|
2021-03-24 11:33:56 -07:00
|
|
|
}
|
|
|
|
|
2020-08-11 06:26:42 -07:00
|
|
|
#[test]
|
|
|
|
fn test_generate_pull_responses() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let node_keypair = Keypair::new();
|
2020-08-11 06:26:42 -07:00
|
|
|
let mut node_crds = Crds::default();
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut ping_cache = PingCache::new(
|
2022-09-26 14:16:56 -07:00
|
|
|
Duration::from_secs(20 * 60), // ttl
|
|
|
|
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
|
|
|
128, // capacity
|
2021-04-20 11:06:13 -07:00
|
|
|
);
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&node_keypair.pubkey(), 0),
|
|
|
|
));
|
2021-04-28 06:19:12 -07:00
|
|
|
let caller = entry.clone();
|
2020-08-11 06:26:42 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(entry, 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
|
|
|
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::new(node_crds);
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut pings = Vec::new();
|
2020-08-11 06:26:42 -07:00
|
|
|
let req = node.new_pull_request(
|
2020-09-29 16:06:02 -07:00
|
|
|
&thread_pool,
|
2020-08-11 06:26:42 -07:00
|
|
|
&node_crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2020-08-11 06:26:42 -07:00
|
|
|
0,
|
|
|
|
0,
|
2020-09-11 12:00:16 -07:00
|
|
|
None,
|
2020-08-11 06:26:42 -07:00
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
2021-04-20 11:06:13 -07:00
|
|
|
&Mutex::new(ping_cache),
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2020-08-11 06:26:42 -07:00
|
|
|
);
|
|
|
|
|
2021-07-14 15:27:17 -07:00
|
|
|
let dest_crds = RwLock::<Crds>::default();
|
2022-05-26 05:45:53 -07:00
|
|
|
let filters = req.unwrap().into_values().flatten();
|
2020-08-11 06:26:42 -07:00
|
|
|
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
2021-07-10 15:16:33 -07:00
|
|
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
&thread_pool,
|
2020-12-18 10:45:12 -08:00
|
|
|
&dest_crds,
|
|
|
|
&filters,
|
2021-07-26 10:13:11 -07:00
|
|
|
usize::MAX, // output_size_limit
|
|
|
|
0, // now
|
2022-04-18 16:14:59 -07:00
|
|
|
&GossipStats::default(),
|
2020-12-18 10:45:12 -08:00
|
|
|
);
|
2020-08-11 06:26:42 -07:00
|
|
|
|
|
|
|
assert_eq!(rsp[0].len(), 0);
|
|
|
|
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2020-08-11 06:26:42 -07:00
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
|
|
|
)));
|
|
|
|
dest_crds
|
2021-07-14 15:27:17 -07:00
|
|
|
.write()
|
|
|
|
.unwrap()
|
2021-10-26 06:02:30 -07:00
|
|
|
.insert(
|
|
|
|
new,
|
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
|
|
|
GossipRoute::LocalMessage,
|
|
|
|
)
|
2020-08-11 06:26:42 -07:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
//should skip new value since caller is to old
|
2021-07-10 15:16:33 -07:00
|
|
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
&thread_pool,
|
2020-12-18 10:45:12 -08:00
|
|
|
&dest_crds,
|
|
|
|
&filters,
|
2021-07-26 10:13:11 -07:00
|
|
|
usize::MAX, // output_size_limit
|
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now
|
2022-04-18 16:14:59 -07:00
|
|
|
&GossipStats::default(),
|
2020-12-18 10:45:12 -08:00
|
|
|
);
|
2020-08-11 06:26:42 -07:00
|
|
|
assert_eq!(rsp[0].len(), 0);
|
2021-05-23 09:50:19 -07:00
|
|
|
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS);
|
2021-05-21 06:59:26 -07:00
|
|
|
filters.extend({
|
|
|
|
// Should return new value since caller is new.
|
|
|
|
let now = CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1;
|
|
|
|
let caller = ContactInfo::new_localhost(&Pubkey::new_unique(), now);
|
2023-01-08 08:00:55 -08:00
|
|
|
let caller = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(caller));
|
2021-05-21 06:59:26 -07:00
|
|
|
filters
|
|
|
|
.iter()
|
|
|
|
.map(|(_, filter)| (caller.clone(), filter.clone()))
|
|
|
|
.collect::<Vec<_>>()
|
|
|
|
});
|
2021-07-10 15:16:33 -07:00
|
|
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
&thread_pool,
|
2020-12-18 10:45:12 -08:00
|
|
|
&dest_crds,
|
|
|
|
&filters,
|
2021-07-26 10:13:11 -07:00
|
|
|
usize::MAX, // output_size_limit
|
2020-12-18 10:45:12 -08:00
|
|
|
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
2022-04-18 16:14:59 -07:00
|
|
|
&GossipStats::default(),
|
2020-12-18 10:45:12 -08:00
|
|
|
);
|
2021-05-23 09:50:19 -07:00
|
|
|
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
|
2021-05-21 06:59:26 -07:00
|
|
|
// There should be only one non-empty response in the 2nd half.
|
|
|
|
// Orders are also preserved.
|
2021-05-23 09:50:19 -07:00
|
|
|
assert!(rsp.iter().take(MIN_NUM_BLOOM_FILTERS).all(|r| r.is_empty()));
|
|
|
|
assert_eq!(rsp.iter().filter(|r| r.is_empty()).count(), rsp.len() - 1);
|
|
|
|
assert_eq!(rsp.iter().find(|r| r.len() == 1).unwrap().len(), 1);
|
2020-08-11 06:26:42 -07:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
|
|
|
fn test_process_pull_request() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let node_keypair = Keypair::new();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node_crds = Crds::default();
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&node_keypair.pubkey(), 0),
|
|
|
|
));
|
2021-04-28 06:19:12 -07:00
|
|
|
let caller = entry.clone();
|
2018-11-15 13:23:26 -08:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(entry, 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut ping_cache = PingCache::new(
|
2022-09-26 14:16:56 -07:00
|
|
|
Duration::from_secs(20 * 60), // ttl
|
|
|
|
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
|
|
|
128, // capacity
|
2021-04-20 11:06:13 -07:00
|
|
|
);
|
|
|
|
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
|
|
|
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::new(node_crds);
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut pings = Vec::new();
|
2019-08-13 18:04:14 -07:00
|
|
|
let req = node.new_pull_request(
|
2020-09-29 16:06:02 -07:00
|
|
|
&thread_pool,
|
2019-08-13 18:04:14 -07:00
|
|
|
&node_crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2019-08-13 18:04:14 -07:00
|
|
|
0,
|
2020-05-05 20:15:19 -07:00
|
|
|
0,
|
2020-09-11 12:00:16 -07:00
|
|
|
None,
|
2019-08-13 18:04:14 -07:00
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
2021-04-20 11:06:13 -07:00
|
|
|
&Mutex::new(ping_cache),
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2019-08-13 18:04:14 -07:00
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2021-07-14 15:27:17 -07:00
|
|
|
let dest_crds = RwLock::<Crds>::default();
|
2022-05-26 05:45:53 -07:00
|
|
|
let filters = req.unwrap().into_values().flatten();
|
2020-05-28 11:38:13 -07:00
|
|
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
2021-07-10 15:16:33 -07:00
|
|
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
&thread_pool,
|
2020-12-18 10:45:12 -08:00
|
|
|
&dest_crds,
|
|
|
|
&filters,
|
2021-07-26 10:13:11 -07:00
|
|
|
usize::MAX, // output_size_limit
|
|
|
|
0, // now
|
2022-04-18 16:14:59 -07:00
|
|
|
&GossipStats::default(),
|
2020-12-18 10:45:12 -08:00
|
|
|
);
|
2021-07-14 15:27:17 -07:00
|
|
|
let callers = filters.into_iter().map(|(caller, _)| caller);
|
|
|
|
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
|
|
|
|
let dest_crds = dest_crds.read().unwrap();
|
2019-08-15 17:04:45 -07:00
|
|
|
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
|
2021-07-21 05:16:26 -07:00
|
|
|
assert!(dest_crds.get::<&CrdsValue>(&caller.label()).is_some());
|
|
|
|
assert_eq!(1, {
|
|
|
|
let entry: &VersionedCrdsValue = dest_crds.get(&caller.label()).unwrap();
|
|
|
|
entry.local_timestamp
|
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_pull_request_response() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let node_keypair = Keypair::new();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node_crds = Crds::default();
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&node_keypair.pubkey(), 1),
|
|
|
|
));
|
2021-04-28 06:19:12 -07:00
|
|
|
let caller = entry.clone();
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = entry.label().pubkey();
|
2021-07-11 08:32:10 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(entry, 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut ping_cache = PingCache::new(
|
2022-09-26 14:16:56 -07:00
|
|
|
Duration::from_secs(20 * 60), // ttl
|
|
|
|
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
|
|
|
128, // capacity
|
2021-04-20 11:06:13 -07:00
|
|
|
);
|
|
|
|
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1);
|
|
|
|
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
let mut dest_crds = Crds::default();
|
2020-10-19 12:12:08 -07:00
|
|
|
let new_id = solana_sdk::pubkey::new_rand();
|
2021-04-20 11:06:13 -07:00
|
|
|
let new = ContactInfo::new_localhost(&new_id, 1);
|
|
|
|
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
|
2021-10-26 06:02:30 -07:00
|
|
|
dest_crds
|
|
|
|
.insert(new.clone(), 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-07-14 15:27:17 -07:00
|
|
|
let dest_crds = RwLock::new(dest_crds);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// node contains a key from the dest node, but at an older local timestamp
|
2021-04-20 11:06:13 -07:00
|
|
|
let same_key = ContactInfo::new_localhost(&new_id, 0);
|
|
|
|
ping_cache.mock_pong(same_key.id, same_key.gossip, Instant::now());
|
2023-01-08 08:00:55 -08:00
|
|
|
let same_key = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(same_key));
|
2019-03-08 19:28:19 -08:00
|
|
|
assert_eq!(same_key.label(), new.label());
|
|
|
|
assert!(same_key.wallclock() < new.wallclock());
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(same_key.clone(), 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-07-21 05:16:26 -07:00
|
|
|
assert_eq!(0, {
|
|
|
|
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
|
|
|
|
entry.local_timestamp
|
|
|
|
});
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::new(node_crds);
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut done = false;
|
2021-04-20 11:06:13 -07:00
|
|
|
let mut pings = Vec::new();
|
|
|
|
let ping_cache = Mutex::new(ping_cache);
|
2018-11-15 13:23:26 -08:00
|
|
|
for _ in 0..30 {
|
|
|
|
// there is a chance of a false positive with bloom filters
|
2019-08-13 18:04:14 -07:00
|
|
|
let req = node.new_pull_request(
|
2020-09-29 16:06:02 -07:00
|
|
|
&thread_pool,
|
2019-08-13 18:04:14 -07:00
|
|
|
&node_crds,
|
2021-04-20 11:06:13 -07:00
|
|
|
&node_keypair,
|
2019-08-13 18:04:14 -07:00
|
|
|
0,
|
2020-05-05 20:15:19 -07:00
|
|
|
0,
|
2020-09-11 12:00:16 -07:00
|
|
|
None,
|
2019-08-13 18:04:14 -07:00
|
|
|
&HashMap::new(),
|
|
|
|
PACKET_DATA_SIZE,
|
2021-04-20 11:06:13 -07:00
|
|
|
&ping_cache,
|
|
|
|
&mut pings,
|
2021-07-23 08:25:03 -07:00
|
|
|
&SocketAddrSpace::Unspecified,
|
2019-08-13 18:04:14 -07:00
|
|
|
);
|
2022-05-26 05:45:53 -07:00
|
|
|
let filters = req.unwrap().into_values().flatten();
|
2020-05-28 11:38:13 -07:00
|
|
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
2021-07-10 15:16:33 -07:00
|
|
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
2021-07-26 10:13:11 -07:00
|
|
|
&thread_pool,
|
2020-12-18 10:45:12 -08:00
|
|
|
&dest_crds,
|
|
|
|
&filters,
|
2021-07-26 10:13:11 -07:00
|
|
|
usize::MAX, // output_size_limit
|
|
|
|
0, // now
|
2022-04-18 16:14:59 -07:00
|
|
|
&GossipStats::default(),
|
2020-12-18 10:45:12 -08:00
|
|
|
);
|
2021-07-10 15:16:33 -07:00
|
|
|
CrdsGossipPull::process_pull_requests(
|
2021-07-14 15:27:17 -07:00
|
|
|
&dest_crds,
|
2020-10-28 10:03:02 -07:00
|
|
|
filters.into_iter().map(|(caller, _)| caller),
|
|
|
|
0,
|
|
|
|
);
|
2019-08-15 17:04:45 -07:00
|
|
|
// if there is a false positive this is empty
|
|
|
|
// prob should be around 0.1 per iteration
|
|
|
|
if rsp.is_empty() {
|
|
|
|
continue;
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
if rsp.is_empty() {
|
|
|
|
continue;
|
|
|
|
}
|
2021-05-23 09:50:19 -07:00
|
|
|
assert_eq!(rsp.len(), MIN_NUM_BLOOM_FILTERS);
|
2020-05-25 15:03:34 -07:00
|
|
|
let failed = node
|
|
|
|
.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-05-25 15:03:34 -07:00
|
|
|
&node_pubkey,
|
2021-05-21 08:55:22 -07:00
|
|
|
&node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()),
|
2021-05-21 06:59:26 -07:00
|
|
|
rsp.into_iter().flatten().collect(),
|
2020-05-25 15:03:34 -07:00
|
|
|
1,
|
|
|
|
)
|
|
|
|
.0;
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(failed, 0);
|
2021-07-14 15:27:17 -07:00
|
|
|
assert_eq!(1, {
|
|
|
|
let node_crds = node_crds.read().unwrap();
|
2021-07-21 05:16:26 -07:00
|
|
|
let entry: &VersionedCrdsValue = node_crds.get(&new.label()).unwrap();
|
|
|
|
entry.local_timestamp
|
2021-07-14 15:27:17 -07:00
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
// verify that the whole record was updated for dest since this is a response from dest
|
2021-07-14 15:27:17 -07:00
|
|
|
assert_eq!(1, {
|
|
|
|
let node_crds = node_crds.read().unwrap();
|
2021-07-21 05:16:26 -07:00
|
|
|
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
|
|
|
|
entry.local_timestamp
|
2021-07-14 15:27:17 -07:00
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
done = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
assert!(done);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_gossip_purge() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut node_crds = Crds::default();
|
2023-01-08 08:00:55 -08:00
|
|
|
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
|
|
|
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
|
|
|
|
));
|
2018-11-15 13:23:26 -08:00
|
|
|
let node_label = entry.label();
|
2019-05-23 23:20:04 -07:00
|
|
|
let node_pubkey = node_label.pubkey();
|
2021-07-10 15:16:33 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(entry, 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2023-01-08 08:00:55 -08:00
|
|
|
let old = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2021-10-26 06:02:30 -07:00
|
|
|
node_crds
|
|
|
|
.insert(old.clone(), 0, GossipRoute::LocalMessage)
|
|
|
|
.unwrap();
|
2021-07-21 05:16:26 -07:00
|
|
|
let value_hash = {
|
|
|
|
let entry: &VersionedCrdsValue = node_crds.get(&old.label()).unwrap();
|
|
|
|
entry.value_hash
|
|
|
|
};
|
2018-11-15 13:23:26 -08:00
|
|
|
//verify self is valid
|
2021-05-24 11:21:54 -07:00
|
|
|
assert_eq!(
|
2021-07-21 05:16:26 -07:00
|
|
|
node_crds.get::<&CrdsValue>(&node_label).unwrap().label(),
|
2021-05-24 11:21:54 -07:00
|
|
|
node_label
|
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
// purge
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::new(node_crds);
|
2021-05-21 08:55:22 -07:00
|
|
|
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
|
2021-07-14 15:27:17 -07:00
|
|
|
CrdsGossipPull::purge_active(&thread_pool, &node_crds, node.crds_timeout, &timeouts);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
//verify self is still valid after purge
|
2021-07-14 15:27:17 -07:00
|
|
|
assert_eq!(node_label, {
|
|
|
|
let node_crds = node_crds.read().unwrap();
|
2021-07-21 05:16:26 -07:00
|
|
|
node_crds.get::<&CrdsValue>(&node_label).unwrap().label()
|
2021-07-14 15:27:17 -07:00
|
|
|
});
|
2021-07-21 05:16:26 -07:00
|
|
|
assert_eq!(
|
|
|
|
node_crds.read().unwrap().get::<&CrdsValue>(&old.label()),
|
|
|
|
None
|
|
|
|
);
|
2021-07-14 15:27:17 -07:00
|
|
|
assert_eq!(node_crds.read().unwrap().num_purged(), 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
for _ in 0..30 {
|
|
|
|
// there is a chance of a false positive with bloom filters
|
|
|
|
// assert that purged value is still in the set
|
|
|
|
// chance of 30 consecutive false positives is 0.1^30
|
2020-09-29 16:06:02 -07:00
|
|
|
let filters = node.build_crds_filters(&thread_pool, &node_crds, PACKET_DATA_SIZE);
|
2019-08-13 18:04:14 -07:00
|
|
|
assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// purge the value
|
2021-07-14 15:27:17 -07:00
|
|
|
let mut node_crds = node_crds.write().unwrap();
|
2021-05-24 06:47:21 -07:00
|
|
|
node_crds.trim_purged(node.crds_timeout + 1);
|
|
|
|
assert_eq!(node_crds.num_purged(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-08-13 18:04:14 -07:00
|
|
|
#[test]
|
2020-05-15 09:35:43 -07:00
|
|
|
#[allow(clippy::float_cmp)]
|
2019-08-13 18:04:14 -07:00
|
|
|
fn test_crds_filter_mask() {
|
|
|
|
let filter = CrdsFilter::new_rand(1, 128);
|
|
|
|
assert_eq!(filter.mask, !0x0);
|
|
|
|
assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64);
|
|
|
|
//1000/9 = 111, so 7 bits are needed to mask it
|
|
|
|
assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32);
|
|
|
|
let filter = CrdsFilter::new_rand(1000, 10);
|
2020-05-15 09:35:43 -07:00
|
|
|
assert_eq!(filter.mask & 0x00_ffff_ffff, 0x00_ffff_ffff);
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_add_no_mask() {
|
|
|
|
let mut filter = CrdsFilter::new_rand(1, 128);
|
|
|
|
let h: Hash = hash(Hash::default().as_ref());
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
let h: Hash = hash(h.as_ref());
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_add_mask() {
|
|
|
|
let mut filter = CrdsFilter::new_rand(1000, 10);
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
while !filter.test_mask(&h) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
assert!(filter.test_mask(&h));
|
|
|
|
//if the mask succeeds, we want the guaranteed negative
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_complete_set_add_mask() {
|
2020-09-29 16:06:02 -07:00
|
|
|
let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into();
|
2019-08-13 18:04:14 -07:00
|
|
|
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
// rev to make the hash::default() miss on the first few test_masks
|
|
|
|
while !filters.iter().rev().any(|f| f.test_mask(&h)) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap();
|
|
|
|
assert!(filter.test_mask(&h));
|
|
|
|
//if the mask succeeds, we want the guaranteed negative
|
|
|
|
assert!(!filter.contains(&h));
|
|
|
|
filter.add(&h);
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_crds_filter_contains_mask() {
|
|
|
|
let filter = CrdsFilter::new_rand(1000, 10);
|
|
|
|
assert!(filter.mask_bits > 0);
|
|
|
|
let mut h: Hash = Hash::default();
|
|
|
|
while filter.test_mask(&h) {
|
|
|
|
h = hash(h.as_ref());
|
|
|
|
}
|
|
|
|
assert!(!filter.test_mask(&h));
|
|
|
|
//if the mask fails, the hash is contained in the set, and can be treated as a false
|
|
|
|
//positive
|
|
|
|
assert!(filter.contains(&h));
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_mask() {
|
|
|
|
for i in 0..16 {
|
|
|
|
run_test_mask(i);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn run_test_mask(mask_bits: u32) {
|
2021-05-19 07:31:47 -07:00
|
|
|
assert_eq!(
|
|
|
|
(0..2u64.pow(mask_bits))
|
|
|
|
.map(|seed| CrdsFilter::compute_mask(seed, mask_bits))
|
|
|
|
.dedup()
|
|
|
|
.count(),
|
|
|
|
2u64.pow(mask_bits) as usize
|
|
|
|
)
|
2019-08-13 18:04:14 -07:00
|
|
|
}
|
2020-02-07 12:38:24 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_process_pull_response() {
|
2021-12-29 11:31:26 -08:00
|
|
|
let mut rng = rand::thread_rng();
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::<Crds>::default();
|
2021-07-11 08:32:10 -07:00
|
|
|
let node = CrdsGossipPull::default();
|
2020-02-07 12:38:24 -08:00
|
|
|
|
2020-10-19 12:12:08 -07:00
|
|
|
let peer_pubkey = solana_sdk::pubkey::new_rand();
|
2023-01-08 08:00:55 -08:00
|
|
|
let peer_entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
2020-02-07 12:38:24 -08:00
|
|
|
ContactInfo::new_localhost(&peer_pubkey, 0),
|
|
|
|
));
|
|
|
|
let mut timeouts = HashMap::new();
|
|
|
|
timeouts.insert(Pubkey::default(), node.crds_timeout);
|
|
|
|
timeouts.insert(peer_pubkey, node.msg_timeout + 1);
|
|
|
|
// inserting a fresh value should be fine.
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_entry.clone()],
|
|
|
|
1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::<Crds>::default();
|
2023-01-08 08:00:55 -08:00
|
|
|
let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
|
2020-02-07 12:38:24 -08:00
|
|
|
ContactInfo::new_localhost(&peer_pubkey, 0),
|
|
|
|
));
|
|
|
|
// check that old contact infos fail if they are too old, regardless of "timeouts"
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_entry.clone(), unstaked_peer_entry],
|
|
|
|
node.msg_timeout + 100,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2021-05-16 07:37:45 -07:00
|
|
|
4
|
2020-02-07 12:38:24 -08:00
|
|
|
);
|
|
|
|
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::<Crds>::default();
|
2020-02-07 12:38:24 -08:00
|
|
|
// check that old contact infos can still land as long as they have a "timeouts" entry
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
2020-05-15 09:35:43 -07:00
|
|
|
vec![peer_entry],
|
2020-02-07 12:38:24 -08:00
|
|
|
node.msg_timeout + 1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
|
|
|
// construct something that's not a contact info
|
2021-12-29 11:31:26 -08:00
|
|
|
let peer_vote = Vote::new(peer_pubkey, new_test_vote_tx(&mut rng), 0).unwrap();
|
|
|
|
let peer_vote = CrdsValue::new_unsigned(CrdsData::Vote(0, peer_vote));
|
2020-02-07 12:38:24 -08:00
|
|
|
// check that older CrdsValues (non-ContactInfos) infos pass even if are too old,
|
|
|
|
// but a recent contact info (inserted above) exists
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
|
|
|
vec![peer_vote.clone()],
|
|
|
|
node.msg_timeout + 1,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2020-02-07 12:38:24 -08:00
|
|
|
0
|
|
|
|
);
|
|
|
|
|
2021-07-14 15:27:17 -07:00
|
|
|
let node_crds = RwLock::<Crds>::default();
|
2020-02-07 12:38:24 -08:00
|
|
|
// without a contact info, inserting an old value should fail
|
|
|
|
assert_eq!(
|
|
|
|
node.process_pull_response(
|
2021-07-14 15:27:17 -07:00
|
|
|
&node_crds,
|
2020-02-07 12:38:24 -08:00
|
|
|
&peer_pubkey,
|
|
|
|
&timeouts,
|
2020-05-15 09:35:43 -07:00
|
|
|
vec![peer_vote],
|
2021-04-14 13:18:00 -07:00
|
|
|
node.msg_timeout + 2,
|
2020-05-25 15:03:34 -07:00
|
|
|
)
|
|
|
|
.0,
|
2021-05-16 07:37:45 -07:00
|
|
|
2
|
2020-02-07 12:38:24 -08:00
|
|
|
);
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|