tvu and tpu timeout on joining its microservices (#24111)
* panic when test timeout * nonblocking send when when droping banks * debug log * timeout for tvu * unused varaible * timeout for tpu * Revert "debug log" This reverts commit da780a3301a51d7c496141a85fcd35014fe6dff5. * add timeout const * fix typo * Revert "nonblocking send when when droping banks". I will create another pull request for this. This reverts commit 088c98ec0facf825b5eca058fb860deba6d28888. * Update core/src/tpu.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * Update core/src/tpu.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * Update core/src/tvu.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * Update core/src/tvu.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> * Update core/src/validator.rs Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com> Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
parent
fbe5e51a16
commit
e105547c14
|
@ -14,7 +14,7 @@ use {
|
||||||
sigverify::TransactionSigVerifier,
|
sigverify::TransactionSigVerifier,
|
||||||
sigverify_stage::SigVerifyStage,
|
sigverify_stage::SigVerifyStage,
|
||||||
},
|
},
|
||||||
crossbeam_channel::{unbounded, Receiver},
|
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError},
|
||||||
solana_gossip::cluster_info::ClusterInfo,
|
solana_gossip::cluster_info::ClusterInfo,
|
||||||
solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender},
|
solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender},
|
||||||
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
|
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
|
||||||
|
@ -32,11 +32,15 @@ use {
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
||||||
thread,
|
thread,
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
|
pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;
|
||||||
|
|
||||||
|
/// Timeout interval when joining threads during TPU close
|
||||||
|
const TPU_THREADS_JOIN_TIMEOUT_SECONDS: u64 = 10;
|
||||||
|
|
||||||
// allow multiple connections for NAT and any open/close overlap
|
// allow multiple connections for NAT and any open/close overlap
|
||||||
pub const MAX_QUIC_CONNECTIONS_PER_IP: usize = 8;
|
pub const MAX_QUIC_CONNECTIONS_PER_IP: usize = 8;
|
||||||
|
|
||||||
|
@ -208,6 +212,22 @@ impl Tpu {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
// spawn a new thread to wait for tpu close
|
||||||
|
let (sender, receiver) = bounded(0);
|
||||||
|
let _ = thread::spawn(move || {
|
||||||
|
let _ = self.do_join();
|
||||||
|
sender.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// exit can deadlock. put an upper-bound on how long we wait for it
|
||||||
|
let timeout = Duration::from_secs(TPU_THREADS_JOIN_TIMEOUT_SECONDS);
|
||||||
|
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
|
||||||
|
error!("timeout for closing tvu");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_join(self) -> thread::Result<()> {
|
||||||
let results = vec![
|
let results = vec![
|
||||||
self.fetch_stage.join(),
|
self.fetch_stage.join(),
|
||||||
self.sigverify_stage.join(),
|
self.sigverify_stage.join(),
|
||||||
|
|
|
@ -26,7 +26,7 @@ use {
|
||||||
tower_storage::TowerStorage,
|
tower_storage::TowerStorage,
|
||||||
voting_service::VotingService,
|
voting_service::VotingService,
|
||||||
},
|
},
|
||||||
crossbeam_channel::{unbounded, Receiver},
|
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError},
|
||||||
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||||
solana_gossip::cluster_info::ClusterInfo,
|
solana_gossip::cluster_info::ClusterInfo,
|
||||||
solana_ledger::{
|
solana_ledger::{
|
||||||
|
@ -60,9 +60,13 @@ use {
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
||||||
thread,
|
thread,
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Timeout interval when joining threads during TVU close
|
||||||
|
const TVU_THREADS_JOIN_TIMEOUT_SECONDS: u64 = 10;
|
||||||
|
|
||||||
pub struct Tvu {
|
pub struct Tvu {
|
||||||
fetch_stage: ShredFetchStage,
|
fetch_stage: ShredFetchStage,
|
||||||
sigverify_stage: SigVerifyStage,
|
sigverify_stage: SigVerifyStage,
|
||||||
|
@ -358,6 +362,22 @@ impl Tvu {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
// spawn a new thread to wait for tvu close
|
||||||
|
let (sender, receiver) = bounded(0);
|
||||||
|
let _ = thread::spawn(move || {
|
||||||
|
let _ = self.do_join();
|
||||||
|
sender.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// exit can deadlock. put an upper-bound on how long we wait for it
|
||||||
|
let timeout = Duration::from_secs(TVU_THREADS_JOIN_TIMEOUT_SECONDS);
|
||||||
|
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
|
||||||
|
error!("timeout for closing tvu");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_join(self) -> thread::Result<()> {
|
||||||
self.retransmit_stage.join()?;
|
self.retransmit_stage.join()?;
|
||||||
self.fetch_stage.join()?;
|
self.fetch_stage.join()?;
|
||||||
self.sigverify_stage.join()?;
|
self.sigverify_stage.join()?;
|
||||||
|
|
|
@ -1844,7 +1844,20 @@ mod tests {
|
||||||
*start_progress.read().unwrap(),
|
*start_progress.read().unwrap(),
|
||||||
ValidatorStartProgress::Running
|
ValidatorStartProgress::Running
|
||||||
);
|
);
|
||||||
validator.close();
|
|
||||||
|
// spawn a new thread to wait for validator close
|
||||||
|
let (sender, receiver) = bounded(0);
|
||||||
|
let _ = thread::spawn(move || {
|
||||||
|
validator.close();
|
||||||
|
sender.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// exit can deadlock. put an upper-bound on how long we wait for it
|
||||||
|
let timeout = Duration::from_secs(30);
|
||||||
|
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
|
||||||
|
panic!("timeout for closing validator");
|
||||||
|
}
|
||||||
|
|
||||||
remove_dir_all(validator_ledger_path).unwrap();
|
remove_dir_all(validator_ledger_path).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue