diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 534f661d7d..a225f30e89 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -116,7 +116,7 @@ fn create_client( rpc_client, websocket_url, TpuClientConfig::default(), - Arc::new(connection_cache), + Arc::new(connection_cache.into()), ) .unwrap_or_else(|err| { eprintln!("Could not create TpuClient {err:?}"); diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index f8098fbe7f..c33300608a 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -131,14 +131,14 @@ fn test_bench_tps_test_validator(config: Config) { CommitmentConfig::processed(), )); let websocket_url = test_validator.rpc_pubsub_url(); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = ConnectionCache::default(); let client = Arc::new( TpuClient::new_with_connection_cache( rpc_client, &websocket_url, TpuClientConfig::default(), - connection_cache, + Arc::new(connection_cache.into()), ) .unwrap(), ); diff --git a/cli/src/program.rs b/cli/src/program.rs index 8503e40f2e..ec9138fdec 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -2159,15 +2159,15 @@ fn send_deploy_messages( if let Some(write_signer) = write_signer { trace!("Writing program data"); let connection_cache = if config.use_quic { - Arc::new(ConnectionCache::new(1)) + ConnectionCache::new(1) } else { - Arc::new(ConnectionCache::with_udp(1)) + ConnectionCache::with_udp(1) }; let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &config.websocket_url, TpuClientConfig::default(), - connection_cache, + Arc::new(connection_cache.into()), )?; let transaction_errors = tpu_client .send_and_confirm_messages_with_spinner( diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 2fba4efc70..e7d060d5b5 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -109,6 +109,12 @@ impl ConnectionCache { } } +impl From for BackendConnectionCache { + fn from(cache: ConnectionCache) -> Self { + cache.cache + } +} + impl Default for ConnectionCache { fn default() -> Self { if DEFAULT_CONNECTION_CACHE_USE_QUIC { diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 8beccdca51..6dfd342854 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,13 +1,6 @@ -pub use { - crate::nonblocking::tpu_client::TpuSenderError, - solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, -}; use { - crate::{ - connection_cache::ConnectionCache, - nonblocking::tpu_client::TpuClient as NonblockingTpuClient, - }, - rayon::iter::{IntoParallelIterator, ParallelIterator}, + crate::connection_cache::ConnectionCache, + solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ message::Message, @@ -15,56 +8,51 @@ use { transaction::{Transaction, TransactionError}, transport::Result as TransportResult, }, - solana_tpu_client::tpu_client::temporary_pub::Result, - std::{net::UdpSocket, sync::Arc}, + solana_tpu_client::tpu_client::{temporary_pub::Result, TpuClient as BackendTpuClient}, + std::sync::Arc, +}; +pub use { + crate::nonblocking::tpu_client::TpuSenderError, + solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, }; /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info +/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency. pub struct TpuClient { - _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket - //todo: get rid of this field - rpc_client: Arc, - tpu_client: Arc, + tpu_client: BackendTpuClient, } impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { - self.invoke(self.tpu_client.send_transaction(transaction)) + self.tpu_client.send_transaction(transaction) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { - self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) + self.tpu_client.send_wire_transaction(wire_transaction) } /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size /// Returns the last error if all sends fail pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { - self.invoke(self.tpu_client.try_send_transaction(transaction)) + self.tpu_client.try_send_transaction(transaction) } /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according /// to fanout size /// Returns the last error if all sends fail pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { - let wire_transactions = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - self.invoke( - self.tpu_client - .try_send_wire_transaction_batch(wire_transactions), - ) + self.tpu_client.try_send_transaction_batch(transactions) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { - self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) + self.tpu_client.try_send_wire_transaction(wire_transaction) } /// Create a new client that disconnects when dropped @@ -73,16 +61,13 @@ impl TpuClient { websocket_url: &str, config: TpuClientConfig, ) -> Result { - let create_tpu_client = - NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); - let tpu_client = - tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; - - Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + let connection_cache = ConnectionCache::default(); + Self::new_with_connection_cache( rpc_client, - tpu_client: Arc::new(tpu_client), - }) + websocket_url, + config, + Arc::new(connection_cache.into()), + ) } /// Create a new client that disconnects when dropped @@ -90,21 +75,15 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc, + connection_cache: Arc, ) -> Result { - let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( - rpc_client.get_inner_client().clone(), - websocket_url, - config, - connection_cache, - ); - let tpu_client = - tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; - Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), - rpc_client, - tpu_client: Arc::new(tpu_client), + tpu_client: BackendTpuClient::new_with_connection_cache( + rpc_client, + websocket_url, + config, + connection_cache, + )?, }) } @@ -113,20 +92,11 @@ impl TpuClient { messages: &[Message], signers: &T, ) -> Result>> { - self.invoke( - self.tpu_client - .send_and_confirm_messages_with_spinner(messages, signers), - ) + self.tpu_client + .send_and_confirm_messages_with_spinner(messages, signers) } pub fn rpc_client(&self) -> &RpcClient { - &self.rpc_client - } - - fn invoke>(&self, f: F) -> T { - // `block_on()` panics if called within an asynchronous execution context. Whereas - // `block_in_place()` only panics if called from a current_thread runtime, which is the - // lesser evil. - tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f)) + self.tpu_client.rpc_client() } } diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 54fbf26fd1..bb30622574 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -461,14 +461,14 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { CommitmentConfig::processed(), )); let connection_cache = match tpu_use_quic { - true => Arc::new(ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE)), - false => Arc::new(ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE)), + true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), }; let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &test_validator.rpc_pubsub_url(), TpuClientConfig::default(), - connection_cache, + Arc::new(connection_cache.into()), ) .unwrap();