Refactor tpu_client -- remove duplicate code (#30104)

Removed implementation in client/tpu_client and make it a thin wrapper to forward calls to the backend TpuClient. There is some minor change to coerce the client/ConnectionCache to connection-cache/ConnectionCache.
This commit is contained in:
Lijun Wang 2023-02-03 17:47:50 -08:00 committed by GitHub
parent 151585e596
commit a24363bb04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 70 deletions

View File

@ -116,7 +116,7 @@ fn create_client(
rpc_client, rpc_client,
websocket_url, websocket_url,
TpuClientConfig::default(), TpuClientConfig::default(),
Arc::new(connection_cache), Arc::new(connection_cache.into()),
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}"); eprintln!("Could not create TpuClient {err:?}");

View File

@ -131,14 +131,14 @@ fn test_bench_tps_test_validator(config: Config) {
CommitmentConfig::processed(), CommitmentConfig::processed(),
)); ));
let websocket_url = test_validator.rpc_pubsub_url(); let websocket_url = test_validator.rpc_pubsub_url();
let connection_cache = Arc::new(ConnectionCache::default()); let connection_cache = ConnectionCache::default();
let client = Arc::new( let client = Arc::new(
TpuClient::new_with_connection_cache( TpuClient::new_with_connection_cache(
rpc_client, rpc_client,
&websocket_url, &websocket_url,
TpuClientConfig::default(), TpuClientConfig::default(),
connection_cache, Arc::new(connection_cache.into()),
) )
.unwrap(), .unwrap(),
); );

View File

@ -2159,15 +2159,15 @@ fn send_deploy_messages(
if let Some(write_signer) = write_signer { if let Some(write_signer) = write_signer {
trace!("Writing program data"); trace!("Writing program data");
let connection_cache = if config.use_quic { let connection_cache = if config.use_quic {
Arc::new(ConnectionCache::new(1)) ConnectionCache::new(1)
} else { } else {
Arc::new(ConnectionCache::with_udp(1)) ConnectionCache::with_udp(1)
}; };
let tpu_client = TpuClient::new_with_connection_cache( let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(), rpc_client.clone(),
&config.websocket_url, &config.websocket_url,
TpuClientConfig::default(), TpuClientConfig::default(),
connection_cache, Arc::new(connection_cache.into()),
)?; )?;
let transaction_errors = tpu_client let transaction_errors = tpu_client
.send_and_confirm_messages_with_spinner( .send_and_confirm_messages_with_spinner(

View File

@ -109,6 +109,12 @@ impl ConnectionCache {
} }
} }
impl From<ConnectionCache> for BackendConnectionCache {
fn from(cache: ConnectionCache) -> Self {
cache.cache
}
}
impl Default for ConnectionCache { impl Default for ConnectionCache {
fn default() -> Self { fn default() -> Self {
if DEFAULT_CONNECTION_CACHE_USE_QUIC { if DEFAULT_CONNECTION_CACHE_USE_QUIC {

View File

@ -1,13 +1,6 @@
pub use {
crate::nonblocking::tpu_client::TpuSenderError,
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};
use { use {
crate::{ crate::connection_cache::ConnectionCache,
connection_cache::ConnectionCache, solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
},
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_rpc_client::rpc_client::RpcClient, solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{ solana_sdk::{
message::Message, message::Message,
@ -15,56 +8,51 @@ use {
transaction::{Transaction, TransactionError}, transaction::{Transaction, TransactionError},
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
solana_tpu_client::tpu_client::temporary_pub::Result, solana_tpu_client::tpu_client::{temporary_pub::Result, TpuClient as BackendTpuClient},
std::{net::UdpSocket, sync::Arc}, std::sync::Arc,
};
pub use {
crate::nonblocking::tpu_client::TpuSenderError,
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
}; };
/// Client which sends transactions directly to the current leader's TPU port over UDP. /// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info /// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
pub struct TpuClient { pub struct TpuClient {
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket tpu_client: BackendTpuClient,
//todo: get rid of this field
rpc_client: Arc<RpcClient>,
tpu_client: Arc<NonblockingTpuClient>,
} }
impl TpuClient { impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size /// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool { pub fn send_transaction(&self, transaction: &Transaction) -> bool {
self.invoke(self.tpu_client.send_transaction(transaction)) self.tpu_client.send_transaction(transaction)
} }
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool { pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) self.tpu_client.send_wire_transaction(wire_transaction)
} }
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size /// size
/// Returns the last error if all sends fail /// Returns the last error if all sends fail
pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
self.invoke(self.tpu_client.try_send_transaction(transaction)) self.tpu_client.try_send_transaction(transaction)
} }
/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
/// to fanout size /// to fanout size
/// Returns the last error if all sends fail /// Returns the last error if all sends fail
pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
let wire_transactions = transactions self.tpu_client.try_send_transaction_batch(transactions)
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.invoke(
self.tpu_client
.try_send_wire_transaction_batch(wire_transactions),
)
} }
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail /// Returns the last error if all sends fail
pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> { pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) self.tpu_client.try_send_wire_transaction(wire_transaction)
} }
/// Create a new client that disconnects when dropped /// Create a new client that disconnects when dropped
@ -73,16 +61,13 @@ impl TpuClient {
websocket_url: &str, websocket_url: &str,
config: TpuClientConfig, config: TpuClientConfig,
) -> Result<Self> { ) -> Result<Self> {
let create_tpu_client = let connection_cache = ConnectionCache::default();
NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); Self::new_with_connection_cache(
let tpu_client =
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
Ok(Self {
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
rpc_client, rpc_client,
tpu_client: Arc::new(tpu_client), websocket_url,
}) config,
Arc::new(connection_cache.into()),
)
} }
/// Create a new client that disconnects when dropped /// Create a new client that disconnects when dropped
@ -90,21 +75,15 @@ impl TpuClient {
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
websocket_url: &str, websocket_url: &str,
config: TpuClientConfig, config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<BackendConnectionCache>,
) -> Result<Self> { ) -> Result<Self> {
let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( Ok(Self {
rpc_client.get_inner_client().clone(), tpu_client: BackendTpuClient::new_with_connection_cache(
rpc_client,
websocket_url, websocket_url,
config, config,
connection_cache, connection_cache,
); )?,
let tpu_client =
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
Ok(Self {
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
rpc_client,
tpu_client: Arc::new(tpu_client),
}) })
} }
@ -113,20 +92,11 @@ impl TpuClient {
messages: &[Message], messages: &[Message],
signers: &T, signers: &T,
) -> Result<Vec<Option<TransactionError>>> { ) -> Result<Vec<Option<TransactionError>>> {
self.invoke(
self.tpu_client self.tpu_client
.send_and_confirm_messages_with_spinner(messages, signers), .send_and_confirm_messages_with_spinner(messages, signers)
)
} }
pub fn rpc_client(&self) -> &RpcClient { pub fn rpc_client(&self) -> &RpcClient {
&self.rpc_client self.tpu_client.rpc_client()
}
fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
// `block_on()` panics if called within an asynchronous execution context. Whereas
// `block_in_place()` only panics if called from a current_thread runtime, which is the
// lesser evil.
tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
} }
} }

View File

@ -461,14 +461,14 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) {
CommitmentConfig::processed(), CommitmentConfig::processed(),
)); ));
let connection_cache = match tpu_use_quic { let connection_cache = match tpu_use_quic {
true => Arc::new(ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE)), true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE),
false => Arc::new(ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE)), false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE),
}; };
let tpu_client = TpuClient::new_with_connection_cache( let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(), rpc_client.clone(),
&test_validator.rpc_pubsub_url(), &test_validator.rpc_pubsub_url(),
TpuClientConfig::default(), TpuClientConfig::default(),
connection_cache, Arc::new(connection_cache.into()),
) )
.unwrap(); .unwrap();