diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index e3dc321a29..bad140ac4f 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,7 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_client::connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + solana_client::connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, solana_core::banking_stage::BankingStage, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -341,7 +341,8 @@ fn main() { SocketAddrSpace::Unspecified, ); let cluster_info = Arc::new(cluster_info); - let tpu_use_quic = matches.is_present("tpu_use_quic"); + let tpu_use_quic = UseQUIC::new(matches.is_present("tpu_use_quic")) + .expect("Failed to initialize QUIC flags"); let banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index da1f015d1c..0e2e1e842f 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -8,7 +8,7 @@ use { keypairs::get_keypairs, }, solana_client::{ - connection_cache::ConnectionCache, + connection_cache::{ConnectionCache, UseQUIC}, rpc_client::RpcClient, thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, @@ -103,8 +103,9 @@ fn main() { do_bench_tps(client, cli_config, keypairs); } ExternalClientType::ThinClient => { + let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = - Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size)); + Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size)); let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) { let rpc = rpc_addr.parse().unwrap_or_else(|e| { @@ -175,8 +176,9 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); + let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = - Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size)); + Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size)); let client = Arc::new( TpuClient::new_with_connection_cache( diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 699f018475..547e8e2a8d 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -10,12 +10,13 @@ use { solana_measure::measure::Measure, solana_sdk::{quic::QUIC_PORT_OFFSET, timing::AtomicInterval}, std::{ - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, }, + tokio::io, }; // Should be non-zero @@ -217,11 +218,28 @@ impl ConnectionCacheStats { } } +pub enum UseQUIC { + Yes, + No(Arc), +} + +impl UseQUIC { + pub fn new(use_quic: bool) -> io::Result { + if use_quic { + Ok(UseQUIC::Yes) + } else { + let socket = + solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))?; + Ok(UseQUIC::No(Arc::new(socket))) + } + } +} + pub struct ConnectionCache { map: RwLock>, stats: Arc, last_stats: AtomicInterval, - use_quic: bool, + use_quic: UseQUIC, connection_pool_size: usize, } @@ -251,7 +269,7 @@ impl ConnectionPool { } impl ConnectionCache { - pub fn new(use_quic: bool, connection_pool_size: usize) -> Self { + pub fn new(use_quic: UseQUIC, connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); Self { @@ -262,11 +280,14 @@ impl ConnectionCache { } pub fn get_use_quic(&self) -> bool { - self.use_quic + match self.use_quic { + UseQUIC::Yes => true, + UseQUIC::No(_) => false, + } } fn create_endpoint(&self) -> Option> { - if self.use_quic { + if self.get_use_quic() { Some(Arc::new(QuicLazyInitializedEndpoint::new())) } else { None @@ -299,15 +320,16 @@ impl ConnectionCache { let (cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) = if to_create_connection { - let connection: Connection = if self.use_quic { - QuicTpuConnection::new( + let connection: Connection = match &self.use_quic { + UseQUIC::Yes => QuicTpuConnection::new( endpoint.as_ref().unwrap().clone(), *addr, self.stats.clone(), ) - .into() - } else { - UdpTpuConnection::new(*addr, self.stats.clone()).into() + .into(), + UseQUIC::No(socket) => { + UdpTpuConnection::new(socket.clone(), *addr, self.stats.clone()).into() + } }; let connection = Arc::new(connection); @@ -363,7 +385,11 @@ impl ConnectionCache { let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); - let port_offset = if self.use_quic { QUIC_PORT_OFFSET } else { 0 }; + let port_offset = if self.get_use_quic() { + QUIC_PORT_OFFSET + } else { + 0 + }; let addr = SocketAddr::new(addr.ip(), addr.port() + port_offset); @@ -470,11 +496,12 @@ impl ConnectionCache { impl Default for ConnectionCache { fn default() -> Self { + let use_quic = UseQUIC::new(DEFAULT_TPU_USE_QUIC).expect("Failed to initialize QUIC flags"); Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), last_stats: AtomicInterval::default(), - use_quic: DEFAULT_TPU_USE_QUIC, + use_quic, connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, } } diff --git a/client/src/nonblocking/udp_client.rs b/client/src/nonblocking/udp_client.rs index 7fc50e593c..1f607430e8 100644 --- a/client/src/nonblocking/udp_client.rs +++ b/client/src/nonblocking/udp_client.rs @@ -2,12 +2,9 @@ //! an interface for sending transactions use { - crate::nonblocking::tpu_connection::TpuConnection, - async_trait::async_trait, - core::iter::repeat, - solana_sdk::transport::Result as TransportResult, - solana_streamer::nonblocking::sendmmsg::batch_send, - std::net::{IpAddr, Ipv4Addr, SocketAddr}, + crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait, + core::iter::repeat, solana_sdk::transport::Result as TransportResult, + solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr, tokio::net::UdpSocket, }; @@ -17,14 +14,8 @@ pub struct UdpTpuConnection { } impl UdpTpuConnection { - pub fn new(tpu_addr: SocketAddr) -> Self { - let socket = - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap(); + pub fn new(tpu_addr: SocketAddr, socket: std::net::UdpSocket) -> Self { socket.set_nonblocking(true).unwrap(); - Self::new_with_std_socket(tpu_addr, socket) - } - - pub fn new_with_std_socket(tpu_addr: SocketAddr, socket: std::net::UdpSocket) -> Self { let socket = UdpSocket::from_std(socket).unwrap(); Self { socket, @@ -92,20 +83,9 @@ mod tests { async fn test_send_from_addr() { let addr_str = "0.0.0.0:50100"; let addr = addr_str.parse().unwrap(); - let connection = UdpTpuConnection::new(addr); - let reader = UdpSocket::bind(addr_str).await.expect("bind"); - check_send_one(&connection, &reader).await; - check_send_batch(&connection, &reader).await; - } - - #[tokio::test] - async fn test_send_from_socket() { - let addr_str = "0.0.0.0:50101"; - let addr = addr_str.parse().unwrap(); let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap(); - socket.set_nonblocking(true).unwrap(); - let connection = UdpTpuConnection::new_with_std_socket(addr, socket); + let connection = UdpTpuConnection::new(addr, socket); let reader = UdpSocket::bind(addr_str).await.expect("bind"); check_send_one(&connection, &reader).await; check_send_batch(&connection, &reader).await; diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index 24f256f304..b0fa879f02 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -7,28 +7,30 @@ use { solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, std::{ - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + net::{SocketAddr, UdpSocket}, sync::Arc, }, }; pub struct UdpTpuConnection { - socket: UdpSocket, + socket: Arc, addr: SocketAddr, } impl UdpTpuConnection { - pub fn new_from_addr(tpu_addr: SocketAddr) -> Self { - let socket = - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap(); + pub fn new_from_addr(local_socket: Arc, tpu_addr: SocketAddr) -> Self { Self { - socket, + socket: local_socket, addr: tpu_addr, } } - pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc) -> Self { - Self::new_from_addr(tpu_addr) + pub fn new( + local_socket: Arc, + tpu_addr: SocketAddr, + _connection_stats: Arc, + ) -> Self { + Self::new_from_addr(local_socket, tpu_addr) } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 649b8d68a2..2adcfcb71d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -25,7 +25,7 @@ use { }, crossbeam_channel::{bounded, unbounded, Receiver}, rand::{thread_rng, Rng}, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, UseQUIC}, solana_entry::poh::compute_hash_time_ns, solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, solana_gossip::{ @@ -753,6 +753,7 @@ impl Validator { }; let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let use_quic = UseQUIC::new(use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size)); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); diff --git a/dos/src/main.rs b/dos/src/main.rs index ad451e7fa7..9153356188 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,7 @@ use { rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, solana_client::{ - connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, rpc_client::RpcClient, tpu_connection::TpuConnection, }, @@ -423,6 +423,7 @@ fn run_dos_transactions( //let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); //let udp_client = UdpTpuConnection::new(target, connection_cache_stats); + let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = ConnectionCache::new(tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE); let connection = connection_cache.get_connection(&target); @@ -621,8 +622,10 @@ fn main() { exit(1); }); + let tpu_use_quic = + UseQUIC::new(cmd_params.tpu_use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = Arc::new(ConnectionCache::new( - cmd_params.tpu_use_quic, + tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE, )); let (client, num_clients) = diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 974e4c6c5a..8d1a872c10 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,7 +8,7 @@ use { log::*, solana_client::{ connection_cache::{ - ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, + ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, }, thin_client::ThinClient, }, @@ -277,13 +277,15 @@ impl LocalCluster { validators.insert(leader_pubkey, cluster_leader); + let tpu_use_quic = + UseQUIC::new(config.tpu_use_quic).expect("Failed to initialize QUIC flags"); let mut cluster = Self { funding_keypair: mint_keypair, entry_point_info: leader_contact_info, validators, genesis_config, connection_cache: Arc::new(ConnectionCache::new( - config.tpu_use_quic, + tpu_use_quic, config.tpu_connection_pool_size, )), }; diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index b73a6a4035..b5190c744a 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -8,7 +8,7 @@ use { solana_account_decoder::UiAccount, solana_client::{ client_error::{ClientErrorKind, Result as ClientResult}, - connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE}, nonblocking::pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig}, @@ -420,6 +420,7 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { test_validator.rpc_url(), CommitmentConfig::processed(), )); + let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = Arc::new(ConnectionCache::new( tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE,