makes CrdsGossipPush thread-safe (#18581)
This commit is contained in:
parent
12a93a9951
commit
90f8cf0920
|
@ -3294,7 +3294,7 @@ mod tests {
|
||||||
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
|
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
|
||||||
cluster_info.insert_info(spy);
|
cluster_info.insert_info(spy);
|
||||||
{
|
{
|
||||||
let mut gossip = cluster_info.gossip.write().unwrap();
|
let gossip = cluster_info.gossip.read().unwrap();
|
||||||
gossip.refresh_push_active_set(
|
gossip.refresh_push_active_set(
|
||||||
&cluster_info.id(),
|
&cluster_info.id(),
|
||||||
cluster_info.my_shred_version(),
|
cluster_info.my_shred_version(),
|
||||||
|
@ -3414,7 +3414,7 @@ mod tests {
|
||||||
.mock_pong(peer.id, peer.gossip, Instant::now());
|
.mock_pong(peer.id, peer.gossip, Instant::now());
|
||||||
cluster_info.insert_info(peer);
|
cluster_info.insert_info(peer);
|
||||||
{
|
{
|
||||||
let mut gossip = cluster_info.gossip.write().unwrap();
|
let gossip = cluster_info.gossip.read().unwrap();
|
||||||
gossip.refresh_push_active_set(
|
gossip.refresh_push_active_set(
|
||||||
&cluster_info.id(),
|
&cluster_info.id(),
|
||||||
cluster_info.my_shred_version(),
|
cluster_info.my_shred_version(),
|
||||||
|
|
|
@ -15,7 +15,6 @@ use {
|
||||||
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
||||||
ping_pong::PingCache,
|
ping_pong::PingCache,
|
||||||
},
|
},
|
||||||
itertools::Itertools,
|
|
||||||
rayon::ThreadPool,
|
rayon::ThreadPool,
|
||||||
solana_ledger::shred::Shred,
|
solana_ledger::shred::Shred,
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
|
@ -48,21 +47,16 @@ impl CrdsGossip {
|
||||||
values: Vec<CrdsValue>,
|
values: Vec<CrdsValue>,
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> HashSet<Pubkey> {
|
) -> HashSet<Pubkey> {
|
||||||
values
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|val| {
|
|
||||||
let origin = val.pubkey();
|
|
||||||
self.push
|
self.push
|
||||||
.process_push_message(&mut self.crds, from, val, now)
|
.process_push_message(&mut self.crds, from, values, now)
|
||||||
.ok()?;
|
.into_iter()
|
||||||
Some(origin)
|
.filter_map(Result::ok)
|
||||||
})
|
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// remove redundant paths in the network
|
/// remove redundant paths in the network
|
||||||
pub fn prune_received_cache<I>(
|
pub fn prune_received_cache<I>(
|
||||||
&mut self,
|
&self,
|
||||||
self_pubkey: &Pubkey,
|
self_pubkey: &Pubkey,
|
||||||
origins: I, // Unique pubkeys of crds values' owners.
|
origins: I, // Unique pubkeys of crds values' owners.
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
@ -70,15 +64,8 @@ impl CrdsGossip {
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = Pubkey>,
|
I: IntoIterator<Item = Pubkey>,
|
||||||
{
|
{
|
||||||
origins
|
|
||||||
.into_iter()
|
|
||||||
.flat_map(|origin| {
|
|
||||||
self.push
|
self.push
|
||||||
.prune_received_cache(self_pubkey, &origin, stakes)
|
.prune_received_cache_many(self_pubkey, origins, stakes)
|
||||||
.into_iter()
|
|
||||||
.zip(std::iter::repeat(origin))
|
|
||||||
})
|
|
||||||
.into_group_map()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_push_messages(
|
pub fn new_push_messages(
|
||||||
|
@ -181,7 +168,7 @@ impl CrdsGossip {
|
||||||
/// refresh the push active set
|
/// refresh the push active set
|
||||||
/// * ratio - number of actives to rotate
|
/// * ratio - number of actives to rotate
|
||||||
pub fn refresh_push_active_set(
|
pub fn refresh_push_active_set(
|
||||||
&mut self,
|
&self,
|
||||||
self_pubkey: &Pubkey,
|
self_pubkey: &Pubkey,
|
||||||
self_shred_version: u16,
|
self_shred_version: u16,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
|
|
@ -20,6 +20,7 @@ use {
|
||||||
},
|
},
|
||||||
bincode::serialized_size,
|
bincode::serialized_size,
|
||||||
indexmap::map::IndexMap,
|
indexmap::map::IndexMap,
|
||||||
|
itertools::Itertools,
|
||||||
lru::LruCache,
|
lru::LruCache,
|
||||||
rand::{seq::SliceRandom, Rng},
|
rand::{seq::SliceRandom, Rng},
|
||||||
solana_runtime::bloom::{AtomicBloom, Bloom},
|
solana_runtime::bloom::{AtomicBloom, Bloom},
|
||||||
|
@ -27,7 +28,12 @@ use {
|
||||||
std::{
|
std::{
|
||||||
cmp,
|
cmp,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
ops::RangeBounds,
|
iter::repeat,
|
||||||
|
ops::{DerefMut, RangeBounds},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Mutex, RwLock,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -45,27 +51,30 @@ const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
|
||||||
|
|
||||||
pub struct CrdsGossipPush {
|
pub struct CrdsGossipPush {
|
||||||
/// max bytes per message
|
/// max bytes per message
|
||||||
pub max_bytes: usize,
|
max_bytes: usize,
|
||||||
/// active set of validators for push
|
/// active set of validators for push
|
||||||
active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>,
|
active_set: RwLock<IndexMap<Pubkey, AtomicBloom<Pubkey>>>,
|
||||||
/// Cursor into the crds table for values to push.
|
/// Cursor into the crds table for values to push.
|
||||||
crds_cursor: Cursor,
|
crds_cursor: Mutex<Cursor>,
|
||||||
/// Cache that tracks which validators a message was received from
|
/// Cache that tracks which validators a message was received from
|
||||||
/// bool indicates it has been pruned.
|
/// bool indicates it has been pruned.
|
||||||
/// This cache represents a lagging view of which validators
|
/// This cache represents a lagging view of which validators
|
||||||
/// currently have this node in their `active_set`
|
/// currently have this node in their `active_set`
|
||||||
received_cache: HashMap<
|
#[allow(clippy::type_complexity)]
|
||||||
|
received_cache: Mutex<
|
||||||
|
HashMap<
|
||||||
Pubkey, // origin/owner
|
Pubkey, // origin/owner
|
||||||
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
|
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
|
||||||
>,
|
>,
|
||||||
last_pushed_to: LruCache<Pubkey, u64>,
|
>,
|
||||||
pub num_active: usize,
|
last_pushed_to: RwLock<LruCache</*node:*/ Pubkey, /*timestamp:*/ u64>>,
|
||||||
pub push_fanout: usize,
|
num_active: usize,
|
||||||
pub msg_timeout: u64,
|
push_fanout: usize,
|
||||||
|
pub(crate) msg_timeout: u64,
|
||||||
pub prune_timeout: u64,
|
pub prune_timeout: u64,
|
||||||
pub num_total: usize,
|
pub num_total: AtomicUsize,
|
||||||
pub num_old: usize,
|
pub num_old: AtomicUsize,
|
||||||
pub num_pushes: usize,
|
pub num_pushes: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CrdsGossipPush {
|
impl Default for CrdsGossipPush {
|
||||||
|
@ -73,23 +82,23 @@ impl Default for CrdsGossipPush {
|
||||||
Self {
|
Self {
|
||||||
// Allow upto 64 Crds Values per PUSH
|
// Allow upto 64 Crds Values per PUSH
|
||||||
max_bytes: PACKET_DATA_SIZE * 64,
|
max_bytes: PACKET_DATA_SIZE * 64,
|
||||||
active_set: IndexMap::new(),
|
active_set: RwLock::default(),
|
||||||
crds_cursor: Cursor::default(),
|
crds_cursor: Mutex::default(),
|
||||||
received_cache: HashMap::new(),
|
received_cache: Mutex::default(),
|
||||||
last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
|
last_pushed_to: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)),
|
||||||
num_active: CRDS_GOSSIP_NUM_ACTIVE,
|
num_active: CRDS_GOSSIP_NUM_ACTIVE,
|
||||||
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
|
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
|
||||||
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
|
||||||
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
|
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
|
||||||
num_total: 0,
|
num_total: AtomicUsize::default(),
|
||||||
num_old: 0,
|
num_old: AtomicUsize::default(),
|
||||||
num_pushes: 0,
|
num_pushes: AtomicUsize::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl CrdsGossipPush {
|
impl CrdsGossipPush {
|
||||||
pub fn num_pending(&self, crds: &Crds) -> usize {
|
pub fn num_pending(&self, crds: &Crds) -> usize {
|
||||||
let mut cursor = self.crds_cursor;
|
let mut cursor: Cursor = *self.crds_cursor.lock().unwrap();
|
||||||
crds.get_entries(&mut cursor).count()
|
crds.get_entries(&mut cursor).count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,15 +107,42 @@ impl CrdsGossipPush {
|
||||||
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
|
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn prune_received_cache(
|
pub(crate) fn prune_received_cache_many<I>(
|
||||||
&mut self,
|
&self,
|
||||||
|
self_pubkey: &Pubkey,
|
||||||
|
origins: I, // Unique pubkeys of crds values' owners.
|
||||||
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
) -> HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = Pubkey>,
|
||||||
|
{
|
||||||
|
let mut received_cache = self.received_cache.lock().unwrap();
|
||||||
|
origins
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|origin| {
|
||||||
|
let peers = Self::prune_received_cache(
|
||||||
|
self_pubkey,
|
||||||
|
&origin,
|
||||||
|
stakes,
|
||||||
|
received_cache.deref_mut(),
|
||||||
|
);
|
||||||
|
peers.into_iter().zip(repeat(origin))
|
||||||
|
})
|
||||||
|
.into_group_map()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prune_received_cache(
|
||||||
self_pubkey: &Pubkey,
|
self_pubkey: &Pubkey,
|
||||||
origin: &Pubkey,
|
origin: &Pubkey,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
received_cache: &mut HashMap<
|
||||||
|
Pubkey, // origin/owner
|
||||||
|
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
|
||||||
|
>,
|
||||||
) -> Vec<Pubkey> {
|
) -> Vec<Pubkey> {
|
||||||
let origin_stake = stakes.get(origin).unwrap_or(&0);
|
let origin_stake = stakes.get(origin).unwrap_or(&0);
|
||||||
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
|
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
|
||||||
let peers = match self.received_cache.get_mut(origin) {
|
let peers = match received_cache.get_mut(origin) {
|
||||||
None => return Vec::default(),
|
None => return Vec::default(),
|
||||||
Some(peers) => peers,
|
Some(peers) => peers,
|
||||||
};
|
};
|
||||||
|
@ -164,28 +200,48 @@ impl CrdsGossipPush {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// process a push message to the network
|
/// process a push message to the network
|
||||||
|
/// Returns origins' pubkeys of upserted values.
|
||||||
pub(crate) fn process_push_message(
|
pub(crate) fn process_push_message(
|
||||||
&mut self,
|
&self,
|
||||||
crds: &mut Crds,
|
crds: &mut Crds,
|
||||||
from: &Pubkey,
|
from: &Pubkey,
|
||||||
value: CrdsValue,
|
values: Vec<CrdsValue>,
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Result<(), CrdsGossipError> {
|
) -> Vec<Result<Pubkey, CrdsGossipError>> {
|
||||||
self.num_total += 1;
|
self.num_total.fetch_add(values.len(), Ordering::Relaxed);
|
||||||
if !self.wallclock_window(now).contains(&value.wallclock()) {
|
let values: Vec<_> = {
|
||||||
|
let wallclock_window = self.wallclock_window(now);
|
||||||
|
let mut received_cache = self.received_cache.lock().unwrap();
|
||||||
|
values
|
||||||
|
.into_iter()
|
||||||
|
.map(|value| {
|
||||||
|
if !wallclock_window.contains(&value.wallclock()) {
|
||||||
return Err(CrdsGossipError::PushMessageTimeout);
|
return Err(CrdsGossipError::PushMessageTimeout);
|
||||||
}
|
}
|
||||||
let origin = value.pubkey();
|
let origin = value.pubkey();
|
||||||
self.received_cache
|
let peers = received_cache.entry(origin).or_default();
|
||||||
.entry(origin)
|
peers
|
||||||
.or_default()
|
|
||||||
.entry(*from)
|
.entry(*from)
|
||||||
.and_modify(|(_pruned, timestamp)| *timestamp = now)
|
.and_modify(|(_pruned, timestamp)| *timestamp = now)
|
||||||
.or_insert((/*pruned:*/ false, now));
|
.or_insert((/*pruned:*/ false, now));
|
||||||
crds.insert(value, now).map_err(|_| {
|
Ok(value)
|
||||||
self.num_old += 1;
|
|
||||||
CrdsGossipError::PushMessageOldVersion
|
|
||||||
})
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
values
|
||||||
|
.into_iter()
|
||||||
|
.map(|value| {
|
||||||
|
let value = value?;
|
||||||
|
let origin = value.pubkey();
|
||||||
|
match crds.insert(value, now) {
|
||||||
|
Ok(()) => Ok(origin),
|
||||||
|
Err(_) => {
|
||||||
|
self.num_old.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Err(CrdsGossipError::PushMessageOldVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// New push message to broadcast to peers.
|
/// New push message to broadcast to peers.
|
||||||
|
@ -193,8 +249,10 @@ impl CrdsGossipPush {
|
||||||
/// peers.
|
/// peers.
|
||||||
/// The list of push messages is created such that all the randomly selected peers have not
|
/// The list of push messages is created such that all the randomly selected peers have not
|
||||||
/// pruned the source addresses.
|
/// pruned the source addresses.
|
||||||
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
pub fn new_push_messages(&self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
||||||
let push_fanout = self.push_fanout.min(self.active_set.len());
|
let active_set = self.active_set.read().unwrap();
|
||||||
|
let active_set_len = active_set.len();
|
||||||
|
let push_fanout = self.push_fanout.min(active_set_len);
|
||||||
if push_fanout == 0 {
|
if push_fanout == 0 {
|
||||||
return HashMap::default();
|
return HashMap::default();
|
||||||
}
|
}
|
||||||
|
@ -203,8 +261,9 @@ impl CrdsGossipPush {
|
||||||
let mut total_bytes: usize = 0;
|
let mut total_bytes: usize = 0;
|
||||||
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
||||||
let wallclock_window = self.wallclock_window(now);
|
let wallclock_window = self.wallclock_window(now);
|
||||||
|
let mut crds_cursor = self.crds_cursor.lock().unwrap();
|
||||||
let entries = crds
|
let entries = crds
|
||||||
.get_entries(&mut self.crds_cursor)
|
.get_entries(crds_cursor.deref_mut())
|
||||||
.map(|entry| &entry.value)
|
.map(|entry| &entry.value)
|
||||||
.filter(|value| wallclock_window.contains(&value.wallclock()));
|
.filter(|value| wallclock_window.contains(&value.wallclock()));
|
||||||
for value in entries {
|
for value in entries {
|
||||||
|
@ -219,8 +278,8 @@ impl CrdsGossipPush {
|
||||||
// learns the MST for that origin.
|
// learns the MST for that origin.
|
||||||
let offset = origin.as_ref()[0] as usize;
|
let offset = origin.as_ref()[0] as usize;
|
||||||
for i in offset..offset + push_fanout {
|
for i in offset..offset + push_fanout {
|
||||||
let index = i % self.active_set.len();
|
let index = i % active_set_len;
|
||||||
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
let (peer, filter) = active_set.get_index(index).unwrap();
|
||||||
if !filter.contains(&origin) || value.should_force_push(peer) {
|
if !filter.contains(&origin) || value.should_force_push(peer) {
|
||||||
trace!("new_push_messages insert {} {:?}", *peer, value);
|
trace!("new_push_messages insert {} {:?}", *peer, value);
|
||||||
push_messages.entry(*peer).or_default().push(value.clone());
|
push_messages.entry(*peer).or_default().push(value.clone());
|
||||||
|
@ -228,17 +287,20 @@ impl CrdsGossipPush {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.num_pushes += num_pushes;
|
drop(crds_cursor);
|
||||||
trace!("new_push_messages {} {}", num_values, self.active_set.len());
|
drop(active_set);
|
||||||
|
self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed);
|
||||||
|
trace!("new_push_messages {} {}", num_values, active_set_len);
|
||||||
|
let mut last_pushed_to = self.last_pushed_to.write().unwrap();
|
||||||
for target_pubkey in push_messages.keys().copied() {
|
for target_pubkey in push_messages.keys().copied() {
|
||||||
self.last_pushed_to.put(target_pubkey, now);
|
last_pushed_to.put(target_pubkey, now);
|
||||||
}
|
}
|
||||||
push_messages
|
push_messages
|
||||||
}
|
}
|
||||||
|
|
||||||
/// add the `from` to the peer's filter of nodes
|
/// add the `from` to the peer's filter of nodes
|
||||||
pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
||||||
if let Some(filter) = self.active_set.get(peer) {
|
if let Some(filter) = self.active_set.read().unwrap().get(peer) {
|
||||||
for origin in origins {
|
for origin in origins {
|
||||||
if origin != self_pubkey {
|
if origin != self_pubkey {
|
||||||
filter.add(origin);
|
filter.add(origin);
|
||||||
|
@ -255,7 +317,7 @@ impl CrdsGossipPush {
|
||||||
/// refresh the push active set
|
/// refresh the push active set
|
||||||
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
||||||
pub(crate) fn refresh_push_active_set(
|
pub(crate) fn refresh_push_active_set(
|
||||||
&mut self,
|
&self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
gossip_validators: Option<&HashSet<Pubkey>>,
|
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||||
|
@ -271,7 +333,8 @@ impl CrdsGossipPush {
|
||||||
#[cfg(not(debug_assertions))]
|
#[cfg(not(debug_assertions))]
|
||||||
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
|
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
let mut active_set = self.active_set.write().unwrap();
|
||||||
|
let need = Self::compute_need(self.num_active, active_set.len(), ratio);
|
||||||
let mut new_items = HashMap::new();
|
let mut new_items = HashMap::new();
|
||||||
let (weights, peers): (Vec<_>, Vec<_>) = self
|
let (weights, peers): (Vec<_>, Vec<_>) = self
|
||||||
.push_options(crds, self_id, self_shred_version, stakes, gossip_validators)
|
.push_options(crds, self_id, self_shred_version, stakes, gossip_validators)
|
||||||
|
@ -286,7 +349,7 @@ impl CrdsGossipPush {
|
||||||
if new_items.len() >= need {
|
if new_items.len() >= need {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if self.active_set.contains_key(&peer) || new_items.contains_key(&peer) {
|
if active_set.contains_key(&peer) || new_items.contains_key(&peer) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let bloom = AtomicBloom::from(Bloom::random(
|
let bloom = AtomicBloom::from(Bloom::random(
|
||||||
|
@ -297,14 +360,14 @@ impl CrdsGossipPush {
|
||||||
bloom.add(&peer);
|
bloom.add(&peer);
|
||||||
new_items.insert(peer, bloom);
|
new_items.insert(peer, bloom);
|
||||||
}
|
}
|
||||||
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
|
let mut keys: Vec<Pubkey> = active_set.keys().cloned().collect();
|
||||||
keys.shuffle(&mut rng);
|
keys.shuffle(&mut rng);
|
||||||
let num = keys.len() / ratio;
|
let num = keys.len() / ratio;
|
||||||
for k in &keys[..num] {
|
for k in &keys[..num] {
|
||||||
self.active_set.swap_remove(k);
|
active_set.swap_remove(k);
|
||||||
}
|
}
|
||||||
for (k, v) in new_items {
|
for (k, v) in new_items {
|
||||||
self.active_set.insert(k, v);
|
active_set.insert(k, v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,6 +383,7 @@ impl CrdsGossipPush {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let max_weight = u16::MAX as f32 - 1.0;
|
let max_weight = u16::MAX as f32 - 1.0;
|
||||||
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
|
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
|
||||||
|
let last_pushed_to = self.last_pushed_to.read().unwrap();
|
||||||
crds.get_nodes()
|
crds.get_nodes()
|
||||||
.filter_map(|value| {
|
.filter_map(|value| {
|
||||||
let info = value.value.contact_info().unwrap();
|
let info = value.value.contact_info().unwrap();
|
||||||
|
@ -343,11 +407,7 @@ impl CrdsGossipPush {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.map(|info| {
|
.map(|info| {
|
||||||
let last_pushed_to = self
|
let last_pushed_to = last_pushed_to.peek(&info.id).copied().unwrap_or_default();
|
||||||
.last_pushed_to
|
|
||||||
.peek(&info.id)
|
|
||||||
.copied()
|
|
||||||
.unwrap_or_default();
|
|
||||||
let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32;
|
let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32;
|
||||||
let stake = get_stake(&info.id, stakes);
|
let stake = get_stake(&info.id, stakes);
|
||||||
let weight = get_weight(max_weight, since, stake);
|
let weight = get_weight(max_weight, since, stake);
|
||||||
|
@ -359,8 +419,8 @@ impl CrdsGossipPush {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// purge received push message cache
|
/// purge received push message cache
|
||||||
pub(crate) fn purge_old_received_cache(&mut self, min_time: u64) {
|
pub(crate) fn purge_old_received_cache(&self, min_time: u64) {
|
||||||
self.received_cache.retain(|_, v| {
|
self.received_cache.lock().unwrap().retain(|_, v| {
|
||||||
v.retain(|_, (_, t)| *t > min_time);
|
v.retain(|_, (_, t)| *t > min_time);
|
||||||
!v.is_empty()
|
!v.is_empty()
|
||||||
});
|
});
|
||||||
|
@ -368,19 +428,31 @@ impl CrdsGossipPush {
|
||||||
|
|
||||||
// Only for tests and simulations.
|
// Only for tests and simulations.
|
||||||
pub(crate) fn mock_clone(&self) -> Self {
|
pub(crate) fn mock_clone(&self) -> Self {
|
||||||
let active_set = self
|
let active_set = {
|
||||||
.active_set
|
let active_set = self.active_set.read().unwrap();
|
||||||
|
active_set
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(k, v)| (*k, v.mock_clone()))
|
.map(|(k, v)| (*k, v.mock_clone()))
|
||||||
.collect();
|
.collect()
|
||||||
let mut last_pushed_to = LruCache::new(self.last_pushed_to.cap());
|
};
|
||||||
for (k, v) in self.last_pushed_to.iter().rev() {
|
let last_pushed_to = {
|
||||||
last_pushed_to.put(*k, *v);
|
let last_pushed_to = self.last_pushed_to.read().unwrap();
|
||||||
|
let mut clone = LruCache::new(last_pushed_to.cap());
|
||||||
|
for (k, v) in last_pushed_to.iter().rev() {
|
||||||
|
clone.put(*k, *v);
|
||||||
}
|
}
|
||||||
|
clone
|
||||||
|
};
|
||||||
|
let received_cache = self.received_cache.lock().unwrap().clone();
|
||||||
|
let crds_cursor = *self.crds_cursor.lock().unwrap();
|
||||||
Self {
|
Self {
|
||||||
active_set,
|
active_set: RwLock::new(active_set),
|
||||||
received_cache: self.received_cache.clone(),
|
received_cache: Mutex::new(received_cache),
|
||||||
last_pushed_to,
|
last_pushed_to: RwLock::new(last_pushed_to),
|
||||||
|
crds_cursor: Mutex::new(crds_cursor),
|
||||||
|
num_total: AtomicUsize::new(self.num_total.load(Ordering::Relaxed)),
|
||||||
|
num_old: AtomicUsize::new(self.num_old.load(Ordering::Relaxed)),
|
||||||
|
num_pushes: AtomicUsize::new(self.num_pushes.load(Ordering::Relaxed)),
|
||||||
..*self
|
..*self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -396,7 +468,7 @@ mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_prune() {
|
fn test_prune() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let mut stakes = HashMap::new();
|
let mut stakes = HashMap::new();
|
||||||
|
|
||||||
let self_id = solana_sdk::pubkey::new_rand();
|
let self_id = solana_sdk::pubkey::new_rand();
|
||||||
|
@ -410,12 +482,20 @@ mod test {
|
||||||
let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand());
|
let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand());
|
||||||
let mut low_staked_set = HashSet::new();
|
let mut low_staked_set = HashSet::new();
|
||||||
low_staked_peers.for_each(|p| {
|
low_staked_peers.for_each(|p| {
|
||||||
let _ = push.process_push_message(&mut crds, &p, value.clone(), 0);
|
push.process_push_message(&mut crds, &p, vec![value.clone()], 0);
|
||||||
low_staked_set.insert(p);
|
low_staked_set.insert(p);
|
||||||
stakes.insert(p, 1);
|
stakes.insert(p, 1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
let pruned = {
|
||||||
|
let mut received_cache = push.received_cache.lock().unwrap();
|
||||||
|
CrdsGossipPush::prune_received_cache(
|
||||||
|
&self_id,
|
||||||
|
&origin,
|
||||||
|
&stakes,
|
||||||
|
received_cache.deref_mut(),
|
||||||
|
)
|
||||||
|
};
|
||||||
assert!(
|
assert!(
|
||||||
pruned.is_empty(),
|
pruned.is_empty(),
|
||||||
"should not prune if min threshold has not been reached"
|
"should not prune if min threshold has not been reached"
|
||||||
|
@ -424,9 +504,17 @@ mod test {
|
||||||
let high_staked_peer = solana_sdk::pubkey::new_rand();
|
let high_staked_peer = solana_sdk::pubkey::new_rand();
|
||||||
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
|
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
|
||||||
stakes.insert(high_staked_peer, high_stake);
|
stakes.insert(high_staked_peer, high_stake);
|
||||||
let _ = push.process_push_message(&mut crds, &high_staked_peer, value, 0);
|
push.process_push_message(&mut crds, &high_staked_peer, vec![value], 0);
|
||||||
|
|
||||||
let pruned = push.prune_received_cache(&self_id, &origin, &stakes);
|
let pruned = {
|
||||||
|
let mut received_cache = push.received_cache.lock().unwrap();
|
||||||
|
CrdsGossipPush::prune_received_cache(
|
||||||
|
&self_id,
|
||||||
|
&origin,
|
||||||
|
&stakes,
|
||||||
|
received_cache.deref_mut(),
|
||||||
|
)
|
||||||
|
};
|
||||||
assert!(
|
assert!(
|
||||||
pruned.len() < low_staked_set.len() + 1,
|
pruned.len() < low_staked_set.len() + 1,
|
||||||
"should not prune all peers"
|
"should not prune all peers"
|
||||||
|
@ -442,7 +530,7 @@ mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_one() {
|
fn test_process_push_one() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
|
@ -450,43 +538,43 @@ mod test {
|
||||||
let label = value.label();
|
let label = value.label();
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0),
|
||||||
Ok(())
|
[Ok(label.pubkey())],
|
||||||
);
|
);
|
||||||
assert_eq!(crds.get(&label).unwrap().value, value);
|
assert_eq!(crds.get(&label).unwrap().value, value);
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_matches!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
[Err(CrdsGossipError::PushMessageOldVersion)],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_old_version() {
|
fn test_process_push_old_version() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
ci.wallclock = 1;
|
ci.wallclock = 1;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
||||||
|
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Ok(())
|
[Ok(ci.id)],
|
||||||
);
|
);
|
||||||
|
|
||||||
// push an old version
|
// push an old version
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
[Err(CrdsGossipError::PushMessageOldVersion)],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_timeout() {
|
fn test_process_push_timeout() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let timeout = push.msg_timeout;
|
let timeout = push.msg_timeout;
|
||||||
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
|
|
||||||
|
@ -494,38 +582,39 @@ mod test {
|
||||||
ci.wallclock = timeout + 1;
|
ci.wallclock = timeout + 1;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Err(CrdsGossipError::PushMessageTimeout)
|
[Err(CrdsGossipError::PushMessageTimeout)],
|
||||||
);
|
);
|
||||||
|
|
||||||
// push a version to far in the past
|
// push a version to far in the past
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], timeout + 1),
|
||||||
Err(CrdsGossipError::PushMessageTimeout)
|
[Err(CrdsGossipError::PushMessageTimeout)]
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_push_update() {
|
fn test_process_push_update() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
|
let origin = ci.id;
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value_old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
let value_old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
|
||||||
|
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value_old, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value_old], 0),
|
||||||
Ok(())
|
[Ok(origin)],
|
||||||
);
|
);
|
||||||
|
|
||||||
// push an old version
|
// push an old version
|
||||||
ci.wallclock = 1;
|
ci.wallclock = 1;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Ok(())
|
[Ok(origin)],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -540,7 +629,7 @@ mod test {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
|
@ -549,21 +638,26 @@ mod test {
|
||||||
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
|
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
|
||||||
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
||||||
|
|
||||||
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
let active_set = push.active_set.read().unwrap();
|
||||||
|
assert!(active_set.get(&value1.label().pubkey()).is_some());
|
||||||
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
)));
|
)));
|
||||||
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
assert!(active_set.get(&value2.label().pubkey()).is_none());
|
||||||
|
drop(active_set);
|
||||||
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
|
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
|
||||||
for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
||||||
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
let active_set = push.active_set.read().unwrap();
|
||||||
|
if active_set.get(&value2.label().pubkey()).is_some() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert!(push.active_set.get(&value2.label().pubkey()).is_some());
|
{
|
||||||
|
let active_set = push.active_set.read().unwrap();
|
||||||
|
assert!(active_set.get(&value2.label().pubkey()).is_some());
|
||||||
|
}
|
||||||
for _ in 0..push.num_active {
|
for _ in 0..push.num_active {
|
||||||
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
||||||
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
|
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
|
||||||
|
@ -571,14 +665,14 @@ mod test {
|
||||||
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
|
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
|
||||||
}
|
}
|
||||||
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
||||||
assert_eq!(push.active_set.len(), push.num_active);
|
assert_eq!(push.active_set.read().unwrap().len(), push.num_active);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_active_set_refresh_with_bank() {
|
fn test_active_set_refresh_with_bank() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
|
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let mut stakes = HashMap::new();
|
let mut stakes = HashMap::new();
|
||||||
for i in 1..=100 {
|
for i in 1..=100 {
|
||||||
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
|
@ -588,7 +682,7 @@ mod test {
|
||||||
let id = peer.label().pubkey();
|
let id = peer.label().pubkey();
|
||||||
crds.insert(peer.clone(), time).unwrap();
|
crds.insert(peer.clone(), time).unwrap();
|
||||||
stakes.insert(id, i * 100);
|
stakes.insert(id, i * 100);
|
||||||
push.last_pushed_to.put(id, time);
|
push.last_pushed_to.write().unwrap().put(id, time);
|
||||||
}
|
}
|
||||||
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
|
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
|
||||||
assert!(!options.is_empty());
|
assert!(!options.is_empty());
|
||||||
|
@ -721,7 +815,7 @@ mod test {
|
||||||
fn test_new_push_messages() {
|
fn test_new_push_messages() {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
|
@ -735,11 +829,12 @@ mod test {
|
||||||
)));
|
)));
|
||||||
let mut expected = HashMap::new();
|
let mut expected = HashMap::new();
|
||||||
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
|
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
|
||||||
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 0),
|
||||||
Ok(())
|
[Ok(origin)]
|
||||||
);
|
);
|
||||||
assert_eq!(push.active_set.len(), 1);
|
assert_eq!(push.active_set.read().unwrap().len(), 1);
|
||||||
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -747,7 +842,7 @@ mod test {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let peers: Vec<_> = vec![0, 0, now]
|
let peers: Vec<_> = vec![0, 0, now]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|wallclock| {
|
.map(|wallclock| {
|
||||||
|
@ -756,11 +851,12 @@ mod test {
|
||||||
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
|
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect();
|
||||||
assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
|
assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
|
||||||
assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
|
assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![peers[2].clone()], now),
|
||||||
Ok(())
|
[Ok(origin[2])],
|
||||||
);
|
);
|
||||||
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
|
||||||
|
|
||||||
|
@ -771,14 +867,14 @@ mod test {
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(push.active_set.len(), 3);
|
assert_eq!(push.active_set.read().unwrap().len(), 3);
|
||||||
assert_eq!(push.new_push_messages(&crds, now), expected);
|
assert_eq!(push.new_push_messages(&crds, now), expected);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_prune() {
|
fn test_process_prune() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let self_id = solana_sdk::pubkey::new_rand();
|
let self_id = solana_sdk::pubkey::new_rand();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
|
@ -791,9 +887,10 @@ mod test {
|
||||||
0,
|
0,
|
||||||
)));
|
)));
|
||||||
let expected = HashMap::new();
|
let expected = HashMap::new();
|
||||||
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg.clone()], 0),
|
||||||
Ok(())
|
[Ok(origin)],
|
||||||
);
|
);
|
||||||
push.process_prune_msg(
|
push.process_prune_msg(
|
||||||
&self_id,
|
&self_id,
|
||||||
|
@ -805,7 +902,7 @@ mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_purge_old_pending_push_messages() {
|
fn test_purge_old_pending_push_messages() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
0,
|
0,
|
||||||
|
@ -817,9 +914,10 @@ mod test {
|
||||||
ci.wallclock = 1;
|
ci.wallclock = 1;
|
||||||
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
let expected = HashMap::new();
|
let expected = HashMap::new();
|
||||||
|
let origin = new_msg.pubkey();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 1),
|
||||||
Ok(())
|
[Ok(origin)],
|
||||||
);
|
);
|
||||||
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
assert_eq!(push.new_push_messages(&crds, 0), expected);
|
||||||
}
|
}
|
||||||
|
@ -827,22 +925,22 @@ mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_purge_old_received_cache() {
|
fn test_purge_old_received_cache() {
|
||||||
let mut crds = Crds::default();
|
let mut crds = Crds::default();
|
||||||
let mut push = CrdsGossipPush::default();
|
let push = CrdsGossipPush::default();
|
||||||
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
ci.wallclock = 0;
|
ci.wallclock = 0;
|
||||||
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
|
||||||
let label = value.label();
|
let label = value.label();
|
||||||
// push a new message
|
// push a new message
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0),
|
||||||
Ok(())
|
[Ok(label.pubkey())]
|
||||||
);
|
);
|
||||||
assert_eq!(crds.get(&label).unwrap().value, value);
|
assert_eq!(crds.get(&label).unwrap().value, value);
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_matches!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0),
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
[Err(CrdsGossipError::PushMessageOldVersion)],
|
||||||
);
|
);
|
||||||
|
|
||||||
// purge the old pushed
|
// purge the old pushed
|
||||||
|
@ -850,8 +948,8 @@ mod test {
|
||||||
|
|
||||||
// push it again
|
// push it again
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
|
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0),
|
||||||
Err(CrdsGossipError::PushMessageOldVersion)
|
[Err(CrdsGossipError::PushMessageOldVersion)],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -322,9 +322,9 @@ pub fn cluster_info_scale() {
|
||||||
.filter(|v| v.message.account_keys == tx.message.account_keys)
|
.filter(|v| v.message.account_keys == tx.message.account_keys)
|
||||||
.count();
|
.count();
|
||||||
let gossip = node.gossip.read().unwrap();
|
let gossip = node.gossip.read().unwrap();
|
||||||
num_old += gossip.push.num_old;
|
num_old += gossip.push.num_old.load(Ordering::Relaxed);
|
||||||
num_push_total += gossip.push.num_total;
|
num_push_total += gossip.push.num_total.load(Ordering::Relaxed);
|
||||||
num_pushes += gossip.push.num_pushes;
|
num_pushes += gossip.push.num_pushes.load(Ordering::Relaxed);
|
||||||
num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed);
|
num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed);
|
||||||
if has_tx == 0 {
|
if has_tx == 0 {
|
||||||
not_done += 1;
|
not_done += 1;
|
||||||
|
@ -348,10 +348,10 @@ pub fn cluster_info_scale() {
|
||||||
);
|
);
|
||||||
sleep(Duration::from_millis(200));
|
sleep(Duration::from_millis(200));
|
||||||
for (node, _, _) in nodes.iter() {
|
for (node, _, _) in nodes.iter() {
|
||||||
let mut gossip = node.gossip.write().unwrap();
|
let gossip = node.gossip.read().unwrap();
|
||||||
gossip.push.num_old = 0;
|
gossip.push.num_old.store(0, Ordering::Relaxed);
|
||||||
gossip.push.num_total = 0;
|
gossip.push.num_total.store(0, Ordering::Relaxed);
|
||||||
gossip.push.num_pushes = 0;
|
gossip.push.num_pushes.store(0, Ordering::Relaxed);
|
||||||
gossip.pull.num_pulls.store(0, Ordering::Relaxed);
|
gossip.pull.num_pulls.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue