processes pull-request callers only once per unique caller (#13750)
process_pull_requests acquires a write lock on crds table to update records timestamp for each of the pull-request callers: https://github.com/solana-labs/solana/blob/3087c9049/core/src/crds_gossip_pull.rs#L287-L300 However, pull-requests overlap a lot in callers and this function ends up doing a lot of redundant duplicate work. This commit obtains unique callers before acquiring an exclusive lock on crds table.
This commit is contained in:
parent
a13083aa65
commit
26bf2b7e45
|
@ -2023,8 +2023,9 @@ impl ClusterInfo {
|
||||||
feature_set: Option<&FeatureSet>,
|
feature_set: Option<&FeatureSet>,
|
||||||
) -> Packets {
|
) -> Packets {
|
||||||
let mut time = Measure::start("handle_pull_requests");
|
let mut time = Measure::start("handle_pull_requests");
|
||||||
|
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
|
||||||
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
|
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
|
||||||
.process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp());
|
.process_pull_requests(callers.cloned(), timestamp());
|
||||||
self.update_data_budget(stakes.len());
|
self.update_data_budget(stakes.len());
|
||||||
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
|
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
|
||||||
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
|
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
|
||||||
|
|
|
@ -240,8 +240,8 @@ impl Crds {
|
||||||
|
|
||||||
/// Update the timestamp's of all the labels that are associated with Pubkey
|
/// Update the timestamp's of all the labels that are associated with Pubkey
|
||||||
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
|
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
|
||||||
for label in &CrdsValue::record_labels(pubkey) {
|
for label in CrdsValue::record_labels(*pubkey) {
|
||||||
self.update_label_timestamp(label, now);
|
self.update_label_timestamp(&label, now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ use solana_sdk::{
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::{Borrow, Cow},
|
borrow::{Borrow, Cow},
|
||||||
collections::{BTreeSet, HashSet},
|
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
|
||||||
fmt,
|
fmt,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -498,18 +498,20 @@ impl CrdsValue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return all the possible labels for a record identified by Pubkey.
|
/// Return all the possible labels for a record identified by Pubkey.
|
||||||
pub fn record_labels(key: &Pubkey) -> Vec<CrdsValueLabel> {
|
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
|
||||||
let mut labels = vec![
|
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [
|
||||||
CrdsValueLabel::ContactInfo(*key),
|
CrdsValueLabel::ContactInfo,
|
||||||
CrdsValueLabel::LowestSlot(*key),
|
CrdsValueLabel::LowestSlot,
|
||||||
CrdsValueLabel::SnapshotHashes(*key),
|
CrdsValueLabel::SnapshotHashes,
|
||||||
CrdsValueLabel::AccountsHashes(*key),
|
CrdsValueLabel::AccountsHashes,
|
||||||
CrdsValueLabel::LegacyVersion(*key),
|
CrdsValueLabel::LegacyVersion,
|
||||||
CrdsValueLabel::Version(*key),
|
CrdsValueLabel::Version,
|
||||||
];
|
];
|
||||||
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
|
CRDS_VALUE_LABEL_STUBS
|
||||||
labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key)));
|
.iter()
|
||||||
labels
|
.map(move |f| (f)(key))
|
||||||
|
.chain((0..MAX_VOTES).map(move |ix| CrdsValueLabel::Vote(ix, key)))
|
||||||
|
.chain((0..MAX_EPOCH_SLOTS).map(move |ix| CrdsValueLabel::EpochSlots(ix, key)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the size (in bytes) of a CrdsValue
|
/// Returns the size (in bytes) of a CrdsValue
|
||||||
|
@ -545,6 +547,30 @@ impl CrdsValue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Filters out an iterator of crds values, returning
|
||||||
|
/// the unique ones with the most recent wallclock.
|
||||||
|
pub(crate) fn filter_current<'a, I>(values: I) -> impl Iterator<Item = &'a CrdsValue>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = &'a CrdsValue>,
|
||||||
|
{
|
||||||
|
let mut out = HashMap::new();
|
||||||
|
for value in values {
|
||||||
|
match out.entry(value.label()) {
|
||||||
|
Entry::Vacant(entry) => {
|
||||||
|
entry.insert((value, value.wallclock()));
|
||||||
|
}
|
||||||
|
Entry::Occupied(mut entry) => {
|
||||||
|
let value_wallclock = value.wallclock();
|
||||||
|
let (_, entry_wallclock) = entry.get();
|
||||||
|
if *entry_wallclock < value_wallclock {
|
||||||
|
entry.insert((value, value_wallclock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.into_iter().map(|(_, (v, _))| v)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -553,13 +579,15 @@ mod test {
|
||||||
use solana_perf::test_tx::test_tx;
|
use solana_perf::test_tx::test_tx;
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
|
use std::cmp::Ordering;
|
||||||
|
use std::iter::repeat_with;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_labels() {
|
fn test_labels() {
|
||||||
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||||
// this method should cover all the possible labels
|
// this method should cover all the possible labels
|
||||||
for v in &CrdsValue::record_labels(&Pubkey::default()) {
|
for v in CrdsValue::record_labels(Pubkey::default()) {
|
||||||
match v {
|
match &v {
|
||||||
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
|
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
|
||||||
CrdsValueLabel::LowestSlot(_) => hits[1] = true,
|
CrdsValueLabel::LowestSlot(_) => hits[1] = true,
|
||||||
CrdsValueLabel::SnapshotHashes(_) => hits[2] = true,
|
CrdsValueLabel::SnapshotHashes(_) => hits[2] = true,
|
||||||
|
@ -743,4 +771,39 @@ mod test {
|
||||||
assert!(!value.verify());
|
assert!(!value.verify());
|
||||||
serialize_deserialize_value(value, correct_keypair);
|
serialize_deserialize_value(value, correct_keypair);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_filter_current() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let keys: Vec<_> = repeat_with(Keypair::new).take(16).collect();
|
||||||
|
let values: Vec<_> = repeat_with(|| {
|
||||||
|
let index = rng.gen_range(0, keys.len());
|
||||||
|
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
|
||||||
|
})
|
||||||
|
.take(256)
|
||||||
|
.collect();
|
||||||
|
let mut currents = HashMap::new();
|
||||||
|
for value in filter_current(&values) {
|
||||||
|
// Assert that filtered values have unique labels.
|
||||||
|
assert!(currents.insert(value.label(), value).is_none());
|
||||||
|
}
|
||||||
|
// Assert that currents are the most recent version of each value.
|
||||||
|
let mut count = 0;
|
||||||
|
for value in &values {
|
||||||
|
let current_value = currents.get(&value.label()).unwrap();
|
||||||
|
match value.wallclock().cmp(¤t_value.wallclock()) {
|
||||||
|
Ordering::Less => (),
|
||||||
|
Ordering::Equal => {
|
||||||
|
assert_eq!(value, *current_value);
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
Ordering::Greater => panic!("this should not happen!"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(count, currents.len());
|
||||||
|
// Currently CrdsData::new_rand is only implemented for 5 different
|
||||||
|
// kinds and excludes Vote and EpochSlots, and so the unique labels
|
||||||
|
// cannot be more than 5 times number of keys.
|
||||||
|
assert!(currents.len() <= keys.len() * 5);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue