diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 6b1cafb66..482062b06 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -25,7 +25,6 @@ use std::{ io, net::{Ipv4Addr, SocketAddr}, sync::{ - atomic::AtomicBool, mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, @@ -250,14 +249,8 @@ pub async fn start_tcp_server( // the generated Banks trait. .map(move |chan| { let (sender, receiver) = channel(); - let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); - SendTransactionService::new( - tpu_addr, - &bank_forks, - &exit_send_transaction_service, - receiver, - ); + SendTransactionService::new(tpu_addr, &bank_forks, receiver); let server = BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); diff --git a/banks-server/src/send_transaction_service.rs b/banks-server/src/send_transaction_service.rs index 89f9f80e4..54eb6b3f4 100644 --- a/banks-server/src/send_transaction_service.rs +++ b/banks-server/src/send_transaction_service.rs @@ -1,3 +1,4 @@ +// TODO: Merge this implementation with the one at `core/src/send_transaction_service.rs` use log::*; use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; @@ -6,8 +7,7 @@ use std::{ collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::Receiver, + mpsc::{Receiver, RecvTimeoutError}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -50,10 +50,9 @@ impl SendTransactionService { pub fn new( tpu_address: SocketAddr, bank_forks: &Arc>, - exit: &Arc, receiver: Receiver, ) -> Self { - let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); + let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address); Self { thread } } @@ -61,7 +60,6 @@ impl SendTransactionService { receiver: Receiver, bank_forks: Arc>, tpu_address: SocketAddr, - exit: Arc, ) -> JoinHandle<()> { let mut last_status_check = Instant::now(); let mut transactions = HashMap::new(); @@ -70,20 +68,20 @@ impl SendTransactionService { Builder::new() .name("send-tx-svc".to_string()) .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - - if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { - Self::send_transaction( - &send_socket, - &tpu_address, - &transaction_info.wire_transaction, - ); - if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { - transactions.insert(transaction_info.signature, transaction_info); - } else { - datapoint_warn!("send_transaction_service-queue-overflow"); + match receiver.recv_timeout(Duration::from_secs(1)) { + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => {} + Ok(transaction_info) => { + Self::send_transaction( + &send_socket, + &tpu_address, + &transaction_info.wire_transaction, + ); + if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { + transactions.insert(transaction_info.signature, transaction_info); + } else { + datapoint_warn!("send_transaction_service-queue-overflow"); + } } } @@ -193,13 +191,12 @@ mod test { let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let exit = Arc::new(AtomicBool::new(false)); - let (_sender, receiver) = channel(); + let (sender, receiver) = channel(); let send_tranaction_service = - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, receiver); - exit.store(true, Ordering::Relaxed); + drop(sender); send_tranaction_service.join().unwrap(); } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index e5b0a55ee..5ed960b62 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -218,7 +218,7 @@ impl JsonRpcRequestProcessor { let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; let (sender, receiver) = channel(); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); Self { config: JsonRpcConfig::default(), @@ -2686,7 +2686,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( &leader_pubkey, @@ -3998,7 +3998,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let res = io.handle_request_sync(req, meta); @@ -4039,7 +4039,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); let mut bad_transaction = system_transaction::transfer(&mint_keypair, &Pubkey::new_rand(), 42, Hash::default()); @@ -4221,7 +4221,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); assert_eq!(request_processor.validator_exit(), false); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -4250,7 +4250,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); assert_eq!(request_processor.validator_exit(), true); assert_eq!(exit.load(Ordering::Relaxed), true); } @@ -4336,7 +4336,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 443be89a1..d4b24db7c 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -313,14 +313,12 @@ impl JsonRpcService { bigtable_ledger_storage, ); - let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); let leader_info = poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); let _send_transaction_service = Arc::new(SendTransactionService::new( tpu_address, &bank_forks, leader_info, - &exit_send_transaction_service, receiver, )); @@ -369,7 +367,6 @@ impl JsonRpcService { let server = server.unwrap(); close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); - exit_send_transaction_service.store(true, Ordering::Relaxed); exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); }) .unwrap(); diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs index d897c372b..79f9a268d 100644 --- a/core/src/send_transaction_service.rs +++ b/core/src/send_transaction_service.rs @@ -1,3 +1,4 @@ +// TODO: Merge this implementation with the one at `banks-server/src/send_transaction_service.rs` use crate::cluster_info::ClusterInfo; use crate::poh_recorder::PohRecorder; use log::*; @@ -9,8 +10,7 @@ use std::{ collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::Receiver, + mpsc::{Receiver, RecvTimeoutError}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -87,16 +87,9 @@ impl SendTransactionService { tpu_address: SocketAddr, bank_forks: &Arc>, leader_info: Option, - exit: &Arc, receiver: Receiver, ) -> Self { - let thread = Self::retry_thread( - tpu_address, - receiver, - bank_forks.clone(), - leader_info, - exit.clone(), - ); + let thread = Self::retry_thread(tpu_address, receiver, bank_forks.clone(), leader_info); Self { thread } } @@ -105,7 +98,6 @@ impl SendTransactionService { receiver: Receiver, bank_forks: Arc>, mut leader_info: Option, - exit: Arc, ) -> JoinHandle<()> { let mut last_status_check = Instant::now(); let mut transactions = HashMap::new(); @@ -118,24 +110,24 @@ impl SendTransactionService { Builder::new() .name("send-tx-sv2".to_string()) .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - - if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { - let address = leader_info - .as_ref() - .and_then(|leader_info| leader_info.get_leader_tpu()) - .unwrap_or(&tpu_address); - Self::send_transaction( - &send_socket, - address, - &transaction_info.wire_transaction, - ); - if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { - transactions.insert(transaction_info.signature, transaction_info); - } else { - datapoint_warn!("send_transaction_service-queue-overflow"); + match receiver.recv_timeout(Duration::from_secs(1)) { + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => {} + Ok(transaction_info) => { + let address = leader_info + .as_ref() + .and_then(|leader_info| leader_info.get_leader_tpu()) + .unwrap_or(&tpu_address); + Self::send_transaction( + &send_socket, + address, + &transaction_info.wire_transaction, + ); + if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { + transactions.insert(transaction_info.signature, transaction_info); + } else { + datapoint_warn!("send_transaction_service-queue-overflow"); + } } } @@ -253,13 +245,12 @@ mod test { let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let exit = Arc::new(AtomicBool::new(false)); - let (_sender, receiver) = channel(); + let (sender, receiver) = channel(); let send_tranaction_service = - SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, receiver); - exit.store(true, Ordering::Relaxed); + drop(sender); send_tranaction_service.join().unwrap(); }