From 145906123a7f39d77f408eaeb61b74b42d784af7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 10 Jul 2020 19:14:41 -0600 Subject: [PATCH] Simplify use of SendTransactionService (#10999) * Send transaction upon recv This will allow us to move the channel to the public interface * Use a channel, not a method, to communicate * Pipeline the services * Ignore unused return values * Fix clippy warning --- core/src/rpc.rs | 87 ++++++++++++++----------- core/src/rpc_service.rs | 18 ++--- runtime/src/send_transaction_service.rs | 84 +++++++++--------------- 3 files changed, 87 insertions(+), 102 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 443c44a1b9..861d89e0b4 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -29,7 +29,7 @@ use solana_runtime::{ bank_forks::BankForks, commitment::{BlockCommitmentArray, BlockCommitmentCache}, log_collector::LogCollector, - send_transaction_service::SendTransactionService, + send_transaction_service::{SendTransactionService, TransactionInfo}, }; use solana_sdk::{ account_utils::StateMut, @@ -58,7 +58,8 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, RwLock, }, }; @@ -97,7 +98,7 @@ pub struct JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, - send_transaction_service: Arc, + transaction_sender: Arc>>, } impl Metadata for JsonRpcRequestProcessor {} @@ -153,19 +154,22 @@ impl JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, - send_transaction_service: Arc, - ) -> Self { - Self { - config, - bank_forks, - block_commitment_cache, - blockstore, - validator_exit, - health, - cluster_info, - genesis_hash, - send_transaction_service, - } + ) -> (Self, Receiver) { + let (sender, receiver) = channel(); + ( + Self { + config, + bank_forks, + block_commitment_cache, + blockstore, + validator_exit, + health, + cluster_info, + genesis_hash, + transaction_sender: Arc::new(Mutex::new(sender)), + }, + receiver, + ) } // Useful for unit testing @@ -179,9 +183,12 @@ impl JsonRpcRequestProcessor { let exit = Arc::new(AtomicBool::new(false)); let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; + let (sender, receiver) = channel(); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + Self { config: JsonRpcConfig::default(), - bank_forks: bank_forks.clone(), + bank_forks, block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( HashMap::new(), 0, @@ -195,11 +202,7 @@ impl JsonRpcRequestProcessor { health: Arc::new(RpcHealth::new(cluster_info.clone(), None, 0, exit.clone())), cluster_info, genesis_hash, - send_transaction_service: Arc::new(SendTransactionService::new( - tpu_address, - &bank_forks, - &exit, - )), + transaction_sender: Arc::new(Mutex::new(sender)), } } @@ -1479,8 +1482,12 @@ impl RpcSol for RpcSolImpl { Error::internal_error() })?; - meta.send_transaction_service - .send(signature, wire_transaction, last_valid_slot); + let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot); + meta.transaction_sender + .lock() + .unwrap() + .send(transaction_info) + .unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err)); Ok(signature.to_string()) } @@ -1527,8 +1534,12 @@ impl RpcSol for RpcSolImpl { } } - meta.send_transaction_service - .send(signature, wire_transaction, last_valid_slot); + let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot); + meta.transaction_sender + .lock() + .unwrap() + .send(transaction_info) + .unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err)); Ok(signature.to_string()) } @@ -1883,7 +1894,7 @@ pub mod tests { &socketaddr!("127.0.0.1:1234"), )); - let meta = JsonRpcRequestProcessor::new( + let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig { enable_rpc_transaction_history: true, identity_pubkey: *pubkey, @@ -1896,8 +1907,8 @@ pub mod tests { RpcHealth::stub(), cluster_info.clone(), Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( &leader_pubkey, @@ -3029,7 +3040,7 @@ pub mod tests { let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; let bank_forks = new_bank_forks().0; - let meta = JsonRpcRequestProcessor::new( + let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), new_bank_forks().0, block_commitment_cache, @@ -3038,8 +3049,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let res = io.handle_request_sync(req, meta); @@ -3068,7 +3079,7 @@ pub mod tests { ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), )); let tpu_address = cluster_info.my_contact_info().tpu; - let meta = JsonRpcRequestProcessor::new( + let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), bank_forks.clone(), block_commitment_cache, @@ -3077,8 +3088,8 @@ pub mod tests { health.clone(), cluster_info, Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); let mut bad_transaction = system_transaction::transfer(&Keypair::new(), &Pubkey::default(), 42, Hash::default()); @@ -3215,7 +3226,7 @@ pub mod tests { let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; let bank_forks = new_bank_forks().0; - let request_processor = JsonRpcRequestProcessor::new( + let (request_processor, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), bank_forks.clone(), block_commitment_cache, @@ -3224,8 +3235,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!(request_processor.validator_exit(), false); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -3242,7 +3253,7 @@ pub mod tests { let bank_forks = new_bank_forks().0; let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; - let request_processor = JsonRpcRequestProcessor::new( + let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, bank_forks.clone(), block_commitment_cache, @@ -3251,8 +3262,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!(request_processor.validator_exit(), true); assert_eq!(exit.load(Ordering::Relaxed), true); } @@ -3329,7 +3340,7 @@ pub mod tests { config.enable_validator_exit = true; let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; - let request_processor = JsonRpcRequestProcessor::new( + let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, bank_forks.clone(), block_commitment_cache, @@ -3338,8 +3349,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), - Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)), ); + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 8438654a74..820f3f51eb 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -249,14 +249,7 @@ impl JsonRpcService { )); let tpu_address = cluster_info.my_contact_info().tpu; - let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); - let send_transaction_service = Arc::new(SendTransactionService::new( - tpu_address, - &bank_forks, - &exit_send_transaction_service, - )); - - let request_processor = JsonRpcRequestProcessor::new( + let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, bank_forks.clone(), block_commitment_cache, @@ -265,9 +258,16 @@ impl JsonRpcService { health.clone(), cluster_info, genesis_hash, - send_transaction_service, ); + let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + let _send_transaction_service = Arc::new(SendTransactionService::new( + tpu_address, + &bank_forks, + &exit_send_transaction_service, + receiver, + )); + #[cfg(test)] let test_request_processor = request_processor.clone(); diff --git a/runtime/src/send_transaction_service.rs b/runtime/src/send_transaction_service.rs index 54f8e5d651..0f24fe8a77 100644 --- a/runtime/src/send_transaction_service.rs +++ b/runtime/src/send_transaction_service.rs @@ -7,8 +7,8 @@ use std::{ net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, RwLock, + mpsc::Receiver, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -19,17 +19,24 @@ const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but m pub struct SendTransactionService { thread: JoinHandle<()>, - sender: Mutex>, - send_socket: UdpSocket, - tpu_address: SocketAddr, } -struct TransactionInfo { +pub struct TransactionInfo { signature: Signature, wire_transaction: Vec, last_valid_slot: Slot, } +impl TransactionInfo { + pub fn new(signature: Signature, wire_transaction: Vec, last_valid_slot: Slot) -> Self { + Self { + signature, + wire_transaction, + last_valid_slot, + } + } +} + #[derive(Default, Debug, PartialEq)] struct ProcessTransactionsResult { rooted: u64, @@ -44,16 +51,10 @@ impl SendTransactionService { tpu_address: SocketAddr, bank_forks: &Arc>, exit: &Arc, + receiver: Receiver, ) -> Self { - let (sender, receiver) = channel::(); - let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); - Self { - thread, - sender: Mutex::new(sender), - send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), - tpu_address, - } + Self { thread } } fn retry_thread( @@ -74,6 +75,11 @@ impl SendTransactionService { } if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { + Self::send_transaction( + &send_socket, + &tpu_address, + &transaction_info.wire_transaction, + ); if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { transactions.insert(transaction_info.signature, transaction_info); } else { @@ -168,21 +174,6 @@ impl SendTransactionService { } } - pub fn send(&self, signature: Signature, wire_transaction: Vec, last_valid_slot: Slot) { - inc_new_counter_info!("send_transaction_service-enqueue", 1, 1); - Self::send_transaction(&self.send_socket, &self.tpu_address, &wire_transaction); - - self.sender - .lock() - .unwrap() - .send(TransactionInfo { - signature, - wire_transaction, - last_valid_slot, - }) - .unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err)); - } - pub fn join(self) -> thread::Result<()> { self.thread.join() } @@ -195,6 +186,7 @@ mod test { genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer, system_transaction, }; + use std::sync::mpsc::channel; #[test] fn service_exit() { @@ -202,8 +194,10 @@ mod test { let bank = Bank::default(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let exit = Arc::new(AtomicBool::new(false)); + let (_sender, receiver) = channel(); - let send_tranaction_service = SendTransactionService::new(tpu_address, &bank_forks, &exit); + let send_tranaction_service = + SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); exit.store(true, Ordering::Relaxed); send_tranaction_service.join().unwrap(); @@ -248,11 +242,7 @@ mod test { info!("Expired transactions are dropped.."); transactions.insert( Signature::default(), - TransactionInfo { - signature: Signature::default(), - wire_transaction: vec![], - last_valid_slot: root_bank.slot() - 1, - }, + TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -273,11 +263,7 @@ mod test { info!("Rooted transactions are dropped..."); transactions.insert( rooted_signature, - TransactionInfo { - signature: rooted_signature, - wire_transaction: vec![], - last_valid_slot: working_bank.slot(), - }, + TransactionInfo::new(rooted_signature, vec![], working_bank.slot()), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -298,11 +284,7 @@ mod test { info!("Failed transactions are dropped..."); transactions.insert( failed_signature, - TransactionInfo { - signature: failed_signature, - wire_transaction: vec![], - last_valid_slot: working_bank.slot(), - }, + TransactionInfo::new(failed_signature, vec![], working_bank.slot()), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -323,11 +305,7 @@ mod test { info!("Non-rooted transactions are kept..."); transactions.insert( non_rooted_signature, - TransactionInfo { - signature: non_rooted_signature, - wire_transaction: vec![], - last_valid_slot: working_bank.slot(), - }, + TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -349,11 +327,7 @@ mod test { info!("Unknown transactions are retried..."); transactions.insert( Signature::default(), - TransactionInfo { - signature: Signature::default(), - wire_transaction: vec![], - last_valid_slot: working_bank.slot(), - }, + TransactionInfo::new(Signature::default(), vec![], working_bank.slot()), ); let result = SendTransactionService::process_transactions( &working_bank,