moves gossip-work thread pool cons to ClusterInfo::new (#12402)
This commit is contained in:
parent
a5c3fc14b3
commit
42f1ef8acb
|
@ -36,7 +36,7 @@ use core::cmp;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rayon::iter::IntoParallelIterator;
|
use rayon::iter::IntoParallelIterator;
|
||||||
use rayon::iter::ParallelIterator;
|
use rayon::iter::ParallelIterator;
|
||||||
use rayon::ThreadPool;
|
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||||
use solana_ledger::staking_utils;
|
use solana_ledger::staking_utils;
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_measure::thread_mem_usage;
|
use solana_measure::thread_mem_usage;
|
||||||
|
@ -246,6 +246,7 @@ struct GossipStats {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
|
thread_pool: ThreadPool,
|
||||||
/// The network
|
/// The network
|
||||||
pub gossip: RwLock<CrdsGossip>,
|
pub gossip: RwLock<CrdsGossip>,
|
||||||
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
|
||||||
|
@ -403,6 +404,11 @@ impl ClusterInfo {
|
||||||
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
|
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
|
||||||
let id = contact_info.id;
|
let id = contact_info.id;
|
||||||
let me = Self {
|
let me = Self {
|
||||||
|
thread_pool: ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_thread_count().min(8))
|
||||||
|
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||||
|
.build()
|
||||||
|
.unwrap(),
|
||||||
gossip: RwLock::new(CrdsGossip::default()),
|
gossip: RwLock::new(CrdsGossip::default()),
|
||||||
keypair,
|
keypair,
|
||||||
entrypoint: RwLock::new(None),
|
entrypoint: RwLock::new(None),
|
||||||
|
@ -432,6 +438,11 @@ impl ClusterInfo {
|
||||||
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
||||||
my_contact_info.id = *new_id;
|
my_contact_info.id = *new_id;
|
||||||
ClusterInfo {
|
ClusterInfo {
|
||||||
|
thread_pool: ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_thread_count().min(2))
|
||||||
|
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
||||||
|
.build()
|
||||||
|
.unwrap(),
|
||||||
gossip: RwLock::new(gossip),
|
gossip: RwLock::new(gossip),
|
||||||
keypair: self.keypair.clone(),
|
keypair: self.keypair.clone(),
|
||||||
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
|
||||||
|
@ -2067,14 +2078,13 @@ impl ClusterInfo {
|
||||||
fn process_packets(
|
fn process_packets(
|
||||||
&self,
|
&self,
|
||||||
requests: Vec<Packets>,
|
requests: Vec<Packets>,
|
||||||
thread_pool: &ThreadPool,
|
|
||||||
recycler: &PacketsRecycler,
|
recycler: &PacketsRecycler,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stakes: HashMap<Pubkey, u64>,
|
stakes: HashMap<Pubkey, u64>,
|
||||||
epoch_time_ms: u64,
|
epoch_time_ms: u64,
|
||||||
) {
|
) {
|
||||||
let sender = response_sender.clone();
|
let sender = response_sender.clone();
|
||||||
thread_pool.install(|| {
|
self.thread_pool.install(|| {
|
||||||
requests.into_par_iter().for_each_with(sender, |s, reqs| {
|
requests.into_par_iter().for_each_with(sender, |s, reqs| {
|
||||||
self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms)
|
self.handle_packets(&recycler, &stakes, reqs, s, epoch_time_ms)
|
||||||
});
|
});
|
||||||
|
@ -2088,7 +2098,6 @@ impl ClusterInfo {
|
||||||
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
||||||
requests_receiver: &PacketReceiver,
|
requests_receiver: &PacketReceiver,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
thread_pool: &ThreadPool,
|
|
||||||
last_print: &mut Instant,
|
last_print: &mut Instant,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
|
@ -2111,14 +2120,7 @@ impl ClusterInfo {
|
||||||
|
|
||||||
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
|
let (stakes, epoch_time_ms) = Self::get_stakes_and_epoch_time(bank_forks);
|
||||||
|
|
||||||
self.process_packets(
|
self.process_packets(requests, recycler, response_sender, stakes, epoch_time_ms);
|
||||||
requests,
|
|
||||||
thread_pool,
|
|
||||||
recycler,
|
|
||||||
response_sender,
|
|
||||||
stakes,
|
|
||||||
epoch_time_ms,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.print_reset_stats(last_print);
|
self.print_reset_stats(last_print);
|
||||||
|
|
||||||
|
@ -2327,11 +2329,6 @@ impl ClusterInfo {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-listen".to_string())
|
.name("solana-listen".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
||||||
.num_threads(std::cmp::min(get_thread_count(), 8))
|
|
||||||
.thread_name(|i| format!("sol-gossip-work-{}", i))
|
|
||||||
.build()
|
|
||||||
.unwrap();
|
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
loop {
|
loop {
|
||||||
let e = self.run_listen(
|
let e = self.run_listen(
|
||||||
|
@ -2339,7 +2336,6 @@ impl ClusterInfo {
|
||||||
bank_forks.as_ref(),
|
bank_forks.as_ref(),
|
||||||
&requests_receiver,
|
&requests_receiver,
|
||||||
&response_sender,
|
&response_sender,
|
||||||
&thread_pool,
|
|
||||||
&mut last_print,
|
&mut last_print,
|
||||||
);
|
);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
|
Loading…
Reference in New Issue