diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 7f318ef5a1..e3a910079d 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,7 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_client::connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + solana_client::connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, solana_core::banking_stage::BankingStage, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -341,8 +341,11 @@ fn main() { SocketAddrSpace::Unspecified, ); let cluster_info = Arc::new(cluster_info); - let tpu_use_quic = UseQUIC::new(matches.is_present("tpu_use_quic")) - .expect("Failed to initialize QUIC flags"); + let tpu_use_quic = matches.is_present("tpu_use_quic"); + let connection_cache = match tpu_use_quic { + true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + }; let banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, @@ -353,10 +356,7 @@ fn main() { None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), - Arc::new(ConnectionCache::new( - tpu_use_quic, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - )), + Arc::new(connection_cache), ); poh_recorder.write().unwrap().set_bank(&bank, false); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 0e2e1e842f..5b96ffb029 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -8,7 +8,7 @@ use { keypairs::get_keypairs, }, solana_client::{ - connection_cache::{ConnectionCache, UseQUIC}, + connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, @@ -103,9 +103,10 @@ fn main() { do_bench_tps(client, cli_config, keypairs); } ExternalClientType::ThinClient => { - let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = - Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size)); + let connection_cache = match use_quic { + true => Arc::new(ConnectionCache::new(*tpu_connection_pool_size)), + false => Arc::new(ConnectionCache::with_udp(*tpu_connection_pool_size)), + }; let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) { let rpc = rpc_addr.parse().unwrap_or_else(|e| { @@ -176,16 +177,17 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); - let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = - Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size)); + let connection_cache = match use_quic { + true => ConnectionCache::new(*tpu_connection_pool_size), + false => ConnectionCache::with_udp(*tpu_connection_pool_size), + }; let client = Arc::new( TpuClient::new_with_connection_cache( rpc_client, websocket_url, TpuClientConfig::default(), - connection_cache, + Arc::new(connection_cache), ) .unwrap_or_else(|err| { eprintln!("Could not create TpuClient {:?}", err); diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 98da0f85bb..34a56cd24a 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -17,7 +17,6 @@ use { Arc, RwLock, }, }, - tokio::io, }; // Should be non-zero @@ -219,29 +218,12 @@ impl ConnectionCacheStats { } } -pub enum UseQUIC { - Yes, - No(Arc), -} - -impl UseQUIC { - pub fn new(use_quic: bool) -> io::Result { - if use_quic { - Ok(UseQUIC::Yes) - } else { - let socket = - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))?; - Ok(UseQUIC::No(Arc::new(socket))) - } - } -} - pub struct ConnectionCache { map: RwLock>, stats: Arc, last_stats: AtomicInterval, - use_quic: UseQUIC, connection_pool_size: usize, + tpu_udp_socket: Option>, } /// Models the pool of connections @@ -270,25 +252,31 @@ impl ConnectionPool { } impl ConnectionCache { - pub fn new(use_quic: UseQUIC, connection_pool_size: usize) -> Self { + pub fn new(connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); Self { - use_quic, + tpu_udp_socket: None, connection_pool_size, ..Self::default() } } - pub fn get_use_quic(&self) -> bool { - match self.use_quic { - UseQUIC::Yes => true, - UseQUIC::No(_) => false, + pub fn with_udp(connection_pool_size: usize) -> Self { + // The minimum pool size is 1. + let connection_pool_size = 1.max(connection_pool_size); + Self { + connection_pool_size, + ..Self::default() } } + pub fn use_quic(&self) -> bool { + matches!(self.tpu_udp_socket, None) + } + fn create_endpoint(&self) -> Option> { - if self.get_use_quic() { + if self.use_quic() { Some(Arc::new(QuicLazyInitializedEndpoint::new())) } else { None @@ -320,12 +308,12 @@ impl ConnectionCache { }); let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection { - let connection = match &self.use_quic { - UseQUIC::Yes => BaseTpuConnection::Quic(Arc::new(QuicClient::new( + let connection = match &self.tpu_udp_socket { + Some(socket) => BaseTpuConnection::Udp(socket.clone()), + None => BaseTpuConnection::Quic(Arc::new(QuicClient::new( endpoint.as_ref().unwrap().clone(), *addr, ))), - UseQUIC::No(socket) => BaseTpuConnection::Udp(socket.clone()), }; let connection = Arc::new(connection); @@ -380,11 +368,7 @@ impl ConnectionCache { let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); - let port_offset = if self.get_use_quic() { - QUIC_PORT_OFFSET - } else { - 0 - }; + let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 }; let addr = SocketAddr::new(addr.ip(), addr.port() + port_offset); @@ -504,13 +488,17 @@ impl ConnectionCache { impl Default for ConnectionCache { fn default() -> Self { - let use_quic = UseQUIC::new(DEFAULT_TPU_USE_QUIC).expect("Failed to initialize QUIC flags"); Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), last_stats: AtomicInterval::default(), - use_quic, connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_udp_socket: (!DEFAULT_TPU_USE_QUIC).then(|| { + Arc::new( + solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) + .expect("Unable to bind to UDP socket"), + ) + }), } } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6e14d19794..3418bf84c7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -227,7 +227,7 @@ impl Tvu { bank_forks.clone(), ); - let warm_quic_cache_service = if connection_cache.get_use_quic() { + let warm_quic_cache_service = if connection_cache.use_quic() { Some(WarmQuicCacheService::new( connection_cache.clone(), cluster_info.clone(), diff --git a/core/src/validator.rs b/core/src/validator.rs index 6e6b4b0791..684791ff30 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -25,7 +25,7 @@ use { }, crossbeam_channel::{bounded, unbounded, Receiver}, rand::{thread_rng, Rng}, - solana_client::connection_cache::{ConnectionCache, UseQUIC}, + solana_client::connection_cache::ConnectionCache, solana_entry::poh::compute_hash_time_ns, solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, solana_gossip::{ @@ -757,8 +757,10 @@ impl Validator { }; let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - let use_quic = UseQUIC::new(use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size)); + let connection_cache = match use_quic { + true => Arc::new(ConnectionCache::new(tpu_connection_pool_size)), + false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), + }; let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let ( diff --git a/dos/src/main.rs b/dos/src/main.rs index 9153356188..6fefd16af3 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,7 @@ use { rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, solana_client::{ - connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, rpc_client::RpcClient, tpu_connection::TpuConnection, }, @@ -423,8 +423,10 @@ fn run_dos_transactions( //let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); //let udp_client = UdpTpuConnection::new(target, connection_cache_stats); - let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = ConnectionCache::new(tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE); + let connection_cache = match tpu_use_quic { + true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + }; let connection = connection_cache.get_connection(&target); let mut count = 0; @@ -622,14 +624,15 @@ fn main() { exit(1); }); - let tpu_use_quic = - UseQUIC::new(cmd_params.tpu_use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = Arc::new(ConnectionCache::new( - tpu_use_quic, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - )); - let (client, num_clients) = - get_multi_client(&validators, &SocketAddrSpace::Unspecified, connection_cache); + let connection_cache = match cmd_params.tpu_use_quic { + true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + }; + let (client, num_clients) = get_multi_client( + &validators, + &SocketAddrSpace::Unspecified, + Arc::new(connection_cache), + ); if validators.len() < num_clients { eprintln!( "Error: Insufficient nodes discovered. Expecting {} or more", diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 30192a2d05..6f3b13e5e0 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,7 +8,7 @@ use { log::*, solana_client::{ connection_cache::{ - ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, + ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, }, thin_client::ThinClient, }, @@ -299,17 +299,15 @@ impl LocalCluster { validators.insert(leader_pubkey, cluster_leader); - let tpu_use_quic = - UseQUIC::new(config.tpu_use_quic).expect("Failed to initialize QUIC flags"); let mut cluster = Self { funding_keypair: mint_keypair, entry_point_info: leader_contact_info, validators, genesis_config, - connection_cache: Arc::new(ConnectionCache::new( - tpu_use_quic, - config.tpu_connection_pool_size, - )), + connection_cache: match config.tpu_use_quic { + true => Arc::new(ConnectionCache::new(config.tpu_connection_pool_size)), + false => Arc::new(ConnectionCache::with_udp(config.tpu_connection_pool_size)), + }, }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index b5190c744a..72c8488e7a 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -8,7 +8,7 @@ use { solana_account_decoder::UiAccount, solana_client::{ client_error::{ClientErrorKind, Result as ClientResult}, - connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, nonblocking::pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig}, @@ -420,11 +420,10 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { test_validator.rpc_url(), CommitmentConfig::processed(), )); - let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags"); - let connection_cache = Arc::new(ConnectionCache::new( - tpu_use_quic, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - )); + let connection_cache = match tpu_use_quic { + true => Arc::new(ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE)), + false => Arc::new(ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE)), + }; let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &test_validator.rpc_pubsub_url(),