simplifies pull-responses handling (#33743)

Following:
https://github.com/solana-labs/solana/pull/33722
from pubkey in PullResponse is no longer used in processing
pull-responses and so the code can be simplified.
This commit is contained in:
behzad nouri 2023-10-18 18:06:14 +00:00 committed by GitHub
parent 84c2f9de55
commit 2465abce5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 76 deletions

View File

@ -80,7 +80,7 @@ use {
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
std::{ std::{
borrow::Cow, borrow::Cow,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
fmt::Debug, fmt::Debug,
fs::{self, File}, fs::{self, File},
io::BufReader, io::BufReader,
@ -2101,77 +2101,27 @@ impl ClusterInfo {
fn handle_batch_pull_responses( fn handle_batch_pull_responses(
&self, &self,
responses: Vec<(Pubkey, Vec<CrdsValue>)>, responses: Vec<CrdsValue>,
thread_pool: &ThreadPool,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
epoch_duration: Duration, epoch_duration: Duration,
) { ) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time); let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time);
if responses.is_empty() {
return;
}
fn extend<K, V>(hash_map: &mut HashMap<K, Vec<V>>, (key, mut value): (K, Vec<V>))
where
K: Eq + std::hash::Hash,
{
match hash_map.entry(key) {
Entry::Occupied(mut entry) => {
let entry_value = entry.get_mut();
if entry_value.len() < value.len() {
std::mem::swap(entry_value, &mut value);
}
entry_value.extend(value);
}
Entry::Vacant(entry) => {
entry.insert(value);
}
}
}
fn merge<K, V>(
mut hash_map: HashMap<K, Vec<V>>,
other: HashMap<K, Vec<V>>,
) -> HashMap<K, Vec<V>>
where
K: Eq + std::hash::Hash,
{
if hash_map.len() < other.len() {
return merge(other, hash_map);
}
for kv in other {
extend(&mut hash_map, kv);
}
hash_map
}
let responses = thread_pool.install(|| {
responses
.into_par_iter()
.with_min_len(1024)
.fold(HashMap::new, |mut hash_map, kv| {
extend(&mut hash_map, kv);
hash_map
})
.reduce(HashMap::new, merge)
});
if !responses.is_empty() { if !responses.is_empty() {
let self_pubkey = self.id(); let self_pubkey = self.id();
let timeouts = self let timeouts = self
.gossip .gossip
.make_timeouts(self_pubkey, stakes, epoch_duration); .make_timeouts(self_pubkey, stakes, epoch_duration);
for (from, data) in responses { self.handle_pull_response(responses, &timeouts);
self.handle_pull_response(&from, data, &timeouts);
}
} }
} }
// Returns (failed, timeout, success) // Returns (failed, timeout, success)
fn handle_pull_response( fn handle_pull_response(
&self, &self,
from: &Pubkey,
crds_values: Vec<CrdsValue>, crds_values: Vec<CrdsValue>,
timeouts: &CrdsTimeouts, timeouts: &CrdsTimeouts,
) -> (usize, usize, usize) { ) -> (usize, usize, usize) {
let len = crds_values.len(); let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
let mut pull_stats = ProcessPullStats::default(); let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = { let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = {
let _st = ScopedTimer::from(&self.stats.filter_pull_response); let _st = ScopedTimer::from(&self.stats.filter_pull_response);
@ -2446,9 +2396,9 @@ impl ClusterInfo {
Protocol::PullRequest(filter, caller) => { Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller)) pull_requests.push((from_addr, filter, caller))
} }
Protocol::PullResponse(from, data) => { Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?; check_duplicate_instance(&data)?;
pull_responses.push((from, data)); pull_responses.append(&mut data);
} }
Protocol::PushMessage(from, data) => { Protocol::PushMessage(from, data) => {
check_duplicate_instance(&data)?; check_duplicate_instance(&data)?;
@ -2460,13 +2410,10 @@ impl ClusterInfo {
} }
} }
if self.require_stake_for_gossip(stakes) { if self.require_stake_for_gossip(stakes) {
for (_, data) in &mut pull_responses { retain_staked(&mut pull_responses, stakes);
retain_staked(data, stakes);
}
for (_, data) in &mut push_messages { for (_, data) in &mut push_messages {
retain_staked(data, stakes); retain_staked(data, stakes);
} }
pull_responses.retain(|(_, data)| !data.is_empty());
push_messages.retain(|(_, data)| !data.is_empty()); push_messages.retain(|(_, data)| !data.is_empty());
} }
self.handle_batch_ping_messages(ping_messages, recycler, response_sender); self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
@ -2478,7 +2425,7 @@ impl ClusterInfo {
stakes, stakes,
response_sender, response_sender,
); );
self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration); self.handle_batch_pull_responses(pull_responses, stakes, epoch_duration);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests( self.handle_batch_pull_requests(
@ -3212,18 +3159,11 @@ mod tests {
); );
assert_eq!( assert_eq!(
(0, 0, 1), (0, 0, 1),
ClusterInfo::handle_pull_response( cluster_info.handle_pull_response(data.clone(), &timeouts)
&cluster_info,
&entrypoint_pubkey,
data.clone(),
&timeouts
)
); );
let entrypoint_pubkey2 = solana_sdk::pubkey::new_rand();
assert_eq!( assert_eq!(
(1, 0, 0), (1, 0, 0),
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts) cluster_info.handle_pull_response(data, &timeouts)
); );
} }
@ -3981,12 +3921,7 @@ mod tests {
&stakes, &stakes,
Duration::from_millis(cluster_info.gossip.pull.crds_timeout), Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
); );
ClusterInfo::handle_pull_response( cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts);
&cluster_info,
&entrypoint_pubkey,
vec![entrypoint_crdsvalue],
&timeouts,
);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1); assert_eq!(pings.len(), 1);
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
@ -4495,7 +4430,7 @@ mod tests {
); );
assert_eq!( assert_eq!(
(0, 0, NO_ENTRIES), (0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) cluster_info.handle_pull_response(data, &timeouts)
); );
} }