From ada6136a6c666c6733d69b8aca85e9f88865bc7e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 1 Feb 2023 18:10:06 -0800 Subject: [PATCH] Refactor connection cache to support generic msgs (#29774) tpu-client/tpu_connection_cache is refactored out the module and moved to connection-cache/connection_cache and the logic in client/connection_cache is consolidated to connection-cache/connection_cache as well. client/connection_cache only has a thin wrapper which forward calls to connection-cache/connection_cache and deal with constructions of quic/udp connection cache for clients using them both.2. The TpuConnection is refactored to ClientConnection to make it generic and functions renamed to be proper for other workflows. eg. tpu_addr -> server_addr, send_transaction --> send_data and etc... The enum dispatch is removed so that we can make the bulk of code of quic and udp agnostic of each other. The client is possible to load quic or udp only into its runtime. The generic type parameter in the tpu-client/tpu_connection_cache is removed in order to create both quic and udp connection cache and use the object to send transactions with multiple branching when sending data. The generic type parameters and associated types are dropped in other types in order to make the trait "object safe" for this purpose. I have annotated the code explaining the reasoning and the refactoring source -> destination. There is no functional changes bench-tps has been performed for rpc-client, thin-client and tpu-client. And it is found the performance number largely match the ones before the refactoring. --- Cargo.lock | 27 + Cargo.toml | 1 + banking-bench/src/main.rs | 2 +- bench-tps/src/cli.rs | 4 +- cli/src/cli.rs | 2 +- cli/src/main.rs | 2 +- client/Cargo.toml | 2 + client/src/connection_cache.rs | 639 +++--------------- client/src/lib.rs | 1 - client/src/nonblocking/quic_client.rs | 63 +- client/src/nonblocking/tpu_client.rs | 5 +- client/src/nonblocking/tpu_connection.rs | 47 +- client/src/nonblocking/udp_client.rs | 40 +- client/src/quic_client.rs | 49 +- client/src/thin_client.rs | 15 +- client/src/tpu_connection.rs | 62 +- client/src/udp_client.rs | 40 +- connection-cache/Cargo.toml | 30 + connection-cache/src/client_connection.rs | 34 + .../src/connection_cache.rs | 290 ++++---- .../src/connection_cache_stats.rs | 18 +- connection-cache/src/lib.rs | 9 + .../src/nonblocking/client_connection.rs | 15 + connection-cache/src/nonblocking/mod.rs | 1 + core/src/banking_stage/forwarder.rs | 4 +- core/src/fetch_stage.rs | 2 +- core/src/validator.rs | 13 +- core/src/warm_quic_cache_service.rs | 4 +- dos/src/main.rs | 6 +- local-cluster/src/local_cluster.rs | 2 +- programs/sbf/Cargo.lock | 25 + quic-client/Cargo.toml | 2 + quic-client/src/lib.rs | 172 +++-- quic-client/src/nonblocking/quic_client.rs | 62 +- quic-client/src/quic_client.rs | 232 ++++--- quic-client/tests/quic_client.rs | 35 +- rpc-test/tests/rpc.rs | 3 +- .../src/send_transaction_service.rs | 6 +- test-validator/src/lib.rs | 2 +- thin-client/Cargo.toml | 3 +- thin-client/src/thin_client.rs | 40 +- tpu-client/Cargo.toml | 2 + tpu-client/src/lib.rs | 4 - tpu-client/src/nonblocking/mod.rs | 1 - tpu-client/src/nonblocking/tpu_client.rs | 39 +- tpu-client/src/nonblocking/tpu_connection.rs | 29 - tpu-client/src/tpu_client.rs | 27 +- tpu-client/src/tpu_connection.rs | 63 -- udp-client/Cargo.toml | 2 + udp-client/src/lib.rs | 111 +-- udp-client/src/nonblocking/udp_client.rs | 50 +- udp-client/src/udp_client.rs | 51 +- validator/src/cli.rs | 2 +- validator/src/main.rs | 2 +- 54 files changed, 910 insertions(+), 1484 deletions(-) create mode 100644 connection-cache/Cargo.toml create mode 100644 connection-cache/src/client_connection.rs rename tpu-client/src/tpu_connection_cache.rs => connection-cache/src/connection_cache.rs (67%) rename {tpu-client => connection-cache}/src/connection_cache_stats.rs (92%) create mode 100644 connection-cache/src/lib.rs create mode 100644 connection-cache/src/nonblocking/client_connection.rs create mode 100644 connection-cache/src/nonblocking/mod.rs delete mode 100644 tpu-client/src/nonblocking/tpu_connection.rs delete mode 100644 tpu-client/src/tpu_connection.rs diff --git a/Cargo.lock b/Cargo.lock index ab32674221..ea4d7e5a4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5181,6 +5181,7 @@ dependencies = [ "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -5248,6 +5249,28 @@ dependencies = [ "solana-sdk 1.16.0", ] +[[package]] +name = "solana-connection-cache" +version = "1.16.0" +dependencies = [ + "async-trait", + "bincode", + "futures-util", + "indexmap", + "indicatif", + "log", + "rand 0.7.3", + "rand_chacha 0.2.2", + "rayon", + "solana-logger 1.16.0", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-sdk 1.16.0", + "thiserror", + "tokio", +] + [[package]] name = "solana-core" version = "1.16.0" @@ -6190,6 +6213,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustls 0.20.6", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -6765,6 +6789,7 @@ dependencies = [ "bincode", "log", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-rpc-client", "solana-rpc-client-api", @@ -6817,6 +6842,7 @@ dependencies = [ "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "solana-connection-cache", "solana-logger 1.16.0", "solana-measure", "solana-metrics", @@ -6885,6 +6911,7 @@ name = "solana-udp-client" version = "1.16.0" dependencies = [ "async-trait", + "solana-connection-cache", "solana-net-utils", "solana-sdk 1.16.0", "solana-streamer", diff --git a/Cargo.toml b/Cargo.toml index d65082e335..16b5dbb6e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "cli-output", "client", "client-test", + "connection-cache", "core", "dos", "download-utils", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 4fbe37400b..5d55c85b6e 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -32,7 +32,7 @@ use { transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ sync::{atomic::Ordering, Arc, RwLock}, thread::sleep, diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 2195f697b6..81e7c0377f 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -8,9 +8,7 @@ use { pubkey::Pubkey, signature::{read_keypair_file, Keypair}, }, - solana_tpu_client::tpu_connection_cache::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, - }, + solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, std::{ net::{Ipv4Addr, SocketAddr}, process::exit, diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 59f38038a5..015375ecd6 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -31,7 +31,7 @@ use { stake::{instruction::LockupArgs, state::Lockup}, transaction::{TransactionError, VersionedTransaction}, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, solana_vote_program::vote_state::VoteAuthorize, std::{collections::HashMap, error, io::stdout, str::FromStr, sync::Arc, time::Duration}, thiserror::Error, diff --git a/cli/src/main.rs b/cli/src/main.rs index 5c459d4a8d..4b2de432ea 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -17,7 +17,7 @@ use { }, solana_remote_wallet::remote_wallet::RemoteWalletManager, solana_rpc_client_api::config::RpcSendTransactionConfig, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{collections::HashMap, error, path::PathBuf, sync::Arc, time::Duration}, }; diff --git a/client/Cargo.toml b/client/Cargo.toml index 2f1d7cc59c..fe45e65fc6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -21,6 +21,8 @@ log = "0.4.17" quinn = "0.9.3" rand = "0.7.0" rayon = "1.5.3" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index e8b1843c14..2fba4efc70 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,475 +1,136 @@ -pub use solana_tpu_client::tpu_connection_cache::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, -}; use { - crate::{ - nonblocking::tpu_connection::NonblockingConnection, tpu_connection::BlockingConnection, - }, - indexmap::map::{Entry, IndexMap}, quinn::Endpoint, - rand::{thread_rng, Rng}, - solana_measure::measure::Measure, - solana_quic_client::nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, - }, - solana_sdk::{ - pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval, - }, - solana_streamer::{ - nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, - streamer::StakedNodes, - tls_certificates::new_self_signed_tls_certificate, - }, - solana_tpu_client::{ - connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL}, - tpu_connection_cache::MAX_CONNECTIONS, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + ConnectionCache as BackendConnectionCache, NewConnectionConfig, ProtocolType, + }, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, + solana_quic_client::{QuicConfig, QuicConnectionManager}, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_streamer::streamer::StakedNodes, + solana_udp_client::UdpConnectionManager, std::{ error::Error, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, - sync::{atomic::Ordering, Arc, RwLock}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::{Arc, RwLock}, }, }; +pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; +pub const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true; +pub const MAX_CONNECTIONS: usize = 1024; + +/// A thin wrapper over connection-cache/ConnectionCache to ease +/// construction of the ConnectionCache for code dealing both with udp and quic. +/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly. pub struct ConnectionCache { - map: RwLock>, - stats: Arc, - last_stats: AtomicInterval, - connection_pool_size: usize, - tpu_udp_socket: Arc, - client_certificate: Arc, - use_quic: bool, - maybe_staked_nodes: Option>>, - maybe_client_pubkey: Option, - - // The optional specified endpoint for the quic based client connections - // If not specified, the connection cache we create as needed. - client_endpoint: Option, -} - -/// Models the pool of connections -struct ConnectionPool { - /// The connections in the pool - connections: Vec>, - - /// Connections in this pool share the same endpoint - endpoint: Option>, -} - -impl ConnectionPool { - /// Get a connection from the pool. It must have at least one connection in the pool. - /// This randomly picks a connection in the pool. - fn borrow_connection(&self) -> Arc { - let mut rng = thread_rng(); - let n = rng.gen_range(0, self.connections.len()); - self.connections[n].clone() - } - - /// Check if we need to create a new connection. If the count of the connections - /// is smaller than the pool size. - fn need_new_connection(&self, required_pool_size: usize) -> bool { - self.connections.len() < required_pool_size - } + cache: BackendConnectionCache, } impl ConnectionCache { + /// Create a quic connection_cache pub fn new(connection_pool_size: usize) -> Self { - Self::_new_with_endpoint(connection_pool_size, None) + Self::new_with_client_options(connection_pool_size, None, None, None) } - /// Create a connection cache with a specific quic client endpoint. - pub fn new_with_endpoint(connection_pool_size: usize, client_endpoint: Endpoint) -> Self { - Self::_new_with_endpoint(connection_pool_size, Some(client_endpoint)) - } - - fn _new_with_endpoint(connection_pool_size: usize, client_endpoint: Option) -> Self { + /// Create a quic conneciton_cache with more client options + pub fn new_with_client_options( + connection_pool_size: usize, + client_endpoint: Option, + cert_info: Option<(&Keypair, IpAddr)>, + stake_info: Option<(&Arc>, &Pubkey)>, + ) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); - Self { - use_quic: true, - connection_pool_size, - client_endpoint, - ..Self::default() + let mut config = QuicConfig::new().unwrap(); + if let Some(client_endpoint) = client_endpoint { + config.update_client_endpoint(client_endpoint); } + if let Some(cert_info) = cert_info { + config + .update_client_certificate(cert_info.0, cert_info.1) + .unwrap(); + } + if let Some(stake_info) = stake_info { + config.set_staked_nodes(stake_info.0, stake_info.1); + } + let connection_manager = + Box::new(QuicConnectionManager::new_with_connection_config(config)); + let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + Self { cache } } + #[deprecated( + since = "1.15.0", + note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate." + )] pub fn update_client_certificate( &mut self, - keypair: &Keypair, - ipaddr: IpAddr, + _keypair: &Keypair, + _ipaddr: IpAddr, ) -> Result<(), Box> { - let (cert, priv_key) = new_self_signed_tls_certificate(keypair, ipaddr)?; - self.client_certificate = Arc::new(QuicClientCertificate { - certificate: cert, - key: priv_key, - }); Ok(()) } + #[deprecated( + since = "1.15.0", + note = "This method does not do anything. Please use `new_with_client_options` instead to set staked nodes information." + )] pub fn set_staked_nodes( &mut self, - staked_nodes: &Arc>, - client_pubkey: &Pubkey, + _staked_nodes: &Arc>, + _client_pubkey: &Pubkey, ) { - self.maybe_staked_nodes = Some(staked_nodes.clone()); - self.maybe_client_pubkey = Some(*client_pubkey); } pub fn with_udp(connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); - Self { - use_quic: false, - connection_pool_size, - ..Self::default() - } + let connection_manager = Box::::default(); + let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + Self { cache } } pub fn use_quic(&self) -> bool { - self.use_quic + matches!(self.cache.get_protocol_type(), ProtocolType::QUIC) } - fn create_endpoint(&self, force_use_udp: bool) -> Option> { - if self.use_quic() && !force_use_udp { - Some(Arc::new(QuicLazyInitializedEndpoint::new( - self.client_certificate.clone(), - self.client_endpoint.as_ref().cloned(), - ))) - } else { - None - } + pub fn get_connection(&self, addr: &SocketAddr) -> Arc { + self.cache.get_connection(addr) } - fn compute_max_parallel_streams(&self) -> usize { - let (client_type, stake, total_stake) = - self.maybe_client_pubkey - .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| { - self.maybe_staked_nodes.as_ref().map_or( - (ConnectionPeerType::Unstaked, 0, 0), - |stakes| { - let rstakes = stakes.read().unwrap(); - rstakes.pubkey_stake_map.get(&pubkey).map_or( - (ConnectionPeerType::Unstaked, 0, rstakes.total_stake), - |stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake), - ) - }, - ) - }); - compute_max_allowed_uni_streams(client_type, stake, total_stake) - } - - /// Create a lazy connection object under the exclusive lock of the cache map if there is not - /// enough used connections in the connection pool for the specified address. - /// Returns CreateConnectionResult. - fn create_connection( - &self, - lock_timing_ms: &mut u64, - addr: &SocketAddr, - force_use_udp: bool, - ) -> CreateConnectionResult { - let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); - let mut map = self.map.write().unwrap(); - get_connection_map_lock_measure.stop(); - *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - // Read again, as it is possible that between read lock dropped and the write lock acquired - // another thread could have setup the connection. - - let (to_create_connection, endpoint) = - map.get(addr) - .map_or((true, self.create_endpoint(force_use_udp)), |pool| { - ( - pool.need_new_connection(self.connection_pool_size), - pool.endpoint.clone(), - ) - }); - - let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection { - let connection = if !self.use_quic() || force_use_udp { - BaseTpuConnection::Udp(self.tpu_udp_socket.clone()) - } else { - BaseTpuConnection::Quic(Arc::new(QuicClient::new( - endpoint.as_ref().unwrap().clone(), - *addr, - self.compute_max_parallel_streams(), - ))) - }; - - let connection = Arc::new(connection); - - // evict a connection if the cache is reaching upper bounds - let mut num_evictions = 0; - let mut get_connection_cache_eviction_measure = - Measure::start("get_connection_cache_eviction_measure"); - while map.len() >= MAX_CONNECTIONS { - let mut rng = thread_rng(); - let n = rng.gen_range(0, MAX_CONNECTIONS); - map.swap_remove_index(n); - num_evictions += 1; - } - get_connection_cache_eviction_measure.stop(); - - match map.entry(*addr) { - Entry::Occupied(mut entry) => { - let pool = entry.get_mut(); - pool.connections.push(connection); - } - Entry::Vacant(entry) => { - entry.insert(ConnectionPool { - connections: vec![connection], - endpoint, - }); - } - } - ( - false, - num_evictions, - get_connection_cache_eviction_measure.as_ms(), - ) - } else { - (true, 0, 0) - }; - - let pool = map.get(addr).unwrap(); - let connection = pool.borrow_connection(); - - CreateConnectionResult { - connection, - cache_hit, - connection_cache_stats: self.stats.clone(), - num_evictions, - eviction_timing_ms, - } - } - - fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { - let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); - let map = self.map.read().unwrap(); - get_connection_map_lock_measure.stop(); - - let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 }; - - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); - let force_use_udp = port == addr.port(); - let addr = SocketAddr::new(addr.ip(), port); - - let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); - - let report_stats = self - .last_stats - .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); - - let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); - let CreateConnectionResult { - connection, - cache_hit, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } = match map.get(&addr) { - Some(pool) => { - if pool.need_new_connection(self.connection_pool_size) { - // create more connection and put it in the pool - drop(map); - self.create_connection(&mut lock_timing_ms, &addr, force_use_udp) - } else { - let connection = pool.borrow_connection(); - CreateConnectionResult { - connection, - cache_hit: true, - connection_cache_stats: self.stats.clone(), - num_evictions: 0, - eviction_timing_ms: 0, - } - } - } - None => { - // Upgrade to write access by dropping read lock and acquire write lock - drop(map); - self.create_connection(&mut lock_timing_ms, &addr, force_use_udp) - } - }; - get_connection_map_measure.stop(); - - GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms: get_connection_map_measure.as_ms(), - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } - } - - fn get_connection_and_log_stats( + pub fn get_nonblocking_connection( &self, addr: &SocketAddr, - ) -> (Arc, Arc) { - let mut get_connection_measure = Measure::start("get_connection_measure"); - let GetConnectionResult { - connection, - cache_hit, - report_stats, - map_timing_ms, - lock_timing_ms, - connection_cache_stats, - num_evictions, - eviction_timing_ms, - } = self.get_or_add_connection(addr); - - if report_stats { - connection_cache_stats.report(); - } - - if cache_hit { - connection_cache_stats - .cache_hits - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_hit_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - } else { - connection_cache_stats - .cache_misses - .fetch_add(1, Ordering::Relaxed); - connection_cache_stats - .get_connection_miss_ms - .fetch_add(map_timing_ms, Ordering::Relaxed); - connection_cache_stats - .cache_evictions - .fetch_add(num_evictions, Ordering::Relaxed); - connection_cache_stats - .eviction_time_ms - .fetch_add(eviction_timing_ms, Ordering::Relaxed); - } - - get_connection_measure.stop(); - connection_cache_stats - .get_connection_lock_ms - .fetch_add(lock_timing_ms, Ordering::Relaxed); - connection_cache_stats - .get_connection_ms - .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); - - (connection, connection_cache_stats) - } - - pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection { - let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); - connection.new_blocking_connection(*addr, connection_cache_stats) - } - - pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection { - let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); - connection.new_nonblocking_connection(*addr, connection_cache_stats) + ) -> Arc { + self.cache.get_nonblocking_connection(addr) } } impl Default for ConnectionCache { fn default() -> Self { - let (cert, priv_key) = - new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Failed to initialize QUIC client certificates"); - Self { - map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), - stats: Arc::new(ConnectionCacheStats::default()), - last_stats: AtomicInterval::default(), - connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Unable to bind to UDP socket"), - ), - client_certificate: Arc::new(QuicClientCertificate { - certificate: cert, - key: priv_key, - }), - use_quic: DEFAULT_TPU_USE_QUIC, - maybe_staked_nodes: None, - maybe_client_pubkey: None, - client_endpoint: None, + if DEFAULT_CONNECTION_CACHE_USE_QUIC { + let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + ConnectionCache::new_with_client_options( + DEFAULT_CONNECTION_POOL_SIZE, + None, + Some(cert_info), + None, + ) + } else { + ConnectionCache::with_udp(DEFAULT_CONNECTION_POOL_SIZE) } } } -enum BaseTpuConnection { - Udp(Arc), - Quic(Arc), -} -impl BaseTpuConnection { - fn new_blocking_connection( - &self, - addr: SocketAddr, - stats: Arc, - ) -> BlockingConnection { - use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection}; - match self { - BaseTpuConnection::Udp(udp_socket) => { - UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into() - } - BaseTpuConnection::Quic(quic_client) => { - QuicTpuConnection::new_with_client(quic_client.clone(), stats).into() - } - } - } - - fn new_nonblocking_connection( - &self, - addr: SocketAddr, - stats: Arc, - ) -> NonblockingConnection { - use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection}; - match self { - BaseTpuConnection::Udp(udp_socket) => { - UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into() - } - BaseTpuConnection::Quic(quic_client) => { - QuicTpuConnection::new_with_client(quic_client.clone(), stats).into() - } - } - } -} - -struct GetConnectionResult { - connection: Arc, - cache_hit: bool, - report_stats: bool, - map_timing_ms: u64, - lock_timing_ms: u64, - connection_cache_stats: Arc, - num_evictions: u64, - eviction_timing_ms: u64, -} - -struct CreateConnectionResult { - connection: Arc, - cache_hit: bool, - connection_cache_stats: Arc, - num_evictions: u64, - eviction_timing_ms: u64, -} - #[cfg(test)] mod tests { use { - crate::{ - connection_cache::{ConnectionCache, MAX_CONNECTIONS}, - tpu_connection::TpuConnection, - }, + crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, - rand::{Rng, SeedableRng}, - rand_chacha::ChaChaRng, - solana_sdk::{ - pubkey::Pubkey, - quic::{ - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_PORT_OFFSET, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - }, - signature::Keypair, - }, + solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, @@ -499,145 +160,6 @@ mod tests { ) } - fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { - let a = rng.gen_range(1, 255); - let b = rng.gen_range(1, 255); - let c = rng.gen_range(1, 255); - let d = rng.gen_range(1, 255); - - let addr_str = format!("{a}.{b}.{c}.{d}:80"); - - addr_str.parse().expect("Invalid address") - } - - #[test] - fn test_connection_cache() { - solana_logger::setup(); - // Allow the test to run deterministically - // with the same pseudorandom sequence between runs - // and on different platforms - the cryptographic security - // property isn't important here but ChaChaRng provides a way - // to get the same pseudorandom sequence on different platforms - let mut rng = ChaChaRng::seed_from_u64(42); - - // Generate a bunch of random addresses and create TPUConnections to them - // Since TPUConnection::new is infallible, it should't matter whether or not - // we can actually connect to those addresses - TPUConnection implementations should either - // be lazy and not connect until first use or handle connection errors somehow - // (without crashing, as would be required in a real practical validator) - let connection_cache = ConnectionCache::default(); - let port_offset = if connection_cache.use_quic() { - QUIC_PORT_OFFSET - } else { - 0 - }; - let addrs = (0..MAX_CONNECTIONS) - .map(|_| { - let addr = get_addr(&mut rng); - connection_cache.get_connection(&addr); - addr - }) - .collect::>(); - { - let map = connection_cache.map.read().unwrap(); - assert!(map.len() == MAX_CONNECTIONS); - addrs.iter().for_each(|a| { - let port = a - .port() - .checked_add(port_offset) - .unwrap_or_else(|| a.port()); - let addr = &SocketAddr::new(a.ip(), port); - - let conn = &map.get(addr).expect("Address not found").connections[0]; - let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); - assert!(addr.ip() == conn.tpu_addr().ip()); - }); - } - - let addr = &get_addr(&mut rng); - connection_cache.get_connection(addr); - - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); - let addr_with_quic_port = SocketAddr::new(addr.ip(), port); - let map = connection_cache.map.read().unwrap(); - assert!(map.len() == MAX_CONNECTIONS); - let _conn = map.get(&addr_with_quic_port).expect("Address not found"); - } - - #[test] - fn test_connection_cache_max_parallel_chunks() { - solana_logger::setup(); - let mut connection_cache = ConnectionCache::default(); - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let pubkey = Pubkey::new_unique(); - connection_cache.set_staked_nodes(&staked_nodes, &pubkey); - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - staked_nodes.write().unwrap().total_stake = 10000; - assert_eq!( - connection_cache.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1); - - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - assert_eq!( - connection_cache.compute_max_parallel_streams(), - (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize - ); - - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .remove(&pubkey); - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1000); - assert_ne!( - connection_cache.compute_max_parallel_streams(), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - } - - // Test that we can get_connection with a connection cache configured for quic - // on an address with a port that, if QUIC_PORT_OFFSET were added to it, it would overflow to - // an invalid port. - #[test] - fn test_overflow_address() { - let port = u16::MAX - QUIC_PORT_OFFSET + 1; - assert!(port.checked_add(QUIC_PORT_OFFSET).is_none()); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let connection_cache = ConnectionCache::new(1); - - let conn = connection_cache.get_connection(&addr); - // We (intentionally) don't have an interface that allows us to distinguish between - // UDP and Quic connections, so check instead that the port is valid (non-zero) - // and is the same as the input port (falling back on UDP) - assert!(conn.tpu_addr().port() != 0); - assert!(conn.tpu_addr().port() == port); - } - #[test] fn test_connection_with_specified_client_endpoint() { let port = u16::MAX - QUIC_PORT_OFFSET + 1; @@ -670,19 +192,20 @@ mod tests { ) .unwrap(); - let connection_cache = ConnectionCache::new_with_endpoint(1, response_recv_endpoint); + let connection_cache = + ConnectionCache::new_with_client_options(1, Some(response_recv_endpoint), None, None); // server port 1: let port1 = 9001; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.tpu_addr().port(), port1 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port1 + QUIC_PORT_OFFSET); // server port 2: let port2 = 9002; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.tpu_addr().port(), port2 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port2 + QUIC_PORT_OFFSET); response_recv_exit.store(true, Ordering::Relaxed); response_recv_thread.join().unwrap(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 23d4cab220..3703d3438e 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -9,7 +9,6 @@ pub mod tpu_connection; pub mod transaction_executor; pub mod udp_client; -#[macro_use] extern crate solana_metrics; pub use solana_rpc_client::mock_sender_for_cli; diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 2f615557b0..28b9649289 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -1,59 +1,8 @@ -//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol -//! and provides an interface for sending transactions which is restricted by the -//! server's flow control. +#[deprecated( + since = "1.15.0", + note = "Please use `solana_quic_client::nonblocking::quic_client::QuicClientConnection` instead." +)] +pub use solana_quic_client::nonblocking::quic_client::QuicClientConnection as QuicTpuConnection; pub use solana_quic_client::nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicError, QuicLazyInitializedEndpoint, QuicTpuConnection, + QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, }; -use { - crate::nonblocking::tpu_connection::TpuConnection, - async_trait::async_trait, - log::*, - solana_sdk::transport::Result as TransportResult, - solana_tpu_client::tpu_connection::ClientStats, - std::{net::SocketAddr, sync::Arc}, -}; - -#[async_trait] -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.client.tpu_addr() - } - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let stats = ClientStats::default(); - let len = buffers.len(); - let res = self - .client - .send_batch(buffers, &stats, self.connection_stats.clone()) - .await; - self.connection_stats - .add_client_stats(&stats, len, res.is_ok()); - res?; - Ok(()) - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let stats = Arc::new(ClientStats::default()); - let send_buffer = - self.client - .send_buffer(wire_transaction, &stats, self.connection_stats.clone()); - if let Err(e) = send_buffer.await { - warn!( - "Failed to send transaction async to {}, error: {:?} ", - self.tpu_addr(), - e - ); - datapoint_warn!("send-wire-async", ("failure", 1, i64),); - self.connection_stats.add_client_stats(&stats, 1, false); - } else { - self.connection_stats.add_client_stats(&stats, 1, true); - } - Ok(()) - } -} diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index e914a88180..5439350c63 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -2,7 +2,6 @@ pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSender use { crate::{ connection_cache::ConnectionCache, - nonblocking::tpu_connection::TpuConnection, tpu_client::{TpuClientConfig, MAX_FANOUT_SLOTS}, }, bincode::serialize, @@ -46,7 +45,7 @@ async fn send_wire_transaction_to_addr( wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone()).await + conn.send_data(&wire_transaction).await } async fn send_wire_transaction_batch_to_addr( @@ -55,7 +54,7 @@ async fn send_wire_transaction_batch_to_addr( wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions).await + conn.send_data_batch(wire_transactions).await } impl TpuClient { diff --git a/client/src/nonblocking/tpu_connection.rs b/client/src/nonblocking/tpu_connection.rs index af19da4f5a..b91a888533 100644 --- a/client/src/nonblocking/tpu_connection.rs +++ b/client/src/nonblocking/tpu_connection.rs @@ -1,42 +1,5 @@ -//! Trait defining async send functions, to be used for UDP or QUIC sending - -use { - async_trait::async_trait, - enum_dispatch::enum_dispatch, - solana_quic_client::nonblocking::quic_client::QuicTpuConnection, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - solana_udp_client::nonblocking::udp_client::UdpTpuConnection, - std::net::SocketAddr, -}; - -// Due to the existence of `crate::connection_cache::Connection`, if this is named -// `Connection`, enum_dispatch gets confused between the two and throws errors when -// trying to convert later. -#[enum_dispatch] -pub enum NonblockingConnection { - QuicTpuConnection, - UdpTpuConnection, -} - -#[async_trait] -#[enum_dispatch(NonblockingConnection)] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - async fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction).await - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_connection_cache::nonblocking::client_connection::ClientConnection` instead." +)] +pub use solana_connection_cache::nonblocking::client_connection::ClientConnection as TpuConnection; diff --git a/client/src/nonblocking/udp_client.rs b/client/src/nonblocking/udp_client.rs index 4e817e8aa7..e880b1fb10 100644 --- a/client/src/nonblocking/udp_client.rs +++ b/client/src/nonblocking/udp_client.rs @@ -1,35 +1,5 @@ -//! Simple UDP client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions - -pub use solana_udp_client::nonblocking::udp_client::UdpTpuConnection; -use { - crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait, - core::iter::repeat, solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, -}; - -#[async_trait] -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - &self.addr - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.socket - .send_to(wire_transaction.as_ref(), self.addr) - .await?; - Ok(()) - } - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts).await?; - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_udp_client::nonblocking::udp_client::UdpClientConnection` instead." +)] +pub use solana_udp_client::nonblocking::udp_client::UdpClientConnection as UdpTpuConnection; diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 1ce44feb8b..a32aa381cb 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -1,44 +1,5 @@ -//! Simple client that connects to a given UDP port with the QUIC protocol and provides -//! an interface for sending transactions which is restricted by the server's flow control. - -pub use solana_quic_client::quic_client::QuicTpuConnection; -use { - crate::{ - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection, - }, - solana_quic_client::quic_client::temporary_pub::*, - solana_sdk::transport::Result as TransportResult, - std::net::SocketAddr, -}; - -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.inner.tpu_addr() - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; - Ok(()) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - let _lock = ASYNC_TASK_SEMAPHORE.acquire(); - let inner = self.inner.clone(); - - let _handle = RUNTIME - .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); - Ok(()) - } - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let _lock = ASYNC_TASK_SEMAPHORE.acquire(); - let inner = self.inner.clone(); - let _handle = - RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_quic_client::quic_client::QuicClientConnection` instead." +)] +pub use solana_quic_client::quic_client::QuicClientConnection as QuicTpuConnection; diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 1c38f5f7a8..277fe83223 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -4,8 +4,9 @@ //! unstable and may change in future releases. use { - crate::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + crate::connection_cache::ConnectionCache, log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ @@ -144,7 +145,7 @@ impl ThinClient { let conn = self.connection_cache.get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) #[allow(clippy::needless_borrow)] - conn.send_wire_transaction(&wire_transaction)?; + conn.send_data(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -531,7 +532,9 @@ impl AsyncClient for ThinClient { transaction: VersionedTransaction, ) -> TransportResult { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.serialize_and_send_transaction(&transaction)?; + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + conn.send_data(&wire_transaction)?; Ok(transaction.signatures[0]) } @@ -540,7 +543,11 @@ impl AsyncClient for ThinClient { batch: Vec, ) -> TransportResult<()> { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.par_serialize_and_send_transaction_batch(&batch[..])?; + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + conn.send_data_batch(&buffers)?; Ok(()) } } diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 81a8b89f66..9e000612a5 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,56 +1,6 @@ -pub use solana_tpu_client::tpu_connection::ClientStats; -use { - enum_dispatch::enum_dispatch, - rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_quic_client::quic_client::QuicTpuConnection, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - solana_udp_client::udp_client::UdpTpuConnection, - std::net::SocketAddr, -}; - -#[enum_dispatch] -pub enum BlockingConnection { - UdpTpuConnection, - QuicTpuConnection, -} - -#[enum_dispatch(BlockingConnection)] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(wire_transaction) - } - - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.send_wire_transaction_batch(&[wire_transaction]) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; - - fn par_serialize_and_send_transaction_batch( - &self, - transactions: &[VersionedTransaction], - ) -> TransportResult<()> { - let buffers = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - - self.send_wire_transaction_batch(&buffers) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()>; -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_connection_cache::client_connection::ClientConnection` instead." +)] +pub use solana_connection_cache::client_connection::ClientConnection as TpuConnection; +pub use solana_connection_cache::client_connection::ClientStats; diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index 868849ee2f..c05b74b364 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -1,35 +1,5 @@ -//! Simple TPU client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions - -pub use solana_udp_client::udp_client::UdpTpuConnection; -use { - crate::tpu_connection::TpuConnection, core::iter::repeat, - solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, - std::net::SocketAddr, -}; - -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - &self.addr - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - self.socket.send_to(wire_transaction.as_ref(), self.addr)?; - Ok(()) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) - } - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) - } -} +#[deprecated( + since = "1.15.0", + note = "Please use `solana_udp_client::udp_client::UdpClientConnection` instead." +)] +pub use solana_udp_client::udp_client::UdpClientConnection as UdpTpuConnection; diff --git a/connection-cache/Cargo.toml b/connection-cache/Cargo.toml new file mode 100644 index 0000000000..c61bee0bd7 --- /dev/null +++ b/connection-cache/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "solana-connection-cache" +version = "1.16.0" +description = "Solana Connection Cache" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-connection-cache" +edition = "2021" + +[dependencies] +async-trait = "0.1.57" +bincode = "1.3.3" +futures-util = "0.3.25" +indexmap = "1.9.1" +indicatif = { version = "0.17.1", optional = true } +log = "0.4.17" +rand = "0.7.0" +rayon = "1.5.3" +solana-measure = { path = "../measure", version = "=1.16.0" } +solana-metrics = { path = "../metrics", version = "=1.16.0" } +solana-net-utils = { path = "../net-utils", version = "=1.16.0" } +solana-sdk = { path = "../sdk", version = "=1.16.0" } +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } + +[dev-dependencies] +rand_chacha = "0.2.2" +solana-logger = { path = "../logger", version = "=1.16.0" } diff --git a/connection-cache/src/client_connection.rs b/connection-cache/src/client_connection.rs new file mode 100644 index 0000000000..a58f33d730 --- /dev/null +++ b/connection-cache/src/client_connection.rs @@ -0,0 +1,34 @@ +use { + solana_metrics::MovingStat, + solana_sdk::transport::Result as TransportResult, + std::{net::SocketAddr, sync::atomic::AtomicU64}, +}; + +#[derive(Default)] +pub struct ClientStats { + pub total_connections: AtomicU64, + pub connection_reuse: AtomicU64, + pub connection_errors: AtomicU64, + pub zero_rtt_accepts: AtomicU64, + pub zero_rtt_rejects: AtomicU64, + + // these will be the last values of these stats + pub congestion_events: MovingStat, + pub streams_blocked_uni: MovingStat, + pub data_blocked: MovingStat, + pub acks: MovingStat, + pub make_connection_ms: AtomicU64, + pub send_timeout: AtomicU64, +} + +pub trait ClientConnection: Sync + Send { + fn server_addr(&self) -> &SocketAddr; + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()>; + + fn send_data_async(&self, buffer: Vec) -> TransportResult<()>; + + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()>; + + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()>; +} diff --git a/tpu-client/src/tpu_connection_cache.rs b/connection-cache/src/connection_cache.rs similarity index 67% rename from tpu-client/src/tpu_connection_cache.rs rename to connection-cache/src/connection_cache.rs index 52c415c6f6..82e88b818d 100644 --- a/tpu-client/src/tpu_connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -1,14 +1,15 @@ use { crate::{ + client_connection::ClientConnection as BlockingClientConnection, connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL}, - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection as BlockingTpuConnection, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, indexmap::map::IndexMap, rand::{thread_rng, Rng}, solana_measure::measure::Measure, solana_sdk::timing::AtomicInterval, std::{ + any::Any, net::SocketAddr, sync::{atomic::Ordering, Arc, RwLock}, }, @@ -18,38 +19,56 @@ use { // Should be non-zero pub static MAX_CONNECTIONS: usize = 1024; -/// Used to decide whether the TPU and underlying connection cache should use -/// QUIC connections. -pub const DEFAULT_TPU_USE_QUIC: bool = true; +/// Default connection pool size per remote address +pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; -/// Default TPU connection pool size per remote address -pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; +/// Defines the protocol types of an implementation supports. +pub enum ProtocolType { + UDP, + QUIC, +} -pub const DEFAULT_TPU_ENABLE_UDP: bool = false; +pub trait ConnectionManager: Sync + Send { + fn new_connection_pool(&self) -> Box; + fn new_connection_config(&self) -> Box; + fn get_port_offset(&self) -> u16; + fn get_protocol_type(&self) -> ProtocolType; +} -pub struct TpuConnectionCache { - pub map: RwLock>, +pub struct ConnectionCache { + pub map: RwLock>>, + pub connection_manager: Box, pub stats: Arc, pub last_stats: AtomicInterval, pub connection_pool_size: usize, - pub tpu_config: P::TpuConfig, + pub connection_config: Box, } -impl TpuConnectionCache

{ +impl ConnectionCache { pub fn new( + connection_manager: Box, connection_pool_size: usize, - ) -> Result::ClientError> { - let config = P::TpuConfig::new()?; - Ok(Self::new_with_config(connection_pool_size, config)) + ) -> Result { + let config = connection_manager.new_connection_config(); + Ok(Self::new_with_config( + connection_pool_size, + config, + connection_manager, + )) } - pub fn new_with_config(connection_pool_size: usize, tpu_config: P::TpuConfig) -> Self { + pub fn new_with_config( + connection_pool_size: usize, + connection_config: Box, + connection_manager: Box, + ) -> Self { Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), + connection_manager, last_stats: AtomicInterval::default(), connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1. - tpu_config, + connection_config, } } @@ -60,7 +79,7 @@ impl TpuConnectionCache

{ &self, lock_timing_ms: &mut u64, addr: &SocketAddr, - ) -> CreateConnectionResult { + ) -> CreateConnectionResult { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let mut map = self.map.write().unwrap(); get_connection_map_lock_measure.stop(); @@ -94,10 +113,13 @@ impl TpuConnectionCache

{ map.entry(*addr) .and_modify(|pool| { - pool.add_connection(&self.tpu_config, addr); + pool.add_connection(&*self.connection_config, addr); }) - .or_insert_with(|| P::new_with_connection(&self.tpu_config, addr)); - + .or_insert_with(|| { + let mut pool = self.connection_manager.new_connection_pool(); + pool.add_connection(&*self.connection_config, addr); + pool + }); ( false, num_evictions, @@ -119,15 +141,12 @@ impl TpuConnectionCache

{ } } - fn get_or_add_connection( - &self, - addr: &SocketAddr, - ) -> GetConnectionResult { + fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); - let port_offset = P::PORT_OFFSET; + let port_offset = self.connection_manager.get_port_offset(); let port = addr .port() @@ -188,7 +207,7 @@ impl TpuConnectionCache

{ fn get_connection_and_log_stats( &self, addr: &SocketAddr, - ) -> (Arc, Arc) { + ) -> (Arc, Arc) { let mut get_connection_measure = Measure::start("get_connection_measure"); let GetConnectionResult { connection, @@ -238,10 +257,7 @@ impl TpuConnectionCache

{ (connection, connection_cache_stats) } - pub fn get_connection( - &self, - addr: &SocketAddr, - ) -> ::BlockingConnectionType { + pub fn get_connection(&self, addr: &SocketAddr) -> Arc { let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_blocking_connection(*addr, connection_cache_stats) } @@ -249,10 +265,14 @@ impl TpuConnectionCache

{ pub fn get_nonblocking_connection( &self, addr: &SocketAddr, - ) -> ::NonblockingConnectionType { + ) -> Arc { let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr); connection.new_nonblocking_connection(*addr, connection_cache_stats) } + + pub fn get_protocol_type(&self) -> ProtocolType { + self.connection_manager.get_protocol_type() + } } #[derive(Error, Debug)] @@ -261,33 +281,37 @@ pub enum ConnectionPoolError { IndexOutOfRange, } -pub trait NewTpuConfig { - type ClientError: std::fmt::Debug; - fn new() -> Result - where - Self: Sized; +#[derive(Error, Debug)] +pub enum ClientError { + #[error("Certificate error: {0}")] + CertificateError(String), + + #[error("IO error: {0:?}")] + IoError(#[from] std::io::Error), } -pub trait ConnectionPool { - type PoolTpuConnection: BaseTpuConnection; - type TpuConfig: NewTpuConfig; - const PORT_OFFSET: u16 = 0; +pub trait NewConnectionConfig: Sync + Send { + fn new() -> Result + where + Self: Sized; + fn as_any(&self) -> &dyn Any; - /// Create a new connection pool based on protocol-specific configuration - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self; + fn as_mut_any(&mut self) -> &mut dyn Any; +} +pub trait ConnectionPool: Sync + Send { /// Add a connection to the pool - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr); /// Get the number of current connections in the pool fn num_connections(&self) -> usize; /// Get a connection based on its index in the pool, without checking if the - fn get(&self, index: usize) -> Result, ConnectionPoolError>; + fn get(&self, index: usize) -> Result, ConnectionPoolError>; /// Get a connection from the pool. It must have at least one connection in the pool. /// This randomly picks a connection in the pool. - fn borrow_connection(&self) -> Arc { + fn borrow_connection(&self) -> Arc { let mut rng = thread_rng(); let n = rng.gen_range(0, self.num_connections()); self.get(n).expect("index is within num_connections") @@ -300,30 +324,27 @@ pub trait ConnectionPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, addr: &SocketAddr, - ) -> Self::PoolTpuConnection; + ) -> Arc; } -pub trait BaseTpuConnection { - type BlockingConnectionType: BlockingTpuConnection; - type NonblockingConnectionType: NonblockingTpuConnection; - +pub trait BaseClientConnection: Sync + Send { fn new_blocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Self::BlockingConnectionType; + ) -> Arc; fn new_nonblocking_connection( &self, addr: SocketAddr, stats: Arc, - ) -> Self::NonblockingConnectionType; + ) -> Arc; } -struct GetConnectionResult { - connection: Arc, +struct GetConnectionResult { + connection: Arc, cache_hit: bool, report_stats: bool, map_timing_ms: u64, @@ -333,8 +354,8 @@ struct GetConnectionResult { eviction_timing_ms: u64, } -struct CreateConnectionResult { - connection: Arc, +struct CreateConnectionResult { + connection: Arc, cache_hit: bool, connection_cache_stats: Arc, num_evictions: u64, @@ -346,8 +367,8 @@ mod tests { use { super::*, crate::{ - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::TpuConnection as BlockingTpuConnection, + client_connection::ClientConnection as BlockingClientConnection, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, async_trait::async_trait, rand::{Rng, SeedableRng}, @@ -362,23 +383,11 @@ mod tests { const MOCK_PORT_OFFSET: u16 = 42; pub struct MockUdpPool { - connections: Vec>, + connections: Vec>, } impl ConnectionPool for MockUdpPool { - type PoolTpuConnection = MockUdp; - type TpuConfig = MockUdpConfig; - const PORT_OFFSET: u16 = MOCK_PORT_OFFSET; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -386,7 +395,7 @@ mod tests { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -395,21 +404,26 @@ mod tests { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, _addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - MockUdp(config.tpu_udp_socket.clone()) + ) -> Arc { + let config: &MockUdpConfig = match config.as_any().downcast_ref::() { + Some(b) => b, + None => panic!("Expecting a MockUdpConfig!"), + }; + + Arc::new(MockUdp(config.udp_socket.clone())) } } pub struct MockUdpConfig { - tpu_udp_socket: Arc, + udp_socket: Arc, } impl Default for MockUdpConfig { fn default() -> Self { Self { - tpu_udp_socket: Arc::new( + udp_socket: Arc::new( solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) .expect("Unable to bind to UDP socket"), ), @@ -417,85 +431,105 @@ mod tests { } } - impl NewTpuConfig for MockUdpConfig { - type ClientError = String; - - fn new() -> Result { + impl NewConnectionConfig for MockUdpConfig { + fn new() -> Result { Ok(Self { - tpu_udp_socket: Arc::new( + udp_socket: Arc::new( solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(|_| "Unable to bind to UDP socket".to_string())?, + .map_err(Into::::into)?, ), }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } pub struct MockUdp(Arc); - impl BaseTpuConnection for MockUdp { - type BlockingConnectionType = MockUdpTpuConnection; - type NonblockingConnectionType = MockUdpTpuConnection; - + impl BaseClientConnection for MockUdp { fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> MockUdpTpuConnection { - MockUdpTpuConnection { + ) -> Arc { + Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, - } + }) } fn new_nonblocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> MockUdpTpuConnection { - MockUdpTpuConnection { + ) -> Arc { + Arc::new(MockUdpConnection { _socket: self.0.clone(), addr, - } + }) } } - pub struct MockUdpTpuConnection { + pub struct MockUdpConnection { _socket: Arc, addr: SocketAddr, } - impl BlockingTpuConnection for MockUdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { + #[derive(Default)] + pub struct MockConnectionManager {} + + impl ConnectionManager for MockConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(MockUdpPool { + connections: Vec::default(), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(MockUdpConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + MOCK_PORT_OFFSET + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::UDP + } + } + + impl BlockingClientConnection for MockUdpConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - fn send_wire_transaction_async(&self, _wire_transaction: Vec) -> TransportResult<()> { + fn send_data(&self, _buffer: &[u8]) -> TransportResult<()> { unimplemented!() } - fn send_wire_transaction_batch(&self, _buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + fn send_data_async(&self, _data: Vec) -> TransportResult<()> { unimplemented!() } - fn send_wire_transaction_batch_async(&self, _buffers: Vec>) -> TransportResult<()> { + fn send_data_batch(&self, _buffers: &[Vec]) -> TransportResult<()> { + unimplemented!() + } + fn send_data_batch_async(&self, _buffers: Vec>) -> TransportResult<()> { unimplemented!() } } #[async_trait] - impl NonblockingTpuConnection for MockUdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { + impl NonblockingClientConnection for MockUdpConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - async fn send_wire_transaction(&self, _wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data(&self, _data: &[u8]) -> TransportResult<()> { unimplemented!() } - async fn send_wire_transaction_batch(&self, _buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data_batch(&self, _buffers: &[Vec]) -> TransportResult<()> { unimplemented!() } } @@ -512,7 +546,7 @@ mod tests { } #[test] - fn test_tpu_connection_cache() { + fn test_connection_cache() { solana_logger::setup(); // Allow the test to run deterministically // with the same pseudorandom sequence between runs @@ -521,13 +555,14 @@ mod tests { // to get the same pseudorandom sequence on different platforms let mut rng = ChaChaRng::seed_from_u64(42); - // Generate a bunch of random addresses and create TPUConnections to them - // Since TPUConnection::new is infallible, it should't matter whether or not - // we can actually connect to those addresses - TPUConnection implementations should either + // Generate a bunch of random addresses and create connections to them + // Since ClientConnection::new is infallible, it should't matter whether or not + // we can actually connect to those addresses - ClientConnection implementations should either // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) + let connection_manager = Box::::default(); let connection_cache = - TpuConnectionCache::::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap(); + ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); let port_offset = MOCK_PORT_OFFSET; let addrs = (0..MAX_CONNECTIONS) .map(|_| { @@ -546,9 +581,9 @@ mod tests { .unwrap_or_else(|| a.port()); let addr = &SocketAddr::new(a.ip(), port); - let conn = &map.get(addr).expect("Address not found").connections[0]; + let conn = &map.get(addr).expect("Address not found").get(0).unwrap(); let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); - assert!(addr.ip() == BlockingTpuConnection::tpu_addr(&conn).ip()); + assert!(addr.ip() == conn.server_addr().ip()); }); } @@ -573,13 +608,14 @@ mod tests { let port = u16::MAX - MOCK_PORT_OFFSET + 1; assert!(port.checked_add(MOCK_PORT_OFFSET).is_none()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let connection_cache = TpuConnectionCache::::new(1).unwrap(); + let connection_manager = Box::::default(); + let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); - let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr); + let conn = connection_cache.get_connection(&addr); // We (intentionally) don't have an interface that allows us to distinguish between // UDP and Quic connections, so check instead that the port is valid (non-zero) // and is the same as the input port (falling back on UDP) - assert!(BlockingTpuConnection::tpu_addr(&conn).port() != 0); - assert!(BlockingTpuConnection::tpu_addr(&conn).port() == port); + assert!(conn.server_addr().port() != 0); + assert!(conn.server_addr().port() == port); } } diff --git a/tpu-client/src/connection_cache_stats.rs b/connection-cache/src/connection_cache_stats.rs similarity index 92% rename from tpu-client/src/connection_cache_stats.rs rename to connection-cache/src/connection_cache_stats.rs index 9827cc125f..697757fd32 100644 --- a/tpu-client/src/connection_cache_stats.rs +++ b/connection-cache/src/connection_cache_stats.rs @@ -1,5 +1,5 @@ use { - crate::tpu_connection::ClientStats, + crate::client_connection::ClientStats, std::sync::atomic::{AtomicU64, Ordering}, }; @@ -161,22 +161,16 @@ impl ConnectionCacheStats { i64 ), ( - "tx_streams_blocked_uni", - self.total_client_stats - .tx_streams_blocked_uni - .load_and_reset(), + "streams_blocked_uni", + self.total_client_stats.streams_blocked_uni.load_and_reset(), i64 ), ( - "tx_data_blocked", - self.total_client_stats.tx_data_blocked.load_and_reset(), - i64 - ), - ( - "tx_acks", - self.total_client_stats.tx_acks.load_and_reset(), + "data_blocked", + self.total_client_stats.data_blocked.load_and_reset(), i64 ), + ("acks", self.total_client_stats.acks.load_and_reset(), i64), ( "num_packets", self.sent_packets.swap(0, Ordering::Relaxed), diff --git a/connection-cache/src/lib.rs b/connection-cache/src/lib.rs new file mode 100644 index 0000000000..cc595ff574 --- /dev/null +++ b/connection-cache/src/lib.rs @@ -0,0 +1,9 @@ +#![allow(clippy::integer_arithmetic)] + +pub mod client_connection; +pub mod connection_cache; +pub mod connection_cache_stats; +pub mod nonblocking; + +#[macro_use] +extern crate solana_metrics; diff --git a/connection-cache/src/nonblocking/client_connection.rs b/connection-cache/src/nonblocking/client_connection.rs new file mode 100644 index 0000000000..ef79674057 --- /dev/null +++ b/connection-cache/src/nonblocking/client_connection.rs @@ -0,0 +1,15 @@ +//! Trait defining async send functions, to be used for UDP or QUIC sending + +use { + async_trait::async_trait, solana_sdk::transport::Result as TransportResult, + std::net::SocketAddr, +}; + +#[async_trait] +pub trait ClientConnection { + fn server_addr(&self) -> &SocketAddr; + + async fn send_data(&self, buffer: &[u8]) -> TransportResult<()>; + + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()>; +} diff --git a/connection-cache/src/nonblocking/mod.rs b/connection-cache/src/nonblocking/mod.rs new file mode 100644 index 0000000000..a601ae46f3 --- /dev/null +++ b/connection-cache/src/nonblocking/mod.rs @@ -0,0 +1 @@ +pub mod client_connection; diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index aad0af9371..74343ef68e 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -7,7 +7,7 @@ use { tracer_packet_stats::TracerPacketStats, unprocessed_transaction_storage::UnprocessedTransactionStorage, }, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_perf::{data_budget::DataBudget, packet::Packet}, @@ -197,7 +197,7 @@ impl Forwarder { .forwarded_transaction_count .fetch_add(packet_vec_len, Ordering::Relaxed); let conn = connection_cache.get_connection(&addr); - conn.send_wire_transaction_batch_async(packet_vec) + conn.send_data_batch_async(packet_vec) }; measure.stop(); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 6ffeeaaacc..e5e2933fa2 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -13,7 +13,7 @@ use { solana_streamer::streamer::{ self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{ net::UdpSocket, sync::{ diff --git a/core/src/validator.rs b/core/src/validator.rs index fd47843457..414bdaba6d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -759,11 +759,12 @@ impl Validator { let connection_cache = match use_quic { true => { - let mut connection_cache = ConnectionCache::new(tpu_connection_pool_size); - connection_cache - .update_client_certificate(&identity_keypair, node.info.gossip.ip()) - .expect("Failed to update QUIC client certificates"); - connection_cache.set_staked_nodes(&staked_nodes, &identity_keypair.pubkey()); + let connection_cache = ConnectionCache::new_with_client_options( + tpu_connection_pool_size, + None, + Some((&identity_keypair, node.info.gossip.ip())), + Some((&staked_nodes, &identity_keypair.pubkey())), + ); Arc::new(connection_cache) } false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), @@ -2089,7 +2090,7 @@ mod tests { crossbeam_channel::{bounded, RecvTimeoutError}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, std::{fs::remove_dir_all, thread, time::Duration}, diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index d685210dd7..0772eb65ad 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,7 +3,7 @@ use { rand::{thread_rng, Rng}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ @@ -50,7 +50,7 @@ impl WarmQuicCacheService { .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) { let conn = connection_cache.get_connection(&addr); - if let Err(err) = conn.send_wire_transaction([0u8]) { + if let Err(err) = conn.send_data(&[0u8]) { warn!( "Failed to warmup QUIC connection to the leader {:?}, Error {:?}", leader_pubkey, err diff --git a/dos/src/main.rs b/dos/src/main.rs index f72c86ba38..f3db52dd09 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ @@ -67,7 +67,7 @@ use { transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ net::{SocketAddr, UdpSocket}, process::exit, @@ -285,7 +285,7 @@ fn create_sender_thread( Ok(tx_batch) => { let len = tx_batch.batch.len(); let mut measure_send_txs = Measure::start("measure_send_txs"); - let res = connection.send_wire_transaction_batch_async(tx_batch.batch); + let res = connection.send_data_batch_async(tx_batch.batch); measure_send_txs.stop(); time_send_ns += measure_send_txs.as_ns(); diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 1b759044c9..029ad46a5f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -44,7 +44,7 @@ use { }, solana_stake_program::{config::create_account as create_stake_config_account, stake_state}, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, solana_vote_program::{ diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9760d25595..d705ac4e54 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4495,6 +4495,7 @@ dependencies = [ "quinn", "rand 0.7.3", "rayon", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -4532,6 +4533,25 @@ dependencies = [ "solana-sdk 1.16.0", ] +[[package]] +name = "solana-connection-cache" +version = "1.16.0" +dependencies = [ + "async-trait", + "bincode", + "futures-util", + "indexmap", + "log", + "rand 0.7.3", + "rayon", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-sdk 1.16.0", + "thiserror", + "tokio", +] + [[package]] name = "solana-core" version = "1.16.0" @@ -5180,6 +5200,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustls 0.20.6", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -6047,6 +6068,8 @@ version = "1.16.0" dependencies = [ "bincode", "log", + "rayon", + "solana-connection-cache", "solana-rpc-client", "solana-rpc-client-api", "solana-sdk 1.16.0", @@ -6065,6 +6088,7 @@ dependencies = [ "log", "rand 0.7.3", "rayon", + "solana-connection-cache", "solana-measure", "solana-metrics", "solana-net-utils", @@ -6105,6 +6129,7 @@ name = "solana-udp-client" version = "1.16.0" dependencies = [ "async-trait", + "solana-connection-cache", "solana-net-utils", "solana-sdk 1.16.0", "solana-streamer", diff --git a/quic-client/Cargo.toml b/quic-client/Cargo.toml index c3f6745900..3cede2c276 100644 --- a/quic-client/Cargo.toml +++ b/quic-client/Cargo.toml @@ -20,6 +20,8 @@ quinn = "0.9.3" quinn-proto = "0.9.2" quinn-udp = "0.3.2" rustls = { version = "0.20.6", default-features = false, features = ["dangerous_configuration", "logging"] } + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 6364366749..855eefeff0 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -9,10 +9,20 @@ extern crate solana_metrics; use { crate::{ nonblocking::quic_client::{ - QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint, - QuicTpuConnection as NonblockingQuicTpuConnection, + QuicClient, QuicClientCertificate, + QuicClientConnection as NonblockingQuicClientConnection, QuicLazyInitializedEndpoint, }, - quic_client::QuicTpuConnection as BlockingQuicTpuConnection, + quic_client::QuicClientConnection as BlockingQuicClientConnection, + }, + quinn::Endpoint, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, + ConnectionPoolError, NewConnectionConfig, ProtocolType, + }, + connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::{ @@ -20,13 +30,8 @@ use { streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, - tpu_connection_cache::{ - BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig, - }, - }, std::{ + any::Any, error::Error, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{Arc, RwLock}, @@ -41,25 +46,12 @@ pub enum QuicClientError { } pub struct QuicPool { - connections: Vec>, + connections: Vec>, endpoint: Arc, } impl ConnectionPool for QuicPool { - type PoolTpuConnection = Quic; - type TpuConfig = QuicConfig; - const PORT_OFFSET: u16 = QUIC_PORT_OFFSET; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - endpoint: config.create_endpoint(), - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -67,7 +59,7 @@ impl ConnectionPool for QuicPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -76,14 +68,15 @@ impl ConnectionPool for QuicPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - Quic(Arc::new(QuicClient::new( + ) -> Arc { + let config = QuicConfig::downcast_ref(config); + Arc::new(Quic(Arc::new(QuicClient::new( self.endpoint.clone(), *addr, config.compute_max_parallel_streams(), - ))) + )))) } } @@ -91,15 +84,17 @@ pub struct QuicConfig { client_certificate: Arc, maybe_staked_nodes: Option>>, maybe_client_pubkey: Option, + + // The optional specified endpoint for the quic based client connections + // If not specified, the connection cache will create as needed. + client_endpoint: Option, } -impl NewTpuConfig for QuicConfig { - type ClientError = QuicClientError; - - fn new() -> Result { +impl NewConnectionConfig for QuicConfig { + fn new() -> Result { let (cert, priv_key) = new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(|err| QuicClientError::CertificateError(err.to_string()))?; + .map_err(|err| ClientError::CertificateError(err.to_string()))?; Ok(Self { client_certificate: Arc::new(QuicClientCertificate { certificate: cert, @@ -107,16 +102,25 @@ impl NewTpuConfig for QuicConfig { }), maybe_staked_nodes: None, maybe_client_pubkey: None, + client_endpoint: None, }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } impl QuicConfig { - fn create_endpoint(&self) -> Arc { - Arc::new(QuicLazyInitializedEndpoint::new( + fn create_endpoint(&self) -> QuicLazyInitializedEndpoint { + QuicLazyInitializedEndpoint::new( self.client_certificate.clone(), - None, - )) + self.client_endpoint.as_ref().cloned(), + ) } fn compute_max_parallel_streams(&self) -> usize { @@ -158,30 +162,84 @@ impl QuicConfig { self.maybe_staked_nodes = Some(staked_nodes.clone()); self.maybe_client_pubkey = Some(*client_pubkey); } + + pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) { + self.client_endpoint = Some(client_endpoint); + } + + /// Convenient function to downcast a generic NewConnectionConfig reference to QuicConfig + pub fn downcast_ref(config: &dyn NewConnectionConfig) -> &Self { + match config.as_any().downcast_ref::() { + Some(config) => config, + None => panic!("Expecting a QuicConfig!"), + } + } } pub struct Quic(Arc); -impl BaseTpuConnection for Quic { - type BlockingConnectionType = BlockingQuicTpuConnection; - type NonblockingConnectionType = NonblockingQuicTpuConnection; - +impl BaseClientConnection for Quic { fn new_blocking_connection( &self, _addr: SocketAddr, stats: Arc, - ) -> BlockingQuicTpuConnection { - BlockingQuicTpuConnection::new_with_client(self.0.clone(), stats) + ) -> Arc { + Arc::new(BlockingQuicClientConnection::new_with_client( + self.0.clone(), + stats, + )) } fn new_nonblocking_connection( &self, _addr: SocketAddr, stats: Arc, - ) -> NonblockingQuicTpuConnection { - NonblockingQuicTpuConnection::new_with_client(self.0.clone(), stats) + ) -> Arc { + Arc::new(NonblockingQuicClientConnection::new_with_client( + self.0.clone(), + stats, + )) } } +#[derive(Default)] +pub struct QuicConnectionManager { + connection_config: Option>, +} + +impl ConnectionManager for QuicConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(QuicPool { + connections: Vec::default(), + endpoint: Arc::new(self.connection_config.as_ref().map_or( + QuicLazyInitializedEndpoint::default(), + |config| { + let config = QuicConfig::downcast_ref(config.as_ref()); + config.create_endpoint() + }, + )), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(QuicConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + QUIC_PORT_OFFSET + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::QUIC + } +} + +impl QuicConnectionManager { + pub fn new_with_connection_config(config: QuicConfig) -> Self { + Self { + connection_config: Some(Box::new(config)), + } + } +} #[cfg(test)] mod tests { use { @@ -190,33 +248,29 @@ mod tests { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, - solana_tpu_client::tpu_connection_cache::{ - TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, - }, }; #[test] fn test_connection_cache_max_parallel_chunks() { solana_logger::setup(); - let connection_cache = - TpuConnectionCache::::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap(); - let mut tpu_config = connection_cache.tpu_config; + + let mut connection_config = QuicConfig::new().unwrap(); assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let pubkey = Pubkey::new_unique(); - tpu_config.set_staked_nodes(&staked_nodes, &pubkey); + connection_config.set_staked_nodes(&staked_nodes, &pubkey); assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); staked_nodes.write().unwrap().total_stake = 10000; assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); @@ -230,7 +284,7 @@ mod tests { (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize ); @@ -245,7 +299,7 @@ mod tests { .pubkey_stake_map .insert(pubkey, 1000); assert_ne!( - tpu_config.compute_max_parallel_streams(), + connection_config.compute_max_parallel_streams(), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); } diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 11d8a6a3b3..231b641bbb 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -1,5 +1,5 @@ //! Simple nonblocking client that connects to a given UDP port with the QUIC protocol -//! and provides an interface for sending transactions which is restricted by the +//! and provides an interface for sending data which is restricted by the //! server's flow control. use { async_mutex::Mutex, @@ -11,6 +11,10 @@ use { ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError, }, + solana_connection_cache::{ + client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection, + }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind, @@ -25,10 +29,6 @@ use { solana_streamer::{ nonblocking::quic::ALPN_TPU_PROTOCOL_ID, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection, - tpu_connection::ClientStats, - }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, @@ -397,21 +397,21 @@ impl QuicClient { connection_stats .total_client_stats - .tx_streams_blocked_uni + .streams_blocked_uni .update_stat( - &self.stats.tx_streams_blocked_uni, + &self.stats.streams_blocked_uni, new_stats.frame_tx.streams_blocked_uni, ); connection_stats .total_client_stats - .tx_data_blocked - .update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked); + .data_blocked + .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked); connection_stats .total_client_stats - .tx_acks - .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); + .acks + .update_stat(&self.stats.acks, new_stats.frame_tx.acks); last_connection_id = connection.stable_id(); match Self::_send_buffer_using_conn(data, &connection).await { @@ -438,7 +438,7 @@ impl QuicClient { // if we come here, that means we have exhausted maximum retries, return the error info!( - "Ran into an error sending transactions {:?}, exhausted retries to {}", + "Ran into an error sending data {:?}, exhausted retries to {}", last_error, self.addr ); // If we get here but last_error is None, then we have a logic error @@ -470,12 +470,12 @@ impl QuicClient { where T: AsRef<[u8]>, { - // Start off by "testing" the connection by sending the first transaction + // Start off by "testing" the connection by sending the first buffer // This will also connect to the server if not already connected // and reconnect and retry if the first send attempt failed // (for example due to a timed out connection), returning an error - // or the connection that was used to successfully send the transaction. - // We will use the returned connection to send the rest of the transactions in the batch + // or the connection that was used to successfully send the buffer. + // We will use the returned connection to send the rest of the buffers in the batch // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way // since testing even in the ideal GCE environment has found no cases // where reconnecting and retrying in the middle of a batch send @@ -515,7 +515,7 @@ impl QuicClient { Ok(()) } - pub fn tpu_addr(&self) -> &SocketAddr { + pub fn server_addr(&self) -> &SocketAddr { &self.addr } @@ -524,12 +524,12 @@ impl QuicClient { } } -pub struct QuicTpuConnection { +pub struct QuicClientConnection { pub client: Arc, pub connection_stats: Arc, } -impl QuicTpuConnection { +impl QuicClientConnection { pub fn base_stats(&self) -> Arc { self.client.stats() } @@ -563,15 +563,12 @@ impl QuicTpuConnection { } #[async_trait] -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.client.tpu_addr() +impl ClientConnection for QuicClientConnection { + fn server_addr(&self) -> &SocketAddr { + self.client.server_addr() } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { let stats = ClientStats::default(); let len = buffers.len(); let res = self @@ -584,18 +581,15 @@ impl TpuConnection for QuicTpuConnection { Ok(()) } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { + async fn send_data(&self, data: &[u8]) -> TransportResult<()> { let stats = Arc::new(ClientStats::default()); - let send_buffer = - self.client - .send_buffer(wire_transaction, &stats, self.connection_stats.clone()); + let send_buffer = self + .client + .send_buffer(data, &stats, self.connection_stats.clone()); if let Err(e) = send_buffer.await { warn!( - "Failed to send transaction async to {}, error: {:?} ", - self.tpu_addr(), + "Failed to send data async to {}, error: {:?} ", + self.server_addr(), e ); datapoint_warn!("send-wire-async", ("failure", 1, i64),); diff --git a/quic-client/src/quic_client.rs b/quic-client/src/quic_client.rs index 0cc5c9e353..39726deb1f 100644 --- a/quic-client/src/quic_client.rs +++ b/quic-client/src/quic_client.rs @@ -1,18 +1,18 @@ //! Simple client that connects to a given UDP port with the QUIC protocol and provides -//! an interface for sending transactions which is restricted by the server's flow control. +//! an interface for sending data which is restricted by the server's flow control. use { crate::nonblocking::quic_client::{ - QuicClient, QuicLazyInitializedEndpoint, QuicTpuConnection as NonblockingQuicTpuConnection, + QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint, }, lazy_static::lazy_static, log::*, - solana_sdk::transport::{Result as TransportResult, TransportError}, - solana_tpu_client::{ + solana_connection_cache::{ + client_connection::{ClientConnection, ClientStats}, connection_cache_stats::ConnectionCacheStats, - nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection, - tpu_connection::{ClientStats, TpuConnection}, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, + solana_sdk::transport::{Result as TransportResult, TransportError}, std::{ net::SocketAddr, sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard}, @@ -21,125 +21,119 @@ use { tokio::{runtime::Runtime, time::timeout}, }; -pub mod temporary_pub { - use super::*; +pub const MAX_OUTSTANDING_TASK: u64 = 2000; +pub const SEND_DATA_TIMEOUT_MS: u64 = 10000; - pub const MAX_OUTSTANDING_TASK: u64 = 2000; - pub const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000; +/// A semaphore used for limiting the number of asynchronous tasks spawn to the +/// runtime. Before spawnning a task, use acquire. After the task is done (be it +/// succsess or failure), call release. +struct AsyncTaskSemaphore { + /// Keep the counter info about the usage + counter: Mutex, + /// Conditional variable for signaling when counter is decremented + cond_var: Condvar, + /// The maximum usage allowed by this semaphore. + permits: u64, +} - /// A semaphore used for limiting the number of asynchronous tasks spawn to the - /// runtime. Before spawnning a task, use acquire. After the task is done (be it - /// succsess or failure), call release. - pub struct AsyncTaskSemaphore { - /// Keep the counter info about the usage - counter: Mutex, - /// Conditional variable for signaling when counter is decremented - cond_var: Condvar, - /// The maximum usage allowed by this semaphore. - permits: u64, - } - - impl AsyncTaskSemaphore { - pub fn new(permits: u64) -> Self { - Self { - counter: Mutex::new(0), - cond_var: Condvar::new(), - permits, - } - } - - /// When returned, the lock has been locked and usage count has been - /// incremented. When the returned MutexGuard is dropped the lock is dropped - /// without decrementing the usage count. - pub fn acquire(&self) -> MutexGuard { - let mut count = self.counter.lock().unwrap(); - *count += 1; - while *count > self.permits { - count = self.cond_var.wait(count).unwrap(); - } - count - } - - /// Acquire the lock and decrement the usage count - pub fn release(&self) { - let mut count = self.counter.lock().unwrap(); - *count -= 1; - self.cond_var.notify_one(); +impl AsyncTaskSemaphore { + pub fn new(permits: u64) -> Self { + Self { + counter: Mutex::new(0), + cond_var: Condvar::new(), + permits, } } - lazy_static! { - pub static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = - AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); - pub static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("quic-client") - .enable_all() - .build() - .unwrap(); + /// When returned, the lock has been locked and usage count has been + /// incremented. When the returned MutexGuard is dropped the lock is dropped + /// without decrementing the usage count. + pub fn acquire(&self) -> MutexGuard { + let mut count = self.counter.lock().unwrap(); + *count += 1; + while *count > self.permits { + count = self.cond_var.wait(count).unwrap(); + } + count } - pub async fn send_wire_transaction_async( - connection: Arc, - wire_transaction: Vec, - ) -> TransportResult<()> { - let result = timeout( - Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS), - connection.send_wire_transaction(wire_transaction), - ) - .await; - ASYNC_TASK_SEMAPHORE.release(); - handle_send_result(result, connection) + /// Acquire the lock and decrement the usage count + pub fn release(&self) { + let mut count = self.counter.lock().unwrap(); + *count -= 1; + self.cond_var.notify_one(); } +} - pub async fn send_wire_transaction_batch_async( - connection: Arc, - buffers: Vec>, - ) -> TransportResult<()> { - let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64; +lazy_static! { + static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = + AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); + static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("quic-client") + .enable_all() + .build() + .unwrap(); +} - let result = timeout( - Duration::from_millis(time_out), - connection.send_wire_transaction_batch(&buffers), - ) - .await; - ASYNC_TASK_SEMAPHORE.release(); - handle_send_result(result, connection) - } +async fn send_data_async( + connection: Arc, + buffer: Vec, +) -> TransportResult<()> { + let result = timeout( + Duration::from_millis(SEND_DATA_TIMEOUT_MS), + connection.send_data(&buffer), + ) + .await; + ASYNC_TASK_SEMAPHORE.release(); + handle_send_result(result, connection) +} - /// Check the send result and update stats if timedout. Returns the checked result. - pub fn handle_send_result( - result: Result, tokio::time::error::Elapsed>, - connection: Arc, - ) -> Result<(), TransportError> { - match result { - Ok(result) => result, - Err(_err) => { - let client_stats = ClientStats::default(); - client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); - let stats = connection.connection_stats(); - stats.add_client_stats(&client_stats, 0, false); - info!("Timedout sending transaction {:?}", connection.tpu_addr()); - Err(TransportError::Custom( - "Timedout sending transaction".to_string(), - )) - } +async fn send_data_batch_async( + connection: Arc, + buffers: Vec>, +) -> TransportResult<()> { + let time_out = SEND_DATA_TIMEOUT_MS * buffers.len() as u64; + + let result = timeout( + Duration::from_millis(time_out), + connection.send_data_batch(&buffers), + ) + .await; + ASYNC_TASK_SEMAPHORE.release(); + handle_send_result(result, connection) +} + +/// Check the send result and update stats if timedout. Returns the checked result. +fn handle_send_result( + result: Result, tokio::time::error::Elapsed>, + connection: Arc, +) -> Result<(), TransportError> { + match result { + Ok(result) => result, + Err(_err) => { + let client_stats = ClientStats::default(); + client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); + let stats = connection.connection_stats(); + stats.add_client_stats(&client_stats, 0, false); + info!("Timedout sending data {:?}", connection.server_addr()); + Err(TransportError::Custom("Timedout sending data".to_string())) } } } -use temporary_pub::*; -pub struct QuicTpuConnection { - pub inner: Arc, +pub struct QuicClientConnection { + pub inner: Arc, } -impl QuicTpuConnection { + +impl QuicClientConnection { pub fn new( endpoint: Arc, - tpu_addr: SocketAddr, + server_addr: SocketAddr, connection_stats: Arc, ) -> Self { - let inner = Arc::new(NonblockingQuicTpuConnection::new( + let inner = Arc::new(NonblockingQuicConnection::new( endpoint, - tpu_addr, + server_addr, connection_stats, )); Self { inner } @@ -149,7 +143,7 @@ impl QuicTpuConnection { client: Arc, connection_stats: Arc, ) -> Self { - let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client( + let inner = Arc::new(NonblockingQuicConnection::new_with_client( client, connection_stats, )); @@ -157,33 +151,33 @@ impl QuicTpuConnection { } } -impl TpuConnection for QuicTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { - self.inner.tpu_addr() +impl ClientConnection for QuicClientConnection { + fn server_addr(&self) -> &SocketAddr { + self.inner.server_addr() } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + RUNTIME.block_on(self.inner.send_data_batch(buffers))?; Ok(()) } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + fn send_data_async(&self, data: Vec) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - let _handle = RUNTIME - .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); + let _handle = RUNTIME.spawn(async move { send_data_async(inner, data).await }); Ok(()) } - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); - let _handle = - RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); + let _handle = RUNTIME.spawn(async move { send_data_batch_async(inner, buffers).await }); + Ok(()) + } + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + RUNTIME.block_on(self.inner.send_data(buffer))?; Ok(()) } } diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index aaeff7c1b4..f48e8b369b 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -3,6 +3,7 @@ mod tests { use { crossbeam_channel::{unbounded, Receiver}, log::*, + solana_connection_cache::connection_cache_stats::ConnectionCacheStats, solana_perf::packet::PacketBatch, solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, @@ -12,7 +13,6 @@ mod tests { nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats, streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, - solana_tpu_client::connection_cache_stats::ConnectionCacheStats, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ @@ -67,8 +67,8 @@ mod tests { #[test] fn test_quic_client_multiple_writes() { use { - solana_quic_client::quic_client::QuicTpuConnection, - solana_tpu_client::tpu_connection::TpuConnection, + solana_connection_cache::client_connection::ClientConnection, + solana_quic_client::quic_client::QuicClientConnection, }; solana_logger::setup(); let (sender, receiver) = unbounded(); @@ -93,7 +93,7 @@ mod tests { let port = s.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - let client = QuicTpuConnection::new( + let client = QuicClientConnection::new( Arc::new(QuicLazyInitializedEndpoint::default()), tpu_addr, connection_cache_stats, @@ -104,7 +104,7 @@ mod tests { let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(client.send_wire_transaction_batch_async(packets).is_ok()); + assert!(client.send_data_batch_async(packets).is_ok()); check_packets(receiver, num_bytes, num_expected_packets); exit.store(true, Ordering::Relaxed); @@ -114,8 +114,8 @@ mod tests { #[tokio::test] async fn test_nonblocking_quic_client_multiple_writes() { use { - solana_quic_client::nonblocking::quic_client::QuicTpuConnection, - solana_tpu_client::nonblocking::tpu_connection::TpuConnection, + solana_connection_cache::nonblocking::client_connection::ClientConnection, + solana_quic_client::nonblocking::quic_client::QuicClientConnection, }; solana_logger::setup(); let (sender, receiver) = unbounded(); @@ -140,7 +140,7 @@ mod tests { let port = s.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - let client = QuicTpuConnection::new( + let client = QuicClientConnection::new( Arc::new(QuicLazyInitializedEndpoint::default()), tpu_addr, connection_cache_stats, @@ -150,8 +150,7 @@ mod tests { let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - - assert!(client.send_wire_transaction_batch(&packets).await.is_ok()); + assert!(client.send_data_batch(&packets).await.is_ok()); check_packets(receiver, num_bytes, num_expected_packets); exit.store(true, Ordering::Relaxed); @@ -168,8 +167,8 @@ mod tests { /// In this we demonstrate that the request sender and the response receiver use the /// same quic Endpoint, and the same UDP socket. use { - solana_quic_client::quic_client::QuicTpuConnection, - solana_tpu_client::tpu_connection::TpuConnection, + solana_connection_cache::client_connection::ClientConnection, + solana_quic_client::quic_client::QuicClientConnection, }; solana_logger::setup(); @@ -239,15 +238,13 @@ mod tests { let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = - QuicTpuConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats); + QuicClientConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats); // Send a full size packet with single byte writes as a request. let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(request_sender - .send_wire_transaction_batch_async(packets) - .is_ok()); + assert!(request_sender.send_data_batch_async(packets).is_ok()); check_packets(receiver, num_bytes, num_expected_packets); info!("Received requests!"); @@ -264,16 +261,14 @@ mod tests { let endpoint2 = QuicLazyInitializedEndpoint::new(client_certificate2, None); let connection_cache_stats2 = Arc::new(ConnectionCacheStats::default()); let response_sender = - QuicTpuConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2); + QuicClientConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2); // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE; let num_expected_packets: usize = 3000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - assert!(response_sender - .send_wire_transaction_batch_async(packets) - .is_ok()); + assert!(response_sender.send_data_batch_async(packets).is_ok()); check_packets(receiver2, num_bytes, num_expected_packets); info!("Received responses!"); diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 0335f7e11f..54fbf26fd1 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -7,7 +7,7 @@ use { serde_json::{json, Value}, solana_account_decoder::UiAccount, solana_client::{ - connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::ConnectionCache, tpu_client::{TpuClient, TpuClientConfig}, }, solana_pubsub_client::nonblocking::pubsub_client::PubsubClient, @@ -29,6 +29,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_test_validator::TestValidator, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, solana_transaction_status::TransactionStatus, std::{ collections::HashSet, diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index fa693ae3c3..f96794b156 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,7 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::connection_cache::ConnectionCache, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -706,7 +706,7 @@ impl SendTransactionService { connection_cache: &Arc, ) -> Result<(), TransportError> { let conn = connection_cache.get_connection(tpu_address); - conn.send_wire_transaction_async(wire_transaction.to_vec()) + conn.send_data_async(wire_transaction.to_vec()) } fn send_transactions_with_metrics( @@ -716,7 +716,7 @@ impl SendTransactionService { ) -> Result<(), TransportError> { let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); let conn = connection_cache.get_connection(tpu_address); - conn.send_wire_transaction_batch_async(wire_transactions) + conn.send_data_batch_async(wire_transactions) } fn send_transactions( diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 7d28fce8cd..1d20306d6c 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -44,7 +44,7 @@ use { signature::{read_keypair_file, write_keypair_file, Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::{ + solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, std::{ diff --git a/thin-client/Cargo.toml b/thin-client/Cargo.toml index 99220f5c59..df89c43776 100644 --- a/thin-client/Cargo.toml +++ b/thin-client/Cargo.toml @@ -12,13 +12,14 @@ edition = "2021" [dependencies] bincode = "1.3.3" log = "0.4.17" +rayon = "1.5.3" +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false } solana-rpc-client = { path = "../rpc-client", version = "=1.16.0", default-features = false } solana-rpc-client-api = { path = "../rpc-client-api", version = "=1.16.0" } solana-sdk = { path = "../sdk", version = "=1.16.0" } solana-tpu-client = { path = "../tpu-client", version = "=1.16.0", default-features = false } [dev-dependencies] -rayon = "1.5.3" solana-logger = { path = "../logger", version = "=1.16.0" } [package.metadata.docs.rs] diff --git a/thin-client/src/thin_client.rs b/thin-client/src/thin_client.rs index 7a43b71d2e..7cde909b24 100644 --- a/thin-client/src/thin_client.rs +++ b/thin-client/src/thin_client.rs @@ -5,6 +5,8 @@ use { log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_connection_cache::connection_cache::ConnectionCache, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response}, solana_sdk::{ @@ -25,10 +27,6 @@ use { transaction::{self, Transaction, VersionedTransaction}, transport::Result as TransportResult, }, - solana_tpu_client::{ - tpu_connection::TpuConnection, - tpu_connection_cache::{ConnectionPool, TpuConnectionCache}, - }, std::{ io, net::SocketAddr, @@ -113,21 +111,21 @@ pub mod temporary_pub { use temporary_pub::*; /// An object for querying and sending transactions to the network. -pub struct ThinClient { +pub struct ThinClient { rpc_clients: Vec, tpu_addrs: Vec, optimizer: ClientOptimizer, - connection_cache: Arc>, + connection_cache: Arc, } -impl ThinClient

{ +impl ThinClient { /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP /// (currently hardcoded to UDP) pub fn new( rpc_addr: SocketAddr, tpu_addr: SocketAddr, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache) } @@ -136,7 +134,7 @@ impl ThinClient

{ rpc_addr: SocketAddr, tpu_addr: SocketAddr, timeout: Duration, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); Self::new_from_client(rpc_client, tpu_addr, connection_cache) @@ -145,7 +143,7 @@ impl ThinClient

{ fn new_from_client( rpc_client: RpcClient, tpu_addr: SocketAddr, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { Self { rpc_clients: vec![rpc_client], @@ -158,7 +156,7 @@ impl ThinClient

{ pub fn new_from_addrs( rpc_addrs: Vec, tpu_addrs: Vec, - connection_cache: Arc>, + connection_cache: Arc, ) -> Self { assert!(!rpc_addrs.is_empty()); assert_eq!(rpc_addrs.len(), tpu_addrs.len()); @@ -221,7 +219,7 @@ impl ThinClient

{ let conn = self.connection_cache.get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) #[allow(clippy::needless_borrow)] - conn.send_wire_transaction(&wire_transaction)?; + conn.send_data(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -316,13 +314,13 @@ impl ThinClient

{ } } -impl Client for ThinClient

{ +impl Client for ThinClient { fn tpu_addr(&self) -> String { self.tpu_addr().to_string() } } -impl SyncClient for ThinClient

{ +impl SyncClient for ThinClient { fn send_and_confirm_message( &self, keypairs: &T, @@ -602,13 +600,15 @@ impl SyncClient for ThinClient

{ } } -impl AsyncClient for ThinClient

{ +impl AsyncClient for ThinClient { fn async_send_versioned_transaction( &self, transaction: VersionedTransaction, ) -> TransportResult { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.serialize_and_send_transaction(&transaction)?; + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + conn.send_data(&wire_transaction)?; Ok(transaction.signatures[0]) } @@ -617,7 +617,11 @@ impl AsyncClient for ThinClient

{ batch: Vec, ) -> TransportResult<()> { let conn = self.connection_cache.get_connection(self.tpu_addr()); - conn.par_serialize_and_send_transaction_batch(&batch[..])?; + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + conn.send_data_batch(&buffers)?; Ok(()) } } @@ -636,7 +640,7 @@ fn min_index(array: &[u64]) -> (u64, usize) { #[cfg(test)] mod tests { - use {super::*, rayon::prelude::*}; + use super::*; #[test] fn test_client_optimizer() { diff --git a/tpu-client/Cargo.toml b/tpu-client/Cargo.toml index b51765d883..342108fd5f 100644 --- a/tpu-client/Cargo.toml +++ b/tpu-client/Cargo.toml @@ -18,6 +18,8 @@ indicatif = { version = "0.17.1", optional = true } log = "0.4.17" rand = "0.7.0" rayon = "1.5.3" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" } solana-metrics = { path = "../metrics", version = "=1.16.0" } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } diff --git a/tpu-client/src/lib.rs b/tpu-client/src/lib.rs index 5fec3e4eda..199a09091e 100644 --- a/tpu-client/src/lib.rs +++ b/tpu-client/src/lib.rs @@ -1,10 +1,6 @@ #![allow(clippy::integer_arithmetic)] -pub mod connection_cache_stats; pub mod nonblocking; pub mod tpu_client; -pub mod tpu_connection; -pub mod tpu_connection_cache; -#[macro_use] extern crate solana_metrics; diff --git a/tpu-client/src/nonblocking/mod.rs b/tpu-client/src/nonblocking/mod.rs index fe2549410a..f5461aefd2 100644 --- a/tpu-client/src/nonblocking/mod.rs +++ b/tpu-client/src/nonblocking/mod.rs @@ -1,2 +1 @@ pub mod tpu_client; -pub mod tpu_connection; diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 0edafb4f0b..626858fe31 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -7,16 +7,13 @@ use { solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, }; use { - crate::{ - nonblocking::tpu_connection::TpuConnection, - tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS}, - tpu_connection_cache::{ - ConnectionPool, TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, - }, - }, + crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS}, bincode::serialize, futures_util::{future::join_all, stream::StreamExt}, log::*, + solana_connection_cache::connection_cache::{ + ConnectionCache, ConnectionManager, DEFAULT_CONNECTION_POOL_SIZE, + }, solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError}, solana_rpc_client::nonblocking::rpc_client::RpcClient, solana_rpc_client_api::{ @@ -253,33 +250,33 @@ impl LeaderTpuCache { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient { fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, rpc_client: Arc, - connection_cache: Arc>, + connection_cache: Arc, } -async fn send_wire_transaction_to_addr( - connection_cache: &TpuConnectionCache

, +async fn send_wire_transaction_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transaction: Vec, ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction(wire_transaction.clone()).await + conn.send_data(&wire_transaction).await } -async fn send_wire_transaction_batch_to_addr( - connection_cache: &TpuConnectionCache

, +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, addr: &SocketAddr, wire_transactions: &[Vec], ) -> TransportResult<()> { let conn = connection_cache.get_nonblocking_connection(addr); - conn.send_wire_transaction_batch(wire_transactions).await + conn.send_data_batch(wire_transactions).await } -impl TpuClient

{ +impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub async fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -394,9 +391,11 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, + connection_manager: Box, ) -> Result { - let connection_cache = - Arc::new(TpuConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap()); // TODO: Handle error properly, as the TpuConnectionCache ctor is now fallible. + let connection_cache = Arc::new( + ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(), + ); // TODO: Handle error properly, as the ConnectionCache ctor is now fallible. Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await } @@ -405,7 +404,7 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc>, + connection_cache: Arc, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let leader_tpu_service = @@ -554,7 +553,7 @@ impl TpuClient

{ } } -impl Drop for TpuClient

{ +impl Drop for TpuClient { fn drop(&mut self) { self.exit.store(true, Ordering::Relaxed); } diff --git a/tpu-client/src/nonblocking/tpu_connection.rs b/tpu-client/src/nonblocking/tpu_connection.rs deleted file mode 100644 index 4a711ac7b8..0000000000 --- a/tpu-client/src/nonblocking/tpu_connection.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Trait defining async send functions, to be used for UDP or QUIC sending - -use { - async_trait::async_trait, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::net::SocketAddr, -}; - -#[async_trait] -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - async fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction).await - } - - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; -} diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 3a343fb301..887f2233e0 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -1,10 +1,8 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { - crate::{ - nonblocking::tpu_client::TpuClient as NonblockingTpuClient, - tpu_connection_cache::{ConnectionPool, TpuConnectionCache}, - }, + crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_connection_cache::connection_cache::{ConnectionCache, ConnectionManager}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, std::{ @@ -19,6 +17,10 @@ use { tokio::time::Duration, }; +pub const DEFAULT_TPU_ENABLE_UDP: bool = false; +pub const DEFAULT_TPU_USE_QUIC: bool = true; +pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; + pub mod temporary_pub { use super::*; @@ -57,14 +59,14 @@ impl Default for TpuClientConfig { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { +pub struct TpuClient { _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket //todo: get rid of this field rpc_client: Arc, - tpu_client: Arc>, + tpu_client: Arc, } -impl TpuClient

{ +impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { @@ -108,9 +110,14 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, + connection_manager: Box, ) -> Result { - let create_tpu_client = - NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); + let create_tpu_client = NonblockingTpuClient::new( + rpc_client.get_inner_client().clone(), + websocket_url, + config, + connection_manager, + ); let tpu_client = tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; @@ -126,7 +133,7 @@ impl TpuClient

{ rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, - connection_cache: Arc>, + connection_cache: Arc, ) -> Result { let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( rpc_client.get_inner_client().clone(), diff --git a/tpu-client/src/tpu_connection.rs b/tpu-client/src/tpu_connection.rs deleted file mode 100644 index 44a35c899c..0000000000 --- a/tpu-client/src/tpu_connection.rs +++ /dev/null @@ -1,63 +0,0 @@ -use { - rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_metrics::MovingStat, - solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::{net::SocketAddr, sync::atomic::AtomicU64}, -}; - -#[derive(Default)] -pub struct ClientStats { - pub total_connections: AtomicU64, - pub connection_reuse: AtomicU64, - pub connection_errors: AtomicU64, - pub zero_rtt_accepts: AtomicU64, - pub zero_rtt_rejects: AtomicU64, - - // these will be the last values of these stats - pub congestion_events: MovingStat, - pub tx_streams_blocked_uni: MovingStat, - pub tx_data_blocked: MovingStat, - pub tx_acks: MovingStat, - pub make_connection_ms: AtomicU64, - pub send_timeout: AtomicU64, -} - -pub trait TpuConnection { - fn tpu_addr(&self) -> &SocketAddr; - - fn serialize_and_send_transaction( - &self, - transaction: &VersionedTransaction, - ) -> TransportResult<()> { - let wire_transaction = - bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(wire_transaction) - } - - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.send_wire_transaction_batch(&[wire_transaction]) - } - - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; - - fn par_serialize_and_send_transaction_batch( - &self, - transactions: &[VersionedTransaction], - ) -> TransportResult<()> { - let buffers = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - - self.send_wire_transaction_batch(&buffers) - } - - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync; - - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()>; -} diff --git a/udp-client/Cargo.toml b/udp-client/Cargo.toml index 1a212d78d0..e86e55f581 100644 --- a/udp-client/Cargo.toml +++ b/udp-client/Cargo.toml @@ -11,6 +11,8 @@ edition = "2021" [dependencies] async-trait = "0.1.57" + +solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false } solana-net-utils = { path = "../net-utils", version = "=1.16.0" } solana-sdk = { path = "../sdk", version = "=1.16.0" } solana-streamer = { path = "../streamer", version = "=1.16.0" } diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index ec8e383571..d634afef57 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -5,45 +5,31 @@ pub mod udp_client; use { crate::{ - nonblocking::udp_client::UdpTpuConnection as NonblockingUdpTpuConnection, - udp_client::UdpTpuConnection as BlockingUdpTpuConnection, + nonblocking::udp_client::UdpClientConnection as NonblockingUdpConnection, + udp_client::UdpClientConnection as BlockingUdpConnection, }, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, - tpu_connection_cache::{ - BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig, + solana_connection_cache::{ + client_connection::ClientConnection as BlockingClientConnection, + connection_cache::{ + BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, + ConnectionPoolError, NewConnectionConfig, ProtocolType, }, + connection_cache_stats::ConnectionCacheStats, + nonblocking::client_connection::ClientConnection as NonblockingClientConnection, }, std::{ + any::Any, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::Arc, }, - thiserror::Error, }; -#[derive(Error, Debug)] -pub enum UdpClientError { - #[error("IO error: {0:?}")] - IoError(#[from] std::io::Error), -} - pub struct UdpPool { - connections: Vec>, + connections: Vec>, } impl ConnectionPool for UdpPool { - type PoolTpuConnection = Udp; - type TpuConfig = UdpConfig; - - fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self { - let mut pool = Self { - connections: vec![], - }; - pool.add_connection(config, addr); - pool - } - - fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) { - let connection = Arc::new(self.create_pool_entry(config, addr)); + fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) { + let connection = self.create_pool_entry(config, addr); self.connections.push(connection); } @@ -51,7 +37,7 @@ impl ConnectionPool for UdpPool { self.connections.len() } - fn get(&self, index: usize) -> Result, ConnectionPoolError> { + fn get(&self, index: usize) -> Result, ConnectionPoolError> { self.connections .get(index) .cloned() @@ -60,47 +46,80 @@ impl ConnectionPool for UdpPool { fn create_pool_entry( &self, - config: &Self::TpuConfig, + config: &dyn NewConnectionConfig, _addr: &SocketAddr, - ) -> Self::PoolTpuConnection { - Udp(config.tpu_udp_socket.clone()) + ) -> Arc { + let config: &UdpConfig = match config.as_any().downcast_ref::() { + Some(b) => b, + None => panic!("Expecting a UdpConfig!"), + }; + Arc::new(Udp(config.udp_socket.clone())) } } pub struct UdpConfig { - tpu_udp_socket: Arc, + udp_socket: Arc, } -impl NewTpuConfig for UdpConfig { - type ClientError = UdpClientError; - - fn new() -> Result { +impl NewConnectionConfig for UdpConfig { + fn new() -> Result { let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?; + .map_err(Into::::into)?; Ok(Self { - tpu_udp_socket: Arc::new(socket), + udp_socket: Arc::new(socket), }) } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } pub struct Udp(Arc); -impl BaseTpuConnection for Udp { - type BlockingConnectionType = BlockingUdpTpuConnection; - type NonblockingConnectionType = NonblockingUdpTpuConnection; - +impl BaseClientConnection for Udp { fn new_blocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> BlockingUdpTpuConnection { - BlockingUdpTpuConnection::new_from_addr(self.0.clone(), addr) + ) -> Arc { + Arc::new(BlockingUdpConnection::new_from_addr(self.0.clone(), addr)) } fn new_nonblocking_connection( &self, addr: SocketAddr, _stats: Arc, - ) -> NonblockingUdpTpuConnection { - NonblockingUdpTpuConnection::new_from_addr(self.0.try_clone().unwrap(), addr) + ) -> Arc { + Arc::new(NonblockingUdpConnection::new_from_addr( + self.0.try_clone().unwrap(), + addr, + )) + } +} + +#[derive(Default)] +pub struct UdpConnectionManager {} + +impl ConnectionManager for UdpConnectionManager { + fn new_connection_pool(&self) -> Box { + Box::new(UdpPool { + connections: Vec::default(), + }) + } + + fn new_connection_config(&self) -> Box { + Box::new(UdpConfig::new().unwrap()) + } + + fn get_port_offset(&self) -> u16 { + 0 + } + + fn get_protocol_type(&self) -> ProtocolType { + ProtocolType::UDP } } diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 07c2900271..ab90491470 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -1,50 +1,43 @@ //! Simple UDP client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions +//! an interface for sending data use { - async_trait::async_trait, core::iter::repeat, solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, - solana_tpu_client::nonblocking::tpu_connection::TpuConnection, std::net::SocketAddr, + async_trait::async_trait, core::iter::repeat, + solana_connection_cache::nonblocking::client_connection::ClientConnection, + solana_sdk::transport::Result as TransportResult, + solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, tokio::net::UdpSocket, }; -pub struct UdpTpuConnection { +pub struct UdpClientConnection { pub socket: UdpSocket, pub addr: SocketAddr, } -impl UdpTpuConnection { - pub fn new_from_addr(socket: std::net::UdpSocket, tpu_addr: SocketAddr) -> Self { +impl UdpClientConnection { + pub fn new_from_addr(socket: std::net::UdpSocket, server_addr: SocketAddr) -> Self { socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); Self { socket, - addr: tpu_addr, + addr: server_addr, } } } #[async_trait] -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { +impl ClientConnection for UdpClientConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - async fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - self.socket - .send_to(wire_transaction.as_ref(), self.addr) - .await?; + async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + self.socket.send_to(buffer, self.addr).await?; Ok(()) } - async fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); + async fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); batch_send(&self.socket, &pkts).await?; Ok(()) } @@ -60,20 +53,17 @@ mod tests { tokio::net::UdpSocket, }; - async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) { + async fn check_send_one(connection: &UdpClientConnection, reader: &UdpSocket) { let packet = vec![111u8; PACKET_DATA_SIZE]; - connection.send_wire_transaction(&packet).await.unwrap(); + connection.send_data(&packet).await.unwrap(); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap(); assert_eq!(1, recv); } - async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) { + async fn check_send_batch(connection: &UdpClientConnection, reader: &UdpSocket) { let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); - connection - .send_wire_transaction_batch(&packets) - .await - .unwrap(); + connection.send_data_batch(&packets).await.unwrap(); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap(); assert_eq!(32, recv); @@ -85,7 +75,7 @@ mod tests { let addr = addr_str.parse().unwrap(); let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); - let connection = UdpTpuConnection::new_from_addr(socket, addr); + let connection = UdpClientConnection::new_from_addr(socket, addr); let reader = UdpSocket::bind(addr_str).await.expect("bind"); check_send_one(&connection, &reader).await; check_send_batch(&connection, &reader).await; diff --git a/udp-client/src/udp_client.rs b/udp-client/src/udp_client.rs index e090b056d0..9ca7d23df9 100644 --- a/udp-client/src/udp_client.rs +++ b/udp-client/src/udp_client.rs @@ -1,63 +1,58 @@ -//! Simple TPU client that communicates with the given UDP port with UDP and provides -//! an interface for sending transactions +//! Simple client that communicates with the given UDP port with UDP and provides +//! an interface for sending data use { core::iter::repeat, + solana_connection_cache::client_connection::ClientConnection, solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, - solana_tpu_client::{ - connection_cache_stats::ConnectionCacheStats, tpu_connection::TpuConnection, - }, std::{ net::{SocketAddr, UdpSocket}, sync::Arc, }, }; -pub struct UdpTpuConnection { +pub struct UdpClientConnection { pub socket: Arc, pub addr: SocketAddr, } -impl UdpTpuConnection { - pub fn new_from_addr(local_socket: Arc, tpu_addr: SocketAddr) -> Self { +impl UdpClientConnection { + pub fn new_from_addr(local_socket: Arc, server_addr: SocketAddr) -> Self { Self { socket: local_socket, - addr: tpu_addr, + addr: server_addr, } } - - pub fn new( - local_socket: Arc, - tpu_addr: SocketAddr, - _connection_stats: Arc, - ) -> Self { - Self::new_from_addr(local_socket, tpu_addr) - } } -impl TpuConnection for UdpTpuConnection { - fn tpu_addr(&self) -> &SocketAddr { +impl ClientConnection for UdpClientConnection { + fn server_addr(&self) -> &SocketAddr { &self.addr } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { - self.socket.send_to(wire_transaction.as_ref(), self.addr)?; + fn send_data_async(&self, data: Vec) -> TransportResult<()> { + self.socket.send_to(data.as_ref(), self.addr)?; Ok(()) } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> - where - T: AsRef<[u8]> + Send + Sync, - { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); + fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { + let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); batch_send(&self.socket, &pkts)?; Ok(()) } - fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); + fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + let pkts: Vec<_> = buffers + .into_iter() + .zip(repeat(self.server_addr())) + .collect(); batch_send(&self.socket, &pkts)?; Ok(()) } + + fn send_data(&self, buffer: &[u8]) -> TransportResult<()> { + self.socket.send_to(buffer, self.addr)?; + Ok(()) + } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 50f2b856ee..c7aef208e1 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -35,7 +35,7 @@ use { solana_send_transaction_service::send_transaction_service::{ self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{path::PathBuf, str::FromStr}, }; diff --git a/validator/src/main.rs b/validator/src/main.rs index 32f9e4df1e..5df496ebf8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -51,7 +51,7 @@ use { }, solana_send_transaction_service::send_transaction_service::{self}, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, solana_validator::{ admin_rpc_service, admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides},