diff --git a/Cargo.lock b/Cargo.lock index ab32674221..ea4d7e5a4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5181,6 +5181,7 @@ dependencies = [ "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -5248,6 +5249,28 @@ dependencies = [ "solana-sdk 1.16.0", ] +[[package]] +name = "solana-connection-cache" +version = "1.16.0" +dependencies = [ + "async-trait", + "bincode", + "futures-util", + "indexmap", + "indicatif", + "log", + "rand 0.7.3", + "rand_chacha 0.2.2", + "rayon", + "solana-logger 1.16.0", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-sdk 1.16.0", + "thiserror", + "tokio", +] + [[package]] name = "solana-core" version = "1.16.0" @@ -6190,6 +6213,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustls 0.20.6", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -6765,6 +6789,7 @@ dependencies = [ "bincode", "log", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-rpc-client", "solana-rpc-client-api", @@ -6817,6 +6842,7 @@ dependencies = [ "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -6885,6 +6911,7 @@ name = "solana-udp-client" version = "1.16.0" dependencies = [ "async-trait", + "solana-connection-cache", "solana-net-utils", "solana-sdk 1.16.0", "solana-streamer", diff --git a/Cargo.toml b/Cargo.toml index d65082e335..16b5dbb6e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "cli-output", "client", "client-test", + "connection-cache", "core", "dos", "download-utils", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 4fbe37400b..5d55c85b6e 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -32,7 +32,7 @@ use { transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ sync::{atomic::Ordering, Arc, RwLock}, thread::sleep, diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 2195f697b6..81e7c0377f 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -8,9 +8,7 @@ use { pubkey::Pubkey, signature::{read_keypair_file, Keypair}, }, - solana_tpu_client::tpu_connection_cache::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, - }, + solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, std::{ net::{Ipv4Addr, SocketAddr}, process::exit, diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 59f38038a5..015375ecd6 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -31,7 +31,7 @@ use { stake::{instruction::LockupArgs, state::Lockup}, transaction::{TransactionError, VersionedTransaction}, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, solana_vote_program::vote_state::VoteAuthorize, std::{collections::HashMap, error, io::stdout, str::FromStr, sync::Arc, time::Duration}, thiserror::Error, diff --git a/cli/src/main.rs b/cli/src/main.rs index 5c459d4a8d..4b2de432ea 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -17,7 +17,7 @@ use { }, solana_remote_wallet::remote_wallet::RemoteWalletManager, solana_rpc_client_api::config::RpcSendTransactionConfig, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{collections::HashMap, error, path::PathBuf, sync::Arc, time::Duration}, }; diff --git a/client/Cargo.toml b/client/Cargo.toml index 2f1d7cc59c..fe45e65fc6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -21,6 +21,8 @@ log = "0.4.17" quinn = "0.9.3" rand = "0.7.0" rayon = "1.5.3" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index e8b1843c14..2fba4efc70 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,475 +1,136 @@ -pub use solana_tpu_client::tpu_connection_cache::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, -}; use { - crate::{ - nonblocking::tpu_connection::NonblockingConnection, tpu_connection::BlockingConnection, - }, - indexmap::map::{Entry, IndexMap}, quinn::Endpoint, - rand::{thread_rng, Rng}, - solana_measure::measure::Measure, - solana_quic_client::nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, - }, - solana_sdk::{ - pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval, - }, - solana_streamer::{ - nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, - streamer::StakedNodes, - tls_certificates::new_self_signed_tls_certificate, - }, - solana_tpu_client::{ - connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL}, - tpu_connection_cache::MAX_CONNECTIONS, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + ConnectionCache as BackendConnectionCache, NewConnectionConfig, ProtocolType, + }, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, + solana_quic_client::{QuicConfig, QuicConnectionManager}, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_streamer::streamer::StakedNodes, + solana_udp_client::UdpConnectionManager, std::{ error::Error, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, - sync::{atomic::Ordering, Arc, RwLock}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::{Arc, RwLock}, }, }; +pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; +pub const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true; +pub const MAX_CONNECTIONS: usize = 1024; + +/// A thin wrapper over connection-cache/ConnectionCache to ease +/// construction of the ConnectionCache for code dealing both with udp and quic. +/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly. pub struct ConnectionCache { - map: RwLock>, - stats: Arc, - last_stats: AtomicInterval, - connection_pool_size: usize, - tpu_udp_socket: Arc, - client_certificate: Arc, - use_quic: bool, - maybe_staked_nodes: Option>>, - maybe_client_pubkey: Option, - - // The optional specified endpoint for the quic based client connections - // If not specified, the connection cache we create as needed. - client_endpoint: Option, -} - -/// Models the pool of connections -struct ConnectionPool { - /// The connections in the pool - connections: Vec>, - - /// Connections in this pool share the same endpoint - endpoint: Option>, -} - -impl ConnectionPool { - /// Get a connection from the pool. It must have at least one connection in the pool. - /// This randomly picks a connection in the pool. - fn borrow_connection(&self) -> Arc { - let mut rng = thread_rng(); - let n = rng.gen_range(0, self.connections.len()); - self.connections[n].clone() - } - - /// Check if we need to create a new connection. If the count of the connections - /// is smaller than the pool size. - fn need_new_connection(&self, required_pool_size: usize) -> bool { - self.connections.len() < required_pool_size - } + cache: BackendConnectionCache, } impl ConnectionCache { + /// Create a quic connection_cache pub fn new(connection_pool_size: usize) -> Self { - Self::_new_with_endpoint(connection_pool_size, None) + Self::new_with_client_options(connection_pool_size, None, None, None) } - /// Create a connection cache with a specific quic client endpoint. - pub fn new_with_endpoint(connection_pool_size: usize, client_endpoint: Endpoint) -> Self { - Self::_new_with_endpoint(connection_pool_size, Some(client_endpoint)) - } - - fn _new_with_endpoint(connection_pool_size: usize, client_endpoint: Option) -> Self { + /// Create a quic conneciton_cache with more client options + pub fn new_with_client_options( + connection_pool_size: usize, + client_endpoint: Option, + cert_info: Option<(&Keypair, IpAddr)>, + stake_info: Option<(&Arc>, &Pubkey)>, + ) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); - Self { - use_quic: true, - connection_pool_size, - client_endpoint, - ..Self::default() + let mut config = QuicConfig::new().unwrap(); + if let Some(client_endpoint) = client_endpoint { + config.update_client_endpoint(client_endpoint); } + if let Some(cert_info) = cert_info { + config + .update_client_certificate(cert_info.0, cert_info.1) + .unwrap(); + } + if let Some(stake_info) = stake_info { + config.set_staked_nodes(stake_info.0, stake_info.1); + } + let connection_manager = + Box::new(QuicConnectionManager::new_with_connection_config(config)); + let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + Self { cache } } + #[deprecated( + since = "1.15.0", + note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate." + )] pub fn update_client_certificate( &mut self, - keypair: &Keypair, - ipaddr: IpAddr, + _keypair: &Keypair, + _ipaddr: IpAddr, ) -> Result<(), Box> { - let (cert, priv_key) = new_self_signed_tls_certificate(keypair, ipaddr)?; - self.client_certificate = Arc::new(QuicClientCertificate { - certificate: cert, - key: priv_key, - }); Ok(()) } + #[deprecated( + since = "1.15.0", + note = "This method does not do anything. Please use `new_with_client_options` instead to set staked nodes information." + )] pub fn set_staked_nodes( &mut self, - staked_nodes: &Arc>, - client_pubkey: &Pubkey, + _staked_nodes: &Arc>, + _client_pubkey: &Pubkey, ) { - self.maybe_staked_nodes = Some(staked_nodes.clone()); - self.maybe_client_pubkey = Some(*client_pubkey); } pub fn with_udp(connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); - Self { - use_quic: false, - connection_pool_size, - ..Self::default() - } + let connection_manager = Box::::default(); + let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + Self { cache } } pub fn use_quic(&self) -> bool { - self.use_quic + matches!(self.cache.get_protocol_type(), ProtocolType::QUIC) } - fn create_endpoint(&self, force_use_udp: bool) -> Option> { - if self.use_quic() && !force_use_udp { - Some(Arc::new(QuicLazyInitializedEndpoint::new( - self.client_certificate.clone(), - self.client_endpoint.as_ref().cloned(), - ))) - } else { - None - } + pub fn get_connection(&self, addr: &SocketAddr) -> Arc { + self.cache.get_connection(addr) } - fn compute_max_parallel_streams(&self) -> usize { - let (client_type, stake, total_stake) = - self.maybe_client_pubkey - .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| { - self.maybe_staked_nodes.as_ref().map_or( - (ConnectionPeerType::Unstaked, 0, 0), - |stakes| { - let rstakes = stakes.read().unwrap(); - rstakes.pubkey_stake_map.get(&pubkey).map_or( - (ConnectionPeerType::Unstaked, 0, rstakes.total_stake), - |stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake), - ) - }, - ) - }); - compute_max_allowed_uni_streams(client_type, stake, total_stake) - } - - /// Create a lazy connection object under the exclusive lock of the cache map if there is not - /// enough used connections in the connection pool for the specified address. - /// Returns CreateConnectionResult. - fn create_connection( - &self, - lock_timing_ms: &mut u64, - addr: &SocketAddr, - force_use_udp: bool, - ) -> CreateConnectionResult { - let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); - let mut map = self.map.write().unwrap(); - get_connection_map_lock_measure.stop(); - *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - // Read again, as it is possible that between read lock dropped and the write lock acquired - // another thread could have setup the connection. - - let (to_create_connection, endpoint) = - map.get(addr) - .map_or((true, self.create_endpoint(force_use_udp)), |pool| { - ( - pool.need_new_connection(self.connection_pool_size), - pool.endpoint.clone(), - ) - }); - - let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection { - let connection = if !self.use_quic() || force_use_udp { - BaseTpuConnection::Udp(self.tpu_udp_socket.clone()) - } else { - BaseTpuConnection::Quic(Arc::new(QuicClient::new( - endpoint.as_ref().unwrap().clone(), - *addr, - self.compute_max_parallel_streams(), - ))) - }; - - let connection = Arc::new(connection); - - // evict a connection if the cache is reaching upper bounds - let mut num_evictions = 0; - let mut get_connection_cache_eviction_measure = - Measure::start("get_connection_cache_eviction_measure"); - while map.len() >= MAX_CONNECTIONS { - let mut rng = thread_rng(); - let n = rng.gen_range(0, MAX_CONNECTIONS); - map.swap_remove_index(n); - num_evictions += 1; - } - get_connection_cache_eviction_measure.stop(); - - match map.entry(*addr) { - Entry::Occupied(mut entry) => { - let pool = entry.get_mut(); - pool.connections.push(connection); - } - Entry::Vacant(entry) => { - entry.insert(ConnectionPool { - connections: vec![connection], - endpoint, - }); - } - } - ( - false, - num_evictions, - get_connection_cache_eviction_measure.as_ms(), - ) - } else { - (true, 0, 0) - }; - - let pool = map.get(addr).unwrap(); - let connection = pool.borrow_connection(); - - CreateConnectionResult { - connection, - cache_hit, - connection_cache_stats: self.stats.clone(), - num_evictions, - eviction_timing_ms, - } - } - - fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { - let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); - let map = self.map.read().unwrap(); - get_connection_map_lock_measure.stop(); - - let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 }; - - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); - let force_use_udp = port == addr.port(); - let addr = SocketAddr::new(addr.ip(), port); - - let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); - - let report_stats = self - .last_stats - .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); - - let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); - let CreateConnectionResult { - connection, - cache_hit, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } = match map.get(&addr) { - Some(pool) => { - if pool.need_new_connection(self.connection_pool_size) { - // create more connection and put it in the pool - drop(map); - self.create_connection(&mut lock_timing_ms, &addr, force_use_udp) - } else { - let connection = pool.borrow_connection(); - CreateConnectionResult { - connection, - cache_hit: true, - connection_cache_stats: self.stats.clone(), - num_evictions: 0, - eviction_timing_ms: 0, - } - } - } - None => { - // Upgrade to write access by dropping read lock and acquire write lock - drop(map); - self.create_connection(&mut lock_timing_ms, &addr, force_use_udp) - } - }; - get_connection_map_measure.stop(); - - GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms: get_connection_map_measure.as_ms(), - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } - } - - fn get_connection_and_log_stats( + pub fn get_nonblocking_connection( &self, addr: &SocketAddr, - ) -> (Arc, Arc) { - let mut get_connection_measure = Measure::start("get_connection_measure"); - let GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms, - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } = self.get_or_add_connection(addr); - - if report_stats { - connection_cache_stats.report(); - } - - if cache_hit { - connection_cache_stats - .cache_hits - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_hit_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - } else { - connection_cache_stats - .cache_misses - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_miss_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - connection_cache_stats - .cache_evictions - .fetch_add(num_evictions, Ordering::Relaxed); - connection_cache_stats - .eviction_time_ms - .fetch_add(eviction_timing_ms, Ordering::Relaxed); - } - - get_connection_measure.stop(); - connection_cache_stats - .get_connection_lock_ms - .fetch_add(lock_timing_ms, Ordering::Relaxed); - connection_cache_stats - .get_connection_ms - .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); - - (connection, connection_cache_stats) - } - - pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection { - let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); - connection.new_blocking_connection(*addr, connection_cache_stats) - } - - pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection { - let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); - connection.new_nonblocking_connection(*addr, connection_cache_stats) + ) -> Arc { + self.cache.get_nonblocking_connection(addr) } } impl Default for ConnectionCache { fn default() -> Self { - let (cert, priv_key) = - new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Failed to initialize QUIC client certificates"); - Self { - map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), - stats: Arc::new(ConnectionCacheStats::default()), - last_stats: AtomicInterval::default(), - connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Unable to bind to UDP socket"), - ), - client_certificate: Arc::new(QuicClientCertificate { - certificate: cert, - key: priv_key, - }), - use_quic: DEFAULT_TPU_USE_QUIC, - maybe_staked_nodes: None, - maybe_client_pubkey: None, - client_endpoint: None, + if DEFAULT_CONNECTION_CACHE_USE_QUIC { + let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + ConnectionCache::new_with_client_options( + DEFAULT_CONNECTION_POOL_SIZE, + None, + Some(cert_info), + None, + ) + } else { + ConnectionCache::with_udp(DEFAULT_CONNECTION_POOL_SIZE) } } } -enum BaseTpuConnection { - Udp(Arc), - Quic(Arc), -} -impl BaseTpuConnection { - fn new_blocking_connection( - &self, - addr: SocketAddr, - stats: Arc, - ) -> BlockingConnection { - use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection}; - match self { - BaseTpuConnection::Udp(udp_socket) => { - UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into() - } - BaseTpuConnection::Quic(quic_client) => { - QuicTpuConnection::new_with_client(quic_client.clone(), stats).into() - } - } - } - - fn new_nonblocking_connection( - &self, - addr: SocketAddr, - stats: Arc, - ) -> NonblockingConnection { - use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection}; - match self { - BaseTpuConnection::Udp(udp_socket) => { - UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into() - } - BaseTpuConnection::Quic(quic_client) => { - QuicTpuConnection::new_with_client(quic_client.clone(), stats).into() - } - } - } -} - -struct GetConnectionResult { - connection: Arc, - cache_hit: bool, - report_stats: bool, - map_timing_ms: u64, - lock_timing_ms: u64, - connection_cache_stats: Arc, - num_evictions: u64, - eviction_timing_ms: u64, -} - -struct CreateConnectionResult { - connection: Arc, - cache_hit: bool, - connection_cache_stats: Arc, - num_evictions: u64, - eviction_timing_ms: u64, -} - #[cfg(test)] mod tests { use { - crate::{ - connection_cache::{ConnectionCache, MAX_CONNECTIONS}, - tpu_connection::TpuConnection, - }, + crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, - rand::{Rng, SeedableRng}, - rand_chacha::ChaChaRng, - solana_sdk::{ - pubkey::Pubkey, - quic::{ - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_PORT_OFFSET, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - }, - signature::Keypair, - }, + solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, @@ -499,145 +160,6 @@ mod tests { ) } - fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { - let a = rng.gen_range(1, 255); - let b = rng.gen_range(1, 255); - let c = rng.gen_range(1, 255); - let d = rng.gen_range(1, 255); - - let addr_str = format!("{a}.{b}.{c}.{d}:80"); - - addr_str.parse().expect("Invalid address") - } - - #[test] - fn test_connection_cache() { - solana_logger::setup(); - // Allow the test to run deterministically - // with the same pseudorandom sequence between runs - // and on different platforms - the cryptographic security - // property isn't important here but ChaChaRng provides a way - // to get the same pseudorandom sequence on different platforms - let mut rng = ChaChaRng::seed_from_u64(42); - - // Generate a bunch of random addresses and create TPUConnections to them - // Since TPUConnection::new is infallible, it should't matter whether or not - // we can actually connect to those addresses - TPUConnection implementations should either - // be lazy and not connect until first use or handle connection errors somehow - // (without crashing, as would be required in a real practical validator) - let connection_cache = ConnectionCache::default(); - let port_offset = if connection_cache.use_quic() { - QUIC_PORT_OFFSET - } else { - 0 - }; - let addrs = (0..MAX_CONNECTIONS) - .map(|_| { - let addr = get_addr(&mut rng); - connection_cache.get_connection(&addr); - addr - }) - .collect::>(); - { - let map = connection_cache.map.read().unwrap(); - assert!(map.len() == MAX_CONNECTIONS); - addrs.iter().for_each(|a| { - let port = a - .port() - .checked_add(port_offset) - .unwrap_or_else(|| a.port()); - let addr = &SocketAddr::new(a.ip(), port); - - let conn = &map.get(addr).expect("Address not found").connections[0]; - let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); - assert!(addr.ip() == conn.tpu_addr().ip()); - }); - } - - let addr = &get_addr(&mut rng); - connection_cache.get_connection(addr); - - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); - let addr_with_quic_port = SocketAddr::new(addr.ip(), port); - let map = connection_cache.map.read().unwrap(); - assert!(map.len() == MAX_CONNECTIONS); - let _conn = map.get(&addr_with_quic_port).expect("Address not found"); - } - - #[test] - fn test_connection_cache_max_parallel_chunks() { - solana_logger::setup(); - let mut connection_cache = ConnectionCache::default(); - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let pubkey = Pubkey::new_unique(); - connection_cache.set_staked_nodes(&staked_nodes, &pubkey); - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - staked_nodes.write().unwrap().total_stake = 10000; - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1); - - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - assert_eq!( - connection_cache.compute_max_parallel_streams(), - (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize - ); - - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .remove(&pubkey); - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1000); - assert_ne!( - connection_cache.compute_max_parallel_streams(), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - } - - // Test that we can get_connection with a connection cache configured for quic - // on an address with a port that, if QUIC_PORT_OFFSET were added to it, it would overflow to - // an invalid port. - #[test] - fn test_overflow_address() { - let port = u16::MAX - QUIC_PORT_OFFSET + 1; - assert!(port.checked_add(QUIC_PORT_OFFSET).is_none()); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let connection_cache = ConnectionCache::new(1); - - let conn = connection_cache.get_connection(&addr); - // We (intentionally) don't have an interface that allows us to distinguish between - // UDP and Quic connections, so check instead that the port is valid (non-zero) - // and is the same as the input port (falling back on UDP) - assert!(conn.tpu_addr().port() != 0); - assert!(conn.tpu_addr().port() == port); - } - #[test] fn test_connection_with_specified_client_endpoint() { let port = u16::MAX - QUIC_PORT_OFFSET + 1; @@ -670,19 +192,20 @@ mod tests { ) .unwrap(); - let connection_cache = ConnectionCache::new_with_endpoint(1, response_recv_endpoint); + let connection_cache = + ConnectionCache::new_with_client_options(1, Some(response_recv_endpoint), None, None); // server port 1: let port1 = 9001; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.tpu_addr().port(), port1 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port1 + QUIC_PORT_OFFSET); // server port 2: let port2 = 9002; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.tpu_addr().port(), port2 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port2 + QUIC_PORT_OFFSET); response_recv_exit.store(true, Ordering::Relaxed); response_recv_thread.join().unwrap(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 23d4cab220..3703d3438e 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -9,7 +9,6 @@ pub mod tpu_connection; pub mod transaction_executor; pub mod udp_client; -#[macro_use] extern crate solana_metrics; pub use solana_rpc_client::mock_sender_for_cli; diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 2f615557b0..28b9649289 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -1,59 +1,8 @@ -//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol -//! and provides an interface for sending transactions which is restricted by the -//! server's flow control. +#[deprecated( + since = "1.15.0", + note = "Please use `solana_quic_client::nonblocking::quic_client::QuicClientConnection` instead." +)] +pub use solana_quic_client::nonblocking::quic_client::QuicClientConnection as QuicTpuConnection; pub use solana_quic_client::nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicError, QuicLazyInitializedEndpoint, QuicTpuConnection, + QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, }; -use { - crate::nonblocking::tpu_connection::TpuConnection, - async_trait::async_trait, - log::*, - solana_sdk::transport::Result as TransportResult, - solana_tpu_client::tpu_connection::ClientStats, - std::{net::SocketAddr, sync::Arc}, -}; - -#[async_trait] -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.client.tpu_addr() - } - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let stats = ClientStats::default(); - let len = buffers.len(); - let res = self - .client - .send_batch(buffers, &stats, self.connection_stats.clone()) - .await; - self.connection_stats - .add_client_stats(&stats, len, res.is_ok()); - res?; - Ok(()) - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let stats = Arc::new(ClientStats::default()); - let send_buffer = - self.client - .send_buffer(wire_transaction, &stats, self.connection_stats.clone()); - if let Err(e) = send_buffer.await { - warn!( - "Failed to send transaction async to {}, error: {:?} ", - self.tpu_addr(), - e - ); - datapoint_warn!("send-wire-async", ("failure", 1, i64),); - self.connection_stats.add_client_stats(&stats, 1, false); - } else { - self.connection_stats.add_client_stats(&stats, 1, true); - } - Ok(()) - } -} diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index e914a88180..5439350c63 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -2,7 +2,6 @@ pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSender use { crate::{ connection_cache::ConnectionCache, - nonblocking::tpu_connection::TpuConnection, tpu_client::{TpuClientConfig, MAX_FANOUT_SLOTS}, }, bincode::serialize, @@ -46,7 +45,7 @@ async fn send_wire_transaction_to_addr( wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone()).await + conn.send_data(&wire_transaction).await } async fn send_wire_transaction_batch_to_addr( @@ -55,7 +54,7 @@ async fn send_wire_transaction_batch_to_addr( wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions).await + conn.send_data_batch(wire_transactions).await } impl TpuClient { diff --git a/client/src/nonblocking/tpu_connection.rs b/client/src/nonblocking/tpu_connection.rs index af19da4f5a..b91a888533 100644 --- a/client/src/nonblocking/tpu_connection.rs +++ b/client/src/nonblocking/tpu_connection.rs @@ -1,42 +1,5 @@ -//! Trait defining async send functions, to be used for UDP or QUIC sending - -use { - async_trait::async_trait, - enum_dispatch::enum_dispatch, - solana_quic_client::nonblocking::quic_client::QuicTpuConnection, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - solana_udp_client::nonblocking::udp_client::UdpTpuConnection, - std::net::SocketAddr, -}; - -// Due to the existence of `crate::connection_cache::Connection`, if this is named -// `Connection`, enum_dispatch gets confused between the two and throws errors when -// trying to convert later. -#[enum_dispatch] -pub enum NonblockingConnection { - QuicTpuConnection, - UdpTpuConnection, -} - -#[async_trait] -#[enum_dispatch(NonblockingConnection)] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - async fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction).await - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_connection_cache::nonblocking::client_connection::ClientConnection` instead." +)] +pub use solana_connection_cache::nonblocking::client_connection::ClientConnection as TpuConnection; diff --git a/client/src/nonblocking/udp_client.rs b/client/src/nonblocking/udp_client.rs index 4e817e8aa7..e880b1fb10 100644 --- a/client/src/nonblocking/udp_client.rs +++ b/client/src/nonblocking/udp_client.rs @@ -1,35 +1,5 @@ -//! Simple UDP client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions - -pub use solana_udp_client::nonblocking::udp_client::UdpTpuConnection; -use { - crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait, - core::iter::repeat, solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, -}; - -#[async_trait] -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - &self.addr - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.socket - .send_to(wire_transaction.as_ref(), self.addr) - .await?; - Ok(()) - } - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts).await?; - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_udp_client::nonblocking::udp_client::UdpClientConnection` instead." +)] +pub use solana_udp_client::nonblocking::udp_client::UdpClientConnection as UdpTpuConnection; diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 1ce44feb8b..a32aa381cb 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -1,44 +1,5 @@ -//! Simple client that connects to a given UDP port with the QUIC protocol and provides -//! an interface for sending transactions which is restricted by the server's flow control. - -pub use solana_quic_client::quic_client::QuicTpuConnection; -use { - crate::{ - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection, - }, - solana_quic_client::quic_client::temporary_pub::*, - solana_sdk::transport::Result as TransportResult, - std::net::SocketAddr, -}; - -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.inner.tpu_addr() - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; - Ok(()) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - let _lock = ASYNC_TASK_SEMAPHORE.acquire(); - let inner = self.inner.clone(); - - let _handle = RUNTIME - .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); - Ok(()) - } - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let _lock = ASYNC_TASK_SEMAPHORE.acquire(); - let inner = self.inner.clone(); - let _handle = - RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_quic_client::quic_client::QuicClientConnection` instead." +)] +pub use solana_quic_client::quic_client::QuicClientConnection as QuicTpuConnection; diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 1c38f5f7a8..277fe83223 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -4,8 +4,9 @@ //! unstable and may change in future releases. use { - crate::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + crate::connection_cache::ConnectionCache, log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ @@ -144,7 +145,7 @@ impl ThinClient { let conn = self.connection_cache.get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) #[allow(clippy::needless_borrow)] - conn.send_wire_transaction(&wire_transaction)?; + conn.send_data(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -531,7 +532,9 @@ impl AsyncClient for ThinClient { transaction: VersionedTransaction, ) -> TransportResult { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.serialize_and_send_transaction(&transaction)?; + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + conn.send_data(&wire_transaction)?; Ok(transaction.signatures[0]) } @@ -540,7 +543,11 @@ impl AsyncClient for ThinClient { batch: Vec, ) -> TransportResult<()> { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.par_serialize_and_send_transaction_batch(&batch[..])?; + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + conn.send_data_batch(&buffers)?; Ok(()) } } diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 81a8b89f66..9e000612a5 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,56 +1,6 @@ -pub use solana_tpu_client::tpu_connection::ClientStats; -use { - enum_dispatch::enum_dispatch, - rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_quic_client::quic_client::QuicTpuConnection, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - solana_udp_client::udp_client::UdpTpuConnection, - std::net::SocketAddr, -}; - -#[enum_dispatch] -pub enum BlockingConnection { - UdpTpuConnection, - QuicTpuConnection, -} - -#[enum_dispatch(BlockingConnection)] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(wire_transaction) - } - - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.send_wire_transaction_batch(&[wire_transaction]) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; - - fn par_serialize_and_send_transaction_batch( - &self, - transactions: &[VersionedTransaction], - ) -> TransportResult<()> { - let buffers = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - - self.send_wire_transaction_batch(&buffers) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()>; -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_connection_cache::client_connection::ClientConnection` instead." +)] +pub use solana_connection_cache::client_connection::ClientConnection as TpuConnection; +pub use solana_connection_cache::client_connection::ClientStats; diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index 868849ee2f..c05b74b364 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -1,35 +1,5 @@ -//! Simple TPU client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions - -pub use solana_udp_client::udp_client::UdpTpuConnection; -use { - crate::tpu_connection::TpuConnection, core::iter::repeat, - solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, - std::net::SocketAddr, -}; - -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - &self.addr - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - self.socket.send_to(wire_transaction.as_ref(), self.addr)?; - Ok(()) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) - } - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_udp_client::udp_client::UdpClientConnection` instead." +)] +pub use solana_udp_client::udp_client::UdpClientConnection as UdpTpuConnection; diff --git a/connection-cache/Cargo.toml b/connection-cache/Cargo.toml new file mode 100644 index 0000000000..c61bee0bd7 --- /dev/null +++ b/connection-cache/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "solana-connection-cache" +version = "1.16.0" +description = "Solana Connection Cache" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-connection-cache" +edition = "2021" + +[dependencies] +async-trait = "0.1.57" +bincode = "1.3.3" +futures-util = "0.3.25" +indexmap = "1.9.1" +indicatif = { version = "0.17.1", optional = true } +log = "0.4.17" +rand = "0.7.0" +rayon = "1.5.3" +solana-measure = { path = "../measure", version = "=1.16.0" } +solana-metrics = { path = "../metrics", version = "=1.16.0" } +solana-net-utils = { path = "../net-utils", version = "=1.16.0" } +solana-sdk = { path = "../sdk", version = "=1.16.0" } +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } + +[dev-dependencies] +rand_chacha = "0.2.2" +solana-logger = { path = "../logger", version = "=1.16.0" } diff --git a/connection-cache/src/client_connection.rs b/connection-cache/src/client_connection.rs new file mode 100644 index 0000000000..a58f33d730 --- /dev/null +++ b/connection-cache/src/client_connection.rs @@ -0,0 +1,34 @@ +use { + solana_metrics::MovingStat, + solana_sdk::transport::Result as TransportResult, + std::{net::SocketAddr, sync::atomic::AtomicU64}, +}; + +#[derive(Default)] +pub struct ClientStats { + pub total_connections: AtomicU64, + pub connection_reuse: AtomicU64, + pub connection_errors: AtomicU64, + pub zero_rtt_accepts: AtomicU64, + pub zero_rtt_rejects: AtomicU64, + + // these will be the last values of these stats + pub congestion_events: MovingStat, + pub streams_blocked_uni: MovingStat, + pub data_blocked: MovingStat, + pub acks: MovingStat, + pub make_connection_ms: AtomicU64, + pub send_timeout: AtomicU64, +} + +pub trait ClientConnection: Sync + Send { + fn server_addr(&self) -> &SocketAddr; + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()>; + + fn send_data_async(&self, buffer: Vec) -> TransportResult<()>; + + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()>; + + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()>; +} diff --git a/tpu-client/src/tpu_connection_cache.rs b/connection-cache/src/connection_cache.rs similarity index 67% rename from tpu-client/src/tpu_connection_cache.rs rename to connection-cache/src/connection_cache.rs index 52c415c6f6..82e88b818d 100644 --- a/tpu-client/src/tpu_connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -1,14 +1,15 @@ use { crate::{ + client_connection::ClientConnection as BlockingClientConnection, connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL}, - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection as BlockingTpuConnection, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, indexmap::map::IndexMap, rand::{thread_rng, Rng}, solana_measure::measure::Measure, solana_sdk::timing::AtomicInterval, std::{ + any::Any, net::SocketAddr, sync::{atomic::Ordering, Arc, RwLock}, }, @@ -18,38 +19,56 @@ use { // Should be non-zero pub static MAX_CONNECTIONS: usize = 1024; -/// Used to decide whether the TPU and underlying connection cache should use -/// QUIC connections. -pub const DEFAULT_TPU_USE_QUIC: bool = true; +/// Default connection pool size per remote address +pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; -/// Default TPU connection pool size per remote address -pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; +/// Defines the protocol types of an implementation supports. +pub enum ProtocolType { + UDP, + QUIC, +} -pub const DEFAULT_TPU_ENABLE_UDP: bool = false; +pub trait ConnectionManager: Sync + Send { + fn new_connection_pool(&self) -> Box; + fn new_connection_config(&self) -> Box; + fn get_port_offset(&self) -> u16; + fn get_protocol_type(&self) -> ProtocolType; +} -pub struct TpuConnectionCache { - pub map: RwLock>, +pub struct ConnectionCache { + pub map: RwLock>>, + pub connection_manager: Box, pub stats: Arc, pub last_stats: AtomicInterval, pub connection_pool_size: usize, - pub tpu_config: P::TpuConfig, + pub connection_config: Box, } -impl TpuConnectionCache

{ +impl ConnectionCache { pub fn new( + connection_manager: Box, connection_pool_size: usize, - ) -> Result::ClientError> { - let config = P::TpuConfig::new()?; - Ok(Self::new_with_config(connection_pool_size, config)) + ) -> Result { + let config = connection_manager.new_connection_config(); + Ok(Self::new_with_config( + connection_pool_size, + config, + connection_manager, + )) } - pub fn new_with_config(connection_pool_size: usize, tpu_config: P::TpuConfig) -> Self { + pub fn new_with_config( + connection_pool_size: usize, + connection_config: Box, + connection_manager: Box, + ) -> Self { Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), + connection_manager, last_stats: AtomicInterval::default(), connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1. - tpu_config, + connection_config, } } @@ -60,7 +79,7 @@ impl TpuConnectionCache

{ &self, lock_timing_ms: &mut u64, addr: &SocketAddr, - ) -> CreateConnectionResult { + ) -> CreateConnectionResult { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let mut map = self.map.write().unwrap(); get_connection_map_lock_measure.stop(); @@ -94,10 +113,13 @@ impl TpuConnectionCache

{ map.entry(*addr) .and_modify(|pool| { - pool.add_connection(&self.tpu_config, addr); + pool.add_connection(&*self.connection_config, addr); }) - .or_insert_with(|| P::new_with_connection(&self.tpu_config, addr)); - + .or_insert_with(|| { + let mut pool = self.connection_manager.new_connection_pool(); + pool.add_connection(&*self.connection_config, addr); + pool + }); ( false, num_evictions, @@ -119,15 +141,12 @@ impl TpuConnectionCache

{ } } - fn get_or_add_connection( - &self, - addr: &SocketAddr, - ) -> GetConnectionResult { + fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); - let port_offset = P::PORT_OFFSET; + let port_offset = self.connection_manager.get_port_offset(); let port = addr .port() @@ -188,7 +207,7 @@ impl TpuConnectionCache

{ fn get_connection_and_log_stats( &self, addr: &SocketAddr, - ) -> (Arc, Arc) { + ) -> (Arc, Arc) { let mut get_connection_measure = Measure::start("get_connection_measure"); let GetConnectionResult { connection, @@ -238,10 +257,7 @@ impl TpuConnectionCache

{ (connection, connection_cache_stats) } - pub fn get_connection( - &self, - addr: &SocketAddr, - ) -> ::BlockingConnectionType { + pub fn get_connection(&self, addr: &SocketAddr) -> Arc { let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_blocking_connection(*addr, connection_cache_stats) } @@ -249,10 +265,14 @@ impl TpuConnectionCache

{ pub fn get_nonblocking_connection( &self, addr: &SocketAddr, - ) -> ::NonblockingConnectionType { + ) -> Arc { let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_nonblocking_connection(*addr, connection_cache_stats) } + + pub fn get_protocol_type(&self) -> ProtocolType { + self.connection_manager.get_protocol_type() + } } #[derive(Error, Debug)] @@ -261,33 +281,37 @@ pub enum ConnectionPoolError { IndexOutOfRange, } -pub trait NewTpuConfig { - type ClientError: std::fmt::Debug; - fn new() -> Result - where - Self: Sized; +#[derive(Error, Debug)] +pub enum ClientError { + #[error("Certificate error: {0}")] + CertificateError(String), + + #[error("IO error: {0:?}")] + IoError(#[from] std::io::Error), } -pub trait ConnectionPool { - type PoolTpuConnection: BaseTpuConnection; - type TpuConfig: NewTpuConfig; - const PORT_OFFSET: u16 = 0; +pub trait NewConnectionConfig: Sync + Send { + fn new() -> Result + where + Self: Sized; + fn as_any(&self) -> &dyn Any; - /// Create a new connection pool based on protocol-specific configuration - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self; + fn as_mut_any(&mut self) -> &mut dyn Any; +} +pub trait ConnectionPool: Sync + Send { /// Add a connection to the pool - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr); /// Get the number of current connections in the pool fn num_connections(&self) -> usize; /// Get a connection based on its index in the pool, without checking if the - fn get(&self, index: usize) -> Result, ConnectionPoolError>; + fn get(&self, index: usize) -> Result, ConnectionPoolError>; /// Get a connection from the pool. It must have at least one connection in the pool. /// This randomly picks a connection in the pool. - fn borrow_connection(&self) -> Arc { + fn borrow_connection(&self) -> Arc { let mut rng = thread_rng(); let n = rng.gen_range(0, self.num_connections()); self.get(n).expect("index is within num_connections") @@ -300,30 +324,27 @@ pub trait ConnectionPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, addr: &SocketAddr, - ) -> Self::PoolTpuConnection; + ) -> Arc; } -pub trait BaseTpuConnection { - type BlockingConnectionType: BlockingTpuConnection; - type NonblockingConnectionType: NonblockingTpuConnection; - +pub trait BaseClientConnection: Sync + Send { fn new_blocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Self::BlockingConnectionType; + ) -> Arc; fn new_nonblocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Self::NonblockingConnectionType; + ) -> Arc; } -struct GetConnectionResult { - connection: Arc, +struct GetConnectionResult { + connection: Arc, cache_hit: bool, report_stats: bool, map_timing_ms: u64, @@ -333,8 +354,8 @@ struct GetConnectionResult { eviction_timing_ms: u64, } -struct CreateConnectionResult { - connection: Arc, +struct CreateConnectionResult { + connection: Arc, cache_hit: bool, connection_cache_stats: Arc, num_evictions: u64, @@ -346,8 +367,8 @@ mod tests { use { super::*, crate::{ - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection as BlockingTpuConnection, + client_connection::ClientConnection as BlockingClientConnection, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, async_trait::async_trait, rand::{Rng, SeedableRng}, @@ -362,23 +383,11 @@ mod tests { const MOCK_PORT_OFFSET: u16 = 42; pub struct MockUdpPool { - connections: Vec>, + connections: Vec>, } impl ConnectionPool for MockUdpPool { - type PoolTpuConnection = MockUdp; - type TpuConfig = MockUdpConfig; - const PORT_OFFSET: u16 = MOCK_PORT_OFFSET; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -386,7 +395,7 @@ mod tests { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -395,21 +404,26 @@ mod tests { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, _addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - MockUdp(config.tpu_udp_socket.clone()) + ) -> Arc { + let config: &MockUdpConfig = match config.as_any().downcast_ref::() { + Some(b) => b, + None => panic!("Expecting a MockUdpConfig!"), + }; + + Arc::new(MockUdp(config.udp_socket.clone())) } } pub struct MockUdpConfig { - tpu_udp_socket: Arc, + udp_socket: Arc, } impl Default for MockUdpConfig { fn default() -> Self { Self { - tpu_udp_socket: Arc::new( + udp_socket: Arc::new( solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) .expect("Unable to bind to UDP socket"), ), @@ -417,85 +431,105 @@ mod tests { } } - impl NewTpuConfig for MockUdpConfig { - type ClientError = String; - - fn new() -> Result { + impl NewConnectionConfig for MockUdpConfig { + fn new() -> Result { Ok(Self { - tpu_udp_socket: Arc::new( + udp_socket: Arc::new( solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(|_| "Unable to bind to UDP socket".to_string())?, + .map_err(Into::::into)?, ), }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } pub struct MockUdp(Arc); - impl BaseTpuConnection for MockUdp { - type BlockingConnectionType = MockUdpTpuConnection; - type NonblockingConnectionType = MockUdpTpuConnection; - + impl BaseClientConnection for MockUdp { fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> MockUdpTpuConnection { - MockUdpTpuConnection { + ) -> Arc { + Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, - } + }) } fn new_nonblocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> MockUdpTpuConnection { - MockUdpTpuConnection { + ) -> Arc { + Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, - } + }) } } - pub struct MockUdpTpuConnection { + pub struct MockUdpConnection { _socket: Arc, addr: SocketAddr, } - impl BlockingTpuConnection for MockUdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { + #[derive(Default)] + pub struct MockConnectionManager {} + + impl ConnectionManager for MockConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(MockUdpPool { + connections: Vec::default(), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(MockUdpConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + MOCK_PORT_OFFSET + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::UDP + } + } + + impl BlockingClientConnection for MockUdpConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - fn send_wire_transaction_async(&self, _wire_transaction: Vec) -> TransportResult<()> { + fn send_data(&self, _buffer: &[u8]) -> TransportResult<()> { unimplemented!() } - fn send_wire_transaction_batch(&self, _buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + fn send_data_async(&self, _data: Vec) -> TransportResult<()> { unimplemented!() } - fn send_wire_transaction_batch_async(&self, _buffers: Vec>) -> TransportResult<()> { + fn send_data_batch(&self, _buffers: &[Vec]) -> TransportResult<()> { + unimplemented!() + } + fn send_data_batch_async(&self, _buffers: Vec>) -> TransportResult<()> { unimplemented!() } } #[async_trait] - impl NonblockingTpuConnection for MockUdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { + impl NonblockingClientConnection for MockUdpConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - async fn send_wire_transaction(&self, _wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data(&self, _data: &[u8]) -> TransportResult<()> { unimplemented!() } - async fn send_wire_transaction_batch(&self, _buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data_batch(&self, _buffers: &[Vec]) -> TransportResult<()> { unimplemented!() } } @@ -512,7 +546,7 @@ mod tests { } #[test] - fn test_tpu_connection_cache() { + fn test_connection_cache() { solana_logger::setup(); // Allow the test to run deterministically // with the same pseudorandom sequence between runs @@ -521,13 +555,14 @@ mod tests { // to get the same pseudorandom sequence on different platforms let mut rng = ChaChaRng::seed_from_u64(42); - // Generate a bunch of random addresses and create TPUConnections to them - // Since TPUConnection::new is infallible, it should't matter whether or not - // we can actually connect to those addresses - TPUConnection implementations should either + // Generate a bunch of random addresses and create connections to them + // Since ClientConnection::new is infallible, it should't matter whether or not + // we can actually connect to those addresses - ClientConnection implementations should either // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) + let connection_manager = Box::::default(); let connection_cache = - TpuConnectionCache::::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap(); + ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); let port_offset = MOCK_PORT_OFFSET; let addrs = (0..MAX_CONNECTIONS) .map(|_| { @@ -546,9 +581,9 @@ mod tests { .unwrap_or_else(|| a.port()); let addr = &SocketAddr::new(a.ip(), port); - let conn = &map.get(addr).expect("Address not found").connections[0]; + let conn = &map.get(addr).expect("Address not found").get(0).unwrap(); let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); - assert!(addr.ip() == BlockingTpuConnection::tpu_addr(&conn).ip()); + assert!(addr.ip() == conn.server_addr().ip()); }); } @@ -573,13 +608,14 @@ mod tests { let port = u16::MAX - MOCK_PORT_OFFSET + 1; assert!(port.checked_add(MOCK_PORT_OFFSET).is_none()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let connection_cache = TpuConnectionCache::::new(1).unwrap(); + let connection_manager = Box::::default(); + let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); - let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr); + let conn = connection_cache.get_connection(&addr); // We (intentionally) don't have an interface that allows us to distinguish between // UDP and Quic connections, so check instead that the port is valid (non-zero) // and is the same as the input port (falling back on UDP) - assert!(BlockingTpuConnection::tpu_addr(&conn).port() != 0); - assert!(BlockingTpuConnection::tpu_addr(&conn).port() == port); + assert!(conn.server_addr().port() != 0); + assert!(conn.server_addr().port() == port); } } diff --git a/tpu-client/src/connection_cache_stats.rs b/connection-cache/src/connection_cache_stats.rs similarity index 92% rename from tpu-client/src/connection_cache_stats.rs rename to connection-cache/src/connection_cache_stats.rs index 9827cc125f..697757fd32 100644 --- a/tpu-client/src/connection_cache_stats.rs +++ b/connection-cache/src/connection_cache_stats.rs @@ -1,5 +1,5 @@ use { - crate::tpu_connection::ClientStats, + crate::client_connection::ClientStats, std::sync::atomic::{AtomicU64, Ordering}, }; @@ -161,22 +161,16 @@ impl ConnectionCacheStats { i64 ), ( - "tx_streams_blocked_uni", - self.total_client_stats - .tx_streams_blocked_uni - .load_and_reset(), + "streams_blocked_uni", + self.total_client_stats.streams_blocked_uni.load_and_reset(), i64 ), ( - "tx_data_blocked", - self.total_client_stats.tx_data_blocked.load_and_reset(), - i64 - ), - ( - "tx_acks", - self.total_client_stats.tx_acks.load_and_reset(), + "data_blocked", + self.total_client_stats.data_blocked.load_and_reset(), i64 ), + ("acks", self.total_client_stats.acks.load_and_reset(), i64), ( "num_packets", self.sent_packets.swap(0, Ordering::Relaxed), diff --git a/connection-cache/src/lib.rs b/connection-cache/src/lib.rs new file mode 100644 index 0000000000..cc595ff574 --- /dev/null +++ b/connection-cache/src/lib.rs @@ -0,0 +1,9 @@ +#![allow(clippy::integer_arithmetic)] + +pub mod client_connection; +pub mod connection_cache; +pub mod connection_cache_stats; +pub mod nonblocking; + +#[macro_use] +extern crate solana_metrics; diff --git a/connection-cache/src/nonblocking/client_connection.rs b/connection-cache/src/nonblocking/client_connection.rs new file mode 100644 index 0000000000..ef79674057 --- /dev/null +++ b/connection-cache/src/nonblocking/client_connection.rs @@ -0,0 +1,15 @@ +//! Trait defining async send functions, to be used for UDP or QUIC sending + +use { + async_trait::async_trait, solana_sdk::transport::Result as TransportResult, + std::net::SocketAddr, +}; + +#[async_trait] +pub trait ClientConnection { + fn server_addr(&self) -> &SocketAddr; + + async fn send_data(&self, buffer: &[u8]) -> TransportResult<()>; + + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()>; +} diff --git a/connection-cache/src/nonblocking/mod.rs b/connection-cache/src/nonblocking/mod.rs new file mode 100644 index 0000000000..a601ae46f3 --- /dev/null +++ b/connection-cache/src/nonblocking/mod.rs @@ -0,0 +1 @@ +pub mod client_connection; diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index aad0af9371..74343ef68e 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -7,7 +7,7 @@ use { tracer_packet_stats::TracerPacketStats, unprocessed_transaction_storage::UnprocessedTransactionStorage, }, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_perf::{data_budget::DataBudget, packet::Packet}, @@ -197,7 +197,7 @@ impl Forwarder { .forwarded_transaction_count .fetch_add(packet_vec_len, Ordering::Relaxed); let conn = connection_cache.get_connection(&addr); - conn.send_wire_transaction_batch_async(packet_vec) + conn.send_data_batch_async(packet_vec) }; measure.stop(); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 6ffeeaaacc..e5e2933fa2 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -13,7 +13,7 @@ use { solana_streamer::streamer::{ self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{ net::UdpSocket, sync::{ diff --git a/core/src/validator.rs b/core/src/validator.rs index fd47843457..414bdaba6d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -759,11 +759,12 @@ impl Validator { let connection_cache = match use_quic { true => { - let mut connection_cache = ConnectionCache::new(tpu_connection_pool_size); - connection_cache - .update_client_certificate(&identity_keypair, node.info.gossip.ip()) - .expect("Failed to update QUIC client certificates"); - connection_cache.set_staked_nodes(&staked_nodes, &identity_keypair.pubkey()); + let connection_cache = ConnectionCache::new_with_client_options( + tpu_connection_pool_size, + None, + Some((&identity_keypair, node.info.gossip.ip())), + Some((&staked_nodes, &identity_keypair.pubkey())), + ); Arc::new(connection_cache) } false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), @@ -2089,7 +2090,7 @@ mod tests { crossbeam_channel::{bounded, RecvTimeoutError}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, std::{fs::remove_dir_all, thread, time::Duration}, diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index d685210dd7..0772eb65ad 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,7 +3,7 @@ use { rand::{thread_rng, Rng}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ @@ -50,7 +50,7 @@ impl WarmQuicCacheService { .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) { let conn = connection_cache.get_connection(&addr); - if let Err(err) = conn.send_wire_transaction([0u8]) { + if let Err(err) = conn.send_data(&[0u8]) { warn!( "Failed to warmup QUIC connection to the leader {:?}, Error {:?}", leader_pubkey, err diff --git a/dos/src/main.rs b/dos/src/main.rs index f72c86ba38..f3db52dd09 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ @@ -67,7 +67,7 @@ use { transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ net::{SocketAddr, UdpSocket}, process::exit, @@ -285,7 +285,7 @@ fn create_sender_thread( Ok(tx_batch) => { let len = tx_batch.batch.len(); let mut measure_send_txs = Measure::start("measure_send_txs"); - let res = connection.send_wire_transaction_batch_async(tx_batch.batch); + let res = connection.send_data_batch_async(tx_batch.batch); measure_send_txs.stop(); time_send_ns += measure_send_txs.as_ns(); diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 1b759044c9..029ad46a5f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -44,7 +44,7 @@ use { }, solana_stake_program::{config::create_account as create_stake_config_account, stake_state}, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, solana_vote_program::{ diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9760d25595..d705ac4e54 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4495,6 +4495,7 @@ dependencies = [ "quinn", "rand 0.7.3", "rayon", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -4532,6 +4533,25 @@ dependencies = [ "solana-sdk 1.16.0", ] +[[package]] +name = "solana-connection-cache" +version = "1.16.0" +dependencies = [ + "async-trait", + "bincode", + "futures-util", + "indexmap", + "log", + "rand 0.7.3", + "rayon", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-sdk 1.16.0", + "thiserror", + "tokio", +] + [[package]] name = "solana-core" version = "1.16.0" @@ -5180,6 +5200,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustls 0.20.6", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -6047,6 +6068,8 @@ version = "1.16.0" dependencies = [ "bincode", "log", + "rayon", + "solana-connection-cache", "solana-rpc-client", "solana-rpc-client-api", "solana-sdk 1.16.0", @@ -6065,6 +6088,7 @@ dependencies = [ "log", "rand 0.7.3", "rayon", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -6105,6 +6129,7 @@ name = "solana-udp-client" version = "1.16.0" dependencies = [ "async-trait", + "solana-connection-cache", "solana-net-utils", "solana-sdk 1.16.0", "solana-streamer", diff --git a/quic-client/Cargo.toml b/quic-client/Cargo.toml index c3f6745900..3cede2c276 100644 --- a/quic-client/Cargo.toml +++ b/quic-client/Cargo.toml @@ -20,6 +20,8 @@ quinn = "0.9.3" quinn-proto = "0.9.2" quinn-udp = "0.3.2" rustls = { version = "0.20.6", default-features = false, features = ["dangerous_configuration", "logging"] } + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 6364366749..855eefeff0 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -9,10 +9,20 @@ extern crate solana_metrics; use { crate::{ nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, - QuicTpuConnection as NonblockingQuicTpuConnection, + QuicClient, QuicClientCertificate, + QuicClientConnection as NonblockingQuicClientConnection, QuicLazyInitializedEndpoint, }, - quic_client::QuicTpuConnection as BlockingQuicTpuConnection, + quic_client::QuicClientConnection as BlockingQuicClientConnection, + }, + quinn::Endpoint, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, + ConnectionPoolError, NewConnectionConfig, ProtocolType, + }, + connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ @@ -20,13 +30,8 @@ use { streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, - tpu_connection_cache::{ - BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig, - }, - }, std::{ + any::Any, error::Error, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{Arc, RwLock}, @@ -41,25 +46,12 @@ pub enum QuicClientError { } pub struct QuicPool { - connections: Vec>, + connections: Vec>, endpoint: Arc, } impl ConnectionPool for QuicPool { - type PoolTpuConnection = Quic; - type TpuConfig = QuicConfig; - const PORT_OFFSET: u16 = QUIC_PORT_OFFSET; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - endpoint: config.create_endpoint(), - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -67,7 +59,7 @@ impl ConnectionPool for QuicPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -76,14 +68,15 @@ impl ConnectionPool for QuicPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - Quic(Arc::new(QuicClient::new( + ) -> Arc { + let config = QuicConfig::downcast_ref(config); + Arc::new(Quic(Arc::new(QuicClient::new( self.endpoint.clone(), *addr, config.compute_max_parallel_streams(), - ))) + )))) } } @@ -91,15 +84,17 @@ pub struct QuicConfig { client_certificate: Arc, maybe_staked_nodes: Option>>, maybe_client_pubkey: Option, + + // The optional specified endpoint for the quic based client connections + // If not specified, the connection cache will create as needed. + client_endpoint: Option, } -impl NewTpuConfig for QuicConfig { - type ClientError = QuicClientError; - - fn new() -> Result { +impl NewConnectionConfig for QuicConfig { + fn new() -> Result { let (cert, priv_key) = new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(|err| QuicClientError::CertificateError(err.to_string()))?; + .map_err(|err| ClientError::CertificateError(err.to_string()))?; Ok(Self { client_certificate: Arc::new(QuicClientCertificate { certificate: cert, @@ -107,16 +102,25 @@ impl NewTpuConfig for QuicConfig { }), maybe_staked_nodes: None, maybe_client_pubkey: None, + client_endpoint: None, }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } impl QuicConfig { - fn create_endpoint(&self) -> Arc { - Arc::new(QuicLazyInitializedEndpoint::new( + fn create_endpoint(&self) -> QuicLazyInitializedEndpoint { + QuicLazyInitializedEndpoint::new( self.client_certificate.clone(), - None, - )) + self.client_endpoint.as_ref().cloned(), + ) } fn compute_max_parallel_streams(&self) -> usize { @@ -158,30 +162,84 @@ impl QuicConfig { self.maybe_staked_nodes = Some(staked_nodes.clone()); self.maybe_client_pubkey = Some(*client_pubkey); } + + pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) { + self.client_endpoint = Some(client_endpoint); + } + + /// Convenient function to downcast a generic NewConnectionConfig reference to QuicConfig + pub fn downcast_ref(config: &dyn NewConnectionConfig) -> &Self { + match config.as_any().downcast_ref::() { + Some(config) => config, + None => panic!("Expecting a QuicConfig!"), + } + } } pub struct Quic(Arc); -impl BaseTpuConnection for Quic { - type BlockingConnectionType = BlockingQuicTpuConnection; - type NonblockingConnectionType = NonblockingQuicTpuConnection; - +impl BaseClientConnection for Quic { fn new_blocking_connection( &self, _addr: SocketAddr, stats: Arc, - ) -> BlockingQuicTpuConnection { - BlockingQuicTpuConnection::new_with_client(self.0.clone(), stats) + ) -> Arc { + Arc::new(BlockingQuicClientConnection::new_with_client( + self.0.clone(), + stats, + )) } fn new_nonblocking_connection( &self, _addr: SocketAddr, stats: Arc, - ) -> NonblockingQuicTpuConnection { - NonblockingQuicTpuConnection::new_with_client(self.0.clone(), stats) + ) -> Arc { + Arc::new(NonblockingQuicClientConnection::new_with_client( + self.0.clone(), + stats, + )) } } +#[derive(Default)] +pub struct QuicConnectionManager { + connection_config: Option>, +} + +impl ConnectionManager for QuicConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(QuicPool { + connections: Vec::default(), + endpoint: Arc::new(self.connection_config.as_ref().map_or( + QuicLazyInitializedEndpoint::default(), + |config| { + let config = QuicConfig::downcast_ref(config.as_ref()); + config.create_endpoint() + }, + )), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(QuicConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + QUIC_PORT_OFFSET + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::QUIC + } +} + +impl QuicConnectionManager { + pub fn new_with_connection_config(config: QuicConfig) -> Self { + Self { + connection_config: Some(Box::new(config)), + } + } +} #[cfg(test)] mod tests { use { @@ -190,33 +248,29 @@ mod tests { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, - solana_tpu_client::tpu_connection_cache::{ - TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, - }, }; #[test] fn test_connection_cache_max_parallel_chunks() { solana_logger::setup(); - let connection_cache = - TpuConnectionCache::::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap(); - let mut tpu_config = connection_cache.tpu_config; + + let mut connection_config = QuicConfig::new().unwrap(); assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let pubkey = Pubkey::new_unique(); - tpu_config.set_staked_nodes(&staked_nodes, &pubkey); + connection_config.set_staked_nodes(&staked_nodes, &pubkey); assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); staked_nodes.write().unwrap().total_stake = 10000; assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); @@ -230,7 +284,7 @@ mod tests { (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize ); @@ -245,7 +299,7 @@ mod tests { .pubkey_stake_map .insert(pubkey, 1000); assert_ne!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); } diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 11d8a6a3b3..231b641bbb 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -1,5 +1,5 @@ //! Simple nonblocking client that connects to a given UDP port with the QUIC protocol -//! and provides an interface for sending transactions which is restricted by the +//! and provides an interface for sending data which is restricted by the //! server's flow control. use { async_mutex::Mutex, @@ -11,6 +11,10 @@ use { ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError, }, + solana_connection_cache::{ + client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection, + }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind, @@ -25,10 +29,6 @@ use { solana_streamer::{ nonblocking::quic::ALPN_TPU_PROTOCOL_ID, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection, - tpu_connection::ClientStats, - }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, @@ -397,21 +397,21 @@ impl QuicClient { connection_stats .total_client_stats - .tx_streams_blocked_uni + .streams_blocked_uni .update_stat( - &self.stats.tx_streams_blocked_uni, + &self.stats.streams_blocked_uni, new_stats.frame_tx.streams_blocked_uni, ); connection_stats .total_client_stats - .tx_data_blocked - .update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked); + .data_blocked + .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked); connection_stats .total_client_stats - .tx_acks - .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); + .acks + .update_stat(&self.stats.acks, new_stats.frame_tx.acks); last_connection_id = connection.stable_id(); match Self::_send_buffer_using_conn(data, &connection).await { @@ -438,7 +438,7 @@ impl QuicClient { // if we come here, that means we have exhausted maximum retries, return the error info!( - "Ran into an error sending transactions {:?}, exhausted retries to {}", + "Ran into an error sending data {:?}, exhausted retries to {}", last_error, self.addr ); // If we get here but last_error is None, then we have a logic error @@ -470,12 +470,12 @@ impl QuicClient { where T: AsRef<[u8]>, { - // Start off by "testing" the connection by sending the first transaction + // Start off by "testing" the connection by sending the first buffer // This will also connect to the server if not already connected // and reconnect and retry if the first send attempt failed // (for example due to a timed out connection), returning an error - // or the connection that was used to successfully send the transaction. - // We will use the returned connection to send the rest of the transactions in the batch + // or the connection that was used to successfully send the buffer. + // We will use the returned connection to send the rest of the buffers in the batch // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way // since testing even in the ideal GCE environment has found no cases // where reconnecting and retrying in the middle of a batch send @@ -515,7 +515,7 @@ impl QuicClient { Ok(()) } - pub fn tpu_addr(&self) -> &SocketAddr { + pub fn server_addr(&self) -> &SocketAddr { &self.addr } @@ -524,12 +524,12 @@ impl QuicClient { } } -pub struct QuicTpuConnection { +pub struct QuicClientConnection { pub client: Arc, pub connection_stats: Arc, } -impl QuicTpuConnection { +impl QuicClientConnection { pub fn base_stats(&self) -> Arc { self.client.stats() } @@ -563,15 +563,12 @@ impl QuicTpuConnection { } #[async_trait] -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.client.tpu_addr() +impl ClientConnection for QuicClientConnection { + fn server_addr(&self) -> &SocketAddr { + self.client.server_addr() } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { let stats = ClientStats::default(); let len = buffers.len(); let res = self @@ -584,18 +581,15 @@ impl TpuConnection for QuicTpuConnection { Ok(()) } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data(&self, data: &[u8]) -> TransportResult<()> { let stats = Arc::new(ClientStats::default()); - let send_buffer = - self.client - .send_buffer(wire_transaction, &stats, self.connection_stats.clone()); + let send_buffer = self + .client + .send_buffer(data, &stats, self.connection_stats.clone()); if let Err(e) = send_buffer.await { warn!( - "Failed to send transaction async to {}, error: {:?} ", - self.tpu_addr(), + "Failed to send data async to {}, error: {:?} ", + self.server_addr(), e ); datapoint_warn!("send-wire-async", ("failure", 1, i64),); diff --git a/quic-client/src/quic_client.rs b/quic-client/src/quic_client.rs index 0cc5c9e353..39726deb1f 100644 --- a/quic-client/src/quic_client.rs +++ b/quic-client/src/quic_client.rs @@ -1,18 +1,18 @@ //! Simple client that connects to a given UDP port with the QUIC protocol and provides -//! an interface for sending transactions which is restricted by the server's flow control. +//! an interface for sending data which is restricted by the server's flow control. use { crate::nonblocking::quic_client::{ - QuicClient, QuicLazyInitializedEndpoint, QuicTpuConnection as NonblockingQuicTpuConnection, + QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint, }, lazy_static::lazy_static, log::*, - solana_sdk::transport::{Result as TransportResult, TransportError}, - solana_tpu_client::{ + solana_connection_cache::{ + client_connection::{ClientConnection, ClientStats}, connection_cache_stats::ConnectionCacheStats, - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::{ClientStats, TpuConnection}, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, + solana_sdk::transport::{Result as TransportResult, TransportError}, std::{ net::SocketAddr, sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard}, @@ -21,125 +21,119 @@ use { tokio::{runtime::Runtime, time::timeout}, }; -pub mod temporary_pub { - use super::*; +pub const MAX_OUTSTANDING_TASK: u64 = 2000; +pub const SEND_DATA_TIMEOUT_MS: u64 = 10000; - pub const MAX_OUTSTANDING_TASK: u64 = 2000; - pub const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000; +/// A semaphore used for limiting the number of asynchronous tasks spawn to the +/// runtime. Before spawnning a task, use acquire. After the task is done (be it +/// succsess or failure), call release. +struct AsyncTaskSemaphore { + /// Keep the counter info about the usage + counter: Mutex, + /// Conditional variable for signaling when counter is decremented + cond_var: Condvar, + /// The maximum usage allowed by this semaphore. + permits: u64, +} - /// A semaphore used for limiting the number of asynchronous tasks spawn to the - /// runtime. Before spawnning a task, use acquire. After the task is done (be it - /// succsess or failure), call release. - pub struct AsyncTaskSemaphore { - /// Keep the counter info about the usage - counter: Mutex, - /// Conditional variable for signaling when counter is decremented - cond_var: Condvar, - /// The maximum usage allowed by this semaphore. - permits: u64, - } - - impl AsyncTaskSemaphore { - pub fn new(permits: u64) -> Self { - Self { - counter: Mutex::new(0), - cond_var: Condvar::new(), - permits, - } - } - - /// When returned, the lock has been locked and usage count has been - /// incremented. When the returned MutexGuard is dropped the lock is dropped - /// without decrementing the usage count. - pub fn acquire(&self) -> MutexGuard { - let mut count = self.counter.lock().unwrap(); - *count += 1; - while *count > self.permits { - count = self.cond_var.wait(count).unwrap(); - } - count - } - - /// Acquire the lock and decrement the usage count - pub fn release(&self) { - let mut count = self.counter.lock().unwrap(); - *count -= 1; - self.cond_var.notify_one(); +impl AsyncTaskSemaphore { + pub fn new(permits: u64) -> Self { + Self { + counter: Mutex::new(0), + cond_var: Condvar::new(), + permits, } } - lazy_static! { - pub static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = - AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); - pub static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("quic-client") - .enable_all() - .build() - .unwrap(); + /// When returned, the lock has been locked and usage count has been + /// incremented. When the returned MutexGuard is dropped the lock is dropped + /// without decrementing the usage count. + pub fn acquire(&self) -> MutexGuard { + let mut count = self.counter.lock().unwrap(); + *count += 1; + while *count > self.permits { + count = self.cond_var.wait(count).unwrap(); + } + count } - pub async fn send_wire_transaction_async( - connection: Arc, - wire_transaction: Vec, - ) -> TransportResult<()> { - let result = timeout( - Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS), - connection.send_wire_transaction(wire_transaction), - ) - .await; - ASYNC_TASK_SEMAPHORE.release(); - handle_send_result(result, connection) + /// Acquire the lock and decrement the usage count + pub fn release(&self) { + let mut count = self.counter.lock().unwrap(); + *count -= 1; + self.cond_var.notify_one(); } +} - pub async fn send_wire_transaction_batch_async( - connection: Arc, - buffers: Vec>, - ) -> TransportResult<()> { - let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64; +lazy_static! { + static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = + AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); + static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("quic-client") + .enable_all() + .build() + .unwrap(); +} - let result = timeout( - Duration::from_millis(time_out), - connection.send_wire_transaction_batch(&buffers), - ) - .await; - ASYNC_TASK_SEMAPHORE.release(); - handle_send_result(result, connection) - } +async fn send_data_async( + connection: Arc, + buffer: Vec, +) -> TransportResult<()> { + let result = timeout( + Duration::from_millis(SEND_DATA_TIMEOUT_MS), + connection.send_data(&buffer), + ) + .await; + ASYNC_TASK_SEMAPHORE.release(); + handle_send_result(result, connection) +} - /// Check the send result and update stats if timedout. Returns the checked result. - pub fn handle_send_result( - result: Result, tokio::time::error::Elapsed>, - connection: Arc, - ) -> Result<(), TransportError> { - match result { - Ok(result) => result, - Err(_err) => { - let client_stats = ClientStats::default(); - client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); - let stats = connection.connection_stats(); - stats.add_client_stats(&client_stats, 0, false); - info!("Timedout sending transaction {:?}", connection.tpu_addr()); - Err(TransportError::Custom( - "Timedout sending transaction".to_string(), - )) - } +async fn send_data_batch_async( + connection: Arc, + buffers: Vec>, +) -> TransportResult<()> { + let time_out = SEND_DATA_TIMEOUT_MS * buffers.len() as u64; + + let result = timeout( + Duration::from_millis(time_out), + connection.send_data_batch(&buffers), + ) + .await; + ASYNC_TASK_SEMAPHORE.release(); + handle_send_result(result, connection) +} + +/// Check the send result and update stats if timedout. Returns the checked result. +fn handle_send_result( + result: Result, tokio::time::error::Elapsed>, + connection: Arc, +) -> Result<(), TransportError> { + match result { + Ok(result) => result, + Err(_err) => { + let client_stats = ClientStats::default(); + client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); + let stats = connection.connection_stats(); + stats.add_client_stats(&client_stats, 0, false); + info!("Timedout sending data {:?}", connection.server_addr()); + Err(TransportError::Custom("Timedout sending data".to_string())) } } } -use temporary_pub::*; -pub struct QuicTpuConnection { - pub inner: Arc, +pub struct QuicClientConnection { + pub inner: Arc, } -impl QuicTpuConnection { + +impl QuicClientConnection { pub fn new( endpoint: Arc, - tpu_addr: SocketAddr, + server_addr: SocketAddr, connection_stats: Arc, ) -> Self { - let inner = Arc::new(NonblockingQuicTpuConnection::new( + let inner = Arc::new(NonblockingQuicConnection::new( endpoint, - tpu_addr, + server_addr, connection_stats, )); Self { inner } @@ -149,7 +143,7 @@ impl QuicTpuConnection { client: Arc, connection_stats: Arc, ) -> Self { - let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client( + let inner = Arc::new(NonblockingQuicConnection::new_with_client( client, connection_stats, )); @@ -157,33 +151,33 @@ impl QuicTpuConnection { } } -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.inner.tpu_addr() +impl ClientConnection for QuicClientConnection { + fn server_addr(&self) -> &SocketAddr { + self.inner.server_addr() } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + RUNTIME.block_on(self.inner.send_data_batch(buffers))?; Ok(()) } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + fn send_data_async(&self, data: Vec) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - let _handle = RUNTIME - .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); + let _handle = RUNTIME.spawn(async move { send_data_async(inner, data).await }); Ok(()) } - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - let _handle = - RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); + let _handle = RUNTIME.spawn(async move { send_data_batch_async(inner, buffers).await }); + Ok(()) + } + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + RUNTIME.block_on(self.inner.send_data(buffer))?; Ok(()) } } diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index aaeff7c1b4..f48e8b369b 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -3,6 +3,7 @@ mod tests { use { crossbeam_channel::{unbounded, Receiver}, log::*, + solana_connection_cache::connection_cache_stats::ConnectionCacheStats, solana_perf::packet::PacketBatch, solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, @@ -12,7 +13,6 @@ mod tests { nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::connection_cache_stats::ConnectionCacheStats, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ @@ -67,8 +67,8 @@ mod tests { #[test] fn test_quic_client_multiple_writes() { use { - solana_quic_client::quic_client::QuicTpuConnection, - solana_tpu_client::tpu_connection::TpuConnection, + solana_connection_cache::client_connection::ClientConnection, + solana_quic_client::quic_client::QuicClientConnection, }; solana_logger::setup(); let (sender, receiver) = unbounded(); @@ -93,7 +93,7 @@ mod tests { let port = s.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - let client = QuicTpuConnection::new( + let client = QuicClientConnection::new( Arc::new(QuicLazyInitializedEndpoint::default()), tpu_addr, connection_cache_stats, @@ -104,7 +104,7 @@ mod tests { let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(client.send_wire_transaction_batch_async(packets).is_ok()); + assert!(client.send_data_batch_async(packets).is_ok()); check_packets(receiver, num_bytes, num_expected_packets); exit.store(true, Ordering::Relaxed); @@ -114,8 +114,8 @@ mod tests { #[tokio::test] async fn test_nonblocking_quic_client_multiple_writes() { use { - solana_quic_client::nonblocking::quic_client::QuicTpuConnection, - solana_tpu_client::nonblocking::tpu_connection::TpuConnection, + solana_connection_cache::nonblocking::client_connection::ClientConnection, + solana_quic_client::nonblocking::quic_client::QuicClientConnection, }; solana_logger::setup(); let (sender, receiver) = unbounded(); @@ -140,7 +140,7 @@ mod tests { let port = s.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - let client = QuicTpuConnection::new( + let client = QuicClientConnection::new( Arc::new(QuicLazyInitializedEndpoint::default()), tpu_addr, connection_cache_stats, @@ -150,8 +150,7 @@ mod tests { let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - - assert!(client.send_wire_transaction_batch(&packets).await.is_ok()); + assert!(client.send_data_batch(&packets).await.is_ok()); check_packets(receiver, num_bytes, num_expected_packets); exit.store(true, Ordering::Relaxed); @@ -168,8 +167,8 @@ mod tests { /// In this we demonstrate that the request sender and the response receiver use the /// same quic Endpoint, and the same UDP socket. use { - solana_quic_client::quic_client::QuicTpuConnection, - solana_tpu_client::tpu_connection::TpuConnection, + solana_connection_cache::client_connection::ClientConnection, + solana_quic_client::quic_client::QuicClientConnection, }; solana_logger::setup(); @@ -239,15 +238,13 @@ mod tests { let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = - QuicTpuConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats); + QuicClientConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats); // Send a full size packet with single byte writes as a request. let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(request_sender - .send_wire_transaction_batch_async(packets) - .is_ok()); + assert!(request_sender.send_data_batch_async(packets).is_ok()); check_packets(receiver, num_bytes, num_expected_packets); info!("Received requests!"); @@ -264,16 +261,14 @@ mod tests { let endpoint2 = QuicLazyInitializedEndpoint::new(client_certificate2, None); let connection_cache_stats2 = Arc::new(ConnectionCacheStats::default()); let response_sender = - QuicTpuConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2); + QuicClientConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2); // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(response_sender - .send_wire_transaction_batch_async(packets) - .is_ok()); + assert!(response_sender.send_data_batch_async(packets).is_ok()); check_packets(receiver2, num_bytes, num_expected_packets); info!("Received responses!"); diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 0335f7e11f..54fbf26fd1 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -7,7 +7,7 @@ use { serde_json::{json, Value}, solana_account_decoder::UiAccount, solana_client::{ - connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::ConnectionCache, tpu_client::{TpuClient, TpuClientConfig}, }, solana_pubsub_client::nonblocking::pubsub_client::PubsubClient, @@ -29,6 +29,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_test_validator::TestValidator, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, solana_transaction_status::TransactionStatus, std::{ collections::HashSet, diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index fa693ae3c3..f96794b156 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,7 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -706,7 +706,7 @@ impl SendTransactionService { connection_cache: &Arc, ) -> Result<(), TransportError> { let conn = connection_cache.get_connection(tpu_address); - conn.send_wire_transaction_async(wire_transaction.to_vec()) + conn.send_data_async(wire_transaction.to_vec()) } fn send_transactions_with_metrics( @@ -716,7 +716,7 @@ impl SendTransactionService { ) -> Result<(), TransportError> { let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); let conn = connection_cache.get_connection(tpu_address); - conn.send_wire_transaction_batch_async(wire_transactions) + conn.send_data_batch_async(wire_transactions) } fn send_transactions( diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 7d28fce8cd..1d20306d6c 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -44,7 +44,7 @@ use { signature::{read_keypair_file, write_keypair_file, Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, std::{ diff --git a/thin-client/Cargo.toml b/thin-client/Cargo.toml index 99220f5c59..df89c43776 100644 --- a/thin-client/Cargo.toml +++ b/thin-client/Cargo.toml @@ -12,13 +12,14 @@ edition = "2021" [dependencies] bincode = "1.3.3" log = "0.4.17" +rayon = "1.5.3" +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false } solana-rpc-client = { path = "../rpc-client", version = "=1.16.0", default-features = false } solana-rpc-client-api = { path = "../rpc-client-api", version = "=1.16.0" } solana-sdk = { path = "../sdk", version = "=1.16.0" } solana-tpu-client = { path = "../tpu-client", version = "=1.16.0", default-features = false } [dev-dependencies] -rayon = "1.5.3" solana-logger = { path = "../logger", version = "=1.16.0" } [package.metadata.docs.rs] diff --git a/thin-client/src/thin_client.rs b/thin-client/src/thin_client.rs index 7a43b71d2e..7cde909b24 100644 --- a/thin-client/src/thin_client.rs +++ b/thin-client/src/thin_client.rs @@ -5,6 +5,8 @@ use { log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_connection_cache::connection_cache::ConnectionCache, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ @@ -25,10 +27,6 @@ use { transaction::{self, Transaction, VersionedTransaction}, transport::Result as TransportResult, }, - solana_tpu_client::{ - tpu_connection::TpuConnection, - tpu_connection_cache::{ConnectionPool, TpuConnectionCache}, - }, std::{ io, net::SocketAddr, @@ -113,21 +111,21 @@ pub mod temporary_pub { use temporary_pub::*; /// An object for querying and sending transactions to the network. -pub struct ThinClient { +pub struct ThinClient { rpc_clients: Vec, tpu_addrs: Vec, optimizer: ClientOptimizer, - connection_cache: Arc>, + connection_cache: Arc, } -impl ThinClient

{ +impl ThinClient { /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP /// (currently hardcoded to UDP) pub fn new( rpc_addr: SocketAddr, tpu_addr: SocketAddr, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache) } @@ -136,7 +134,7 @@ impl ThinClient

{ rpc_addr: SocketAddr, tpu_addr: SocketAddr, timeout: Duration, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); Self::new_from_client(rpc_client, tpu_addr, connection_cache) @@ -145,7 +143,7 @@ impl ThinClient

{ fn new_from_client( rpc_client: RpcClient, tpu_addr: SocketAddr, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { Self { rpc_clients: vec![rpc_client], @@ -158,7 +156,7 @@ impl ThinClient

{ pub fn new_from_addrs( rpc_addrs: Vec, tpu_addrs: Vec, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { assert!(!rpc_addrs.is_empty()); assert_eq!(rpc_addrs.len(), tpu_addrs.len()); @@ -221,7 +219,7 @@ impl ThinClient

{ let conn = self.connection_cache.get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) #[allow(clippy::needless_borrow)] - conn.send_wire_transaction(&wire_transaction)?; + conn.send_data(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -316,13 +314,13 @@ impl ThinClient

{ } } -impl Client for ThinClient

{ +impl Client for ThinClient { fn tpu_addr(&self) -> String { self.tpu_addr().to_string() } } -impl SyncClient for ThinClient

{ +impl SyncClient for ThinClient { fn send_and_confirm_message( &self, keypairs: &T, @@ -602,13 +600,15 @@ impl SyncClient for ThinClient

{ } } -impl AsyncClient for ThinClient

{ +impl AsyncClient for ThinClient { fn async_send_versioned_transaction( &self, transaction: VersionedTransaction, ) -> TransportResult { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.serialize_and_send_transaction(&transaction)?; + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + conn.send_data(&wire_transaction)?; Ok(transaction.signatures[0]) } @@ -617,7 +617,11 @@ impl AsyncClient for ThinClient

{ batch: Vec, ) -> TransportResult<()> { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.par_serialize_and_send_transaction_batch(&batch[..])?; + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + conn.send_data_batch(&buffers)?; Ok(()) } } @@ -636,7 +640,7 @@ fn min_index(array: &[u64]) -> (u64, usize) { #[cfg(test)] mod tests { - use {super::*, rayon::prelude::*}; + use super::*; #[test] fn test_client_optimizer() { diff --git a/tpu-client/Cargo.toml b/tpu-client/Cargo.toml index b51765d883..342108fd5f 100644 --- a/tpu-client/Cargo.toml +++ b/tpu-client/Cargo.toml @@ -18,6 +18,8 @@ indicatif = { version = "0.17.1", optional = true } log = "0.4.17" rand = "0.7.0" rayon = "1.5.3" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/tpu-client/src/lib.rs b/tpu-client/src/lib.rs index 5fec3e4eda..199a09091e 100644 --- a/tpu-client/src/lib.rs +++ b/tpu-client/src/lib.rs @@ -1,10 +1,6 @@ #![allow(clippy::integer_arithmetic)] -pub mod connection_cache_stats; pub mod nonblocking; pub mod tpu_client; -pub mod tpu_connection; -pub mod tpu_connection_cache; -#[macro_use] extern crate solana_metrics; diff --git a/tpu-client/src/nonblocking/mod.rs b/tpu-client/src/nonblocking/mod.rs index fe2549410a..f5461aefd2 100644 --- a/tpu-client/src/nonblocking/mod.rs +++ b/tpu-client/src/nonblocking/mod.rs @@ -1,2 +1 @@ pub mod tpu_client; -pub mod tpu_connection; diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 0edafb4f0b..626858fe31 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -7,16 +7,13 @@ use { solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, }; use { - crate::{ - nonblocking::tpu_connection::TpuConnection, - tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS}, - tpu_connection_cache::{ - ConnectionPool, TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, - }, - }, + crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS}, bincode::serialize, futures_util::{future::join_all, stream::StreamExt}, log::*, + solana_connection_cache::connection_cache::{ + ConnectionCache, ConnectionManager, DEFAULT_CONNECTION_POOL_SIZE, + }, solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError}, solana_rpc_client::nonblocking::rpc_client::RpcClient, solana_rpc_client_api::{ @@ -253,33 +250,33 @@ impl LeaderTpuCache { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient { fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, rpc_client: Arc, - connection_cache: Arc>, + connection_cache: Arc, } -async fn send_wire_transaction_to_addr( - connection_cache: &TpuConnectionCache

, +async fn send_wire_transaction_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone()).await + conn.send_data(&wire_transaction).await } -async fn send_wire_transaction_batch_to_addr( - connection_cache: &TpuConnectionCache

, +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions).await + conn.send_data_batch(wire_transactions).await } -impl TpuClient

{ +impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub async fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -394,9 +391,11 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, + connection_manager: Box, ) -> Result { - let connection_cache = - Arc::new(TpuConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap()); // TODO: Handle error properly, as the TpuConnectionCache ctor is now fallible. + let connection_cache = Arc::new( + ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(), + ); // TODO: Handle error properly, as the ConnectionCache ctor is now fallible. Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await } @@ -405,7 +404,7 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc>, + connection_cache: Arc, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let leader_tpu_service = @@ -554,7 +553,7 @@ impl TpuClient

{ } } -impl Drop for TpuClient

{ +impl Drop for TpuClient { fn drop(&mut self) { self.exit.store(true, Ordering::Relaxed); } diff --git a/tpu-client/src/nonblocking/tpu_connection.rs b/tpu-client/src/nonblocking/tpu_connection.rs deleted file mode 100644 index 4a711ac7b8..0000000000 --- a/tpu-client/src/nonblocking/tpu_connection.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Trait defining async send functions, to be used for UDP or QUIC sending - -use { - async_trait::async_trait, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::net::SocketAddr, -}; - -#[async_trait] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - async fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction).await - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; -} diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 3a343fb301..887f2233e0 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -1,10 +1,8 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { - crate::{ - nonblocking::tpu_client::TpuClient as NonblockingTpuClient, - tpu_connection_cache::{ConnectionPool, TpuConnectionCache}, - }, + crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_connection_cache::connection_cache::{ConnectionCache, ConnectionManager}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, std::{ @@ -19,6 +17,10 @@ use { tokio::time::Duration, }; +pub const DEFAULT_TPU_ENABLE_UDP: bool = false; +pub const DEFAULT_TPU_USE_QUIC: bool = true; +pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; + pub mod temporary_pub { use super::*; @@ -57,14 +59,14 @@ impl Default for TpuClientConfig { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient { _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket //todo: get rid of this field rpc_client: Arc, - tpu_client: Arc>, + tpu_client: Arc, } -impl TpuClient

{ +impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -108,9 +110,14 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, + connection_manager: Box, ) -> Result { - let create_tpu_client = - NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); + let create_tpu_client = NonblockingTpuClient::new( + rpc_client.get_inner_client().clone(), + websocket_url, + config, + connection_manager, + ); let tpu_client = tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; @@ -126,7 +133,7 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc>, + connection_cache: Arc, ) -> Result { let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( rpc_client.get_inner_client().clone(), diff --git a/tpu-client/src/tpu_connection.rs b/tpu-client/src/tpu_connection.rs deleted file mode 100644 index 44a35c899c..0000000000 --- a/tpu-client/src/tpu_connection.rs +++ /dev/null @@ -1,63 +0,0 @@ -use { - rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_metrics::MovingStat, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::{net::SocketAddr, sync::atomic::AtomicU64}, -}; - -#[derive(Default)] -pub struct ClientStats { - pub total_connections: AtomicU64, - pub connection_reuse: AtomicU64, - pub connection_errors: AtomicU64, - pub zero_rtt_accepts: AtomicU64, - pub zero_rtt_rejects: AtomicU64, - - // these will be the last values of these stats - pub congestion_events: MovingStat, - pub tx_streams_blocked_uni: MovingStat, - pub tx_data_blocked: MovingStat, - pub tx_acks: MovingStat, - pub make_connection_ms: AtomicU64, - pub send_timeout: AtomicU64, -} - -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(wire_transaction) - } - - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.send_wire_transaction_batch(&[wire_transaction]) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; - - fn par_serialize_and_send_transaction_batch( - &self, - transactions: &[VersionedTransaction], - ) -> TransportResult<()> { - let buffers = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - - self.send_wire_transaction_batch(&buffers) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()>; -} diff --git a/udp-client/Cargo.toml b/udp-client/Cargo.toml index 1a212d78d0..e86e55f581 100644 --- a/udp-client/Cargo.toml +++ b/udp-client/Cargo.toml @@ -11,6 +11,8 @@ edition = "2021" [dependencies] async-trait = "0.1.57" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } solana-sdk = { path = "../sdk", version = "=1.16.0" } solana-streamer = { path = "../streamer", version = "=1.16.0" } diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index ec8e383571..d634afef57 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -5,45 +5,31 @@ pub mod udp_client; use { crate::{ - nonblocking::udp_client::UdpTpuConnection as NonblockingUdpTpuConnection, - udp_client::UdpTpuConnection as BlockingUdpTpuConnection, + nonblocking::udp_client::UdpClientConnection as NonblockingUdpConnection, + udp_client::UdpClientConnection as BlockingUdpConnection, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, - tpu_connection_cache::{ - BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, + ConnectionPoolError, NewConnectionConfig, ProtocolType, }, + connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, std::{ + any::Any, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::Arc, }, - thiserror::Error, }; -#[derive(Error, Debug)] -pub enum UdpClientError { - #[error("IO error: {0:?}")] - IoError(#[from] std::io::Error), -} - pub struct UdpPool { - connections: Vec>, + connections: Vec>, } impl ConnectionPool for UdpPool { - type PoolTpuConnection = Udp; - type TpuConfig = UdpConfig; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -51,7 +37,7 @@ impl ConnectionPool for UdpPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -60,47 +46,80 @@ impl ConnectionPool for UdpPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, _addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - Udp(config.tpu_udp_socket.clone()) + ) -> Arc { + let config: &UdpConfig = match config.as_any().downcast_ref::() { + Some(b) => b, + None => panic!("Expecting a UdpConfig!"), + }; + Arc::new(Udp(config.udp_socket.clone())) } } pub struct UdpConfig { - tpu_udp_socket: Arc, + udp_socket: Arc, } -impl NewTpuConfig for UdpConfig { - type ClientError = UdpClientError; - - fn new() -> Result { +impl NewConnectionConfig for UdpConfig { + fn new() -> Result { let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?; + .map_err(Into::::into)?; Ok(Self { - tpu_udp_socket: Arc::new(socket), + udp_socket: Arc::new(socket), }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } pub struct Udp(Arc); -impl BaseTpuConnection for Udp { - type BlockingConnectionType = BlockingUdpTpuConnection; - type NonblockingConnectionType = NonblockingUdpTpuConnection; - +impl BaseClientConnection for Udp { fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> BlockingUdpTpuConnection { - BlockingUdpTpuConnection::new_from_addr(self.0.clone(), addr) + ) -> Arc { + Arc::new(BlockingUdpConnection::new_from_addr(self.0.clone(), addr)) } fn new_nonblocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> NonblockingUdpTpuConnection { - NonblockingUdpTpuConnection::new_from_addr(self.0.try_clone().unwrap(), addr) + ) -> Arc { + Arc::new(NonblockingUdpConnection::new_from_addr( + self.0.try_clone().unwrap(), + addr, + )) + } +} + +#[derive(Default)] +pub struct UdpConnectionManager {} + +impl ConnectionManager for UdpConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(UdpPool { + connections: Vec::default(), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(UdpConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + 0 + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::UDP } } diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 07c2900271..ab90491470 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -1,50 +1,43 @@ //! Simple UDP client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions +//! an interface for sending data use { - async_trait::async_trait, core::iter::repeat, solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, - solana_tpu_client::nonblocking::tpu_connection::TpuConnection, std::net::SocketAddr, + async_trait::async_trait, core::iter::repeat, + solana_connection_cache::nonblocking::client_connection::ClientConnection, + solana_sdk::transport::Result as TransportResult, + solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, tokio::net::UdpSocket, }; -pub struct UdpTpuConnection { +pub struct UdpClientConnection { pub socket: UdpSocket, pub addr: SocketAddr, } -impl UdpTpuConnection { - pub fn new_from_addr(socket: std::net::UdpSocket, tpu_addr: SocketAddr) -> Self { +impl UdpClientConnection { + pub fn new_from_addr(socket: std::net::UdpSocket, server_addr: SocketAddr) -> Self { socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); Self { socket, - addr: tpu_addr, + addr: server_addr, } } } #[async_trait] -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { +impl ClientConnection for UdpClientConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.socket - .send_to(wire_transaction.as_ref(), self.addr) - .await?; + async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + self.socket.send_to(buffer, self.addr).await?; Ok(()) } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); batch_send(&self.socket, &pkts).await?; Ok(()) } @@ -60,20 +53,17 @@ mod tests { tokio::net::UdpSocket, }; - async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) { + async fn check_send_one(connection: &UdpClientConnection, reader: &UdpSocket) { let packet = vec![111u8; PACKET_DATA_SIZE]; - connection.send_wire_transaction(&packet).await.unwrap(); + connection.send_data(&packet).await.unwrap(); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap(); assert_eq!(1, recv); } - async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) { + async fn check_send_batch(connection: &UdpClientConnection, reader: &UdpSocket) { let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); - connection - .send_wire_transaction_batch(&packets) - .await - .unwrap(); + connection.send_data_batch(&packets).await.unwrap(); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap(); assert_eq!(32, recv); @@ -85,7 +75,7 @@ mod tests { let addr = addr_str.parse().unwrap(); let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); - let connection = UdpTpuConnection::new_from_addr(socket, addr); + let connection = UdpClientConnection::new_from_addr(socket, addr); let reader = UdpSocket::bind(addr_str).await.expect("bind"); check_send_one(&connection, &reader).await; check_send_batch(&connection, &reader).await; diff --git a/udp-client/src/udp_client.rs b/udp-client/src/udp_client.rs index e090b056d0..9ca7d23df9 100644 --- a/udp-client/src/udp_client.rs +++ b/udp-client/src/udp_client.rs @@ -1,63 +1,58 @@ -//! Simple TPU client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions +//! Simple client that communicates with the given UDP port with UDP and provides +//! an interface for sending data use { core::iter::repeat, + solana_connection_cache::client_connection::ClientConnection, solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, tpu_connection::TpuConnection, - }, std::{ net::{SocketAddr, UdpSocket}, sync::Arc, }, }; -pub struct UdpTpuConnection { +pub struct UdpClientConnection { pub socket: Arc, pub addr: SocketAddr, } -impl UdpTpuConnection { - pub fn new_from_addr(local_socket: Arc, tpu_addr: SocketAddr) -> Self { +impl UdpClientConnection { + pub fn new_from_addr(local_socket: Arc, server_addr: SocketAddr) -> Self { Self { socket: local_socket, - addr: tpu_addr, + addr: server_addr, } } - - pub fn new( - local_socket: Arc, - tpu_addr: SocketAddr, - _connection_stats: Arc, - ) -> Self { - Self::new_from_addr(local_socket, tpu_addr) - } } -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { +impl ClientConnection for UdpClientConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - self.socket.send_to(wire_transaction.as_ref(), self.addr)?; + fn send_data_async(&self, data: Vec) -> TransportResult<()> { + self.socket.send_to(data.as_ref(), self.addr)?; Ok(()) } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); batch_send(&self.socket, &pkts)?; Ok(()) } - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + let pkts: Vec<_> = buffers + .into_iter() + .zip(repeat(self.server_addr())) + .collect(); batch_send(&self.socket, &pkts)?; Ok(()) } + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + self.socket.send_to(buffer, self.addr)?; + Ok(()) + } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 50f2b856ee..c7aef208e1 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -35,7 +35,7 @@ use { solana_send_transaction_service::send_transaction_service::{ self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{path::PathBuf, str::FromStr}, }; diff --git a/validator/src/main.rs b/validator/src/main.rs index 32f9e4df1e..5df496ebf8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -51,7 +51,7 @@ use { }, solana_send_transaction_service::send_transaction_service::{self}, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, solana_validator::{ admin_rpc_service, admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides},