diff --git a/Cargo.lock b/Cargo.lock index e242c734bf..d9721b090d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5617,7 +5617,9 @@ version = "1.11.0" dependencies = [ "crossbeam-channel", "log", + "solana-client", "solana-logger", + "solana-measure", "solana-metrics", "solana-runtime", "solana-sdk", diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 3846bb76a3..3e9ed4f78e 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3595,6 +3595,8 @@ version = "1.11.0" dependencies = [ "crossbeam-channel", "log", + "solana-client", + "solana-measure", "solana-metrics", "solana-runtime", "solana-sdk", diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index 4073b56f28..a1cf512e0f 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -12,10 +12,13 @@ edition = "2021" [dependencies] crossbeam-channel = "0.5" log = "0.4.14" +solana-client = { path = "../client", version = "=1.11.0" } +solana-measure = { path = "../measure", version = "=1.11.0" } solana-metrics = { path = "../metrics", version = "=1.11.0" } solana-runtime = { path = "../runtime", version = "=1.11.0" } solana-sdk = { path = "../sdk", version = "=1.11.0" } + [dev-dependencies] solana-logger = { path = "../logger", version = "=1.11.0" } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 36ad86e8b1..99b3f2fc85 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,12 +2,14 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, + solana_client::connection_cache, + solana_measure::measure::Measure, solana_metrics::{datapoint_warn, inc_new_counter_info}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, std::{ collections::hash_map::{Entry, HashMap}, - net::{SocketAddr, UdpSocket}, + net::SocketAddr, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -134,12 +136,11 @@ impl SendTransactionService { let mut last_status_check = Instant::now(); let mut last_leader_refresh = Instant::now(); let mut transactions = HashMap::new(); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); if let Some(leader_info) = leader_info.as_mut() { leader_info.refresh_recent_peers(); } - + connection_cache::set_use_quic(config.use_quic); Builder::new() .name("send-tx-sv2".to_string()) .spawn(move || loop { @@ -164,11 +165,7 @@ impl SendTransactionService { }) .unwrap_or_else(|| vec![&tpu_address]); for address in addresses { - Self::send_transaction( - &send_socket, - address, - &transaction_info.wire_transaction, - ); + Self::send_transaction(address, &transaction_info.wire_transaction); } if transactions_len < MAX_TRANSACTION_QUEUE_SIZE { inc_new_counter_info!("send_transaction_service-insert-tx", 1); @@ -199,7 +196,6 @@ impl SendTransactionService { let _result = Self::process_transactions( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &leader_info, @@ -221,7 +217,6 @@ impl SendTransactionService { fn process_transactions( working_bank: &Arc, root_bank: &Arc, - send_socket: &UdpSocket, tpu_address: &SocketAddr, transactions: &mut HashMap, leader_info: &Option, @@ -292,11 +287,7 @@ impl SendTransactionService { }) .unwrap_or_else(|| vec![tpu_address]); for address in addresses { - Self::send_transaction( - send_socket, - address, - &transaction_info.wire_transaction, - ); + Self::send_transaction(address, &transaction_info.wire_transaction); } true } @@ -317,14 +308,20 @@ impl SendTransactionService { result } - fn send_transaction( - send_socket: &UdpSocket, - tpu_address: &SocketAddr, - wire_transaction: &[u8], - ) { - if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { + fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) { + let mut measure = Measure::start("send_transaction_service-us"); + let connection = connection_cache::get_connection(tpu_address); + + if let Err(err) = connection.send_wire_transaction(wire_transaction) { warn!("Failed to send transaction to {}: {:?}", tpu_address, err); } + measure.stop(); + inc_new_counter_info!( + "send_transaction_service-us", + measure.as_us() as usize, + 1000, + 1000 + ); } pub fn join(self) -> thread::Result<()> { @@ -372,7 +369,6 @@ mod test { let (genesis_config, mint_keypair) = create_genesis_config(4); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, @@ -419,7 +415,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -448,7 +443,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -477,7 +471,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -506,7 +499,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -536,7 +528,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -576,7 +567,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -594,7 +584,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -617,7 +606,6 @@ mod test { let (genesis_config, mint_keypair) = create_genesis_config(4); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, @@ -674,7 +662,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -702,7 +689,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -732,7 +718,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -760,7 +745,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -789,7 +773,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -818,7 +801,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -848,7 +830,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None, @@ -873,7 +854,6 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &send_socket, &tpu_address, &mut transactions, &None,