removes redundant (mutable) self receivers (#18574)
This commit is contained in:
parent
c810908e62
commit
918b5c28b2
|
@ -659,7 +659,7 @@ impl ClusterInfo {
|
||||||
self.my_contact_info.read().unwrap().shred_version
|
self.my_contact_info.read().unwrap().shred_version
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots {
|
fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots {
|
||||||
let self_pubkey = self.id();
|
let self_pubkey = self.id();
|
||||||
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
|
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
|
||||||
let gossip = self.gossip.read().unwrap();
|
let gossip = self.gossip.read().unwrap();
|
||||||
|
@ -1141,28 +1141,14 @@ impl ClusterInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
|
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
|
||||||
let version = self
|
let gossip = self.gossip.read().unwrap();
|
||||||
.gossip
|
let version = gossip.crds.get(&CrdsValueLabel::Version(*pubkey));
|
||||||
.read()
|
if let Some(version) = version.and_then(|v| v.value.version()) {
|
||||||
.unwrap()
|
return Some(version.version.clone());
|
||||||
.crds
|
|
||||||
.get(&CrdsValueLabel::Version(*pubkey))
|
|
||||||
.map(|x| x.value.version())
|
|
||||||
.flatten()
|
|
||||||
.map(|version| version.version.clone());
|
|
||||||
|
|
||||||
if version.is_none() {
|
|
||||||
self.gossip
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.crds
|
|
||||||
.get(&CrdsValueLabel::LegacyVersion(*pubkey))
|
|
||||||
.map(|x| x.value.legacy_version())
|
|
||||||
.flatten()
|
|
||||||
.map(|version| version.version.clone().into())
|
|
||||||
} else {
|
|
||||||
version
|
|
||||||
}
|
}
|
||||||
|
let version = gossip.crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?;
|
||||||
|
let version = version.value.legacy_version()?;
|
||||||
|
Some(version.version.clone().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// all validators that have a valid rpc port regardless of `shred_version`.
|
/// all validators that have a valid rpc port regardless of `shred_version`.
|
||||||
|
|
|
@ -238,8 +238,7 @@ impl CrdsGossip {
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = CrdsValue>,
|
I: IntoIterator<Item = CrdsValue>,
|
||||||
{
|
{
|
||||||
self.pull
|
CrdsGossipPull::process_pull_requests(&mut self.crds, callers, now);
|
||||||
.process_pull_requests(&mut self.crds, callers, now);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_pull_responses(
|
pub fn generate_pull_responses(
|
||||||
|
@ -248,8 +247,7 @@ impl CrdsGossip {
|
||||||
output_size_limit: usize, // Limit number of crds values returned.
|
output_size_limit: usize, // Limit number of crds values returned.
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.pull
|
CrdsGossipPull::generate_pull_responses(&self.crds, filters, output_size_limit, now)
|
||||||
.generate_pull_responses(&self.crds, filters, output_size_limit, now)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn filter_pull_responses(
|
pub fn filter_pull_responses(
|
||||||
|
@ -313,9 +311,7 @@ impl CrdsGossip {
|
||||||
//sanity check
|
//sanity check
|
||||||
assert_eq!(timeouts[self_pubkey], std::u64::MAX);
|
assert_eq!(timeouts[self_pubkey], std::u64::MAX);
|
||||||
assert!(timeouts.contains_key(&Pubkey::default()));
|
assert!(timeouts.contains_key(&Pubkey::default()));
|
||||||
rv = self
|
rv = CrdsGossipPull::purge_active(thread_pool, &mut self.crds, now, timeouts);
|
||||||
.pull
|
|
||||||
.purge_active(thread_pool, &mut self.crds, now, timeouts);
|
|
||||||
}
|
}
|
||||||
self.crds
|
self.crds
|
||||||
.trim_purged(now.saturating_sub(5 * self.pull.crds_timeout));
|
.trim_purged(now.saturating_sub(5 * self.pull.crds_timeout));
|
||||||
|
|
|
@ -75,7 +75,8 @@ impl solana_sdk::sanitize::Sanitize for CrdsFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CrdsFilter {
|
impl CrdsFilter {
|
||||||
pub fn new_rand(num_items: usize, max_bytes: usize) -> Self {
|
#[cfg(test)]
|
||||||
|
pub(crate) fn new_rand(num_items: usize, max_bytes: usize) -> Self {
|
||||||
let max_bits = (max_bytes * 8) as f64;
|
let max_bits = (max_bytes * 8) as f64;
|
||||||
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
|
||||||
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
|
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
|
||||||
|
@ -208,7 +209,7 @@ impl Default for CrdsGossipPull {
|
||||||
impl CrdsGossipPull {
|
impl CrdsGossipPull {
|
||||||
/// generate a random request
|
/// generate a random request
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new_pull_request(
|
pub(crate) fn new_pull_request(
|
||||||
&self,
|
&self,
|
||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
|
@ -315,12 +316,12 @@ impl CrdsGossipPull {
|
||||||
/// This is used for weighted random selection during `new_pull_request`
|
/// This is used for weighted random selection during `new_pull_request`
|
||||||
/// It's important to use the local nodes request creation time as the weight
|
/// It's important to use the local nodes request creation time as the weight
|
||||||
/// instead of the response received time otherwise failed nodes will increase their weight.
|
/// instead of the response received time otherwise failed nodes will increase their weight.
|
||||||
pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
|
pub(crate) fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
|
||||||
self.pull_request_time.put(from, now);
|
self.pull_request_time.put(from, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// process a pull request
|
/// process a pull request
|
||||||
pub fn process_pull_requests<I>(&mut self, crds: &mut Crds, callers: I, now: u64)
|
pub(crate) fn process_pull_requests<I>(crds: &mut Crds, callers: I, now: u64)
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = CrdsValue>,
|
I: IntoIterator<Item = CrdsValue>,
|
||||||
{
|
{
|
||||||
|
@ -332,14 +333,13 @@ impl CrdsGossipPull {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create gossip responses to pull requests
|
/// Create gossip responses to pull requests
|
||||||
pub fn generate_pull_responses(
|
pub(crate) fn generate_pull_responses(
|
||||||
&self,
|
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
requests: &[(CrdsValue, CrdsFilter)],
|
requests: &[(CrdsValue, CrdsFilter)],
|
||||||
output_size_limit: usize, // Limit number of crds values returned.
|
output_size_limit: usize, // Limit number of crds values returned.
|
||||||
now: u64,
|
now: u64,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.filter_crds_values(crds, requests, output_size_limit, now)
|
Self::filter_crds_values(crds, requests, output_size_limit, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if responses should be inserted and
|
// Checks if responses should be inserted and
|
||||||
|
@ -348,7 +348,7 @@ impl CrdsGossipPull {
|
||||||
// .0 => responses that update the owner timestamp
|
// .0 => responses that update the owner timestamp
|
||||||
// .1 => responses that do not update the owner timestamp
|
// .1 => responses that do not update the owner timestamp
|
||||||
// .2 => hash value of outdated values which will fail to insert.
|
// .2 => hash value of outdated values which will fail to insert.
|
||||||
pub fn filter_pull_responses(
|
pub(crate) fn filter_pull_responses(
|
||||||
&self,
|
&self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
timeouts: &HashMap<Pubkey, u64>,
|
timeouts: &HashMap<Pubkey, u64>,
|
||||||
|
@ -394,7 +394,7 @@ impl CrdsGossipPull {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// process a vec of pull responses
|
/// process a vec of pull responses
|
||||||
pub fn process_pull_responses(
|
pub(crate) fn process_pull_responses(
|
||||||
&mut self,
|
&mut self,
|
||||||
crds: &mut Crds,
|
crds: &mut Crds,
|
||||||
from: &Pubkey,
|
from: &Pubkey,
|
||||||
|
@ -426,7 +426,7 @@ impl CrdsGossipPull {
|
||||||
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
|
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn purge_failed_inserts(&mut self, now: u64) {
|
pub(crate) fn purge_failed_inserts(&mut self, now: u64) {
|
||||||
if FAILED_INSERTS_RETENTION_MS < now {
|
if FAILED_INSERTS_RETENTION_MS < now {
|
||||||
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
|
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
|
||||||
let outdated = self
|
let outdated = self
|
||||||
|
@ -472,7 +472,6 @@ impl CrdsGossipPull {
|
||||||
|
|
||||||
/// filter values that fail the bloom filter up to max_bytes
|
/// filter values that fail the bloom filter up to max_bytes
|
||||||
fn filter_crds_values(
|
fn filter_crds_values(
|
||||||
&self,
|
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
filters: &[(CrdsValue, CrdsFilter)],
|
filters: &[(CrdsValue, CrdsFilter)],
|
||||||
mut output_size_limit: usize, // Limit number of crds values returned.
|
mut output_size_limit: usize, // Limit number of crds values returned.
|
||||||
|
@ -553,8 +552,7 @@ impl CrdsGossipPull {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Purge values from the crds that are older then `active_timeout`
|
/// Purge values from the crds that are older then `active_timeout`
|
||||||
pub fn purge_active(
|
pub(crate) fn purge_active(
|
||||||
&mut self,
|
|
||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
crds: &mut Crds,
|
crds: &mut Crds,
|
||||||
now: u64,
|
now: u64,
|
||||||
|
@ -569,7 +567,7 @@ impl CrdsGossipPull {
|
||||||
|
|
||||||
/// For legacy tests
|
/// For legacy tests
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn process_pull_response(
|
fn process_pull_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
crds: &mut Crds,
|
crds: &mut Crds,
|
||||||
from: &Pubkey,
|
from: &Pubkey,
|
||||||
|
@ -1123,10 +1121,9 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut dest_crds = Crds::default();
|
let mut dest_crds = Crds::default();
|
||||||
let dest = CrdsGossipPull::default();
|
|
||||||
let (_, filters) = req.unwrap();
|
let (_, filters) = req.unwrap();
|
||||||
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||||
&dest_crds,
|
&dest_crds,
|
||||||
&filters,
|
&filters,
|
||||||
/*output_size_limit=*/ usize::MAX,
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
@ -1144,7 +1141,7 @@ pub(crate) mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
//should skip new value since caller is to old
|
//should skip new value since caller is to old
|
||||||
let rsp = dest.generate_pull_responses(
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||||
&dest_crds,
|
&dest_crds,
|
||||||
&filters,
|
&filters,
|
||||||
/*output_size_limit=*/ usize::MAX,
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
@ -1162,7 +1159,7 @@ pub(crate) mod tests {
|
||||||
.map(|(_, filter)| (caller.clone(), filter.clone()))
|
.map(|(_, filter)| (caller.clone(), filter.clone()))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
});
|
});
|
||||||
let rsp = dest.generate_pull_responses(
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||||
&dest_crds,
|
&dest_crds,
|
||||||
&filters,
|
&filters,
|
||||||
/*output_size_limit=*/ usize::MAX,
|
/*output_size_limit=*/ usize::MAX,
|
||||||
|
@ -1211,16 +1208,15 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut dest_crds = Crds::default();
|
let mut dest_crds = Crds::default();
|
||||||
let mut dest = CrdsGossipPull::default();
|
|
||||||
let (_, filters) = req.unwrap();
|
let (_, filters) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||||
&dest_crds,
|
&dest_crds,
|
||||||
&filters,
|
&filters,
|
||||||
/*output_size_limit=*/ usize::MAX,
|
/*output_size_limit=*/ usize::MAX,
|
||||||
0,
|
0,
|
||||||
);
|
);
|
||||||
dest.process_pull_requests(
|
CrdsGossipPull::process_pull_requests(
|
||||||
&mut dest_crds,
|
&mut dest_crds,
|
||||||
filters.into_iter().map(|(caller, _)| caller),
|
filters.into_iter().map(|(caller, _)| caller),
|
||||||
1,
|
1,
|
||||||
|
@ -1251,7 +1247,6 @@ pub(crate) mod tests {
|
||||||
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
|
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
|
||||||
node_crds.insert(new, 0).unwrap();
|
node_crds.insert(new, 0).unwrap();
|
||||||
|
|
||||||
let mut dest = CrdsGossipPull::default();
|
|
||||||
let mut dest_crds = Crds::default();
|
let mut dest_crds = Crds::default();
|
||||||
let new_id = solana_sdk::pubkey::new_rand();
|
let new_id = solana_sdk::pubkey::new_rand();
|
||||||
let new = ContactInfo::new_localhost(&new_id, 1);
|
let new = ContactInfo::new_localhost(&new_id, 1);
|
||||||
|
@ -1286,13 +1281,13 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
let (_, filters) = req.unwrap();
|
let (_, filters) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(
|
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||||
&dest_crds,
|
&dest_crds,
|
||||||
&filters,
|
&filters,
|
||||||
/*output_size_limit=*/ usize::MAX,
|
/*output_size_limit=*/ usize::MAX,
|
||||||
0,
|
0,
|
||||||
);
|
);
|
||||||
dest.process_pull_requests(
|
CrdsGossipPull::process_pull_requests(
|
||||||
&mut dest_crds,
|
&mut dest_crds,
|
||||||
filters.into_iter().map(|(caller, _)| caller),
|
filters.into_iter().map(|(caller, _)| caller),
|
||||||
0,
|
0,
|
||||||
|
@ -1335,7 +1330,7 @@ pub(crate) mod tests {
|
||||||
)));
|
)));
|
||||||
let node_label = entry.label();
|
let node_label = entry.label();
|
||||||
let node_pubkey = node_label.pubkey();
|
let node_pubkey = node_label.pubkey();
|
||||||
let mut node = CrdsGossipPull::default();
|
let node = CrdsGossipPull::default();
|
||||||
node_crds.insert(entry, 0).unwrap();
|
node_crds.insert(entry, 0).unwrap();
|
||||||
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
&solana_sdk::pubkey::new_rand(),
|
&solana_sdk::pubkey::new_rand(),
|
||||||
|
@ -1351,7 +1346,7 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
// purge
|
// purge
|
||||||
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
|
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
|
||||||
node.purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts);
|
CrdsGossipPull::purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts);
|
||||||
|
|
||||||
//verify self is still valid after purge
|
//verify self is still valid after purge
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
@ -98,7 +98,7 @@ impl CrdsGossipPush {
|
||||||
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
|
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn prune_received_cache(
|
pub(crate) fn prune_received_cache(
|
||||||
&mut self,
|
&mut self,
|
||||||
self_pubkey: &Pubkey,
|
self_pubkey: &Pubkey,
|
||||||
origin: &Pubkey,
|
origin: &Pubkey,
|
||||||
|
@ -254,7 +254,7 @@ impl CrdsGossipPush {
|
||||||
|
|
||||||
/// refresh the push active set
|
/// refresh the push active set
|
||||||
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
/// * ratio - active_set.len()/ratio is the number of actives to rotate
|
||||||
pub fn refresh_push_active_set(
|
pub(crate) fn refresh_push_active_set(
|
||||||
&mut self,
|
&mut self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
stakes: &HashMap<Pubkey, u64>,
|
stakes: &HashMap<Pubkey, u64>,
|
||||||
|
@ -359,7 +359,7 @@ impl CrdsGossipPush {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// purge received push message cache
|
/// purge received push message cache
|
||||||
pub fn purge_old_received_cache(&mut self, min_time: u64) {
|
pub(crate) fn purge_old_received_cache(&mut self, min_time: u64) {
|
||||||
self.received_cache.retain(|_, v| {
|
self.received_cache.retain(|_, v| {
|
||||||
v.retain(|_, (_, t)| *t > min_time);
|
v.retain(|_, (_, t)| *t > min_time);
|
||||||
!v.is_empty()
|
!v.is_empty()
|
||||||
|
|
Loading…
Reference in New Issue