diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 929bbb4d25..d85ec9dc2e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -14,7 +14,7 @@ use { sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, - crossbeam_channel::{unbounded, Receiver}, + crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, @@ -32,11 +32,15 @@ use { net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, + time::Duration, }, }; pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; +/// Timeout interval when joining threads during TPU close +const TPU_THREADS_JOIN_TIMEOUT_SECONDS: u64 = 10; + // allow multiple connections for NAT and any open/close overlap pub const MAX_QUIC_CONNECTIONS_PER_IP: usize = 8; @@ -208,6 +212,22 @@ impl Tpu { } pub fn join(self) -> thread::Result<()> { + // spawn a new thread to wait for tpu close + let (sender, receiver) = bounded(0); + let _ = thread::spawn(move || { + let _ = self.do_join(); + sender.send(()).unwrap(); + }); + + // exit can deadlock. put an upper-bound on how long we wait for it + let timeout = Duration::from_secs(TPU_THREADS_JOIN_TIMEOUT_SECONDS); + if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) { + error!("timeout for closing tvu"); + } + Ok(()) + } + + fn do_join(self) -> thread::Result<()> { let results = vec![ self.fetch_stage.join(), self.sigverify_stage.join(), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e3021abd88..649b027019 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -26,7 +26,7 @@ use { tower_storage::TowerStorage, voting_service::VotingService, }, - crossbeam_channel::{unbounded, Receiver}, + crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -60,9 +60,13 @@ use { net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, + time::Duration, }, }; +/// Timeout interval when joining threads during TVU close +const TVU_THREADS_JOIN_TIMEOUT_SECONDS: u64 = 10; + pub struct Tvu { fetch_stage: ShredFetchStage, sigverify_stage: SigVerifyStage, @@ -358,6 +362,22 @@ impl Tvu { } pub fn join(self) -> thread::Result<()> { + // spawn a new thread to wait for tvu close + let (sender, receiver) = bounded(0); + let _ = thread::spawn(move || { + let _ = self.do_join(); + sender.send(()).unwrap(); + }); + + // exit can deadlock. put an upper-bound on how long we wait for it + let timeout = Duration::from_secs(TVU_THREADS_JOIN_TIMEOUT_SECONDS); + if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) { + error!("timeout for closing tvu"); + } + Ok(()) + } + + fn do_join(self) -> thread::Result<()> { self.retransmit_stage.join()?; self.fetch_stage.join()?; self.sigverify_stage.join()?; diff --git a/core/src/validator.rs b/core/src/validator.rs index eacb54f6f3..568a61ed53 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1844,7 +1844,20 @@ mod tests { *start_progress.read().unwrap(), ValidatorStartProgress::Running ); - validator.close(); + + // spawn a new thread to wait for validator close + let (sender, receiver) = bounded(0); + let _ = thread::spawn(move || { + validator.close(); + sender.send(()).unwrap(); + }); + + // exit can deadlock. put an upper-bound on how long we wait for it + let timeout = Duration::from_secs(30); + if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) { + panic!("timeout for closing validator"); + } + remove_dir_all(validator_ledger_path).unwrap(); }