From 51b37f0184b15c02306d3047770d00755ab9c764 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Thu, 31 Mar 2022 16:44:23 -0500 Subject: [PATCH] Modify rpc_completed_slot_service to be non-blocking (#24007) * timeout for validator exits * clippy * print backtrace when panic * add backtrace package * increase time out to 30s * debug logging * make rpc complete service non blocking * reduce log level * remove logging * recv_timeout * remove backtrace * remove sleep * remove unused variable * add comments * Update core/src/validator.rs Co-authored-by: Trent Nelson * Update core/src/validator.rs Co-authored-by: Trent Nelson * whitespace * more whitespace * fix build Co-authored-by: Trent Nelson --- core/src/validator.rs | 28 +++++++++++++++++++------- rpc/src/rpc_completed_slots_service.rs | 20 +++++++++++++++--- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 29f50b9ab..d8a0ab536 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -872,8 +872,11 @@ impl Validator { let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); - let rpc_completed_slots_service = - RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); + let rpc_completed_slots_service = RpcCompletedSlotsService::spawn( + completed_slots_receiver, + rpc_subscriptions.clone(), + exit.clone(), + ); let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( @@ -1806,9 +1809,10 @@ pub fn is_snapshot_config_valid( mod tests { use { super::*, + crossbeam_channel::{bounded, RecvTimeoutError}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, - std::fs::remove_dir_all, + std::{fs::remove_dir_all, thread, time::Duration}, }; fn validator_exit() { @@ -1926,12 +1930,22 @@ mod tests { // Each validator can exit in parallel to speed many sequential calls to join` validators.iter_mut().for_each(|v| v.exit()); - // While join is called sequentially, the above exit call notified all the - // validators to exit from all their threads - validators.into_iter().for_each(|validator| { - validator.join(); + + // spawn a new thread to wait for the join of the validator + let (sender, receiver) = bounded(0); + let _ = thread::spawn(move || { + validators.into_iter().for_each(|validator| { + validator.join(); + }); + sender.send(()).unwrap(); }); + // timeout of 30s for shutting down the validators + let timeout = Duration::from_secs(30); + if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) { + panic!("timeout for shutting down validators",); + } + for path in ledger_paths { remove_dir_all(path).unwrap(); } diff --git a/rpc/src/rpc_completed_slots_service.rs b/rpc/src/rpc_completed_slots_service.rs index 0f9d393e6..61eabba75 100644 --- a/rpc/src/rpc_completed_slots_service.rs +++ b/rpc/src/rpc_completed_slots_service.rs @@ -4,21 +4,35 @@ use { solana_ledger::blockstore::CompletedSlotsReceiver, solana_sdk::timing::timestamp, std::{ - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, thread::{Builder, JoinHandle}, + time::Duration, }, }; +pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100; + pub struct RpcCompletedSlotsService; impl RpcCompletedSlotsService { pub fn spawn( completed_slots_receiver: CompletedSlotsReceiver, rpc_subscriptions: Arc, + exit: Arc, ) -> JoinHandle<()> { Builder::new() .name("solana-rpc-completed-slots-service".to_string()) - .spawn(move || { - for slots in completed_slots_receiver.iter() { + .spawn(move || loop { + // shutdown the service + if exit.load(Ordering::Relaxed) { + break; + } + + if let Ok(slots) = completed_slots_receiver + .recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS)) + { for slot in slots { rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { slot,