From 8cfc010b8451584d36ef1fdaa417998e0af678a8 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Thu, 14 Apr 2022 22:20:34 -0400 Subject: [PATCH] Send async quic batch of txs (#24298) Add an interface send_wire_transaction_batch_async to TpuConnection to allow for sending batches without waiting for completion Co-authored-by: Anatoly Yakovenko --- client/src/connection_cache.rs | 19 +++++++++++++++ client/src/lib.rs | 6 ++--- client/src/quic_client.rs | 42 ++++++++++++++++++++++++---------- client/src/tpu_connection.rs | 6 +++++ client/src/udp_client.rs | 9 ++++++++ 5 files changed, 67 insertions(+), 15 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 4d37ffe73..4acf3d319 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -256,6 +256,25 @@ pub fn send_wire_transaction_async( r } +pub fn send_wire_transaction_batch_async( + packets: Vec>, + addr: &SocketAddr, +) -> Result<(), TransportError> { + let (conn, stats) = get_connection(addr); + let client_stats = Arc::new(ClientStats::default()); + let len = packets.len(); + let r = match conn { + Connection::Udp(conn) => { + conn.send_wire_transaction_batch_async(packets, client_stats.clone()) + } + Connection::Quic(conn) => { + conn.send_wire_transaction_batch_async(packets, client_stats.clone()) + } + }; + stats.add_client_stats(&client_stats, len, r.is_ok()); + r +} + pub fn send_wire_transaction( wire_transaction: &[u8], addr: &SocketAddr, diff --git a/client/src/lib.rs b/client/src/lib.rs index 9ffe55bfa..2cef3aa96 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,6 +2,9 @@ #[macro_use] extern crate serde_derive; +#[macro_use] +extern crate solana_metrics; + pub mod blockhash_query; pub mod client_error; pub mod connection_cache; @@ -27,9 +30,6 @@ pub mod tpu_connection; pub mod transaction_executor; pub mod udp_client; -#[macro_use] -extern crate solana_metrics; - pub mod mock_sender_for_cli { /// Magic `SIGNATURE` value used by `solana-cli` unit tests. /// Please don't use this constant. diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 0bcb97f4e..69efbbee0 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -119,16 +119,31 @@ impl TpuConnection for QuicTpuConnection { stats: Arc, ) -> TransportResult<()> { let _guard = RUNTIME.enter(); - //drop and detach the task let client = self.client.clone(); - inc_new_counter_info!("send_wire_transaction_async", 1); + //drop and detach the task let _ = RUNTIME.spawn(async move { let send_buffer = client.send_buffer(wire_transaction, &stats); if let Err(e) = send_buffer.await { - inc_new_counter_warn!("send_wire_transaction_async_fail", 1); warn!("Failed to send transaction async to {:?}", e); - } else { - inc_new_counter_info!("send_wire_transaction_async_pass", 1); + datapoint_warn!("send-wire-async", ("failure", 1, i64),); + } + }); + Ok(()) + } + + fn send_wire_transaction_batch_async( + &self, + buffers: Vec>, + stats: Arc, + ) -> TransportResult<()> { + let _guard = RUNTIME.enter(); + let client = self.client.clone(); + //drop and detach the task + let _ = RUNTIME.spawn(async move { + let send_batch = client.send_batch(&buffers, &stats); + if let Err(e) = send_batch.await { + warn!("Failed to send transaction batch async to {:?}", e); + datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),); } }); Ok(()) @@ -269,13 +284,16 @@ impl QuicClient { .iter() .chunks(QUIC_MAX_CONCURRENT_STREAMS); - let futures = chunks.into_iter().map(|buffs| { - join_all( - buffs - .into_iter() - .map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)), - ) - }); + let futures: Vec<_> = chunks + .into_iter() + .map(|buffs| { + join_all( + buffs + .into_iter() + .map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)), + ) + }) + .collect(); for f in futures { f.await.into_iter().try_for_each(|res| res)?; diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index eb76336c2..e3a7c013e 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -70,4 +70,10 @@ pub trait TpuConnection { ) -> TransportResult<()> where T: AsRef<[u8]>; + + fn send_wire_transaction_batch_async( + &self, + buffers: Vec>, + stats: Arc, + ) -> TransportResult<()>; } diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index b13b899f8..f6db67813 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -62,4 +62,13 @@ impl TpuConnection for UdpTpuConnection { batch_send(&self.socket, &pkts)?; Ok(()) } + fn send_wire_transaction_batch_async( + &self, + buffers: Vec>, + _stats: Arc, + ) -> TransportResult<()> { + let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); + batch_send(&self.socket, &pkts)?; + Ok(()) + } }