From 03c1744e1d03cb8c7cac6a10aff930007ed536ac Mon Sep 17 00:00:00 2001 From: Tyera Date: Thu, 20 Apr 2023 19:23:14 -0500 Subject: [PATCH] Register SendTransactionService exit (#31261) * Pass exit into SendTransactionService * Abort SendTransactionService with BanksService * Register SendTransactionService exit as part of RpcService validator Exit * Improve test, ensure receiver has been dropped --- banks-server/src/banks_server.rs | 4 +- banks-server/src/rpc_banks_service.rs | 1 + rpc/src/rpc.rs | 3 ++ rpc/src/rpc_service.rs | 7 ++- .../src/send_transaction_service.rs | 49 ++++++++++++++++++- 5 files changed, 60 insertions(+), 4 deletions(-) diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 049353647..1137c3eea 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -33,7 +33,7 @@ use { convert::TryFrom, io, net::{Ipv4Addr, SocketAddr}, - sync::{Arc, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, thread::Builder, time::Duration, }, @@ -433,6 +433,7 @@ pub async fn start_tcp_server( bank_forks: Arc>, block_commitment_cache: Arc>, connection_cache: Arc, + exit: Arc, ) -> io::Result<()> { // Note: These settings are copied straight from the tarpc example. let server = tcp::listen(listen_addr, Bincode::default) @@ -460,6 +461,7 @@ pub async fn start_tcp_server( &connection_cache, 5_000, 0, + exit.clone(), ); let server = BanksServer::new( diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs index 8a2f48156..f3224fb64 100644 --- a/banks-server/src/rpc_banks_service.rs +++ b/banks-server/src/rpc_banks_service.rs @@ -39,6 +39,7 @@ async fn start_abortable_tcp_server( bank_forks.clone(), block_commitment_cache.clone(), connection_cache, + exit.clone(), ) .fuse(); let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 95059025f..edf8843a2 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -365,6 +365,7 @@ impl JsonRpcRequestProcessor { &connection_cache, 1000, 1, + exit.clone(), ); Self { @@ -6429,6 +6430,7 @@ pub mod tests { &connection_cache, 1000, 1, + exit, ); let mut bad_transaction = system_transaction::transfer( @@ -6697,6 +6699,7 @@ pub mod tests { &connection_cache, 1000, 1, + exit, ); assert_eq!( request_processor.get_block_commitment(0), diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 111fd6143..8153b9ae0 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -476,6 +476,7 @@ impl JsonRpcService { prioritization_fee_cache, ); + let exit = Arc::new(AtomicBool::new(false)); let leader_info = poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( @@ -485,6 +486,7 @@ impl JsonRpcService { receiver, &connection_cache, send_transaction_service_config, + exit.clone(), )); #[cfg(test)] @@ -556,7 +558,10 @@ impl JsonRpcService { validator_exit .write() .unwrap() - .register_exit(Box::new(move || close_handle_.close())); + .register_exit(Box::new(move || { + close_handle_.close(); + exit.store(true, Ordering::Relaxed); + })); Ok(Self { thread_hdl, #[cfg(test)] diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 45db7fe0c..03ce336b2 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -332,6 +332,7 @@ impl SendTransactionService { connection_cache: &Arc, retry_rate_ms: u64, leader_forward_count: u64, + exit: Arc, ) -> Self { let config = Config { retry_rate_ms, @@ -345,6 +346,7 @@ impl SendTransactionService { receiver, connection_cache, config, + exit, ) } @@ -355,6 +357,7 @@ impl SendTransactionService { receiver: Receiver, connection_cache: &Arc, config: Config, + exit: Arc, ) -> Self { let stats_report = Arc::new(SendTransactionServiceStatsReport::default()); @@ -362,7 +365,6 @@ impl SendTransactionService { let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); - let exit = Arc::new(AtomicBool::new(false)); let receive_txn_thread = Self::receive_txn_thread( tpu_address, receiver, @@ -776,7 +778,7 @@ mod test { use { super::*, crate::tpu_info::NullTpuInfo, - crossbeam_channel::unbounded, + crossbeam_channel::{bounded, unbounded}, solana_sdk::{ account::AccountSharedData, genesis_config::create_genesis_config, @@ -804,12 +806,55 @@ mod test { &connection_cache, 1000, 1, + Arc::new(AtomicBool::new(false)), ); drop(sender); send_transaction_service.join().unwrap(); } + #[test] + fn validator_exit() { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let bank = Bank::default_for_tests(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let (sender, receiver) = bounded(0); + + let dummy_tx_info = || TransactionInfo { + signature: Signature::default(), + wire_transaction: vec![0; 128], + last_valid_block_height: 0, + durable_nonce_info: None, + max_retries: None, + retries: 0, + last_sent_time: None, + }; + + let exit = Arc::new(AtomicBool::new(false)); + let connection_cache = Arc::new(ConnectionCache::default()); + let _send_transaction_service = SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + &connection_cache, + 1000, + 1, + exit.clone(), + ); + + sender.send(dummy_tx_info()).unwrap(); + + thread::spawn(move || { + exit.store(true, Ordering::Relaxed); + }); + + let mut option = Ok(()); + while option.is_ok() { + option = sender.send(dummy_tx_info()); + } + } + #[test] fn process_transactions() { solana_logger::setup();