diff --git a/Cargo.lock b/Cargo.lock index ffc82e27e4..c5f6a984b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4669,6 +4669,7 @@ dependencies = [ "solana-faucet", "solana-logger 1.11.0", "solana-measure", + "solana-metrics", "solana-net-utils", "solana-sdk", "solana-streamer", diff --git a/client/Cargo.toml b/client/Cargo.toml index bd30ff138d..ebe822557c 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -40,6 +40,7 @@ solana-account-decoder = { path = "../account-decoder", version = "=1.11.0" } solana-clap-utils = { path = "../clap-utils", version = "=1.11.0" } solana-faucet = { path = "../faucet", version = "=1.11.0" } solana-measure = { path = "../measure", version = "=1.11.0" } +solana-metrics = { path = "../metrics", version = "=1.11.0" } solana-net-utils = { path = "../net-utils", version = "=1.11.0" } solana-sdk = { path = "../sdk", version = "=1.11.0" } solana-streamer = { path = "../streamer", version = "=1.11.0" } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 424de6a100..fa4a1925ed 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -93,6 +93,17 @@ pub fn send_wire_transaction_batch( } } +pub fn send_wire_transaction_async( + packets: Vec, + addr: &SocketAddr, +) -> Result<(), TransportError> { + let conn = get_connection(addr); + match conn { + Connection::Udp(conn) => conn.send_wire_transaction_async(packets), + Connection::Quic(conn) => conn.send_wire_transaction_async(packets), + } +} + pub fn send_wire_transaction( wire_transaction: &[u8], addr: &SocketAddr, diff --git a/client/src/lib.rs b/client/src/lib.rs index 6a169b1f38..9ffe55bfa1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -27,6 +27,9 @@ pub mod tpu_connection; pub mod transaction_executor; pub mod udp_client; +#[macro_use] +extern crate solana_metrics; + pub mod mock_sender_for_cli { /// Magic `SIGNATURE` value used by `solana-cli` unit tests. /// Please don't use this constant. diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index aa7135d44f..08d3af8a9b 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -6,6 +6,8 @@ use { async_mutex::Mutex, futures::future::join_all, itertools::Itertools, + lazy_static::lazy_static, + log::*, quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, solana_sdk::{ quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, @@ -39,9 +41,14 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { Ok(rustls::client::ServerCertVerified::assertion()) } } +lazy_static! { + static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); +} struct QuicClient { - runtime: Runtime, endpoint: Endpoint, connection: Arc>>>, addr: SocketAddr, @@ -67,9 +74,9 @@ impl TpuConnection for QuicTpuConnection { where T: AsRef<[u8]>, { - let _guard = self.client.runtime.enter(); + let _guard = RUNTIME.enter(); let send_buffer = self.client.send_buffer(wire_transaction); - self.client.runtime.block_on(send_buffer)?; + RUNTIME.block_on(send_buffer)?; Ok(()) } @@ -77,21 +84,33 @@ impl TpuConnection for QuicTpuConnection { where T: AsRef<[u8]>, { - let _guard = self.client.runtime.enter(); + let _guard = RUNTIME.enter(); let send_batch = self.client.send_batch(buffers); - self.client.runtime.block_on(send_batch)?; + RUNTIME.block_on(send_batch)?; + Ok(()) + } + + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + let _guard = RUNTIME.enter(); + //drop and detach the task + let client = self.client.clone(); + inc_new_counter_info!("send_wire_transaction_async", 1); + let _ = RUNTIME.spawn(async move { + let send_buffer = client.send_buffer(wire_transaction); + if let Err(e) = send_buffer.await { + inc_new_counter_warn!("send_wire_transaction_async_fail", 1); + warn!("Failed to send transaction async to {:?}", e); + } else { + inc_new_counter_info!("send_wire_transaction_async_pass", 1); + } + }); Ok(()) } } impl QuicClient { pub fn new(client_socket: UdpSocket, addr: SocketAddr) -> Self { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - - let _guard = runtime.enter(); + let _guard = RUNTIME.enter(); let crypto = rustls::ClientConfig::builder() .with_safe_defaults() @@ -100,12 +119,11 @@ impl QuicClient { let create_endpoint = QuicClient::create_endpoint(EndpointConfig::default(), client_socket); - let mut endpoint = runtime.block_on(create_endpoint); + let mut endpoint = RUNTIME.block_on(create_endpoint); endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto))); Self { - runtime, endpoint, connection: Arc::new(Mutex::new(None)), addr, diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 5228a54536..0c11e204fe 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -22,6 +22,8 @@ pub trait TpuConnection { where T: AsRef<[u8]>; + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; + fn par_serialize_and_send_transaction_batch( &self, transactions: &[VersionedTransaction], diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index 7838bc7c14..c9a2747bff 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -34,6 +34,11 @@ impl TpuConnection for UdpTpuConnection { Ok(()) } + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + self.socket.send_to(wire_transaction.as_ref(), self.addr)?; + Ok(()) + } + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]>, diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 8f211ed3c2..818d0b23b4 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3376,6 +3376,7 @@ dependencies = [ "solana-clap-utils", "solana-faucet", "solana-measure", + "solana-metrics", "solana-net-utils", "solana-sdk", "solana-streamer", diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index ce467b5d2c..31b2660728 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -311,7 +311,9 @@ impl SendTransactionService { fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) { let mut measure = Measure::start("send_transaction_service-us"); - if let Err(err) = connection_cache::send_wire_transaction(wire_transaction, tpu_address) { + if let Err(err) = + connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address) + { warn!("Failed to send transaction to {}: {:?}", tpu_address, err); } measure.stop();