filters crds values in parallel when responding to gossip pull-requests (#18877)
When responding to gossip pull-requests, filter_crds_values takes a lot of time while holding onto read-lock: https://github.com/solana-labs/solana/blob/f51d64868/gossip/src/crds_gossip_pull.rs#L509-L566 This commit will filter-crds-values in parallel using rayon thread-pools.
This commit is contained in:
parent
f51d648681
commit
f1198fc6d5
|
@ -1635,9 +1635,8 @@ impl ClusterInfo {
|
|||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
sender: PacketSender,
|
||||
gossip_validators: Option<HashSet<Pubkey>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let thread_pool = ThreadPoolBuilder::new()
|
||||
.num_threads(std::cmp::min(get_thread_count(), 8))
|
||||
.thread_name(|i| format!("ClusterInfo::gossip-{}", i))
|
||||
|
@ -1812,8 +1811,13 @@ impl ClusterInfo {
|
|||
self.stats
|
||||
.pull_requests_count
|
||||
.add_relaxed(requests.len() as u64);
|
||||
let response =
|
||||
self.handle_pull_requests(recycler, requests, stakes, require_stake_for_gossip);
|
||||
let response = self.handle_pull_requests(
|
||||
thread_pool,
|
||||
recycler,
|
||||
requests,
|
||||
stakes,
|
||||
require_stake_for_gossip,
|
||||
);
|
||||
if !response.is_empty() {
|
||||
self.stats
|
||||
.packets_sent_pull_responses_count
|
||||
|
@ -1883,6 +1887,7 @@ impl ClusterInfo {
|
|||
// and tries to send back to them the values it detects are missing.
|
||||
fn handle_pull_requests(
|
||||
&self,
|
||||
thread_pool: &ThreadPool,
|
||||
recycler: &PacketsRecycler,
|
||||
requests: Vec<PullData>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
|
@ -1914,8 +1919,12 @@ impl ClusterInfo {
|
|||
let self_id = self.id();
|
||||
let mut pull_responses = {
|
||||
let _st = ScopedTimer::from(&self.stats.generate_pull_responses);
|
||||
self.gossip
|
||||
.generate_pull_responses(&caller_and_filters, output_size_limit, now)
|
||||
self.gossip.generate_pull_responses(
|
||||
thread_pool,
|
||||
&caller_and_filters,
|
||||
output_size_limit,
|
||||
now,
|
||||
)
|
||||
};
|
||||
if require_stake_for_gossip {
|
||||
for resp in &mut pull_responses {
|
||||
|
@ -2516,6 +2525,9 @@ impl ClusterInfo {
|
|||
match self.run_socket_consume(&receiver, &sender, &thread_pool) {
|
||||
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
|
||||
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||
// A send operation can only fail if the receiving end of a
|
||||
// channel is disconnected.
|
||||
Err(GossipError::SendError) => break,
|
||||
Err(err) => error!("gossip consume: {}", err),
|
||||
Ok(()) => (),
|
||||
}
|
||||
|
@ -2531,19 +2543,18 @@ impl ClusterInfo {
|
|||
requests_receiver: Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
|
||||
response_sender: PacketSender,
|
||||
should_check_duplicate_instance: bool,
|
||||
exit: &Arc<AtomicBool>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let mut last_print = Instant::now();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let thread_pool = ThreadPoolBuilder::new()
|
||||
.num_threads(get_thread_count().min(8))
|
||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||
.build()
|
||||
.unwrap();
|
||||
Builder::new()
|
||||
.name("solana-listen".to_string())
|
||||
.spawn(move || {
|
||||
let thread_pool = ThreadPoolBuilder::new()
|
||||
.num_threads(std::cmp::min(get_thread_count(), 8))
|
||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut last_print = Instant::now();
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
if let Err(err) = self.run_listen(
|
||||
&recycler,
|
||||
|
|
|
@ -240,11 +240,18 @@ impl CrdsGossip {
|
|||
|
||||
pub fn generate_pull_responses(
|
||||
&self,
|
||||
thread_pool: &ThreadPool,
|
||||
filters: &[(CrdsValue, CrdsFilter)],
|
||||
output_size_limit: usize, // Limit number of crds values returned.
|
||||
now: u64,
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
CrdsGossipPull::generate_pull_responses(&self.crds, filters, output_size_limit, now)
|
||||
CrdsGossipPull::generate_pull_responses(
|
||||
thread_pool,
|
||||
&self.crds,
|
||||
filters,
|
||||
output_size_limit,
|
||||
now,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn filter_pull_responses(
|
||||
|
|
|
@ -22,7 +22,6 @@ use {
|
|||
ping_pong::PingCache,
|
||||
weighted_shuffle::WeightedShuffle,
|
||||
},
|
||||
itertools::Itertools,
|
||||
lru::LruCache,
|
||||
rand::Rng,
|
||||
rayon::{prelude::*, ThreadPool},
|
||||
|
@ -39,7 +38,7 @@ use {
|
|||
iter::{repeat, repeat_with},
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
atomic::{AtomicI64, AtomicUsize, Ordering},
|
||||
Mutex, RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
|
@ -356,12 +355,13 @@ impl CrdsGossipPull {
|
|||
|
||||
/// Create gossip responses to pull requests
|
||||
pub(crate) fn generate_pull_responses(
|
||||
thread_pool: &ThreadPool,
|
||||
crds: &RwLock<Crds>,
|
||||
requests: &[(CrdsValue, CrdsFilter)],
|
||||
output_size_limit: usize, // Limit number of crds values returned.
|
||||
now: u64,
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
Self::filter_crds_values(crds, requests, output_size_limit, now)
|
||||
Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now)
|
||||
}
|
||||
|
||||
// Checks if responses should be inserted and
|
||||
|
@ -508,9 +508,10 @@ impl CrdsGossipPull {
|
|||
|
||||
/// Filter values that fail the bloom filter up to `max_bytes`.
|
||||
fn filter_crds_values(
|
||||
thread_pool: &ThreadPool,
|
||||
crds: &RwLock<Crds>,
|
||||
filters: &[(CrdsValue, CrdsFilter)],
|
||||
mut output_size_limit: usize, // Limit number of crds values returned.
|
||||
output_size_limit: usize, // Limit number of crds values returned.
|
||||
now: u64,
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
||||
|
@ -518,50 +519,57 @@ impl CrdsGossipPull {
|
|||
//skip filters from callers that are too old
|
||||
let caller_wallclock_window =
|
||||
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
|
||||
let mut dropped_requests = 0;
|
||||
let mut total_skipped = 0;
|
||||
let dropped_requests = AtomicUsize::default();
|
||||
let total_skipped = AtomicUsize::default();
|
||||
let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
|
||||
let output_size_limit = AtomicI64::new(output_size_limit);
|
||||
let crds = crds.read().unwrap();
|
||||
let ret: Vec<_> = filters
|
||||
.iter()
|
||||
.map(|(caller, filter)| {
|
||||
if output_size_limit == 0 {
|
||||
return None;
|
||||
let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| {
|
||||
if output_size_limit.load(Ordering::Relaxed) <= 0 {
|
||||
return Vec::default();
|
||||
}
|
||||
let caller_wallclock = caller.wallclock();
|
||||
if !caller_wallclock_window.contains(&caller_wallclock) {
|
||||
dropped_requests.fetch_add(1, Ordering::Relaxed);
|
||||
return Vec::default();
|
||||
}
|
||||
let caller_pubkey = caller.pubkey();
|
||||
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
||||
let pred = |entry: &&VersionedCrdsValue| {
|
||||
debug_assert!(filter.test_mask(&entry.value_hash));
|
||||
// Skip values that are too new.
|
||||
if entry.value.wallclock() > caller_wallclock {
|
||||
total_skipped.fetch_add(1, Ordering::Relaxed);
|
||||
false
|
||||
} else {
|
||||
!filter.filter_contains(&entry.value_hash)
|
||||
&& (entry.value.pubkey() != caller_pubkey
|
||||
|| entry.value.should_force_push(&caller_pubkey))
|
||||
}
|
||||
let caller_wallclock = caller.wallclock();
|
||||
if !caller_wallclock_window.contains(&caller_wallclock) {
|
||||
dropped_requests += 1;
|
||||
return Some(vec![]);
|
||||
}
|
||||
let caller_pubkey = caller.pubkey();
|
||||
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
|
||||
let pred = |entry: &&VersionedCrdsValue| {
|
||||
debug_assert!(filter.test_mask(&entry.value_hash));
|
||||
// Skip values that are too new.
|
||||
if entry.value.wallclock() > caller_wallclock {
|
||||
total_skipped += 1;
|
||||
false
|
||||
} else {
|
||||
!filter.filter_contains(&entry.value_hash)
|
||||
&& (entry.value.pubkey() != caller_pubkey
|
||||
|| entry.value.should_force_push(&caller_pubkey))
|
||||
}
|
||||
};
|
||||
let out: Vec<_> = crds
|
||||
.filter_bitmask(filter.mask, filter.mask_bits)
|
||||
.filter(pred)
|
||||
.map(|entry| entry.value.clone())
|
||||
.take(output_size_limit)
|
||||
.collect();
|
||||
output_size_limit -= out.len();
|
||||
Some(out)
|
||||
})
|
||||
.while_some()
|
||||
.collect();
|
||||
};
|
||||
let out: Vec<_> = crds
|
||||
.filter_bitmask(filter.mask, filter.mask_bits)
|
||||
.filter(pred)
|
||||
.map(|entry| entry.value.clone())
|
||||
.take(output_size_limit.load(Ordering::Relaxed).max(0) as usize)
|
||||
.collect();
|
||||
output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
|
||||
out
|
||||
};
|
||||
let ret: Vec<_> = thread_pool.install(|| {
|
||||
filters
|
||||
.par_iter()
|
||||
.map(|(caller, filter)| apply_filter(caller, filter))
|
||||
.collect()
|
||||
});
|
||||
inc_new_counter_info!(
|
||||
"gossip_filter_crds_values-dropped_requests",
|
||||
dropped_requests + filters.len() - ret.len()
|
||||
dropped_requests.into_inner()
|
||||
);
|
||||
inc_new_counter_info!(
|
||||
"gossip_filter_crds_values-dropped_values",
|
||||
total_skipped.into_inner()
|
||||
);
|
||||
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
|
||||
ret
|
||||
}
|
||||
|
||||
|
@ -1213,10 +1221,11 @@ pub(crate) mod tests {
|
|||
let (_, filters) = req.unwrap();
|
||||
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||
&thread_pool,
|
||||
&dest_crds,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
0,
|
||||
usize::MAX, // output_size_limit
|
||||
0, // now
|
||||
);
|
||||
|
||||
assert_eq!(rsp[0].len(), 0);
|
||||
|
@ -1233,10 +1242,11 @@ pub(crate) mod tests {
|
|||
|
||||
//should skip new value since caller is to old
|
||||
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||
&thread_pool,
|
||||
&dest_crds,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
||||
usize::MAX, // output_size_limit
|
||||
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now
|
||||
);
|
||||
assert_eq!(rsp[0].len(), 0);
|
||||
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS);
|
||||
|
@ -1251,9 +1261,10 @@ pub(crate) mod tests {
|
|||
.collect::<Vec<_>>()
|
||||
});
|
||||
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||
&thread_pool,
|
||||
&dest_crds,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
usize::MAX, // output_size_limit
|
||||
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
|
||||
);
|
||||
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
|
||||
|
@ -1304,10 +1315,11 @@ pub(crate) mod tests {
|
|||
let (_, filters) = req.unwrap();
|
||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||
&thread_pool,
|
||||
&dest_crds,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
0,
|
||||
usize::MAX, // output_size_limit
|
||||
0, // now
|
||||
);
|
||||
let callers = filters.into_iter().map(|(caller, _)| caller);
|
||||
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
|
||||
|
@ -1382,10 +1394,11 @@ pub(crate) mod tests {
|
|||
let (_, filters) = req.unwrap();
|
||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||
let rsp = CrdsGossipPull::generate_pull_responses(
|
||||
&thread_pool,
|
||||
&dest_crds,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
0,
|
||||
usize::MAX, // output_size_limit
|
||||
0, // now
|
||||
);
|
||||
CrdsGossipPull::process_pull_requests(
|
||||
&dest_crds,
|
||||
|
|
|
@ -58,27 +58,27 @@ impl GossipService {
|
|||
1,
|
||||
false,
|
||||
);
|
||||
let (response_sender, response_receiver) = channel();
|
||||
let (consume_sender, listen_receiver) = channel();
|
||||
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
|
||||
let _consume_sender = consume_sender.clone();
|
||||
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
|
||||
request_receiver,
|
||||
consume_sender,
|
||||
exit.clone(),
|
||||
);
|
||||
let t_listen = ClusterInfo::listen(
|
||||
cluster_info.clone(),
|
||||
let (response_sender, response_receiver) = channel();
|
||||
let t_listen = cluster_info.clone().listen(
|
||||
bank_forks.clone(),
|
||||
listen_receiver,
|
||||
response_sender.clone(),
|
||||
should_check_duplicate_instance,
|
||||
exit,
|
||||
exit.clone(),
|
||||
);
|
||||
let t_gossip = ClusterInfo::gossip(
|
||||
cluster_info.clone(),
|
||||
let t_gossip = cluster_info.clone().gossip(
|
||||
bank_forks,
|
||||
response_sender,
|
||||
gossip_validators,
|
||||
exit,
|
||||
exit.clone(),
|
||||
);
|
||||
// To work around:
|
||||
// https://github.com/rust-lang/rust/issues/54267
|
||||
|
|
|
@ -505,7 +505,7 @@ fn network_run_pull(
|
|||
.collect()
|
||||
};
|
||||
let transfered: Vec<_> = requests
|
||||
.into_par_iter()
|
||||
.into_iter()
|
||||
.map(|(to, filters, caller_info)| {
|
||||
let mut bytes: usize = 0;
|
||||
let mut msgs: usize = 0;
|
||||
|
@ -527,8 +527,9 @@ fn network_run_pull(
|
|||
let rsp = node
|
||||
.gossip
|
||||
.generate_pull_responses(
|
||||
thread_pool,
|
||||
&filters,
|
||||
/*output_size_limit=*/ usize::MAX,
|
||||
usize::MAX, // output_size_limit
|
||||
now,
|
||||
)
|
||||
.into_iter()
|
||||
|
|
Loading…
Reference in New Issue