From ab85ab7a5bedcb4816fc97eabc689386be94f214 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 19 Oct 2022 10:42:14 -0700 Subject: [PATCH] handle semaphore locally without impacting the interface --- tpu-client/src/nonblocking/quic_client.rs | 22 ++-------- tpu-client/src/nonblocking/tpu_client.rs | 6 +-- tpu-client/src/nonblocking/tpu_connection.rs | 19 ++------ tpu-client/src/nonblocking/udp_client.rs | 28 +++--------- tpu-client/src/quic_client.rs | 46 ++++++++++---------- tpu-client/src/tpu_connection_cache.rs | 16 ++----- tpu-client/tests/quic_client.rs | 5 +-- 7 files changed, 42 insertions(+), 100 deletions(-) diff --git a/tpu-client/src/nonblocking/quic_client.rs b/tpu-client/src/nonblocking/quic_client.rs index 0646e19ad..e9339f5ef 100644 --- a/tpu-client/src/nonblocking/quic_client.rs +++ b/tpu-client/src/nonblocking/quic_client.rs @@ -3,8 +3,7 @@ //! server's flow control. use { crate::{ - connection_cache_stats::ConnectionCacheStats, - nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection}, + connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection, tpu_connection::ClientStats, }, async_mutex::Mutex, @@ -555,11 +554,7 @@ impl TpuConnection for QuicTpuConnection { self.client.tpu_addr() } - async fn send_wire_transaction_batch( - &self, - buffers: &[T], - callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -571,19 +566,11 @@ impl TpuConnection for QuicTpuConnection { .await; self.connection_stats .add_client_stats(&stats, len, res.is_ok()); - if let Some(callback) = callback { - callback(); - } - res?; Ok(()) } - async fn send_wire_transaction( - &self, - wire_transaction: T, - callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -602,9 +589,6 @@ impl TpuConnection for QuicTpuConnection { } else { self.connection_stats.add_client_stats(&stats, 1, true); } - if let Some(callback) = callback { - callback(); - } Ok(()) } } diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index ff6c671a1..6a4696904 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -228,8 +228,7 @@ async fn send_wire_transaction_to_addr( wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone(), &mut None) - .await + conn.send_wire_transaction(wire_transaction.clone()).await } async fn send_wire_transaction_batch_to_addr( @@ -238,8 +237,7 @@ async fn send_wire_transaction_batch_to_addr( wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions, &mut None) - .await + conn.send_wire_transaction_batch(wire_transactions).await } impl TpuClient { diff --git a/tpu-client/src/nonblocking/tpu_connection.rs b/tpu-client/src/nonblocking/tpu_connection.rs index 9a1d4bec9..9e819070b 100644 --- a/tpu-client/src/nonblocking/tpu_connection.rs +++ b/tpu-client/src/nonblocking/tpu_connection.rs @@ -17,9 +17,6 @@ pub enum NonblockingConnection { UdpTpuConnection, } -pub type SendTransactionCallback = Box; -pub type SendTransactionCallbackOption = Option; - #[async_trait] #[enum_dispatch(NonblockingConnection)] pub trait TpuConnection { @@ -28,27 +25,17 @@ pub trait TpuConnection { async fn serialize_and_send_transaction( &self, transaction: &VersionedTransaction, - callback: &mut SendTransactionCallbackOption, ) -> TransportResult<()> { let wire_transaction = bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction, callback) - .await + self.send_wire_transaction(&wire_transaction).await } - async fn send_wire_transaction( - &self, - wire_transaction: T, - callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync; - async fn send_wire_transaction_batch( - &self, - buffers: &[T], - callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync; } diff --git a/tpu-client/src/nonblocking/udp_client.rs b/tpu-client/src/nonblocking/udp_client.rs index f66afd463..1a4187650 100644 --- a/tpu-client/src/nonblocking/udp_client.rs +++ b/tpu-client/src/nonblocking/udp_client.rs @@ -2,12 +2,9 @@ //! an interface for sending transactions use { - crate::nonblocking::tpu_connection::{SendTransactionCallbackOption, TpuConnection}, - async_trait::async_trait, - core::iter::repeat, - solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, - std::net::SocketAddr, + crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait, + core::iter::repeat, solana_sdk::transport::Result as TransportResult, + solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, tokio::net::UdpSocket, }; @@ -33,11 +30,7 @@ impl TpuConnection for UdpTpuConnection { &self.addr } - async fn send_wire_transaction( - &self, - wire_transaction: T, - _callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -47,11 +40,7 @@ impl TpuConnection for UdpTpuConnection { Ok(()) } - async fn send_wire_transaction_batch( - &self, - buffers: &[T], - _callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { @@ -73,10 +62,7 @@ mod tests { async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) { let packet = vec![111u8; PACKET_DATA_SIZE]; - connection - .send_wire_transaction(&packet, &mut None) - .await - .unwrap(); + connection.send_wire_transaction(&packet).await.unwrap(); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap(); assert_eq!(1, recv); @@ -85,7 +71,7 @@ mod tests { async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) { let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); connection - .send_wire_transaction_batch(&packets, &mut None) + .send_wire_transaction_batch(&packets) .await .unwrap(); let mut packets = vec![Packet::default(); 32]; diff --git a/tpu-client/src/quic_client.rs b/tpu-client/src/quic_client.rs index b75ed01ea..d0a8cd1bd 100644 --- a/tpu-client/src/quic_client.rs +++ b/tpu-client/src/quic_client.rs @@ -104,6 +104,24 @@ impl QuicTpuConnection { } } +async fn send_wire_transaction_async( + connection: Arc, + wire_transaction: Vec, +) -> TransportResult<()> { + let result = connection.send_wire_transaction(wire_transaction).await; + ASYNC_TASK_SEMAPHORE.release(); + result +} + +async fn send_wire_transaction_batch_async( + connection: Arc, + buffers: Vec>, +) -> TransportResult<()> { + let result = connection.send_wire_transaction_batch(&buffers).await; + ASYNC_TASK_SEMAPHORE.release(); + result +} + impl TpuConnection for QuicTpuConnection { fn tpu_addr(&self) -> &SocketAddr { self.inner.tpu_addr() @@ -113,42 +131,24 @@ impl TpuConnection for QuicTpuConnection { where T: AsRef<[u8]> + Send + Sync, { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers, &mut None))?; + RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; Ok(()) } fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - //drop and detach the task - let _ = RUNTIME.spawn(async move { - inner - .send_wire_transaction( - wire_transaction, - &mut Some(Box::new(|| { - ASYNC_TASK_SEMAPHORE.release(); - })), - ) - .await - }); + let _ = RUNTIME + .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); Ok(()) } fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - - let _ = RUNTIME.spawn(async move { - inner - .send_wire_transaction_batch( - &buffers, - &mut Some(Box::new(|| { - ASYNC_TASK_SEMAPHORE.release(); - })), - ) - .await - }); + let _ = + RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); Ok(()) } } diff --git a/tpu-client/src/tpu_connection_cache.rs b/tpu-client/src/tpu_connection_cache.rs index 66ac2433e..669717cc8 100644 --- a/tpu-client/src/tpu_connection_cache.rs +++ b/tpu-client/src/tpu_connection_cache.rs @@ -341,9 +341,7 @@ mod tests { use { super::*, crate::{ - nonblocking::tpu_connection::{ - SendTransactionCallbackOption, TpuConnection as NonblockingTpuConnection, - }, + nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, tpu_connection::TpuConnection as BlockingTpuConnection, }, async_trait::async_trait, @@ -470,21 +468,13 @@ mod tests { fn tpu_addr(&self) -> &SocketAddr { &self.addr } - async fn send_wire_transaction( - &self, - _wire_transaction: T, - _callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction(&self, _wire_transaction: T) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { unimplemented!() } - async fn send_wire_transaction_batch( - &self, - _buffers: &[T], - _callback: &mut SendTransactionCallbackOption, - ) -> TransportResult<()> + async fn send_wire_transaction_batch(&self, _buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { diff --git a/tpu-client/tests/quic_client.rs b/tpu-client/tests/quic_client.rs index 4983bae82..31b8967dd 100644 --- a/tpu-client/tests/quic_client.rs +++ b/tpu-client/tests/quic_client.rs @@ -141,10 +141,7 @@ mod tests { let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(client - .send_wire_transaction_batch(&packets, &mut None) - .await - .is_ok()); + assert!(client.send_wire_transaction_batch(&packets).await.is_ok()); check_packets(receiver, num_bytes, num_expected_packets); exit.store(true, Ordering::Relaxed);