2018-11-15 13:23:26 -08:00
|
|
|
//! Crds Gossip Push overlay
|
|
|
|
//! This module is used to propagate recently created CrdsValues across the network
|
|
|
|
//! Eager push strategy is based on Plumtree
|
|
|
|
//! http://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
|
|
|
|
//!
|
|
|
|
//! Main differences are:
|
|
|
|
//! 1. There is no `max hop`. Messages are signed with a local wallclock. If they are outside of
|
2020-06-10 17:00:17 -07:00
|
|
|
//! the local nodes wallclock window they are dropped silently.
|
2018-11-15 13:23:26 -08:00
|
|
|
//! 2. The prune set is stored in a Bloom filter.
|
|
|
|
|
2020-02-18 08:46:11 -08:00
|
|
|
use crate::{
|
2021-03-24 11:33:56 -07:00
|
|
|
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
|
2020-02-18 08:46:11 -08:00
|
|
|
contact_info::ContactInfo,
|
2021-04-30 09:57:19 -07:00
|
|
|
crds::{Crds, Cursor, VersionedCrdsValue},
|
2021-05-21 06:59:26 -07:00
|
|
|
crds_gossip::{get_stake, get_weight},
|
2020-02-18 08:46:11 -08:00
|
|
|
crds_gossip_error::CrdsGossipError,
|
2021-04-30 09:57:19 -07:00
|
|
|
crds_value::CrdsValue,
|
2020-02-18 08:46:11 -08:00
|
|
|
weighted_shuffle::weighted_shuffle,
|
|
|
|
};
|
2018-11-15 13:23:26 -08:00
|
|
|
use bincode::serialized_size;
|
|
|
|
use indexmap::map::IndexMap;
|
2021-03-24 11:33:56 -07:00
|
|
|
use lru::LruCache;
|
2020-10-06 06:48:32 -07:00
|
|
|
use rand::{seq::SliceRandom, Rng};
|
2020-11-05 07:42:00 -08:00
|
|
|
use solana_runtime::bloom::{AtomicBloom, Bloom};
|
2021-04-30 09:57:19 -07:00
|
|
|
use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
|
2020-02-18 08:46:11 -08:00
|
|
|
use std::{
|
|
|
|
cmp,
|
|
|
|
collections::{HashMap, HashSet},
|
2021-04-30 09:57:19 -07:00
|
|
|
ops::RangeBounds,
|
2020-02-18 08:46:11 -08:00
|
|
|
};
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
|
|
|
|
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
|
2020-02-07 12:38:24 -08:00
|
|
|
// With a fanout of 6, a 1000 node cluster should only take ~4 hops to converge.
|
|
|
|
// However since pushes are stake weighed, some trailing nodes
|
|
|
|
// might need more time to receive values. 30 seconds should be plenty.
|
|
|
|
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
|
2018-12-01 12:00:30 -08:00
|
|
|
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
2019-06-26 00:30:16 -07:00
|
|
|
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
|
2020-10-29 05:50:58 -07:00
|
|
|
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
|
2020-10-06 06:48:32 -07:00
|
|
|
// Do not push to peers which have not been updated for this long.
|
|
|
|
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
pub struct CrdsGossipPush {
|
|
|
|
/// max bytes per message
|
|
|
|
pub max_bytes: usize,
|
|
|
|
/// active set of validators for push
|
2020-11-05 07:42:00 -08:00
|
|
|
active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>,
|
2021-04-30 09:57:19 -07:00
|
|
|
/// Cursor into the crds table for values to push.
|
|
|
|
crds_cursor: Cursor,
|
2020-06-13 22:03:38 -07:00
|
|
|
/// Cache that tracks which validators a message was received from
|
2020-06-13 22:43:43 -07:00
|
|
|
/// bool indicates it has been pruned.
|
2020-06-13 22:03:38 -07:00
|
|
|
/// This cache represents a lagging view of which validators
|
|
|
|
/// currently have this node in their `active_set`
|
2021-05-13 06:50:16 -07:00
|
|
|
received_cache: HashMap<
|
|
|
|
Pubkey, // origin/owner
|
|
|
|
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
|
|
|
|
>,
|
2021-03-24 11:33:56 -07:00
|
|
|
last_pushed_to: LruCache<Pubkey, u64>,
|
2018-11-15 13:23:26 -08:00
|
|
|
pub num_active: usize,
|
|
|
|
pub push_fanout: usize,
|
|
|
|
pub msg_timeout: u64,
|
2018-12-01 12:00:30 -08:00
|
|
|
pub prune_timeout: u64,
|
2020-06-13 22:03:38 -07:00
|
|
|
pub num_total: usize,
|
|
|
|
pub num_old: usize,
|
|
|
|
pub num_pushes: usize,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for CrdsGossipPush {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
2019-11-14 10:24:53 -08:00
|
|
|
// Allow upto 64 Crds Values per PUSH
|
|
|
|
max_bytes: PACKET_DATA_SIZE * 64,
|
2018-11-15 13:23:26 -08:00
|
|
|
active_set: IndexMap::new(),
|
2021-04-30 09:57:19 -07:00
|
|
|
crds_cursor: Cursor::default(),
|
2019-06-26 00:30:16 -07:00
|
|
|
received_cache: HashMap::new(),
|
2021-03-24 11:33:56 -07:00
|
|
|
last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
|
2018-11-15 13:23:26 -08:00
|
|
|
num_active: CRDS_GOSSIP_NUM_ACTIVE,
|
|
|
|
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
|
|
|
|
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
2018-12-01 12:00:30 -08:00
|
|
|
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
|
2020-06-13 22:03:38 -07:00
|
|
|
num_total: 0,
|
|
|
|
num_old: 0,
|
|
|
|
num_pushes: 0,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl CrdsGossipPush {
|
2021-04-30 09:57:19 -07:00
|
|
|
pub fn num_pending(&self, crds: &Crds) -> usize {
|
|
|
|
let mut cursor = self.crds_cursor;
|
|
|
|
crds.get_entries(&mut cursor).count()
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-06-26 00:30:16 -07:00
|
|
|
|
|
|
|
fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 {
|
|
|
|
let min_path_stake = self_stake.min(origin_stake);
|
2019-07-16 10:20:03 -07:00
|
|
|
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
|
2019-06-26 00:30:16 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn prune_received_cache(
|
|
|
|
&mut self,
|
|
|
|
self_pubkey: &Pubkey,
|
|
|
|
origin: &Pubkey,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
) -> Vec<Pubkey> {
|
|
|
|
let origin_stake = stakes.get(origin).unwrap_or(&0);
|
|
|
|
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
|
2021-05-13 06:50:16 -07:00
|
|
|
let peers = match self.received_cache.get_mut(origin) {
|
|
|
|
None => return Vec::default(),
|
|
|
|
Some(peers) => peers,
|
|
|
|
};
|
2020-06-13 22:03:38 -07:00
|
|
|
let peer_stake_total: u64 = peers
|
|
|
|
.iter()
|
2021-05-13 06:50:16 -07:00
|
|
|
.filter(|(_, (pruned, _))| !pruned)
|
|
|
|
.filter_map(|(peer, _)| stakes.get(peer))
|
2020-06-13 22:03:38 -07:00
|
|
|
.sum();
|
2019-06-26 00:30:16 -07:00
|
|
|
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
|
|
|
|
if peer_stake_total < prune_stake_threshold {
|
|
|
|
return Vec::new();
|
|
|
|
}
|
2021-05-13 06:50:16 -07:00
|
|
|
let shuffled_staked_peers = {
|
|
|
|
let peers: Vec<_> = peers
|
|
|
|
.iter()
|
|
|
|
.filter(|(_, (pruned, _))| !pruned)
|
|
|
|
.filter_map(|(peer, _)| Some((*peer, *stakes.get(peer)?)))
|
|
|
|
.filter(|(_, stake)| *stake > 0)
|
|
|
|
.collect();
|
|
|
|
let mut seed = [0; 32];
|
|
|
|
rand::thread_rng().fill(&mut seed[..]);
|
|
|
|
let weights: Vec<_> = peers.iter().map(|(_, stake)| *stake).collect();
|
|
|
|
weighted_shuffle(&weights, seed)
|
|
|
|
.into_iter()
|
|
|
|
.map(move |i| peers[i])
|
|
|
|
};
|
2019-06-26 00:30:16 -07:00
|
|
|
let mut keep = HashSet::new();
|
|
|
|
let mut peer_stake_sum = 0;
|
2020-10-29 05:50:58 -07:00
|
|
|
keep.insert(*origin);
|
2021-05-13 06:50:16 -07:00
|
|
|
for (peer, stake) in shuffled_staked_peers {
|
|
|
|
if peer == *origin {
|
2020-10-29 05:50:58 -07:00
|
|
|
continue;
|
|
|
|
}
|
2021-05-13 06:50:16 -07:00
|
|
|
keep.insert(peer);
|
|
|
|
peer_stake_sum += stake;
|
2020-06-13 22:03:38 -07:00
|
|
|
if peer_stake_sum >= prune_stake_threshold
|
|
|
|
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
|
|
|
|
{
|
2019-06-26 00:30:16 -07:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-05-13 06:50:16 -07:00
|
|
|
for (peer, (pruned, _)) in peers.iter_mut() {
|
|
|
|
if !*pruned && !keep.contains(peer) {
|
|
|
|
*pruned = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
peers
|
2020-06-13 22:03:38 -07:00
|
|
|
.keys()
|
2021-05-13 06:50:16 -07:00
|
|
|
.filter(|peer| !keep.contains(peer))
|
|
|
|
.copied()
|
|
|
|
.collect()
|
2019-06-26 00:30:16 -07:00
|
|
|
}
|
|
|
|
|
2021-04-30 09:57:19 -07:00
|
|
|
fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> {
|
|
|
|
now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout)
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
/// process a push message to the network
|
|
|
|
pub fn process_push_message(
|
|
|
|
&mut self,
|
|
|
|
crds: &mut Crds,
|
2019-06-26 00:30:16 -07:00
|
|
|
from: &Pubkey,
|
2018-11-15 13:23:26 -08:00
|
|
|
value: CrdsValue,
|
|
|
|
now: u64,
|
|
|
|
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
|
2020-06-13 22:03:38 -07:00
|
|
|
self.num_total += 1;
|
2021-04-30 09:57:19 -07:00
|
|
|
if !self.wallclock_window(now).contains(&value.wallclock()) {
|
2018-11-15 13:23:26 -08:00
|
|
|
return Err(CrdsGossipError::PushMessageTimeout);
|
|
|
|
}
|
2021-04-30 09:57:19 -07:00
|
|
|
let origin = value.pubkey();
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
self.received_cache
|
2020-06-13 22:03:38 -07:00
|
|
|
.entry(origin)
|
removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79
Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298
For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129
However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392
Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.
To avoid such scenarios, this commit:
* removes Crds::new_versioned and Crd::insert_versioned.
* makes VersionedCrdsValue constructor private, only invoked in
Crds::insert, so that insert_timestamp is populated right before
insert.
This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.
2021-04-28 04:56:13 -07:00
|
|
|
.or_default()
|
|
|
|
.entry(*from)
|
|
|
|
.and_modify(|(_pruned, timestamp)| *timestamp = now)
|
|
|
|
.or_insert((/*pruned:*/ false, now));
|
2021-04-30 09:57:19 -07:00
|
|
|
crds.insert(value, now).map_err(|_| {
|
|
|
|
self.num_old += 1;
|
|
|
|
CrdsGossipError::PushMessageOldVersion
|
|
|
|
})
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// New push message to broadcast to peers.
|
|
|
|
/// Returns a list of Pubkeys for the selected peers and a list of values to send to all the
|
|
|
|
/// peers.
|
|
|
|
/// The list of push messages is created such that all the randomly selected peers have not
|
|
|
|
/// pruned the source addresses.
|
2019-05-28 18:39:40 -07:00
|
|
|
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
2020-10-07 11:29:20 -07:00
|
|
|
let push_fanout = self.push_fanout.min(self.active_set.len());
|
|
|
|
if push_fanout == 0 {
|
|
|
|
return HashMap::default();
|
|
|
|
}
|
|
|
|
let mut num_pushes = 0;
|
|
|
|
let mut num_values = 0;
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut total_bytes: usize = 0;
|
2019-05-28 18:39:40 -07:00
|
|
|
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
2021-04-30 09:57:19 -07:00
|
|
|
let wallclock_window = self.wallclock_window(now);
|
|
|
|
let entries = crds
|
|
|
|
.get_entries(&mut self.crds_cursor)
|
|
|
|
.map(|entry| &entry.value)
|
|
|
|
.filter(|value| wallclock_window.contains(&value.wallclock()));
|
|
|
|
for value in entries {
|
|
|
|
let serialized_size = serialized_size(&value).unwrap();
|
|
|
|
total_bytes = total_bytes.saturating_add(serialized_size as usize);
|
|
|
|
if total_bytes > self.max_bytes {
|
|
|
|
break;
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2021-04-30 09:57:19 -07:00
|
|
|
num_values += 1;
|
|
|
|
let origin = value.pubkey();
|
|
|
|
// Use a consistent index for the same origin so the active set
|
|
|
|
// learns the MST for that origin.
|
|
|
|
let offset = origin.as_ref()[0] as usize;
|
|
|
|
for i in offset..offset + push_fanout {
|
2020-10-07 11:29:20 -07:00
|
|
|
let index = i % self.active_set.len();
|
|
|
|
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
2020-12-08 06:19:01 -08:00
|
|
|
if !filter.contains(&origin) || value.should_force_push(peer) {
|
2020-10-07 11:29:20 -07:00
|
|
|
trace!("new_push_messages insert {} {:?}", *peer, value);
|
|
|
|
push_messages.entry(*peer).or_default().push(value.clone());
|
|
|
|
num_pushes += 1;
|
|
|
|
}
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-10-07 11:29:20 -07:00
|
|
|
self.num_pushes += num_pushes;
|
|
|
|
trace!("new_push_messages {} {}", num_values, self.active_set.len());
|
2021-03-24 11:33:56 -07:00
|
|
|
for target_pubkey in push_messages.keys().copied() {
|
|
|
|
self.last_pushed_to.put(target_pubkey, now);
|
2020-10-02 13:57:26 -07:00
|
|
|
}
|
2019-05-28 18:39:40 -07:00
|
|
|
push_messages
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// add the `from` to the peer's filter of nodes
|
2020-11-05 07:42:00 -08:00
|
|
|
pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
|
|
|
if let Some(filter) = self.active_set.get(peer) {
|
2020-10-29 08:17:19 -07:00
|
|
|
for origin in origins {
|
|
|
|
if origin != self_pubkey {
|
2020-11-05 07:42:00 -08:00
|
|
|
filter.add(origin);
|
2020-10-29 08:17:19 -07:00
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn compute_need(num_active: usize, active_set_len: usize, ratio: usize) -> usize {
|
|
|
|
let num = active_set_len / ratio;
|
|
|
|
cmp::min(num_active, (num_active - active_set_len) + num)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// refresh the push active set
|
|
|
|
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
|
|
|
pub fn refresh_push_active_set(
|
|
|
|
&mut self,
|
|
|
|
crds: &Crds,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2020-09-11 12:00:16 -07:00
|
|
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
2019-03-09 19:28:43 -08:00
|
|
|
self_id: &Pubkey,
|
2020-05-05 20:15:19 -07:00
|
|
|
self_shred_version: u16,
|
2018-11-15 13:23:26 -08:00
|
|
|
network_size: usize,
|
|
|
|
ratio: usize,
|
|
|
|
) {
|
2021-05-21 06:59:26 -07:00
|
|
|
const BLOOM_FALSE_RATE: f64 = 0.1;
|
|
|
|
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
|
2021-05-23 09:50:19 -07:00
|
|
|
#[cfg(debug_assertions)]
|
|
|
|
const MIN_NUM_BLOOM_ITEMS: usize = 512;
|
|
|
|
#[cfg(not(debug_assertions))]
|
|
|
|
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
|
2020-10-06 06:48:32 -07:00
|
|
|
let mut rng = rand::thread_rng();
|
2018-11-15 13:23:26 -08:00
|
|
|
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
|
|
|
let mut new_items = HashMap::new();
|
2021-05-21 06:59:26 -07:00
|
|
|
let (weights, peers): (Vec<_>, Vec<_>) = self
|
|
|
|
.push_options(
|
|
|
|
crds,
|
|
|
|
&self_id,
|
|
|
|
self_shred_version,
|
|
|
|
stakes,
|
|
|
|
gossip_validators,
|
|
|
|
)
|
|
|
|
.into_iter()
|
|
|
|
.unzip();
|
|
|
|
if peers.is_empty() {
|
2019-02-20 17:08:56 -08:00
|
|
|
return;
|
|
|
|
}
|
2021-05-23 09:50:19 -07:00
|
|
|
let num_bloom_items = MIN_NUM_BLOOM_ITEMS.max(network_size);
|
2021-05-21 06:59:26 -07:00
|
|
|
let shuffle = {
|
|
|
|
let mut seed = [0; 32];
|
|
|
|
rng.fill(&mut seed[..]);
|
|
|
|
weighted_shuffle(&weights, seed).into_iter()
|
|
|
|
};
|
|
|
|
for peer in shuffle.map(|i| peers[i].id) {
|
|
|
|
if new_items.len() >= need {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if self.active_set.contains_key(&peer) || new_items.contains_key(&peer) {
|
|
|
|
continue;
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2021-05-21 06:59:26 -07:00
|
|
|
let bloom = AtomicBloom::from(Bloom::random(
|
|
|
|
num_bloom_items,
|
|
|
|
BLOOM_FALSE_RATE,
|
|
|
|
BLOOM_MAX_BITS,
|
|
|
|
));
|
|
|
|
bloom.add(&peer);
|
|
|
|
new_items.insert(peer, bloom);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
|
2020-10-06 06:48:32 -07:00
|
|
|
keys.shuffle(&mut rng);
|
2018-11-15 13:23:26 -08:00
|
|
|
let num = keys.len() / ratio;
|
|
|
|
for k in &keys[..num] {
|
2019-10-17 14:19:27 -07:00
|
|
|
self.active_set.swap_remove(k);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
for (k, v) in new_items {
|
|
|
|
self.active_set.insert(k, v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-26 11:45:10 -08:00
|
|
|
fn push_options<'a>(
|
|
|
|
&self,
|
|
|
|
crds: &'a Crds,
|
|
|
|
self_id: &Pubkey,
|
2020-05-05 20:15:19 -07:00
|
|
|
self_shred_version: u16,
|
2019-02-26 11:45:10 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2020-09-11 12:00:16 -07:00
|
|
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
2019-02-26 11:45:10 -08:00
|
|
|
) -> Vec<(f32, &'a ContactInfo)> {
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
let max_weight = u16::MAX as f32 - 1.0;
|
|
|
|
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
|
2020-11-15 08:38:04 -08:00
|
|
|
crds.get_nodes()
|
2020-10-06 06:48:32 -07:00
|
|
|
.filter_map(|value| {
|
2020-11-15 08:38:04 -08:00
|
|
|
let info = value.value.contact_info().unwrap();
|
2020-10-06 06:48:32 -07:00
|
|
|
// Stop pushing to nodes which have not been active recently.
|
|
|
|
if value.local_timestamp < active_cutoff {
|
|
|
|
// In order to mitigate eclipse attack, for staked nodes
|
|
|
|
// continue retrying periodically.
|
|
|
|
let stake = stakes.get(&info.id).unwrap_or(&0);
|
2020-11-12 08:09:37 -08:00
|
|
|
if *stake == 0 || !rng.gen_ratio(1, 16) {
|
2020-10-06 06:48:32 -07:00
|
|
|
return None;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Some(info)
|
|
|
|
})
|
|
|
|
.filter(|info| {
|
2020-05-05 20:15:19 -07:00
|
|
|
info.id != *self_id
|
|
|
|
&& ContactInfo::is_valid_address(&info.gossip)
|
2020-08-18 18:52:45 -07:00
|
|
|
&& self_shred_version == info.shred_version
|
2020-09-11 12:00:16 -07:00
|
|
|
&& gossip_validators.map_or(true, |gossip_validators| {
|
|
|
|
gossip_validators.contains(&info.id)
|
|
|
|
})
|
2020-05-05 20:15:19 -07:00
|
|
|
})
|
2020-10-06 06:48:32 -07:00
|
|
|
.map(|info| {
|
2021-03-24 11:33:56 -07:00
|
|
|
let last_pushed_to = self
|
|
|
|
.last_pushed_to
|
|
|
|
.peek(&info.id)
|
|
|
|
.copied()
|
|
|
|
.unwrap_or_default();
|
|
|
|
let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32;
|
2019-02-26 11:45:10 -08:00
|
|
|
let stake = get_stake(&info.id, stakes);
|
|
|
|
let weight = get_weight(max_weight, since, stake);
|
|
|
|
(weight, info)
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2019-06-26 00:30:16 -07:00
|
|
|
/// purge received push message cache
|
|
|
|
pub fn purge_old_received_cache(&mut self, min_time: u64) {
|
2020-09-28 14:59:59 -07:00
|
|
|
self.received_cache.retain(|_, v| {
|
|
|
|
v.retain(|_, (_, t)| *t > min_time);
|
|
|
|
!v.is_empty()
|
|
|
|
});
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-11-05 07:42:00 -08:00
|
|
|
|
|
|
|
// Only for tests and simulations.
|
|
|
|
pub(crate) fn mock_clone(&self) -> Self {
|
2021-03-24 11:33:56 -07:00
|
|
|
let active_set = self
|
|
|
|
.active_set
|
|
|
|
.iter()
|
|
|
|
.map(|(k, v)| (*k, v.mock_clone()))
|
|
|
|
.collect();
|
|
|
|
let mut last_pushed_to = LruCache::new(self.last_pushed_to.cap());
|
|
|
|
for (k, v) in self.last_pushed_to.iter().rev() {
|
|
|
|
last_pushed_to.put(*k, *v);
|
2020-11-05 07:42:00 -08:00
|
|
|
}
|
|
|
|
Self {
|
|
|
|
active_set,
|
|
|
|
received_cache: self.received_cache.clone(),
|
2021-03-24 11:33:56 -07:00
|
|
|
last_pushed_to,
|
2020-11-05 07:42:00 -08:00
|
|
|
..*self
|
|
|
|
}
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
2019-11-03 10:07:51 -08:00
|
|
|
use crate::crds_value::CrdsData;
|
2019-02-20 17:08:56 -08:00
|
|
|
|
2019-06-26 00:30:16 -07:00
|
|
|
#[test]
|
|
|
|
fn test_prune() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let mut stakes = HashMap::new();
|
|
|
|
|
2020-10-19 12:12:08 -07:00
|
|
|
let self_id = solana_sdk::pubkey::new_rand();
|
|
|
|
let origin = solana_sdk::pubkey::new_rand();
|
2019-06-26 00:30:16 -07:00
|
|
|
stakes.insert(self_id, 100);
|
|
|
|
stakes.insert(origin, 100);
|
|
|
|
|
2019-11-03 10:07:51 -08:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
|
|
|
&origin, 0,
|
|
|
|
)));
|
2020-10-19 12:12:08 -07:00
|
|
|
let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand());
|
2019-06-26 00:30:16 -07:00
|
|
|
let mut low_staked_set = HashSet::new();
|
|
|
|
low_staked_peers.for_each(|p| {
|
|
|
|
let _ = push.process_push_message(&mut crds, &p, value.clone(), 0);
|
|
|
|
low_staked_set.insert(p);
|
|
|
|
stakes.insert(p, 1);
|
|
|
|
});
|
|
|
|
|
2020-06-13 22:03:38 -07:00
|
|
|
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
2019-06-26 00:30:16 -07:00
|
|
|
assert!(
|
|
|
|
pruned.is_empty(),
|
|
|
|
"should not prune if min threshold has not been reached"
|
|
|
|
);
|
|
|
|
|
2020-10-19 12:12:08 -07:00
|
|
|
let high_staked_peer = solana_sdk::pubkey::new_rand();
|
2019-06-26 00:30:16 -07:00
|
|
|
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
|
|
|
|
stakes.insert(high_staked_peer, high_stake);
|
2020-05-15 09:35:43 -07:00
|
|
|
let _ = push.process_push_message(&mut crds, &high_staked_peer, value, 0);
|
2019-06-26 00:30:16 -07:00
|
|
|
|
2020-06-13 22:03:38 -07:00
|
|
|
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
2019-06-26 00:30:16 -07:00
|
|
|
assert!(
|
|
|
|
pruned.len() < low_staked_set.len() + 1,
|
|
|
|
"should not prune all peers"
|
|
|
|
);
|
|
|
|
pruned.iter().for_each(|p| {
|
|
|
|
assert!(
|
|
|
|
low_staked_set.contains(p),
|
|
|
|
"only low staked peers should be pruned"
|
|
|
|
);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[test]
|
2020-06-13 22:03:38 -07:00
|
|
|
fn test_process_push_one() {
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
let label = value.label();
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
assert_eq!(crds.lookup(&label), Some(&value));
|
|
|
|
|
|
|
|
// push it again
|
2020-06-13 22:03:38 -07:00
|
|
|
assert_matches!(
|
2020-05-15 09:35:43 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
2020-06-13 22:03:38 -07:00
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
2018-11-15 13:23:26 -08:00
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_old_version() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2020-10-19 12:12:08 -07:00
|
|
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
ci.wallclock = 1;
|
2019-11-03 10:07:51 -08:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// push a new message
|
2019-06-26 00:30:16 -07:00
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// push an old version
|
|
|
|
ci.wallclock = 0;
|
2020-05-15 09:35:43 -07:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_timeout() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
|
|
|
let timeout = push.msg_timeout;
|
2020-10-19 12:12:08 -07:00
|
|
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// push a version to far in the future
|
|
|
|
ci.wallclock = timeout + 1;
|
2019-11-03 10:07:51 -08:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::PushMessageTimeout)
|
|
|
|
);
|
|
|
|
|
|
|
|
// push a version to far in the past
|
|
|
|
ci.wallclock = 0;
|
2020-05-15 09:35:43 -07:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::PushMessageTimeout)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_push_update() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2020-10-19 12:12:08 -07:00
|
|
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
ci.wallclock = 0;
|
2019-11-03 10:07:51 -08:00
|
|
|
let value_old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value_old.clone(), 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
|
|
|
|
// push an old version
|
|
|
|
ci.wallclock = 1;
|
2020-05-15 09:35:43 -07:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0)
|
2018-11-15 13:23:26 -08:00
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.value,
|
|
|
|
value_old
|
|
|
|
);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_compute_need() {
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 0, 10), 30);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 1, 10), 29);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 30, 10), 3);
|
|
|
|
assert_eq!(CrdsGossipPush::compute_need(30, 29, 10), 3);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_refresh_active_set() {
|
2018-12-14 12:36:50 -08:00
|
|
|
solana_logger::setup();
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-10-06 06:48:32 -07:00
|
|
|
assert_eq!(crds.insert(value1.clone(), now), Ok(None));
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
2019-11-03 10:07:51 -08:00
|
|
|
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
2020-10-06 06:48:32 -07:00
|
|
|
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
|
2018-11-15 13:23:26 -08:00
|
|
|
for _ in 0..30 {
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert!(push.active_set.get(&value2.label().pubkey()).is_some());
|
|
|
|
|
|
|
|
for _ in 0..push.num_active {
|
2019-11-03 10:07:51 -08:00
|
|
|
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
2020-10-19 12:12:08 -07:00
|
|
|
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
|
2019-11-03 10:07:51 -08:00
|
|
|
));
|
2020-10-06 06:48:32 -07:00
|
|
|
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(push.active_set.len(), push.num_active);
|
|
|
|
}
|
|
|
|
#[test]
|
2019-02-20 17:08:56 -08:00
|
|
|
fn test_active_set_refresh_with_bank() {
|
2020-10-02 13:57:26 -07:00
|
|
|
solana_logger::setup();
|
2019-02-20 17:08:56 -08:00
|
|
|
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
|
|
|
|
let mut crds = Crds::default();
|
2020-10-02 13:57:26 -07:00
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-02-20 20:02:47 -08:00
|
|
|
let mut stakes = HashMap::new();
|
2019-02-20 17:08:56 -08:00
|
|
|
for i in 1..=100 {
|
2019-11-03 10:07:51 -08:00
|
|
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
time,
|
|
|
|
)));
|
2019-02-20 17:08:56 -08:00
|
|
|
let id = peer.label().pubkey();
|
|
|
|
crds.insert(peer.clone(), time).unwrap();
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes.insert(id, i * 100);
|
2021-03-24 11:33:56 -07:00
|
|
|
push.last_pushed_to.put(id, time);
|
2019-02-20 17:08:56 -08:00
|
|
|
}
|
2020-09-11 12:00:16 -07:00
|
|
|
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
|
2019-02-26 11:45:10 -08:00
|
|
|
assert!(!options.is_empty());
|
|
|
|
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
|
|
|
|
// check that the highest stake holder is also the heaviest weighted.
|
|
|
|
assert_eq!(
|
|
|
|
*stakes.get(&options.get(0).unwrap().1.id).unwrap(),
|
|
|
|
10_000_u64
|
2019-02-20 17:08:56 -08:00
|
|
|
);
|
|
|
|
}
|
2020-05-05 20:15:19 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_no_pushes_to_from_different_shred_versions() {
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
2020-05-05 20:15:19 -07:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let stakes = HashMap::new();
|
|
|
|
let node = CrdsGossipPush::default();
|
|
|
|
|
|
|
|
let gossip = socketaddr!("127.0.0.1:1234");
|
|
|
|
|
|
|
|
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-05-05 20:15:19 -07:00
|
|
|
shred_version: 123,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-05-05 20:15:19 -07:00
|
|
|
shred_version: 0,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-05-05 20:15:19 -07:00
|
|
|
shred_version: 123,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-05-05 20:15:19 -07:00
|
|
|
shred_version: 456,
|
2020-05-15 09:35:43 -07:00
|
|
|
gossip,
|
2020-05-05 20:15:19 -07:00
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
|
2020-10-06 06:48:32 -07:00
|
|
|
crds.insert(me.clone(), now).unwrap();
|
|
|
|
crds.insert(spy.clone(), now).unwrap();
|
|
|
|
crds.insert(node_123.clone(), now).unwrap();
|
|
|
|
crds.insert(node_456, now).unwrap();
|
2020-05-05 20:15:19 -07:00
|
|
|
|
2020-08-18 18:52:45 -07:00
|
|
|
// shred version 123 should ignore nodes with versions 0 and 456
|
2020-05-05 20:15:19 -07:00
|
|
|
let options = node
|
2020-09-11 12:00:16 -07:00
|
|
|
.push_options(&crds, &me.label().pubkey(), 123, &stakes, None)
|
2020-05-05 20:15:19 -07:00
|
|
|
.iter()
|
|
|
|
.map(|(_, c)| c.id)
|
|
|
|
.collect::<Vec<_>>();
|
2020-08-18 18:52:45 -07:00
|
|
|
assert_eq!(options.len(), 1);
|
|
|
|
assert!(!options.contains(&spy.pubkey()));
|
2020-05-05 20:15:19 -07:00
|
|
|
assert!(options.contains(&node_123.pubkey()));
|
|
|
|
|
2020-08-18 18:52:45 -07:00
|
|
|
// spy nodes should not push to people on different shred versions
|
2020-05-05 20:15:19 -07:00
|
|
|
let options = node
|
2020-09-11 12:00:16 -07:00
|
|
|
.push_options(&crds, &spy.label().pubkey(), 0, &stakes, None)
|
2020-05-05 20:15:19 -07:00
|
|
|
.iter()
|
|
|
|
.map(|(_, c)| c.id)
|
|
|
|
.collect::<Vec<_>>();
|
2020-08-18 18:52:45 -07:00
|
|
|
assert!(options.is_empty());
|
2020-05-05 20:15:19 -07:00
|
|
|
}
|
2020-09-11 12:00:16 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pushes_only_to_allowed() {
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
2020-09-11 12:00:16 -07:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let stakes = HashMap::new();
|
|
|
|
let node = CrdsGossipPush::default();
|
|
|
|
let gossip = socketaddr!("127.0.0.1:1234");
|
|
|
|
|
|
|
|
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-09-11 12:00:16 -07:00
|
|
|
gossip,
|
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
2020-10-19 12:12:08 -07:00
|
|
|
id: solana_sdk::pubkey::new_rand(),
|
2020-09-11 12:00:16 -07:00
|
|
|
gossip,
|
|
|
|
..ContactInfo::default()
|
|
|
|
}));
|
|
|
|
|
|
|
|
crds.insert(me.clone(), 0).unwrap();
|
2020-10-06 06:48:32 -07:00
|
|
|
crds.insert(node_123.clone(), now).unwrap();
|
2020-09-11 12:00:16 -07:00
|
|
|
|
|
|
|
// Unknown pubkey in gossip_validators -- will push to nobody
|
|
|
|
let mut gossip_validators = HashSet::new();
|
|
|
|
let options = node.push_options(
|
|
|
|
&crds,
|
|
|
|
&me.label().pubkey(),
|
|
|
|
0,
|
|
|
|
&stakes,
|
|
|
|
Some(&gossip_validators),
|
|
|
|
);
|
|
|
|
|
|
|
|
assert!(options.is_empty());
|
|
|
|
|
|
|
|
// Unknown pubkey in gossip_validators -- will push to nobody
|
2020-10-19 12:12:08 -07:00
|
|
|
gossip_validators.insert(solana_sdk::pubkey::new_rand());
|
2020-09-11 12:00:16 -07:00
|
|
|
let options = node.push_options(
|
|
|
|
&crds,
|
|
|
|
&me.label().pubkey(),
|
|
|
|
0,
|
|
|
|
&stakes,
|
|
|
|
Some(&gossip_validators),
|
|
|
|
);
|
|
|
|
assert!(options.is_empty());
|
|
|
|
|
|
|
|
// node_123 pubkey in gossip_validators -- will push to it
|
|
|
|
gossip_validators.insert(node_123.pubkey());
|
|
|
|
let options = node.push_options(
|
|
|
|
&crds,
|
|
|
|
&me.label().pubkey(),
|
|
|
|
0,
|
|
|
|
&stakes,
|
|
|
|
Some(&gossip_validators),
|
|
|
|
);
|
|
|
|
|
|
|
|
assert_eq!(options.len(), 1);
|
|
|
|
assert_eq!(options[0].1.id, node_123.pubkey());
|
|
|
|
}
|
|
|
|
|
2019-02-20 17:08:56 -08:00
|
|
|
#[test]
|
2018-11-15 13:23:26 -08:00
|
|
|
fn test_new_push_messages() {
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2020-10-06 06:48:32 -07:00
|
|
|
assert_eq!(crds.insert(peer.clone(), now), Ok(None));
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2019-11-03 10:07:51 -08:00
|
|
|
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2019-05-28 18:39:40 -07:00
|
|
|
let mut expected = HashMap::new();
|
|
|
|
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
|
2019-06-26 00:30:16 -07:00
|
|
|
assert_eq!(
|
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0),
|
|
|
|
Ok(None)
|
|
|
|
);
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(push.active_set.len(), 1);
|
2019-05-28 18:39:40 -07:00
|
|
|
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_personalized_push_messages() {
|
2020-10-06 06:48:32 -07:00
|
|
|
let now = timestamp();
|
2021-05-19 13:56:10 -07:00
|
|
|
let mut rng = rand::thread_rng();
|
2019-05-28 18:39:40 -07:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2021-05-19 13:56:10 -07:00
|
|
|
let peers: Vec<_> = vec![0, 0, now]
|
|
|
|
.into_iter()
|
|
|
|
.map(|wallclock| {
|
|
|
|
let mut peer = ContactInfo::new_rand(&mut rng, /*pubkey=*/ None);
|
|
|
|
peer.wallclock = wallclock;
|
|
|
|
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
assert_eq!(crds.insert(peers[0].clone(), now), Ok(None));
|
|
|
|
assert_eq!(crds.insert(peers[1].clone(), now), Ok(None));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2021-05-19 13:56:10 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now),
|
2019-05-28 18:39:40 -07:00
|
|
|
Ok(None)
|
2018-11-15 13:23:26 -08:00
|
|
|
);
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2019-05-28 18:39:40 -07:00
|
|
|
|
|
|
|
// push 3's contact info to 1 and 2 and 3
|
2021-05-19 13:56:10 -07:00
|
|
|
let expected: HashMap<_, _> = vec![
|
|
|
|
(peers[0].pubkey(), vec![peers[2].clone()]),
|
|
|
|
(peers[1].pubkey(), vec![peers[2].clone()]),
|
|
|
|
]
|
|
|
|
.into_iter()
|
|
|
|
.collect();
|
2019-05-28 18:39:40 -07:00
|
|
|
assert_eq!(push.active_set.len(), 3);
|
2020-10-06 06:48:32 -07:00
|
|
|
assert_eq!(push.new_push_messages(&crds, now), expected);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_process_prune() {
|
|
|
|
let mut crds = Crds::default();
|
2020-10-19 12:12:08 -07:00
|
|
|
let self_id = solana_sdk::pubkey::new_rand();
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2019-11-03 10:07:51 -08:00
|
|
|
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2019-05-28 18:39:40 -07:00
|
|
|
let expected = HashMap::new();
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Ok(None)
|
|
|
|
);
|
2020-06-13 22:03:38 -07:00
|
|
|
push.process_prune_msg(
|
|
|
|
&self_id,
|
|
|
|
&peer.label().pubkey(),
|
|
|
|
&[new_msg.label().pubkey()],
|
|
|
|
);
|
2019-05-28 18:39:40 -07:00
|
|
|
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
#[test]
|
|
|
|
fn test_purge_old_pending_push_messages() {
|
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2019-11-03 10:07:51 -08:00
|
|
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
2020-10-19 12:12:08 -07:00
|
|
|
&solana_sdk::pubkey::new_rand(),
|
2019-11-03 10:07:51 -08:00
|
|
|
0,
|
|
|
|
)));
|
2020-05-15 09:35:43 -07:00
|
|
|
assert_eq!(crds.insert(peer, 0), Ok(None));
|
2020-09-11 12:00:16 -07:00
|
|
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-10-19 12:12:08 -07:00
|
|
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
ci.wallclock = 1;
|
2020-05-15 09:35:43 -07:00
|
|
|
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
2019-05-28 18:39:40 -07:00
|
|
|
let expected = HashMap::new();
|
2018-11-15 13:23:26 -08:00
|
|
|
assert_eq!(
|
2020-05-15 09:35:43 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1),
|
2018-11-15 13:23:26 -08:00
|
|
|
Ok(None)
|
|
|
|
);
|
2019-05-28 18:39:40 -07:00
|
|
|
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2019-06-26 00:30:16 -07:00
|
|
|
fn test_purge_old_received_cache() {
|
2018-11-15 13:23:26 -08:00
|
|
|
let mut crds = Crds::default();
|
|
|
|
let mut push = CrdsGossipPush::default();
|
2020-10-19 12:12:08 -07:00
|
|
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
2018-11-15 13:23:26 -08:00
|
|
|
ci.wallclock = 0;
|
2020-05-15 09:35:43 -07:00
|
|
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
2018-11-15 13:23:26 -08:00
|
|
|
let label = value.label();
|
|
|
|
// push a new message
|
|
|
|
assert_eq!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Ok(None)
|
|
|
|
);
|
|
|
|
assert_eq!(crds.lookup(&label), Some(&value));
|
|
|
|
|
|
|
|
// push it again
|
2020-06-13 22:03:38 -07:00
|
|
|
assert_matches!(
|
2019-06-26 00:30:16 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
2020-06-13 22:03:38 -07:00
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
2018-11-15 13:23:26 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
// purge the old pushed
|
2019-06-26 00:30:16 -07:00
|
|
|
push.purge_old_received_cache(1);
|
2018-11-15 13:23:26 -08:00
|
|
|
|
|
|
|
// push it again
|
|
|
|
assert_eq!(
|
2020-05-15 09:35:43 -07:00
|
|
|
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
2018-11-15 13:23:26 -08:00
|
|
|
Err(CrdsGossipError::PushMessageOldVersion)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|