diff --git a/Cargo.lock b/Cargo.lock index d05c19559..0e62d1696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4904,6 +4904,7 @@ dependencies = [ "solana-clap-utils", "solana-cli-config", "solana-client", + "solana-connection-cache", "solana-core", "solana-faucet", "solana-genesis", diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index c8ede2899..27580f6b8 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -19,6 +19,7 @@ serde_yaml = "0.9.13" solana-clap-utils = { path = "../clap-utils", version = "=1.16.0" } solana-cli-config = { path = "../cli-config", version = "=1.16.0" } solana-client = { path = "../client", version = "=1.16.0" } +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-core = { path = "../core", version = "=1.16.0" } solana-faucet = { path = "../faucet", version = "=1.16.0" } solana-genesis = { path = "../genesis", version = "=1.16.0" } diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs index 0f348837a..c56da2ae6 100644 --- a/bench-tps/src/bench_tps_client/tpu_client.rs +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -1,13 +1,21 @@ use { crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result}, solana_client::tpu_client::TpuClient, + solana_connection_cache::connection_cache::{ + ConnectionManager, ConnectionPool, NewConnectionConfig, + }, solana_sdk::{ account::Account, commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash, message::Message, pubkey::Pubkey, signature::Signature, transaction::Transaction, }, }; -impl BenchTpsClient for TpuClient { +impl BenchTpsClient for TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ fn send_transaction(&self, transaction: Transaction) -> Result { let signature = transaction.signatures[0]; self.try_send_transaction(&transaction)?; diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index a225f30e8..682585076 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -110,19 +110,32 @@ fn create_client( true => ConnectionCache::new(tpu_connection_pool_size), false => ConnectionCache::with_udp(tpu_connection_pool_size), }; - - Arc::new( - TpuClient::new_with_connection_cache( - rpc_client, - websocket_url, - TpuClientConfig::default(), - Arc::new(connection_cache.into()), - ) - .unwrap_or_else(|err| { - eprintln!("Could not create TpuClient {err:?}"); - exit(1); - }), - ) + match connection_cache { + ConnectionCache::Udp(cache) => Arc::new( + TpuClient::new_with_connection_cache( + rpc_client, + websocket_url, + TpuClientConfig::default(), + Arc::new(cache), + ) + .unwrap_or_else(|err| { + eprintln!("Could not create TpuClient {err:?}"); + exit(1); + }), + ), + ConnectionCache::Quic(cache) => Arc::new( + TpuClient::new_with_connection_cache( + rpc_client, + websocket_url, + TpuClientConfig::default(), + Arc::new(cache), + ) + .unwrap_or_else(|err| { + eprintln!("Could not create TpuClient {err:?}"); + exit(1); + }), + ), + } } } } diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index c33300608..3da677ed1 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -9,7 +9,6 @@ use { spl_convert::FromOtherSolana, }, solana_client::{ - connection_cache::ConnectionCache, thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -131,17 +130,8 @@ fn test_bench_tps_test_validator(config: Config) { CommitmentConfig::processed(), )); let websocket_url = test_validator.rpc_pubsub_url(); - let connection_cache = ConnectionCache::default(); - - let client = Arc::new( - TpuClient::new_with_connection_cache( - rpc_client, - &websocket_url, - TpuClientConfig::default(), - Arc::new(connection_cache.into()), - ) - .unwrap(), - ); + let client = + Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap()); let lamports_per_account = 1000; diff --git a/cli/src/program.rs b/cli/src/program.rs index ec9138fde..aba183388 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -2163,21 +2163,32 @@ fn send_deploy_messages( } else { ConnectionCache::with_udp(1) }; - let tpu_client = TpuClient::new_with_connection_cache( - rpc_client.clone(), - &config.websocket_url, - TpuClientConfig::default(), - Arc::new(connection_cache.into()), - )?; - let transaction_errors = tpu_client + let transaction_errors = match connection_cache { + ConnectionCache::Udp(cache) => TpuClient::new_with_connection_cache( + rpc_client.clone(), + &config.websocket_url, + TpuClientConfig::default(), + Arc::new(cache), + )? .send_and_confirm_messages_with_spinner( write_messages, &[payer_signer, write_signer], - ) - .map_err(|err| format!("Data writes to account failed: {err}"))? - .into_iter() - .flatten() - .collect::>(); + ), + ConnectionCache::Quic(cache) => TpuClient::new_with_connection_cache( + rpc_client.clone(), + &config.websocket_url, + TpuClientConfig::default(), + Arc::new(cache), + )? + .send_and_confirm_messages_with_spinner( + write_messages, + &[payer_signer, write_signer], + ), + } + .map_err(|err| format!("Data writes to account failed: {err}"))? + .into_iter() + .flatten() + .collect::>(); if !transaction_errors.is_empty() { for transaction_error in &transaction_errors { diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index e7d060d5b..72af2678d 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,16 +1,16 @@ use { quinn::Endpoint, solana_connection_cache::{ - client_connection::ClientConnection as BlockingClientConnection, + client_connection::ClientConnection, connection_cache::{ - ConnectionCache as BackendConnectionCache, NewConnectionConfig, ProtocolType, + BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool, + NewConnectionConfig, }, - nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, - solana_quic_client::{QuicConfig, QuicConnectionManager}, - solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, + solana_sdk::{pubkey::Pubkey, signature::Keypair, transport::Result as TransportResult}, solana_streamer::streamer::StakedNodes, - solana_udp_client::UdpConnectionManager, + solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool}, std::{ error::Error, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -18,15 +18,28 @@ use { }, }; -pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; -pub const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true; -pub const MAX_CONNECTIONS: usize = 1024; +const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; +const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true; /// A thin wrapper over connection-cache/ConnectionCache to ease /// construction of the ConnectionCache for code dealing both with udp and quic. /// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly. -pub struct ConnectionCache { - cache: BackendConnectionCache, +pub enum ConnectionCache { + Quic(BackendConnectionCache), + Udp(BackendConnectionCache), +} + +type QuicBaseClientConnection = ::BaseClientConnection; +type UdpBaseClientConnection = ::BaseClientConnection; + +pub enum BlockingClientConnection { + Quic(Arc<::BlockingClientConnection>), + Udp(Arc<::BlockingClientConnection>), +} + +pub enum NonblockingClientConnection { + Quic(Arc<::NonblockingClientConnection>), + Udp(Arc<::NonblockingClientConnection>), } impl ConnectionCache { @@ -56,10 +69,9 @@ impl ConnectionCache { if let Some(stake_info) = stake_info { config.set_staked_nodes(stake_info.0, stake_info.1); } - let connection_manager = - Box::new(QuicConnectionManager::new_with_connection_config(config)); + let connection_manager = QuicConnectionManager::new_with_connection_config(config); let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); - Self { cache } + Self::Quic(cache) } #[deprecated( @@ -88,30 +100,31 @@ impl ConnectionCache { pub fn with_udp(connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); - let connection_manager = Box::::default(); + let connection_manager = UdpConnectionManager::default(); let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); - Self { cache } + Self::Udp(cache) } pub fn use_quic(&self) -> bool { - matches!(self.cache.get_protocol_type(), ProtocolType::QUIC) + matches!(self, Self::Quic(_)) } - pub fn get_connection(&self, addr: &SocketAddr) -> Arc { - self.cache.get_connection(addr) + pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection { + match self { + Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)), + Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)), + } } - pub fn get_nonblocking_connection( - &self, - addr: &SocketAddr, - ) -> Arc { - self.cache.get_nonblocking_connection(addr) - } -} - -impl From for BackendConnectionCache { - fn from(cache: ConnectionCache) -> Self { - cache.cache + pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection { + match self { + Self::Quic(cache) => { + NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr)) + } + Self::Udp(cache) => { + NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr)) + } + } } } @@ -131,9 +144,51 @@ impl Default for ConnectionCache { } } +macro_rules! dispatch { + ($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => { + #[inline] + $vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? { + match self { + Self::Quic(this) => this.$name($($arg, )?), + Self::Udp(this) => this.$name($($arg, )?), + } + } + }; +} + +impl ClientConnection for BlockingClientConnection { + dispatch!(fn server_addr(&self) -> &SocketAddr); + dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>); + dispatch!(fn send_data_async(&self, buffer: Vec) -> TransportResult<()>); + dispatch!(fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()>); + dispatch!(fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()>); +} + +#[async_trait::async_trait] +impl solana_connection_cache::nonblocking::client_connection::ClientConnection + for NonblockingClientConnection +{ + dispatch!(fn server_addr(&self) -> &SocketAddr); + + async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + match self { + Self::Quic(cache) => Ok(cache.send_data(buffer).await?), + Self::Udp(cache) => Ok(cache.send_data(buffer).await?), + } + } + + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + match self { + Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?), + Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?), + } + } +} + #[cfg(test)] mod tests { use { + super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair}, diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 09912d76f..b8a5f4143 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -1,7 +1,11 @@ pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError}; use { crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig}, - solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache, + solana_connection_cache::connection_cache::{ + ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool, + NewConnectionConfig, + }, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ message::Message, @@ -15,11 +19,20 @@ use { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { - tpu_client: BackendTpuClient, +pub struct TpuClient< + P, // ConnectionPool + M, // ConnectionManager + C, // NewConnectionConfig +> { + tpu_client: BackendTpuClient, } -impl TpuClient { +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub async fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -62,23 +75,39 @@ impl TpuClient { .try_send_wire_transaction_batch(wire_transactions) .await } +} +impl TpuClient { /// Create a new client that disconnects when dropped pub async fn new( rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, ) -> Result { - let connection_cache = Arc::new(ConnectionCache::default().into()); + let connection_cache = match ConnectionCache::default() { + ConnectionCache::Quic(cache) => Arc::new(cache), + ConnectionCache::Udp(_) => { + return Err(TpuSenderError::Custom(String::from( + "Invalid default connection cache", + ))) + } + }; Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await } +} +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Create a new client that disconnects when dropped pub async fn new_with_connection_cache( rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc, + connection_cache: Arc>, ) -> Result { Ok(Self { tpu_client: BackendTpuClient::new_with_connection_cache( diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 277fe8322..2d8b4cd3b 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -7,6 +7,7 @@ use { crate::connection_cache::ConnectionCache, log::*, rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_connection_cache::client_connection::ClientConnection, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 6dfd34285..f9cdf1ddf 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,6 +1,10 @@ use { crate::connection_cache::ConnectionCache, - solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache, + solana_connection_cache::connection_cache::{ + ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool, + NewConnectionConfig, + }, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ message::Message, @@ -19,11 +23,20 @@ pub use { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info /// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency. -pub struct TpuClient { - tpu_client: BackendTpuClient, +pub struct TpuClient< + P, // ConnectionPool + M, // ConnectionManager + C, // NewConnectionConfig +> { + tpu_client: BackendTpuClient, } -impl TpuClient { +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -54,28 +67,39 @@ impl TpuClient { pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { self.tpu_client.try_send_wire_transaction(wire_transaction) } +} +impl TpuClient { /// Create a new client that disconnects when dropped pub fn new( rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, ) -> Result { - let connection_cache = ConnectionCache::default(); - Self::new_with_connection_cache( - rpc_client, - websocket_url, - config, - Arc::new(connection_cache.into()), - ) + let connection_cache = match ConnectionCache::default() { + ConnectionCache::Quic(cache) => Arc::new(cache), + ConnectionCache::Udp(_) => { + return Err(TpuSenderError::Custom(String::from( + "Invalid default connection cache", + ))) + } + }; + Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache) } +} +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Create a new client that disconnects when dropped pub fn new_with_connection_cache( rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc, + connection_cache: Arc>, ) -> Result { Ok(Self { tpu_client: BackendTpuClient::new_with_connection_cache( diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 82e88b818..75e09c0ce 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -9,7 +9,6 @@ use { solana_measure::measure::Measure, solana_sdk::timing::AtomicInterval, std::{ - any::Any, net::SocketAddr, sync::{atomic::Ordering, Arc, RwLock}, }, @@ -17,38 +16,40 @@ use { }; // Should be non-zero -pub static MAX_CONNECTIONS: usize = 1024; +const MAX_CONNECTIONS: usize = 1024; /// Default connection pool size per remote address pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; -/// Defines the protocol types of an implementation supports. -pub enum ProtocolType { - UDP, - QUIC, -} +pub trait ConnectionManager { + type ConnectionPool: ConnectionPool; + type NewConnectionConfig: NewConnectionConfig; -pub trait ConnectionManager: Sync + Send { - fn new_connection_pool(&self) -> Box; - fn new_connection_config(&self) -> Box; + fn new_connection_pool(&self) -> Self::ConnectionPool; + fn new_connection_config(&self) -> Self::NewConnectionConfig; fn get_port_offset(&self) -> u16; - fn get_protocol_type(&self) -> ProtocolType; } -pub struct ConnectionCache { - pub map: RwLock>>, - pub connection_manager: Box, - pub stats: Arc, - pub last_stats: AtomicInterval, - pub connection_pool_size: usize, - pub connection_config: Box, +pub struct ConnectionCache< + R, // ConnectionPool + S, // ConnectionManager + T, // NewConnectionConfig +> { + map: RwLock>, + connection_manager: S, + stats: Arc, + last_stats: AtomicInterval, + connection_pool_size: usize, + connection_config: T, } -impl ConnectionCache { - pub fn new( - connection_manager: Box, - connection_pool_size: usize, - ) -> Result { +impl ConnectionCache +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ + pub fn new(connection_manager: M, connection_pool_size: usize) -> Result { let config = connection_manager.new_connection_config(); Ok(Self::new_with_config( connection_pool_size, @@ -59,8 +60,8 @@ impl ConnectionCache { pub fn new_with_config( connection_pool_size: usize, - connection_config: Box, - connection_manager: Box, + connection_config: C, + connection_manager: M, ) -> Self { Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), @@ -79,7 +80,7 @@ impl ConnectionCache { &self, lock_timing_ms: &mut u64, addr: &SocketAddr, - ) -> CreateConnectionResult { + ) -> CreateConnectionResult<

::BaseClientConnection> { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let mut map = self.map.write().unwrap(); get_connection_map_lock_measure.stop(); @@ -113,11 +114,11 @@ impl ConnectionCache { map.entry(*addr) .and_modify(|pool| { - pool.add_connection(&*self.connection_config, addr); + pool.add_connection(&self.connection_config, addr); }) .or_insert_with(|| { let mut pool = self.connection_manager.new_connection_pool(); - pool.add_connection(&*self.connection_config, addr); + pool.add_connection(&self.connection_config, addr); pool }); ( @@ -141,7 +142,10 @@ impl ConnectionCache { } } - fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { + fn get_or_add_connection( + &self, + addr: &SocketAddr, + ) -> GetConnectionResult<

::BaseClientConnection> { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); @@ -207,7 +211,10 @@ impl ConnectionCache { fn get_connection_and_log_stats( &self, addr: &SocketAddr, - ) -> (Arc, Arc) { + ) -> ( + Arc<

::BaseClientConnection>, + Arc, + ) { let mut get_connection_measure = Measure::start("get_connection_measure"); let GetConnectionResult { connection, @@ -257,7 +264,7 @@ impl ConnectionCache { (connection, connection_cache_stats) } - pub fn get_connection(&self, addr: &SocketAddr) -> Arc { + pub fn get_connection(&self, addr: &SocketAddr) -> Arc<<

::BaseClientConnection as BaseClientConnection>::BlockingClientConnection>{ let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_blocking_connection(*addr, connection_cache_stats) } @@ -265,14 +272,10 @@ impl ConnectionCache { pub fn get_nonblocking_connection( &self, addr: &SocketAddr, - ) -> Arc { + ) -> Arc<<

::BaseClientConnection as BaseClientConnection>::NonblockingClientConnection>{ let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_nonblocking_connection(*addr, connection_cache_stats) } - - pub fn get_protocol_type(&self) -> ProtocolType { - self.connection_manager.get_protocol_type() - } } #[derive(Error, Debug)] @@ -290,28 +293,26 @@ pub enum ClientError { IoError(#[from] std::io::Error), } -pub trait NewConnectionConfig: Sync + Send { - fn new() -> Result - where - Self: Sized; - fn as_any(&self) -> &dyn Any; - - fn as_mut_any(&mut self) -> &mut dyn Any; +pub trait NewConnectionConfig: Sized { + fn new() -> Result; } -pub trait ConnectionPool: Sync + Send { +pub trait ConnectionPool { + type NewConnectionConfig: NewConnectionConfig; + type BaseClientConnection: BaseClientConnection; + /// Add a connection to the pool - fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr); + fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr); /// Get the number of current connections in the pool fn num_connections(&self) -> usize; /// Get a connection based on its index in the pool, without checking if the - fn get(&self, index: usize) -> Result, ConnectionPoolError>; + fn get(&self, index: usize) -> Result, ConnectionPoolError>; /// Get a connection from the pool. It must have at least one connection in the pool. /// This randomly picks a connection in the pool. - fn borrow_connection(&self) -> Arc { + fn borrow_connection(&self) -> Arc { let mut rng = thread_rng(); let n = rng.gen_range(0, self.num_connections()); self.get(n).expect("index is within num_connections") @@ -324,27 +325,30 @@ pub trait ConnectionPool: Sync + Send { fn create_pool_entry( &self, - config: &dyn NewConnectionConfig, + config: &Self::NewConnectionConfig, addr: &SocketAddr, - ) -> Arc; + ) -> Arc; } -pub trait BaseClientConnection: Sync + Send { +pub trait BaseClientConnection { + type BlockingClientConnection: BlockingClientConnection; + type NonblockingClientConnection: NonblockingClientConnection; + fn new_blocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Arc; + ) -> Arc; fn new_nonblocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Arc; + ) -> Arc; } -struct GetConnectionResult { - connection: Arc, +struct GetConnectionResult { + connection: Arc, cache_hit: bool, report_stats: bool, map_timing_ms: u64, @@ -354,8 +358,8 @@ struct GetConnectionResult { eviction_timing_ms: u64, } -struct CreateConnectionResult { - connection: Arc, +struct CreateConnectionResult { + connection: Arc, cache_hit: bool, connection_cache_stats: Arc, num_evictions: u64, @@ -382,11 +386,14 @@ mod tests { const MOCK_PORT_OFFSET: u16 = 42; - pub struct MockUdpPool { - connections: Vec>, + struct MockUdpPool { + connections: Vec>, } impl ConnectionPool for MockUdpPool { - fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + type NewConnectionConfig = MockUdpConfig; + type BaseClientConnection = MockUdp; + + fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) { let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -395,7 +402,10 @@ mod tests { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get( + &self, + index: usize, + ) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -404,19 +414,14 @@ mod tests { fn create_pool_entry( &self, - config: &dyn NewConnectionConfig, + config: &Self::NewConnectionConfig, _addr: &SocketAddr, - ) -> Arc { - let config: &MockUdpConfig = match config.as_any().downcast_ref::() { - Some(b) => b, - None => panic!("Expecting a MockUdpConfig!"), - }; - + ) -> Arc { Arc::new(MockUdp(config.udp_socket.clone())) } } - pub struct MockUdpConfig { + struct MockUdpConfig { udp_socket: Arc, } @@ -440,23 +445,18 @@ mod tests { ), }) } - - fn as_any(&self) -> &dyn Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } } - pub struct MockUdp(Arc); + struct MockUdp(Arc); impl BaseClientConnection for MockUdp { + type BlockingClientConnection = MockUdpConnection; + type NonblockingClientConnection = MockUdpConnection; + fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, @@ -467,7 +467,7 @@ mod tests { &self, addr: SocketAddr, _stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, @@ -475,32 +475,31 @@ mod tests { } } - pub struct MockUdpConnection { + struct MockUdpConnection { _socket: Arc, addr: SocketAddr, } #[derive(Default)] - pub struct MockConnectionManager {} + struct MockConnectionManager {} impl ConnectionManager for MockConnectionManager { - fn new_connection_pool(&self) -> Box { - Box::new(MockUdpPool { + type ConnectionPool = MockUdpPool; + type NewConnectionConfig = MockUdpConfig; + + fn new_connection_pool(&self) -> Self::ConnectionPool { + MockUdpPool { connections: Vec::default(), - }) + } } - fn new_connection_config(&self) -> Box { - Box::new(MockUdpConfig::new().unwrap()) + fn new_connection_config(&self) -> Self::NewConnectionConfig { + MockUdpConfig::new().unwrap() } fn get_port_offset(&self) -> u16 { MOCK_PORT_OFFSET } - - fn get_protocol_type(&self) -> ProtocolType { - ProtocolType::UDP - } } impl BlockingClientConnection for MockUdpConnection { @@ -560,7 +559,7 @@ mod tests { // we can actually connect to those addresses - ClientConnection implementations should either // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) - let connection_manager = Box::::default(); + let connection_manager = MockConnectionManager::default(); let connection_cache = ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); let port_offset = MOCK_PORT_OFFSET; @@ -583,7 +582,14 @@ mod tests { let conn = &map.get(addr).expect("Address not found").get(0).unwrap(); let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); - assert!(addr.ip() == conn.server_addr().ip()); + assert_eq!( + BlockingClientConnection::server_addr(&*conn).ip(), + addr.ip(), + ); + assert_eq!( + NonblockingClientConnection::server_addr(&*conn).ip(), + addr.ip(), + ); }); } @@ -608,14 +614,18 @@ mod tests { let port = u16::MAX - MOCK_PORT_OFFSET + 1; assert!(port.checked_add(MOCK_PORT_OFFSET).is_none()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let connection_manager = Box::::default(); + let connection_manager = MockConnectionManager::default(); let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); let conn = connection_cache.get_connection(&addr); // We (intentionally) don't have an interface that allows us to distinguish between // UDP and Quic connections, so check instead that the port is valid (non-zero) // and is the same as the input port (falling back on UDP) - assert!(conn.server_addr().port() != 0); - assert!(conn.server_addr().port() == port); + assert_ne!(port, 0u16); + assert_eq!(BlockingClientConnection::server_addr(&*conn).port(), port); + assert_eq!( + NonblockingClientConnection::server_addr(&*conn).port(), + port + ); } } diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 38165d35f..c41aad097 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -7,7 +7,7 @@ use { tracer_packet_stats::TracerPacketStats, unprocessed_transaction_storage::UnprocessedTransactionStorage, }, - solana_client::connection_cache::ConnectionCache, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_perf::{data_budget::DataBudget, packet::Packet}, diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 0772eb65a..ecf8cde42 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,7 +3,7 @@ use { rand::{thread_rng, Rng}, - solana_client::connection_cache::ConnectionCache, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ diff --git a/dos/src/main.rs b/dos/src/main.rs index f3db52dd0..1f14c059b 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::connection_cache::ConnectionCache, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 855eefeff..182c4ed6b 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -16,13 +16,11 @@ use { }, quinn::Endpoint, solana_connection_cache::{ - client_connection::ClientConnection as BlockingClientConnection, connection_cache::{ BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, - ConnectionPoolError, NewConnectionConfig, ProtocolType, + ConnectionPoolError, NewConnectionConfig, }, connection_cache_stats::ConnectionCacheStats, - nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ @@ -31,7 +29,6 @@ use { tls_certificates::new_self_signed_tls_certificate, }, std::{ - any::Any, error::Error, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{Arc, RwLock}, @@ -46,11 +43,14 @@ pub enum QuicClientError { } pub struct QuicPool { - connections: Vec>, + connections: Vec>, endpoint: Arc, } impl ConnectionPool for QuicPool { - fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + type BaseClientConnection = Quic; + type NewConnectionConfig = QuicConfig; + + fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) { let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -59,7 +59,7 @@ impl ConnectionPool for QuicPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -68,10 +68,9 @@ impl ConnectionPool for QuicPool { fn create_pool_entry( &self, - config: &dyn NewConnectionConfig, + config: &Self::NewConnectionConfig, addr: &SocketAddr, - ) -> Arc { - let config = QuicConfig::downcast_ref(config); + ) -> Arc { Arc::new(Quic(Arc::new(QuicClient::new( self.endpoint.clone(), *addr, @@ -105,14 +104,6 @@ impl NewConnectionConfig for QuicConfig { client_endpoint: None, }) } - - fn as_any(&self) -> &dyn Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } } impl QuicConfig { @@ -166,23 +157,18 @@ impl QuicConfig { pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) { self.client_endpoint = Some(client_endpoint); } - - /// Convenient function to downcast a generic NewConnectionConfig reference to QuicConfig - pub fn downcast_ref(config: &dyn NewConnectionConfig) -> &Self { - match config.as_any().downcast_ref::() { - Some(config) => config, - None => panic!("Expecting a QuicConfig!"), - } - } } pub struct Quic(Arc); impl BaseClientConnection for Quic { + type BlockingClientConnection = BlockingQuicClientConnection; + type NonblockingClientConnection = NonblockingQuicClientConnection; + fn new_blocking_connection( &self, _addr: SocketAddr, stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(BlockingQuicClientConnection::new_with_client( self.0.clone(), stats, @@ -193,7 +179,7 @@ impl BaseClientConnection for Quic { &self, _addr: SocketAddr, stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(NonblockingQuicClientConnection::new_with_client( self.0.clone(), stats, @@ -203,40 +189,39 @@ impl BaseClientConnection for Quic { #[derive(Default)] pub struct QuicConnectionManager { - connection_config: Option>, + connection_config: Option, } impl ConnectionManager for QuicConnectionManager { - fn new_connection_pool(&self) -> Box { - Box::new(QuicPool { + type ConnectionPool = QuicPool; + type NewConnectionConfig = QuicConfig; + + fn new_connection_pool(&self) -> Self::ConnectionPool { + QuicPool { connections: Vec::default(), - endpoint: Arc::new(self.connection_config.as_ref().map_or( - QuicLazyInitializedEndpoint::default(), - |config| { - let config = QuicConfig::downcast_ref(config.as_ref()); - config.create_endpoint() - }, - )), - }) + endpoint: Arc::new( + self.connection_config + .as_ref() + .map_or(QuicLazyInitializedEndpoint::default(), |config| { + config.create_endpoint() + }), + ), + } } - fn new_connection_config(&self) -> Box { - Box::new(QuicConfig::new().unwrap()) + fn new_connection_config(&self) -> QuicConfig { + QuicConfig::new().unwrap() } fn get_port_offset(&self) -> u16 { QUIC_PORT_OFFSET } - - fn get_protocol_type(&self) -> ProtocolType { - ProtocolType::QUIC - } } impl QuicConnectionManager { pub fn new_with_connection_config(config: QuicConfig) -> Self { Self { - connection_config: Some(Box::new(config)), + connection_config: Some(config), } } } diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index bb3062257..885f8ac7f 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -464,19 +464,28 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), }; - let tpu_client = TpuClient::new_with_connection_cache( - rpc_client.clone(), - &test_validator.rpc_pubsub_url(), - TpuClientConfig::default(), - Arc::new(connection_cache.into()), - ) - .unwrap(); - let recent_blockhash = rpc_client.get_latest_blockhash().unwrap(); let tx = system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash); - assert!(tpu_client.send_transaction(&tx)); - + let success = match connection_cache { + ConnectionCache::Quic(cache) => TpuClient::new_with_connection_cache( + rpc_client.clone(), + &test_validator.rpc_pubsub_url(), + TpuClientConfig::default(), + Arc::new(cache), + ) + .unwrap() + .send_transaction(&tx), + ConnectionCache::Udp(cache) => TpuClient::new_with_connection_cache( + rpc_client.clone(), + &test_validator.rpc_pubsub_url(), + TpuClientConfig::default(), + Arc::new(cache), + ) + .unwrap() + .send_transaction(&tx), + }; + assert!(success); let timeout = Duration::from_secs(5); let now = Instant::now(); let signatures = vec![tx.signatures[0]]; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index f96794b15..45db7fe0c 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,7 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::connection_cache::ConnectionCache, + solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, diff --git a/thin-client/src/thin_client.rs b/thin-client/src/thin_client.rs index 7cde909b2..72e3c68a4 100644 --- a/thin-client/src/thin_client.rs +++ b/thin-client/src/thin_client.rs @@ -6,7 +6,12 @@ use { log::*, rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_connection_cache::connection_cache::ConnectionCache, + solana_connection_cache::{ + client_connection::ClientConnection, + connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + }, + }, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ @@ -111,21 +116,30 @@ pub mod temporary_pub { use temporary_pub::*; /// An object for querying and sending transactions to the network. -pub struct ThinClient { +pub struct ThinClient< + P, // ConnectionPool + M, // ConnectionManager + C, // NewConnectionConfig +> { rpc_clients: Vec, tpu_addrs: Vec, optimizer: ClientOptimizer, - connection_cache: Arc, + connection_cache: Arc>, } -impl ThinClient { +impl ThinClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP /// (currently hardcoded to UDP) pub fn new( rpc_addr: SocketAddr, tpu_addr: SocketAddr, - connection_cache: Arc, + connection_cache: Arc>, ) -> Self { Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache) } @@ -134,7 +148,7 @@ impl ThinClient { rpc_addr: SocketAddr, tpu_addr: SocketAddr, timeout: Duration, - connection_cache: Arc, + connection_cache: Arc>, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); Self::new_from_client(rpc_client, tpu_addr, connection_cache) @@ -143,7 +157,7 @@ impl ThinClient { fn new_from_client( rpc_client: RpcClient, tpu_addr: SocketAddr, - connection_cache: Arc, + connection_cache: Arc>, ) -> Self { Self { rpc_clients: vec![rpc_client], @@ -156,7 +170,7 @@ impl ThinClient { pub fn new_from_addrs( rpc_addrs: Vec, tpu_addrs: Vec, - connection_cache: Arc, + connection_cache: Arc>, ) -> Self { assert!(!rpc_addrs.is_empty()); assert_eq!(rpc_addrs.len(), tpu_addrs.len()); @@ -314,13 +328,23 @@ impl ThinClient { } } -impl Client for ThinClient { +impl Client for ThinClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ fn tpu_addr(&self) -> String { self.tpu_addr().to_string() } } -impl SyncClient for ThinClient { +impl SyncClient for ThinClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ fn send_and_confirm_message( &self, keypairs: &T, @@ -600,7 +624,12 @@ impl SyncClient for ThinClient { } } -impl AsyncClient for ThinClient { +impl AsyncClient for ThinClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ fn async_send_versioned_transaction( &self, transaction: VersionedTransaction, diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 626858fe3..8786f947f 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -11,8 +11,12 @@ use { bincode::serialize, futures_util::{future::join_all, stream::StreamExt}, log::*, - solana_connection_cache::connection_cache::{ - ConnectionCache, ConnectionManager, DEFAULT_CONNECTION_POOL_SIZE, + solana_connection_cache::{ + connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + DEFAULT_CONNECTION_POOL_SIZE, + }, + nonblocking::client_connection::ClientConnection, }, solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError}, solana_rpc_client::nonblocking::rpc_client::RpcClient, @@ -250,33 +254,52 @@ impl LeaderTpuCache { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient< + P, // ConnectionPool + M, // ConnectionManager + C, // NewConnectionConfig +> { fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, rpc_client: Arc, - connection_cache: Arc, + connection_cache: Arc>, } -async fn send_wire_transaction_to_addr( - connection_cache: &ConnectionCache, +async fn send_wire_transaction_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transaction: Vec, -) -> TransportResult<()> { +) -> TransportResult<()> +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ let conn = connection_cache.get_nonblocking_connection(addr); conn.send_data(&wire_transaction).await } -async fn send_wire_transaction_batch_to_addr( - connection_cache: &ConnectionCache, +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transactions: &[Vec], -) -> TransportResult<()> { +) -> TransportResult<()> +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ let conn = connection_cache.get_nonblocking_connection(addr); conn.send_data_batch(wire_transactions).await } -impl TpuClient { +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub async fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -391,7 +414,7 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_manager: Box, + connection_manager: M, ) -> Result { let connection_cache = Arc::new( ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(), @@ -404,7 +427,7 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc, + connection_cache: Arc>, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let leader_tpu_service = @@ -553,7 +576,7 @@ impl TpuClient { } } -impl Drop for TpuClient { +impl Drop for TpuClient { fn drop(&mut self) { self.exit.store(true, Ordering::Relaxed); } diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 887f2233e..a927faf2c 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -2,7 +2,9 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_connection_cache::connection_cache::{ConnectionCache, ConnectionManager}, + solana_connection_cache::connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + }, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, std::{ @@ -59,14 +61,23 @@ impl Default for TpuClientConfig { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient< + P, // ConnectionPool + M, // ConnectionManager + C, // NewConnectionConfig +> { _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket //todo: get rid of this field rpc_client: Arc, - tpu_client: Arc, + tpu_client: Arc>, } -impl TpuClient { +impl TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -110,7 +121,7 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_manager: Box, + connection_manager: M, ) -> Result { let create_tpu_client = NonblockingTpuClient::new( rpc_client.get_inner_client().clone(), @@ -133,7 +144,7 @@ impl TpuClient { rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc, + connection_cache: Arc>, ) -> Result { let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( rpc_client.get_inner_client().clone(), diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index d634afef5..995aff178 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -9,26 +9,26 @@ use { udp_client::UdpClientConnection as BlockingUdpConnection, }, solana_connection_cache::{ - client_connection::ClientConnection as BlockingClientConnection, connection_cache::{ BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, - ConnectionPoolError, NewConnectionConfig, ProtocolType, + ConnectionPoolError, NewConnectionConfig, }, connection_cache_stats::ConnectionCacheStats, - nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, std::{ - any::Any, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::Arc, }, }; pub struct UdpPool { - connections: Vec>, + connections: Vec>, } impl ConnectionPool for UdpPool { - fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + type BaseClientConnection = Udp; + type NewConnectionConfig = UdpConfig; + + fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) { let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -37,7 +37,7 @@ impl ConnectionPool for UdpPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -46,13 +46,9 @@ impl ConnectionPool for UdpPool { fn create_pool_entry( &self, - config: &dyn NewConnectionConfig, + config: &Self::NewConnectionConfig, _addr: &SocketAddr, - ) -> Arc { - let config: &UdpConfig = match config.as_any().downcast_ref::() { - Some(b) => b, - None => panic!("Expecting a UdpConfig!"), - }; + ) -> Arc { Arc::new(Udp(config.udp_socket.clone())) } } @@ -69,23 +65,18 @@ impl NewConnectionConfig for UdpConfig { udp_socket: Arc::new(socket), }) } - - fn as_any(&self) -> &dyn Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } } pub struct Udp(Arc); impl BaseClientConnection for Udp { + type BlockingClientConnection = BlockingUdpConnection; + type NonblockingClientConnection = NonblockingUdpConnection; + fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(BlockingUdpConnection::new_from_addr(self.0.clone(), addr)) } @@ -93,7 +84,7 @@ impl BaseClientConnection for Udp { &self, addr: SocketAddr, _stats: Arc, - ) -> Arc { + ) -> Arc { Arc::new(NonblockingUdpConnection::new_from_addr( self.0.try_clone().unwrap(), addr, @@ -105,21 +96,19 @@ impl BaseClientConnection for Udp { pub struct UdpConnectionManager {} impl ConnectionManager for UdpConnectionManager { - fn new_connection_pool(&self) -> Box { - Box::new(UdpPool { + type ConnectionPool = UdpPool; + type NewConnectionConfig = UdpConfig; + fn new_connection_pool(&self) -> Self::ConnectionPool { + UdpPool { connections: Vec::default(), - }) + } } - fn new_connection_config(&self) -> Box { - Box::new(UdpConfig::new().unwrap()) + fn new_connection_config(&self) -> Self::NewConnectionConfig { + UdpConfig::new().unwrap() } fn get_port_offset(&self) -> u16 { 0 } - - fn get_protocol_type(&self) -> ProtocolType { - ProtocolType::UDP - } }