diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index f236825fdf..e3dc321a29 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,7 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, solana_core::banking_stage::BankingStage, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -352,7 +352,10 @@ fn main() { None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), - Arc::new(ConnectionCache::new(tpu_use_quic)), + Arc::new(ConnectionCache::new( + tpu_use_quic, + DEFAULT_TPU_CONNECTION_POOL_SIZE, + )), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 9e6a18ab03..e9b4faa848 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -2,7 +2,7 @@ use { clap::{crate_description, crate_name, App, Arg, ArgMatches}, solana_clap_utils::input_validators::{is_url, is_url_or_moniker}, solana_cli_config::{ConfigInput, CONFIG_FILE}, - solana_client::connection_cache::DEFAULT_TPU_USE_QUIC, + solana_client::connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, solana_sdk::{ fee_calculator::FeeRateGovernor, pubkey::Pubkey, @@ -53,6 +53,7 @@ pub struct Config { pub target_node: Option, pub external_client_type: ExternalClientType, pub use_quic: bool, + pub tpu_connection_pool_size: usize, } impl Default for Config { @@ -79,6 +80,7 @@ impl Default for Config { target_node: None, external_client_type: ExternalClientType::default(), use_quic: DEFAULT_TPU_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, } } } @@ -294,6 +296,13 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .help("Submit transactions via QUIC; only affects ThinClient (default) \ or TpuClient sends"), ) + .arg( + Arg::with_name("tpu_connection_pool_size") + .long("tpu-connection-pool-size") + .takes_value(true) + .help("Controls the connection pool size per remote address; only affects ThinClient (default) \ + or TpuClient sends"), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -343,6 +352,13 @@ pub fn extract_args(matches: &ArgMatches) -> Config { args.use_quic = true; } + if let Some(v) = matches.value_of("tpu_connection_pool_size") { + args.tpu_connection_pool_size = v + .to_string() + .parse() + .expect("can't parse tpu_connection_pool_size"); + } + if let Some(addr) = matches.value_of("entrypoint") { args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse entrypoint address: {}", e); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 572c185fa8..da1f015d1c 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -49,6 +49,7 @@ fn main() { target_node, external_client_type, use_quic, + tpu_connection_pool_size, .. } = &cli_config; @@ -102,7 +103,9 @@ fn main() { do_bench_tps(client, cli_config, keypairs); } ExternalClientType::ThinClient => { - let connection_cache = Arc::new(ConnectionCache::new(*use_quic)); + let connection_cache = + Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size)); + let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) { let rpc = rpc_addr.parse().unwrap_or_else(|e| { eprintln!("RPC address should parse as socketaddr {:?}", e); @@ -172,7 +175,9 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); - let connection_cache = Arc::new(ConnectionCache::new(*use_quic)); + let connection_cache = + Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size)); + let client = Arc::new( TpuClient::new_with_connection_cache( rpc_client, diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 63ab99e915..699f018475 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,17 +1,18 @@ use { crate::{ + nonblocking::quic_client::QuicLazyInitializedEndpoint, quic_client::QuicTpuConnection, tpu_connection::{ClientStats, Connection}, udp_client::UdpTpuConnection, }, - indexmap::map::IndexMap, + indexmap::map::{Entry, IndexMap}, rand::{thread_rng, Rng}, solana_measure::measure::Measure, - solana_sdk::timing::AtomicInterval, + solana_sdk::{quic::QUIC_PORT_OFFSET, timing::AtomicInterval}, std::{ net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, RwLock, }, }, @@ -24,6 +25,9 @@ static MAX_CONNECTIONS: usize = 1024; /// QUIC connections. pub const DEFAULT_TPU_USE_QUIC: bool = false; +/// Default TPU connection pool size per remote address +pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; + #[derive(Default)] pub struct ConnectionCacheStats { cache_hits: AtomicU64, @@ -214,22 +218,144 @@ impl ConnectionCacheStats { } pub struct ConnectionCache { - map: RwLock>>, + map: RwLock>, stats: Arc, last_stats: AtomicInterval, - use_quic: AtomicBool, + use_quic: bool, + connection_pool_size: usize, +} + +/// Models the pool of connections +struct ConnectionPool { + /// The connections in the pool + connections: Vec>, + + /// Connections in this pool share the same endpoint + endpoint: Option>, +} + +impl ConnectionPool { + /// Get a connection from the pool. It must have at least one connection in the pool. + /// This randomly picks a connection in the pool. + fn borrow_connection(&self) -> Arc { + let mut rng = thread_rng(); + let n = rng.gen_range(0, self.connections.len()); + self.connections[n].clone() + } + + /// Check if we need to create a new connection. If the count of the connections + /// is smaller than the pool size. + fn need_new_connection(&self, required_pool_size: usize) -> bool { + self.connections.len() < required_pool_size + } } impl ConnectionCache { - pub fn new(use_quic: bool) -> Self { + pub fn new(use_quic: bool, connection_pool_size: usize) -> Self { + // The minimum pool size is 1. + let connection_pool_size = 1.max(connection_pool_size); Self { - use_quic: AtomicBool::new(use_quic), + use_quic, + connection_pool_size, ..Self::default() } } pub fn get_use_quic(&self) -> bool { - self.use_quic.load(Ordering::Relaxed) + self.use_quic + } + + fn create_endpoint(&self) -> Option> { + if self.use_quic { + Some(Arc::new(QuicLazyInitializedEndpoint::new())) + } else { + None + } + } + + /// Create a lazy connection object under the exclusive lock of the cache map if there is not + /// enough unsed connections in the connection pool for the specified address. + /// Returns CreateConnectionResult. + fn create_connection( + &self, + lock_timing_ms: &mut u64, + addr: &SocketAddr, + ) -> CreateConnectionResult { + let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure"); + let mut map = self.map.write().unwrap(); + get_connection_map_lock_measure.stop(); + *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); + // Read again, as it is possible that between read lock dropped and the write lock acquired + // another thread could have setup the connection. + + let (to_create_connection, endpoint) = + map.get(addr) + .map_or((true, self.create_endpoint()), |pool| { + ( + pool.need_new_connection(self.connection_pool_size), + pool.endpoint.clone(), + ) + }); + + let (cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) = + if to_create_connection { + let connection: Connection = if self.use_quic { + QuicTpuConnection::new( + endpoint.as_ref().unwrap().clone(), + *addr, + self.stats.clone(), + ) + .into() + } else { + UdpTpuConnection::new(*addr, self.stats.clone()).into() + }; + + let connection = Arc::new(connection); + + // evict a connection if the cache is reaching upper bounds + let mut num_evictions = 0; + let mut get_connection_cache_eviction_measure = + Measure::start("get_connection_cache_eviction_measure"); + while map.len() >= MAX_CONNECTIONS { + let mut rng = thread_rng(); + let n = rng.gen_range(0, MAX_CONNECTIONS); + map.swap_remove_index(n); + num_evictions += 1; + } + get_connection_cache_eviction_measure.stop(); + + match map.entry(*addr) { + Entry::Occupied(mut entry) => { + let pool = entry.get_mut(); + pool.connections.push(connection); + } + Entry::Vacant(entry) => { + entry.insert(ConnectionPool { + connections: vec![connection], + endpoint, + }); + } + } + ( + false, + self.stats.clone(), + num_evictions, + get_connection_cache_eviction_measure.as_ms(), + ) + } else { + (true, self.stats.clone(), 0, 0) + }; + + let pool = map.get(addr).unwrap(); + let connection = pool.borrow_connection(); + + CreateConnectionResult { + connection, + cache_hit, + connection_cache_stats, + num_evictions, + eviction_timing_ms, + } } fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult { @@ -237,6 +363,10 @@ impl ConnectionCache { let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); + let port_offset = if self.use_quic { QUIC_PORT_OFFSET } else { 0 }; + + let addr = SocketAddr::new(addr.ip(), addr.port() + port_offset); + let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); let report_stats = self @@ -244,57 +374,35 @@ impl ConnectionCache { .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); - let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) = - match map.get(addr) { - Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0), - None => { - // Upgrade to write access by dropping read lock and acquire write lock + let CreateConnectionResult { + connection, + cache_hit, + connection_cache_stats, + num_evictions, + eviction_timing_ms, + } = match map.get(&addr) { + Some(pool) => { + if pool.need_new_connection(self.connection_pool_size) { + // create more connection and put it in the pool drop(map); - let mut get_connection_map_lock_measure = - Measure::start("get_connection_map_lock_measure"); - let mut map = self.map.write().unwrap(); - get_connection_map_lock_measure.stop(); - - lock_timing_ms = - lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - - // Read again, as it is possible that between read lock dropped and the write lock acquired - // another thread could have setup the connection. - match map.get(addr) { - Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0), - None => { - let connection: Connection = if self.use_quic.load(Ordering::Relaxed) { - QuicTpuConnection::new(*addr, self.stats.clone()).into() - } else { - UdpTpuConnection::new(*addr, self.stats.clone()).into() - }; - - let connection = Arc::new(connection); - - // evict a connection if the cache is reaching upper bounds - let mut num_evictions = 0; - let mut get_connection_cache_eviction_measure = - Measure::start("get_connection_cache_eviction_measure"); - while map.len() >= MAX_CONNECTIONS { - let mut rng = thread_rng(); - let n = rng.gen_range(0, MAX_CONNECTIONS); - map.swap_remove_index(n); - num_evictions += 1; - } - get_connection_cache_eviction_measure.stop(); - - map.insert(*addr, connection.clone()); - ( - connection, - false, - self.stats.clone(), - num_evictions, - get_connection_cache_eviction_measure.as_ms(), - ) - } + self.create_connection(&mut lock_timing_ms, &addr) + } else { + let connection = pool.borrow_connection(); + CreateConnectionResult { + connection, + cache_hit: true, + connection_cache_stats: self.stats.clone(), + num_evictions: 0, + eviction_timing_ms: 0, } } - }; + } + None => { + // Upgrade to write access by dropping read lock and acquire write lock + drop(map); + self.create_connection(&mut lock_timing_ms, &addr) + } + }; get_connection_map_measure.stop(); GetConnectionResult { @@ -359,13 +467,15 @@ impl ConnectionCache { connection } } + impl Default for ConnectionCache { fn default() -> Self { Self { map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), last_stats: AtomicInterval::default(), - use_quic: AtomicBool::new(DEFAULT_TPU_USE_QUIC), + use_quic: DEFAULT_TPU_USE_QUIC, + connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, } } } @@ -381,6 +491,14 @@ struct GetConnectionResult { eviction_timing_ms: u64, } +struct CreateConnectionResult { + connection: Arc, + cache_hit: bool, + connection_cache_stats: Arc, + num_evictions: u64, + eviction_timing_ms: u64, +} + #[cfg(test)] mod tests { use { @@ -432,7 +550,7 @@ mod tests { let map = connection_cache.map.read().unwrap(); assert!(map.len() == MAX_CONNECTIONS); addrs.iter().for_each(|a| { - let conn = map.get(a).expect("Address not found"); + let conn = &map.get(a).expect("Address not found").connections[0]; assert!(a.ip() == conn.tpu_addr().ip()); }); } diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index a091c4a25b..7252024578 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -9,6 +9,7 @@ use { async_mutex::Mutex, futures::future::join_all, itertools::Itertools, + log::*, quinn::{ ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, }, @@ -18,8 +19,10 @@ use { std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, + thread, time::Duration, }, + tokio::sync::RwLock, }; struct SkipServerVerification; @@ -44,18 +47,19 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { } } -/// A wrapper over NewConnection with additional capability to create the endpoint as part -/// of creating a new connection. -#[derive(Clone)] -struct QuicNewConnection { - endpoint: Endpoint, - connection: Arc, +/// A lazy-initialized Quic Endpoint +pub struct QuicLazyInitializedEndpoint { + endpoint: RwLock>>, } -impl QuicNewConnection { - /// Create a QuicNewConnection given the remote address 'addr'. - async fn make_connection(addr: SocketAddr, stats: &ClientStats) -> Result { - let mut make_connection_measure = Measure::start("make_connection_measure"); +impl QuicLazyInitializedEndpoint { + pub fn new() -> Self { + Self { + endpoint: RwLock::new(None), + } + } + + fn create_endpoint() -> Endpoint { let (_, client_socket) = solana_net_utils::bind_in_range( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), VALIDATOR_PORT_RANGE, @@ -78,6 +82,56 @@ impl QuicNewConnection { transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); endpoint.set_default_client_config(config); + endpoint + } + + async fn get_endpoint(&self) -> Arc { + let lock = self.endpoint.read().await; + let endpoint = lock.as_ref(); + + match endpoint { + Some(endpoint) => endpoint.clone(), + None => { + drop(lock); + let mut lock = self.endpoint.write().await; + let endpoint = lock.as_ref(); + + match endpoint { + Some(endpoint) => endpoint.clone(), + None => { + let connection = Arc::new(Self::create_endpoint()); + *lock = Some(connection.clone()); + connection + } + } + } + } + } +} + +impl Default for QuicLazyInitializedEndpoint { + fn default() -> Self { + Self::new() + } +} + +/// A wrapper over NewConnection with additional capability to create the endpoint as part +/// of creating a new connection. +#[derive(Clone)] +struct QuicNewConnection { + endpoint: Arc, + connection: Arc, +} + +impl QuicNewConnection { + /// Create a QuicNewConnection given the remote address 'addr'. + async fn make_connection( + endpoint: Arc, + addr: SocketAddr, + stats: &ClientStats, + ) -> Result { + let mut make_connection_measure = Measure::start("make_connection_measure"); + let endpoint = endpoint.get_endpoint().await; let connecting = endpoint.connect(addr, "connect").unwrap(); stats.total_connections.fetch_add(1, Ordering::Relaxed); @@ -132,14 +186,16 @@ impl QuicNewConnection { } pub struct QuicClient { + endpoint: Arc, connection: Arc>>, addr: SocketAddr, stats: Arc, } impl QuicClient { - pub fn new(addr: SocketAddr) -> Self { + pub fn new(endpoint: Arc, addr: SocketAddr) -> Self { Self { + endpoint, connection: Arc::new(Mutex::new(None)), addr, stats: Arc::new(ClientStats::default()), @@ -165,63 +221,131 @@ impl QuicClient { stats: &ClientStats, connection_stats: Arc, ) -> Result, WriteError> { - let connection = { - let mut conn_guard = self.connection.lock().await; + let mut connection_try_count = 0; + let mut last_connection_id = 0; + let mut last_error = None; - let maybe_conn = conn_guard.clone(); - match maybe_conn { - Some(conn) => { - stats.connection_reuse.fetch_add(1, Ordering::Relaxed); - conn.connection.clone() + while connection_try_count < 2 { + let connection = { + let mut conn_guard = self.connection.lock().await; + + let maybe_conn = conn_guard.as_mut(); + match maybe_conn { + Some(conn) => { + if conn.connection.connection.stable_id() == last_connection_id { + // this is the problematic connection we had used before, create a new one + let conn = conn.make_connection_0rtt(self.addr, stats).await; + match conn { + Ok(conn) => { + info!( + "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}", + self.addr, + conn.connection.stable_id(), + connection_try_count, + last_connection_id, + last_error, + ); + connection_try_count += 1; + conn + } + Err(err) => { + info!( + "Cannot make 0rtt connection to {}, error {:}", + self.addr, err + ); + return Err(err); + } + } + } else { + stats.connection_reuse.fetch_add(1, Ordering::Relaxed); + conn.connection.clone() + } + } + None => { + let conn = QuicNewConnection::make_connection( + self.endpoint.clone(), + self.addr, + stats, + ) + .await; + match conn { + Ok(conn) => { + *conn_guard = Some(conn.clone()); + info!( + "Made connection to {} id {} try_count {}", + self.addr, + conn.connection.connection.stable_id(), + connection_try_count + ); + connection_try_count += 1; + conn.connection.clone() + } + Err(err) => { + info!("Cannot make connection to {}, error {:}", self.addr, err); + return Err(err); + } + } + } } - None => { - let conn = QuicNewConnection::make_connection(self.addr, stats).await?; - *conn_guard = Some(conn.clone()); - conn.connection.clone() + }; + + let new_stats = connection.connection.stats(); + + connection_stats + .total_client_stats + .congestion_events + .update_stat( + &self.stats.congestion_events, + new_stats.path.congestion_events, + ); + + connection_stats + .total_client_stats + .tx_streams_blocked_uni + .update_stat( + &self.stats.tx_streams_blocked_uni, + new_stats.frame_tx.streams_blocked_uni, + ); + + connection_stats + .total_client_stats + .tx_data_blocked + .update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked); + + connection_stats + .total_client_stats + .tx_acks + .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); + + last_connection_id = connection.connection.stable_id(); + match Self::_send_buffer_using_conn(data, &connection).await { + Ok(()) => { + return Ok(connection); } - } - }; - - let new_stats = connection.connection.stats(); - - connection_stats - .total_client_stats - .congestion_events - .update_stat( - &self.stats.congestion_events, - new_stats.path.congestion_events, - ); - - connection_stats - .total_client_stats - .tx_streams_blocked_uni - .update_stat( - &self.stats.tx_streams_blocked_uni, - new_stats.frame_tx.streams_blocked_uni, - ); - - connection_stats - .total_client_stats - .tx_data_blocked - .update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked); - - connection_stats - .total_client_stats - .tx_acks - .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); - - match Self::_send_buffer_using_conn(data, &connection).await { - Ok(()) => Ok(connection), - _ => { - let connection = { - let mut conn_guard = self.connection.lock().await; - let conn = conn_guard.as_mut().unwrap(); - conn.make_connection_0rtt(self.addr, stats).await? - }; - Self::_send_buffer_using_conn(data, &connection).await?; - Ok(connection) + Err(err) => match err { + WriteError::ConnectionLost(_) => { + last_error = Some(err); + } + _ => { + info!( + "Error sending to {} with id {}, error {:?} thread: {:?}", + self.addr, + connection.connection.stable_id(), + err, + thread::current().id(), + ); + return Err(err); + } + }, } } + + // if we come here, that means we have exhausted maximum retries, return the error + info!( + "Ran into an error sending transactions {:?}, exhausted retries to {}", + last_error, self.addr + ); + Err(last_error.unwrap()) } pub async fn send_buffer( diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 2f9e84c124..c91a20d993 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -4,12 +4,12 @@ use { crate::{ connection_cache::ConnectionCacheStats, - nonblocking::quic_client::QuicClient, + nonblocking::quic_client::{QuicClient, QuicLazyInitializedEndpoint}, tpu_connection::{ClientStats, TpuConnection}, }, lazy_static::lazy_static, log::*, - solana_sdk::{quic::QUIC_PORT_OFFSET, transport::Result as TransportResult}, + solana_sdk::transport::Result as TransportResult, std::{net::SocketAddr, sync::Arc}, tokio::runtime::Runtime, }; @@ -31,9 +31,12 @@ impl QuicTpuConnection { self.client.stats() } - pub fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self { - let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); - let client = Arc::new(QuicClient::new(tpu_addr)); + pub fn new( + endpoint: Arc, + tpu_addr: SocketAddr, + connection_stats: Arc, + ) -> Self { + let client = Arc::new(QuicClient::new(endpoint, tpu_addr)); Self { client, @@ -74,7 +77,11 @@ impl TpuConnection for QuicTpuConnection { let send_buffer = client.send_buffer(wire_transaction, &stats, connection_stats.clone()); if let Err(e) = send_buffer.await { - warn!("Failed to send transaction async to {:?}", e); + warn!( + "Failed to send transaction async to {}, error: {:?} ", + client.tpu_addr(), + e + ); datapoint_warn!("send-wire-async", ("failure", 1, i64),); connection_stats.add_client_stats(&stats, 1, false); } else { diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 05fc34a7d1..7964737c78 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -3,10 +3,11 @@ mod tests { use { crossbeam_channel::unbounded, solana_client::{ - connection_cache::ConnectionCacheStats, quic_client::QuicTpuConnection, + connection_cache::ConnectionCacheStats, + nonblocking::quic_client::QuicLazyInitializedEndpoint, quic_client::QuicTpuConnection, tpu_connection::TpuConnection, }, - solana_sdk::{packet::PACKET_DATA_SIZE, quic::QUIC_PORT_OFFSET, signature::Keypair}, + solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::quic::{spawn_server, StreamStats}, std::{ collections::HashMap, @@ -44,10 +45,14 @@ mod tests { .unwrap(); let addr = s.local_addr().unwrap().ip(); - let port = s.local_addr().unwrap().port() - QUIC_PORT_OFFSET; + let port = s.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - let client = QuicTpuConnection::new(tpu_addr, connection_cache_stats); + let client = QuicTpuConnection::new( + Arc::new(QuicLazyInitializedEndpoint::default()), + tpu_addr, + connection_cache_stats, + ); // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE; diff --git a/core/src/validator.rs b/core/src/validator.rs index 6fe17b2a83..600975dc85 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -381,6 +381,7 @@ impl Validator { start_progress: Arc>, socket_addr_space: SocketAddrSpace, use_quic: bool, + tpu_connection_pool_size: usize, ) -> Self { let id = identity_keypair.pubkey(); assert_eq!(id, node.info.id); @@ -748,7 +749,7 @@ impl Validator { }; let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - let connection_cache = Arc::new(ConnectionCache::new(use_quic)); + let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size)); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let ( @@ -2047,7 +2048,7 @@ mod tests { use { super::*, crossbeam_channel::{bounded, RecvTimeoutError}, - solana_client::connection_cache::DEFAULT_TPU_USE_QUIC, + solana_client::connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, std::{fs::remove_dir_all, thread, time::Duration}, @@ -2084,6 +2085,7 @@ mod tests { start_progress.clone(), SocketAddrSpace::Unspecified, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, ); assert_eq!( *start_progress.read().unwrap(), @@ -2179,6 +2181,7 @@ mod tests { Arc::new(RwLock::new(ValidatorStartProgress::default())), SocketAddrSpace::Unspecified, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, ) }) .collect(); diff --git a/dos/src/main.rs b/dos/src/main.rs index 733f4786ad..f2232e3738 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -44,7 +44,10 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::{connection_cache::ConnectionCache, rpc_client::RpcClient}, + solana_client::{ + connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, + rpc_client::RpcClient, + }, solana_core::serve_repair::RepairProtocol, solana_dos::cli::*, solana_gossip::{ @@ -598,7 +601,10 @@ fn main() { exit(1); }); - let connection_cache = Arc::new(ConnectionCache::new(cmd_params.tpu_use_quic)); + let connection_cache = Arc::new(ConnectionCache::new( + cmd_params.tpu_use_quic, + DEFAULT_TPU_CONNECTION_POOL_SIZE, + )); let (client, num_clients) = get_multi_client(&validators, &SocketAddrSpace::Unspecified, connection_cache); if validators.len() < num_clients { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 6528d191b6..974e4c6c5a 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -7,7 +7,9 @@ use { itertools::izip, log::*, solana_client::{ - connection_cache::{ConnectionCache, DEFAULT_TPU_USE_QUIC}, + connection_cache::{ + ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, + }, thin_client::ThinClient, }, solana_core::{ @@ -80,6 +82,7 @@ pub struct ClusterConfig { pub poh_config: PohConfig, pub additional_accounts: Vec<(Pubkey, AccountSharedData)>, pub tpu_use_quic: bool, + pub tpu_connection_pool_size: usize, } impl Default for ClusterConfig { @@ -100,6 +103,7 @@ impl Default for ClusterConfig { skip_warmup_slots: false, additional_accounts: vec![], tpu_use_quic: DEFAULT_TPU_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, } } } @@ -255,6 +259,7 @@ impl LocalCluster { Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, ); let mut validators = HashMap::new(); @@ -277,7 +282,10 @@ impl LocalCluster { entry_point_info: leader_contact_info, validators, genesis_config, - connection_cache: Arc::new(ConnectionCache::new(config.tpu_use_quic)), + connection_cache: Arc::new(ConnectionCache::new( + config.tpu_use_quic, + config.tpu_connection_pool_size, + )), }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis @@ -450,6 +458,7 @@ impl LocalCluster { Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, ); let validator_pubkey = validator_keypair.pubkey(); @@ -797,6 +806,7 @@ impl Cluster for LocalCluster { Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, ); cluster_validator_info.validator = Some(restarted_node); cluster_validator_info diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index cd50ccf605..b73a6a4035 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -8,7 +8,7 @@ use { solana_account_decoder::UiAccount, solana_client::{ client_error::{ClientErrorKind, Result as ClientResult}, - connection_cache::ConnectionCache, + connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, nonblocking::pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig}, @@ -420,7 +420,10 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { test_validator.rpc_url(), CommitmentConfig::processed(), )); - let connection_cache = Arc::new(ConnectionCache::new(tpu_use_quic)); + let connection_cache = Arc::new(ConnectionCache::new( + tpu_use_quic, + DEFAULT_TPU_CONNECTION_POOL_SIZE, + )); let tpu_client = TpuClient::new_with_connection_cache( rpc_client.clone(), &test_validator.rpc_pubsub_url(), diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 3a2bd587fb..5410f7024e 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -2,7 +2,11 @@ use { log::*, solana_cli_output::CliAccount, - solana_client::{connection_cache::DEFAULT_TPU_USE_QUIC, nonblocking, rpc_client::RpcClient}, + solana_client::{ + connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, + nonblocking, + rpc_client::RpcClient, + }, solana_core::{ tower_storage::TowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -749,6 +753,7 @@ impl TestValidator { config.start_progress.clone(), socket_addr_space, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, )); // Needed to avoid panics in `solana-responder-gossip` in tests that create a number of diff --git a/validator/src/main.rs b/validator/src/main.rs index b0325a5094..34245b5fb8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -19,8 +19,8 @@ use { keypair::SKIP_SEED_PHRASE_VALIDATION_ARG, }, solana_client::{ - rpc_client::RpcClient, rpc_config::RpcLeaderScheduleConfig, - rpc_request::MAX_MULTIPLE_ACCOUNTS, + connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, rpc_client::RpcClient, + rpc_config::RpcLeaderScheduleConfig, rpc_request::MAX_MULTIPLE_ACCOUNTS, }, solana_core::{ ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, @@ -468,6 +468,7 @@ pub fn main() { let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(); let default_rocksdb_fifo_shred_storage_size = &DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string(); + let default_tpu_connection_pool_size = &DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(); let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) @@ -1209,6 +1210,14 @@ pub fn main() { .takes_value(false) .help("Use QUIC to send transactions."), ) + .arg( + Arg::with_name("tpu_connection_pool_size") + .long("tpu-connection-pool-size") + .takes_value(true) + .default_value(default_tpu_connection_pool_size) + .validator(is_parsable::) + .help("Controls the TPU connection pool size per remote addresss"), + ) .arg( Arg::with_name("rocksdb_max_compaction_jitter") .long("rocksdb-max-compaction-jitter-slots") @@ -2214,6 +2223,7 @@ pub fn main() { let accounts_shrink_optimize_total_space = value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); let tpu_use_quic = matches.is_present("tpu_use_quic"); + let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); if !(0.0..=1.0).contains(&shrink_ratio) { @@ -2973,6 +2983,7 @@ pub fn main() { start_progress, socket_addr_space, tpu_use_quic, + tpu_connection_pool_size, ); *admin_service_post_init.write().unwrap() = Some(admin_rpc_service::AdminRpcRequestMetadataPostInit {