diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0890def4c5..52711867da 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -36,7 +36,7 @@ use core::cmp; use itertools::Itertools; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; -use rayon::ThreadPool; +use rayon::{ThreadPool, ThreadPoolBuilder}; use solana_ledger::staking_utils; use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; @@ -246,6 +246,7 @@ struct GossipStats { } pub struct ClusterInfo { + thread_pool: ThreadPool, /// The network pub gossip: RwLock, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. @@ -403,6 +404,11 @@ impl ClusterInfo { pub fn new(contact_info: ContactInfo, keypair: Arc) -> Self { let id = contact_info.id; let me = Self { + thread_pool: ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("sol-gossip-work-{}", i)) + .build() + .unwrap(), gossip: RwLock::new(CrdsGossip::default()), keypair, entrypoint: RwLock::new(None), @@ -432,6 +438,11 @@ impl ClusterInfo { let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); my_contact_info.id = *new_id; ClusterInfo { + thread_pool: ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(2)) + .thread_name(|i| format!("sol-gossip-work-{}", i)) + .build() + .unwrap(), gossip: RwLock::new(gossip), keypair: self.keypair.clone(), entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), @@ -2067,14 +2078,13 @@ impl ClusterInfo { fn process_packets( &self, requests: Vec, - thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, stakes: HashMap, epoch_time_ms: u64, ) { let sender = response_sender.clone(); - thread_pool.install(|| { + self.thread_pool.install(|| { requests.into_par_iter().for_each_with(sender, |s, reqs| { self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms) }); @@ -2088,7 +2098,6 @@ impl ClusterInfo { bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, - thread_pool: &ThreadPool, last_print: &mut Instant, ) -> Result<()> { let timeout = Duration::new(1, 0); @@ -2111,14 +2120,7 @@ impl ClusterInfo { let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks); - self.process_packets( - requests, - thread_pool, - recycler, - response_sender, - stakes, - epoch_time_ms, - ); + self.process_packets(requests, recycler, response_sender, stakes, epoch_time_ms); self.print_reset_stats(last_print); @@ -2327,11 +2329,6 @@ impl ClusterInfo { Builder::new() .name("solana-listen".to_string()) .spawn(move || { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(std::cmp::min(get_thread_count(), 8)) - .thread_name(|i| format!("sol-gossip-work-{}", i)) - .build() - .unwrap(); let mut last_print = Instant::now(); loop { let e = self.run_listen( @@ -2339,7 +2336,6 @@ impl ClusterInfo { bank_forks.as_ref(), &requests_receiver, &response_sender, - &thread_pool, &mut last_print, ); if exit.load(Ordering::Relaxed) {