From d33c5486609a1dc7a6d4f30433f1f9d36ca11eff Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 6 Jul 2022 11:49:58 +0000 Subject: [PATCH] bypasses window-service stage before retransmitting shreds (#26291) With recent patches, window-service recv-window does not do much other than redirecting packets/shreds to downstream channels. The commit removes window-service recv-window and instead sends packets/shreds directly from sigverify to retransmit-stage and window-service insert thread. --- core/src/retransmit_stage.rs | 126 +++---------- core/src/sigverify_shreds.rs | 36 +++- core/src/tvu.rs | 75 +++++--- core/src/window_service.rs | 340 ++++++++++------------------------- 4 files changed, 209 insertions(+), 368 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 94fab21bc..c6d3855f7 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,17 +3,10 @@ use { crate::{ - ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - cluster_info_vote_listener::VerifiedVoteReceiver, cluster_nodes::{ClusterNodes, ClusterNodesCache}, - cluster_slots::ClusterSlots, - cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, - completed_data_sets_service::CompletedDataSetsSender, packet_hasher::PacketHasher, - repair_service::{DuplicateSlotsResetSender, RepairInfo}, - window_service::WindowService, }, - crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, + crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::{izip, Itertools}, lru::LruCache, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, @@ -23,27 +16,25 @@ use { contact_info::ContactInfo, }, solana_ledger::{ - blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, shred::{self, ShredId}, }, solana_measure::measure::Measure, - solana_perf::packet::PacketBatch, solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, + solana_sdk::{clock::Slot, pubkey::Pubkey, timing::timestamp}, solana_streamer::{ sendmmsg::{multi_target_send, SendPktsError}, socket::SocketAddrSpace, }, std::{ - collections::{HashMap, HashSet}, + collections::HashMap, iter::repeat, net::UdpSocket, ops::AddAssign, sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -378,122 +369,61 @@ pub fn retransmitter( .unwrap(); Builder::new() .name("solana-retransmitter".to_string()) - .spawn(move || { - trace!("retransmitter started"); - loop { - match retransmit( - &thread_pool, - &bank_forks, - &leader_schedule_cache, - &cluster_info, - &shreds_receiver, - &sockets, - &mut stats, - &cluster_nodes_cache, - &mut hasher_reset_ts, - &mut shreds_received, - &mut packet_hasher, - &max_slots, - rpc_subscriptions.as_deref(), - ) { - Ok(()) => (), - Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, - } + .spawn(move || loop { + match retransmit( + &thread_pool, + &bank_forks, + &leader_schedule_cache, + &cluster_info, + &shreds_receiver, + &sockets, + &mut stats, + &cluster_nodes_cache, + &mut hasher_reset_ts, + &mut shreds_received, + &mut packet_hasher, + &max_slots, + rpc_subscriptions.as_deref(), + ) { + Ok(()) => (), + Err(RecvTimeoutError::Timeout) => (), + Err(RecvTimeoutError::Disconnected) => break, } - trace!("exiting retransmitter"); }) .unwrap() } pub struct RetransmitStage { retransmit_thread_handle: JoinHandle<()>, - window_service: WindowService, - cluster_slots_service: ClusterSlotsService, } impl RetransmitStage { - #[allow(clippy::new_ret_no_self)] - #[allow(clippy::too_many_arguments)] pub(crate) fn new( bank_forks: Arc>, leader_schedule_cache: Arc, - blockstore: Arc, cluster_info: Arc, retransmit_sockets: Arc>, - repair_socket: Arc, - ancestor_hashes_socket: Arc, - verified_receiver: Receiver>, - exit: Arc, - cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, - epoch_schedule: EpochSchedule, - turbine_disabled: Arc, - cluster_slots: Arc, - duplicate_slots_reset_sender: DuplicateSlotsResetSender, - verified_vote_receiver: VerifiedVoteReceiver, - repair_validators: Option>, - completed_data_sets_sender: CompletedDataSetsSender, + retransmit_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, - duplicate_slots_sender: Sender, - ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { - let (retransmit_sender, retransmit_receiver) = unbounded(); - let retransmit_thread_handle = retransmitter( retransmit_sockets, - bank_forks.clone(), - leader_schedule_cache.clone(), - cluster_info.clone(), + bank_forks, + leader_schedule_cache, + cluster_info, retransmit_receiver, max_slots, rpc_subscriptions, ); - let cluster_slots_service = ClusterSlotsService::new( - blockstore.clone(), - cluster_slots.clone(), - bank_forks.clone(), - cluster_info.clone(), - cluster_slots_update_receiver, - exit.clone(), - ); - - let repair_info = RepairInfo { - bank_forks, - epoch_schedule, - duplicate_slots_reset_sender, - repair_validators, - cluster_info, - cluster_slots, - }; - let window_service = WindowService::new( - blockstore, - verified_receiver, - retransmit_sender, - repair_socket, - ancestor_hashes_socket, - exit, - repair_info, - leader_schedule_cache, - turbine_disabled, - verified_vote_receiver, - completed_data_sets_sender, - duplicate_slots_sender, - ancestor_hashes_replay_update_receiver, - ); - Self { retransmit_thread_handle, - window_service, - cluster_slots_service, } } pub(crate) fn join(self) -> thread::Result<()> { - self.retransmit_thread_handle.join()?; - self.window_service.join()?; - self.cluster_slots_service.join() + self.retransmit_thread_handle.join() } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 2da6e428c..b32d045bc 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -14,7 +14,10 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, }, }; @@ -24,7 +27,9 @@ pub struct ShredSigVerifier { bank_forks: Arc>, leader_schedule_cache: Arc, recycler_cache: RecyclerCache, + retransmit_sender: Sender>>, packet_sender: Sender>, + turbine_disabled: Arc, } impl ShredSigVerifier { @@ -32,7 +37,9 @@ impl ShredSigVerifier { pubkey: Pubkey, bank_forks: Arc>, leader_schedule_cache: Arc, + retransmit_sender: Sender>>, packet_sender: Sender>, + turbine_disabled: Arc, ) -> Self { sigverify::init(); Self { @@ -40,7 +47,9 @@ impl ShredSigVerifier { bank_forks, leader_schedule_cache, recycler_cache: RecyclerCache::warmed(), + retransmit_sender, packet_sender, + turbine_disabled, } } } @@ -52,6 +61,20 @@ impl SigVerifier for ShredSigVerifier { &mut self, packet_batches: Vec, ) -> Result<(), SigVerifyServiceError> { + if self.turbine_disabled.load(Ordering::Relaxed) { + return Ok(()); + } + // Exclude repair packets from retransmit. + // TODO: return the error here! + let _ = self.retransmit_sender.send( + packet_batches + .iter() + .flat_map(PacketBatch::iter) + .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) + .filter_map(shred::layout::get_shred) + .map(<[u8]>::to_vec) + .collect(), + ); self.packet_sender.send(packet_batches)?; Ok(()) } @@ -140,8 +163,15 @@ pub mod tests { let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bf = Arc::new(RwLock::new(BankForks::new(bank))); let (sender, receiver) = unbounded(); - let mut verifier = ShredSigVerifier::new(Pubkey::new_unique(), bf, cache, sender); - + let (retransmit_sender, _retransmit_receiver) = unbounded(); + let mut verifier = ShredSigVerifier::new( + Pubkey::new_unique(), + bf, + cache, + retransmit_sender, + sender, + Arc::::default(), // turbine_disabled + ); let batch_size = 2; let mut batch = PacketBatch::with_capacity(batch_size); batch.resize(batch_size, Packet::default()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3418bf84c..fe7d1d94b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -10,10 +10,12 @@ use { VerifiedVoteReceiver, VoteTracker, }, cluster_slots::ClusterSlots, + cluster_slots_service::ClusterSlotsService, completed_data_sets_service::CompletedDataSetsSender, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, + repair_service::RepairInfo, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, @@ -24,6 +26,7 @@ use { validator::ProcessBlockStore, voting_service::VotingService, warm_quic_cache_service::WarmQuicCacheService, + window_service::WindowService, }, crossbeam_channel::{unbounded, Receiver}, solana_client::connection_cache::ConnectionCache, @@ -61,6 +64,8 @@ pub struct Tvu { fetch_stage: ShredFetchStage, sigverify_stage: SigVerifyStage, retransmit_stage: RetransmitStage, + window_service: WindowService, + cluster_slots_service: ClusterSlotsService, replay_stage: ReplayStage, ledger_cleanup_service: Option, cost_update_service: CostUpdateService, @@ -157,45 +162,69 @@ impl Tvu { ); let (verified_sender, verified_receiver) = unbounded(); + let (retransmit_sender, retransmit_receiver) = unbounded(); let sigverify_stage = SigVerifyStage::new( fetch_receiver, ShredSigVerifier::new( cluster_info.id(), bank_forks.clone(), leader_schedule_cache.clone(), + retransmit_sender.clone(), verified_sender, + turbine_disabled, ), "shred-verifier", ); + let retransmit_stage = RetransmitStage::new( + bank_forks.clone(), + leader_schedule_cache.clone(), + cluster_info.clone(), + Arc::new(retransmit_sockets), + retransmit_receiver, + max_slots.clone(), + Some(rpc_subscriptions.clone()), + ); + let cluster_slots = Arc::new(ClusterSlots::default()); let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); - let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = unbounded(); - let retransmit_stage = RetransmitStage::new( - bank_forks.clone(), - leader_schedule_cache.clone(), + let window_service = { + let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); + let repair_info = RepairInfo { + bank_forks: bank_forks.clone(), + epoch_schedule, + duplicate_slots_reset_sender, + repair_validators: tvu_config.repair_validators, + cluster_info: cluster_info.clone(), + cluster_slots: cluster_slots.clone(), + }; + WindowService::new( + blockstore.clone(), + verified_receiver, + retransmit_sender, + repair_socket, + ancestor_hashes_socket, + exit.clone(), + repair_info, + leader_schedule_cache.clone(), + verified_vote_receiver, + completed_data_sets_sender, + duplicate_slots_sender, + ancestor_hashes_replay_update_receiver, + ) + }; + + let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); + let cluster_slots_service = ClusterSlotsService::new( blockstore.clone(), - cluster_info.clone(), - Arc::new(retransmit_sockets), - repair_socket, - ancestor_hashes_socket, - verified_receiver, - exit.clone(), - cluster_slots_update_receiver, - *bank_forks.read().unwrap().working_bank().epoch_schedule(), - turbine_disabled, cluster_slots.clone(), - duplicate_slots_reset_sender, - verified_vote_receiver, - tvu_config.repair_validators, - completed_data_sets_sender, - max_slots.clone(), - Some(rpc_subscriptions.clone()), - duplicate_slots_sender, - ancestor_hashes_replay_update_receiver, + bank_forks.clone(), + cluster_info.clone(), + cluster_slots_update_receiver, + exit.clone(), ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded(); @@ -292,6 +321,8 @@ impl Tvu { fetch_stage, sigverify_stage, retransmit_stage, + window_service, + cluster_slots_service, replay_stage, ledger_cleanup_service, cost_update_service, @@ -304,6 +335,8 @@ impl Tvu { pub fn join(self) -> thread::Result<()> { self.retransmit_stage.join()?; + self.window_service.join()?; + self.cluster_slots_service.join()?; self.fetch_stage.join()?; self.sigverify_stage.join()?; if self.ledger_cleanup_service.is_some() { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 55e448df4..da4cbcb44 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -22,7 +22,7 @@ use { solana_metrics::inc_new_counter_error, solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, - solana_sdk::{clock::Slot, pubkey::Pubkey}, + solana_sdk::clock::Slot, std::{ cmp::Reverse, collections::{HashMap, HashSet}, @@ -43,7 +43,10 @@ pub(crate) type DuplicateSlotReceiver = Receiver; #[derive(Default)] struct WindowServiceMetrics { run_insert_count: u64, - num_shreds_received: u64, + num_packets: usize, + num_repairs: usize, + num_shreds_received: usize, + handle_packets_elapsed_us: u64, shred_receiver_elapsed_us: u64, prune_shreds_elapsed_us: u64, num_shreds_pruned_invalid_repair: usize, @@ -52,14 +55,23 @@ struct WindowServiceMetrics { num_errors_cross_beam_recv_timeout: u64, num_errors_other: u64, num_errors_try_crossbeam_send: u64, + addrs: HashMap, } impl WindowServiceMetrics { fn report_metrics(&self, metric_name: &'static str) { + const MAX_NUM_ADDRS: usize = 5; datapoint_info!( metric_name, + ( + "handle_packets_elapsed_us", + self.handle_packets_elapsed_us, + i64 + ), ("run_insert_count", self.run_insert_count as i64, i64), - ("num_shreds_received", self.num_shreds_received as i64, i64), + ("num_packets", self.num_packets, i64), + ("num_repairs", self.num_repairs, i64), + ("num_shreds_received", self.num_shreds_received, i64), ( "shred_receiver_elapsed_us", self.shred_receiver_elapsed_us as i64, @@ -89,6 +101,19 @@ impl WindowServiceMetrics { i64 ), ); + + let mut addrs: Vec<_> = self.addrs.iter().collect(); + let reverse_count = |(_addr, count): &_| Reverse(*count); + if addrs.len() > MAX_NUM_ADDRS { + addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); + addrs.truncate(MAX_NUM_ADDRS); + } + addrs.sort_unstable_by_key(reverse_count); + info!( + "num addresses: {}, top packets by source: {:?}", + self.addrs.len(), + addrs + ); } fn record_error(&mut self, err: &Error) { @@ -105,52 +130,6 @@ impl WindowServiceMetrics { } } -#[derive(Default)] -struct ReceiveWindowStats { - num_iters: usize, - num_packets: usize, - num_repairs: usize, - num_shreds: usize, // num_discards: num_packets - num_shreds - elapsed: Duration, // excludes waiting time on the receiver channel. - addrs: HashMap, - since: Option, -} - -impl ReceiveWindowStats { - fn maybe_submit(&mut self) { - const MAX_NUM_ADDRS: usize = 5; - const SUBMIT_CADENCE: Duration = Duration::from_secs(2); - let elapsed = self.since.as_ref().map(Instant::elapsed); - if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE { - return; - } - datapoint_info!( - "receive_window_stats", - ("num_iters", self.num_iters, i64), - ("num_packets", self.num_packets, i64), - ("num_shreds", self.num_shreds, i64), - ("num_repairs", self.num_repairs, i64), - ("elapsed_micros", self.elapsed.as_micros(), i64), - ); - let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect(); - let reverse_count = |(_addr, count): &_| Reverse(*count); - if addrs.len() > MAX_NUM_ADDRS { - addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); - addrs.truncate(MAX_NUM_ADDRS); - } - addrs.sort_unstable_by_key(reverse_count); - info!( - "num addresses: {}, top packets by source: {:?}", - self.addrs.len(), - addrs - ); - *self = Self { - since: Some(Instant::now()), - ..Self::default() - }; - } -} - fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, @@ -229,8 +208,10 @@ fn prune_shreds_invalid_repair( assert_eq!(shreds.len(), repair_infos.len()); } +#[allow(clippy::too_many_arguments)] fn run_insert( - shred_receiver: &Receiver<(Vec, Vec>)>, + thread_pool: &ThreadPool, + verified_receiver: &Receiver>, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, @@ -243,26 +224,46 @@ fn run_insert( where F: Fn(Shred), { - ws_metrics.run_insert_count += 1; + const RECV_TIMEOUT: Duration = Duration::from_millis(200); let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed"); - let timer = Duration::from_millis(200); - let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?; - while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() { - shreds.extend(more_shreds); - repair_infos.extend(more_repair_infos); - } + let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?; + packets.extend(verified_receiver.try_iter().flatten()); shred_receiver_elapsed.stop(); ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us(); - ws_metrics.num_shreds_received += shreds.len() as u64; - // TODO: Consider using thread-pool here instead of recv_window. - let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = shreds - .into_iter() - .zip(repair_infos) - .filter_map(|(shred, repair_info)| { - let shred = Shred::new_from_serialized_shred(shred).ok()?; - Some((shred, repair_info)) - }) - .unzip(); + ws_metrics.run_insert_count += 1; + let handle_packet = |packet: &Packet| { + if packet.meta.discard() { + return None; + } + let shred = shred::layout::get_shred(packet)?; + let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?; + if packet.meta.repair() { + let repair_info = RepairMeta { + _from_addr: packet.meta.socket_addr(), + // If can't parse the nonce, dump the packet. + nonce: repair_response::nonce(packet)?, + }; + Some((shred, Some(repair_info))) + } else { + Some((shred, None)) + } + }; + let now = Instant::now(); + let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { + packets + .par_iter() + .flat_map_iter(|packets| packets.iter().filter_map(handle_packet)) + .unzip() + }); + ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64; + ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::(); + ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); + ws_metrics.num_shreds_received += shreds.len(); + for packet in packets.iter().flat_map(PacketBatch::iter) { + let addr = packet.meta.socket_addr(); + *ws_metrics.addrs.entry(addr).or_default() += 1; + } + let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); let num_shreds = shreds.len(); prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); @@ -293,90 +294,12 @@ where Ok(()) } -fn recv_window( - insert_shred_sender: &Sender<(Vec, Vec>)>, - verified_receiver: &Receiver>, - retransmit_sender: &Sender>, - turbine_disabled: &AtomicBool, - thread_pool: &ThreadPool, - stats: &mut ReceiveWindowStats, -) -> Result<()> { - const RECV_TIMEOUT: Duration = Duration::from_millis(200); - let mut packet_batches = verified_receiver.recv_timeout(RECV_TIMEOUT)?; - packet_batches.extend(verified_receiver.try_iter().flatten()); - let now = Instant::now(); - let turbine_disabled = turbine_disabled.load(Ordering::Relaxed); - let handle_packet = |packet: &Packet| { - if turbine_disabled || packet.meta.discard() { - return None; - } - let shred = shred::layout::get_shred(packet)?; - if packet.meta.repair() { - let repair_info = RepairMeta { - _from_addr: packet.meta.socket_addr(), - // If can't parse the nonce, dump the packet. - nonce: repair_response::nonce(packet)?, - }; - Some((shred.to_vec(), Some(repair_info))) - } else { - Some((shred.to_vec(), None)) - } - }; - let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { - packet_batches - .par_iter() - .flat_map_iter(|packet_batch| packet_batch.iter().filter_map(handle_packet)) - .unzip() - }); - // Exclude repair packets from retransmit. - let _ = retransmit_sender.send( - shreds - .iter() - .zip(&repair_infos) - .filter(|(_, repair_info)| repair_info.is_none()) - .map(|(shred, _)| shred) - .cloned() - .collect(), - ); - stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); - stats.num_shreds += shreds.len(); - insert_shred_sender.send((shreds, repair_infos))?; - - stats.num_iters += 1; - stats.num_packets += packet_batches.iter().map(PacketBatch::len).sum::(); - for packet in packet_batches.iter().flat_map(PacketBatch::iter) { - let addr = packet.meta.socket_addr(); - *stats.addrs.entry(addr).or_default() += 1; - } - stats.elapsed += now.elapsed(); - Ok(()) -} - struct RepairMeta { _from_addr: SocketAddr, nonce: Nonce, } -// Implement a destructor for the window_service thread to signal it exited -// even on panics -struct Finalizer { - exit_sender: Arc, -} - -impl Finalizer { - fn new(exit_sender: Arc) -> Self { - Finalizer { exit_sender } - } -} -// Implement a destructor for Finalizer. -impl Drop for Finalizer { - fn drop(&mut self) { - self.exit_sender.clone().store(true, Ordering::Relaxed); - } -} - pub(crate) struct WindowService { - t_window: JoinHandle<()>, t_insert: JoinHandle<()>, t_check_duplicate: JoinHandle<()>, repair_service: RepairService, @@ -393,7 +316,6 @@ impl WindowService { exit: Arc, repair_info: RepairInfo, leader_schedule_cache: Arc, - turbine_disabled: Arc, verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, duplicate_slots_sender: DuplicateSlotSender, @@ -402,7 +324,6 @@ impl WindowService { let outstanding_requests = Arc::>::default(); let cluster_info = repair_info.cluster_info.clone(); - let id = cluster_info.id(); let repair_service = RepairService::new( blockstore.clone(), @@ -415,7 +336,6 @@ impl WindowService { ancestor_hashes_replay_update_receiver, ); - let (insert_sender, insert_receiver) = unbounded(); let (duplicate_sender, duplicate_receiver) = unbounded(); let t_check_duplicate = Self::start_check_duplicate_thread( @@ -427,27 +347,17 @@ impl WindowService { ); let t_insert = Self::start_window_insert_thread( - exit.clone(), + exit, blockstore, leader_schedule_cache, - insert_receiver, + verified_receiver, duplicate_sender, completed_data_sets_sender, - retransmit_sender.clone(), + retransmit_sender, outstanding_requests, ); - let t_window = Self::start_recv_window_thread( - id, - exit, - insert_sender, - verified_receiver, - turbine_disabled, - retransmit_sender, - ); - WindowService { - t_window, t_insert, t_check_duplicate, repair_service, @@ -466,20 +376,17 @@ impl WindowService { }; Builder::new() .name("solana-check-duplicate".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - - let mut noop = || {}; - if let Err(e) = run_check_duplicate( - &cluster_info, - &blockstore, - &duplicate_receiver, - &duplicate_slots_sender, - ) { - if Self::should_exit_on_error(e, &mut noop, &handle_error) { - break; + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + if let Err(e) = run_check_duplicate( + &cluster_info, + &blockstore, + &duplicate_receiver, + &duplicate_slots_sender, + ) { + if Self::should_exit_on_error(e, &handle_error) { + break; + } } } }) @@ -490,17 +397,20 @@ impl WindowService { exit: Arc, blockstore: Arc, leader_schedule_cache: Arc, - insert_receiver: Receiver<(Vec, Vec>)>, + verified_receiver: Receiver>, check_duplicate_sender: Sender, completed_data_sets_sender: CompletedDataSetsSender, retransmit_sender: Sender>, outstanding_requests: Arc>, ) -> JoinHandle<()> { - let mut handle_timeout = || {}; let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); }; - + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("window-insert-{}", i)) + .build() + .unwrap(); Builder::new() .name("solana-window-insert".to_string()) .spawn(move || { @@ -510,13 +420,10 @@ impl WindowService { let mut metrics = BlockstoreInsertionMetrics::default(); let mut ws_metrics = WindowServiceMetrics::default(); let mut last_print = Instant::now(); - loop { - if exit.load(Ordering::Relaxed) { - break; - } - + while !exit.load(Ordering::Relaxed) { if let Err(e) = run_insert( - &insert_receiver, + &thread_pool, + &verified_receiver, &blockstore, &leader_schedule_cache, &handle_duplicate, @@ -527,7 +434,7 @@ impl WindowService { &outstanding_requests, ) { ws_metrics.record_error(&e); - if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { + if Self::should_exit_on_error(e, &handle_error) { break; } } @@ -544,71 +451,13 @@ impl WindowService { .unwrap() } - #[allow(clippy::too_many_arguments)] - fn start_recv_window_thread( - id: Pubkey, - exit: Arc, - insert_sender: Sender<(Vec, Vec>)>, - verified_receiver: Receiver>, - turbine_disabled: Arc, - retransmit_sender: Sender>, - ) -> JoinHandle<()> { - let mut stats = ReceiveWindowStats::default(); - Builder::new() - .name("solana-window".to_string()) - .spawn(move || { - let _exit = Finalizer::new(exit.clone()); - trace!("{}: RECV_WINDOW started", id); - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .build() - .unwrap(); - let mut now = Instant::now(); - let handle_error = || { - inc_new_counter_error!("solana-window-error", 1, 1); - }; - - while !exit.load(Ordering::Relaxed) { - let mut handle_timeout = || { - if now.elapsed() > Duration::from_secs(30) { - warn!( - "Window does not seem to be receiving data. \ - Ensure port configuration is correct..." - ); - now = Instant::now(); - } - }; - if let Err(e) = recv_window( - &insert_sender, - &verified_receiver, - &retransmit_sender, - &turbine_disabled, - &thread_pool, - &mut stats, - ) { - if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { - break; - } - } else { - now = Instant::now(); - } - stats.maybe_submit(); - } - }) - .unwrap() - } - - fn should_exit_on_error(e: Error, handle_timeout: &mut F, handle_error: &H) -> bool + fn should_exit_on_error(e: Error, handle_error: &H) -> bool where - F: FnMut(), H: Fn(), { match e { Error::RecvTimeout(RecvTimeoutError::Disconnected) => true, - Error::RecvTimeout(RecvTimeoutError::Timeout) => { - handle_timeout(); - false - } + Error::RecvTimeout(RecvTimeoutError::Timeout) => false, Error::Send => true, _ => { handle_error(); @@ -619,7 +468,6 @@ impl WindowService { } pub(crate) fn join(self) -> thread::Result<()> { - self.t_window.join()?; self.t_insert.join()?; self.t_check_duplicate.join()?; self.repair_service.join()