diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index a2de272157..c6173c6809 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -437,8 +437,14 @@ fn main() { let cluster_info = Arc::new(cluster_info); let tpu_disable_quic = matches.is_present("tpu_disable_quic"); let connection_cache = match tpu_disable_quic { - false => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), - true => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::new_quic( + "connection_cache_banking_bench_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ), + true => ConnectionCache::with_udp( + "connection_cache_banking_bench_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ), }; let banking_stage = BankingStage::new_num_threads( &cluster_info, diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index da1e82c2b0..0788181e85 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -83,10 +83,16 @@ fn create_connection_cache( client_node_id: Option<&Keypair>, ) -> ConnectionCache { if !use_quic { - return ConnectionCache::with_udp(tpu_connection_pool_size); + return ConnectionCache::with_udp( + "bench-tps-connection_cache_udp", + tpu_connection_pool_size, + ); } if client_node_id.is_none() { - return ConnectionCache::new(tpu_connection_pool_size); + return ConnectionCache::new_quic( + "bench-tps-connection_cache_quic", + tpu_connection_pool_size, + ); } let rpc_client = Arc::new(RpcClient::new_with_commitment( @@ -107,6 +113,7 @@ fn create_connection_cache( HashMap::::default(), // overrides ))); ConnectionCache::new_with_client_options( + "bench-tps-connection_cache_quic", tpu_connection_pool_size, None, Some((client_node_id, bind_address)), diff --git a/cli/src/program.rs b/cli/src/program.rs index 70bf00ba8e..f912f1e6ef 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -2163,9 +2163,9 @@ fn send_deploy_messages( if let Some(write_signer) = write_signer { trace!("Writing program data"); let connection_cache = if config.use_quic { - ConnectionCache::new(1) + ConnectionCache::new_quic("connection_cache_cli_program_quic", 1) } else { - ConnectionCache::with_udp(1) + ConnectionCache::with_udp("connection_cache_cli_program_udp", 1) }; let transaction_errors = match connection_cache { ConnectionCache::Udp(cache) => TpuClient::new_with_connection_cache( diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index ee2c70b193..4cdcfcd97b 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -44,13 +44,29 @@ pub enum NonblockingClientConnection { } impl ConnectionCache { + pub fn new(name: &'static str) -> Self { + if DEFAULT_CONNECTION_CACHE_USE_QUIC { + let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + ConnectionCache::new_with_client_options( + name, + DEFAULT_CONNECTION_POOL_SIZE, + None, // client_endpoint + Some(cert_info), + None, // stake_info + ) + } else { + ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE) + } + } + /// Create a quic connection_cache - pub fn new(connection_pool_size: usize) -> Self { - Self::new_with_client_options(connection_pool_size, None, None, None) + pub fn new_quic(name: &'static str, connection_pool_size: usize) -> Self { + Self::new_with_client_options(name, connection_pool_size, None, None, None) } /// Create a quic conneciton_cache with more client options pub fn new_with_client_options( + name: &'static str, connection_pool_size: usize, client_endpoint: Option, cert_info: Option<(&Keypair, IpAddr)>, @@ -71,7 +87,8 @@ impl ConnectionCache { config.set_staked_nodes(stake_info.0, stake_info.1); } let connection_manager = QuicConnectionManager::new_with_connection_config(config); - let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + let cache = + BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap(); Self::Quic(Arc::new(cache)) } @@ -106,11 +123,12 @@ impl ConnectionCache { ) { } - pub fn with_udp(connection_pool_size: usize) -> Self { + pub fn with_udp(name: &'static str, connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); let connection_manager = UdpConnectionManager::default(); - let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap(); + let cache = + BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap(); Self::Udp(Arc::new(cache)) } @@ -137,22 +155,6 @@ impl ConnectionCache { } } -impl Default for ConnectionCache { - fn default() -> Self { - if DEFAULT_CONNECTION_CACHE_USE_QUIC { - let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); - ConnectionCache::new_with_client_options( - DEFAULT_CONNECTION_POOL_SIZE, - None, - Some(cert_info), - None, - ) - } else { - ConnectionCache::with_udp(DEFAULT_CONNECTION_POOL_SIZE) - } - } -} - macro_rules! dispatch { ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => { #[inline] @@ -273,8 +275,13 @@ mod tests { ) .unwrap(); - let connection_cache = - ConnectionCache::new_with_client_options(1, Some(response_recv_endpoint), None, None); + let connection_cache = ConnectionCache::new_with_client_options( + "connection_cache_test", + 1, // connection_pool_size + Some(response_recv_endpoint), // client_endpoint + None, // cert_info + None, // stake_info + ); // server port 1: let port1 = 9001; diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 21c7c55a04..5e71eae36b 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -80,11 +80,12 @@ where impl TpuClient { /// Create a new client that disconnects when dropped pub async fn new( + name: &'static str, rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, ) -> Result { - let connection_cache = match ConnectionCache::default() { + let connection_cache = match ConnectionCache::new(name) { ConnectionCache::Quic(cache) => cache, ConnectionCache::Udp(_) => { return Err(TpuSenderError::Custom(String::from( diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 7d32381445..4539415134 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -76,7 +76,7 @@ impl TpuClient { websocket_url: &str, config: TpuClientConfig, ) -> Result { - let connection_cache = match ConnectionCache::default() { + let connection_cache = match ConnectionCache::new("connection_cache_tpu_client") { ConnectionCache::Quic(cache) => cache, ConnectionCache::Udp(_) => { return Err(TpuSenderError::Custom(String::from( diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 5337063997..fe1c2dedaf 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -42,6 +42,7 @@ pub struct ConnectionCache< S, // ConnectionManager T, // NewConnectionConfig > { + name: &'static str, map: RwLock>, connection_manager: S, stats: Arc, @@ -56,9 +57,14 @@ where M: ConnectionManager, C: NewConnectionConfig, { - pub fn new(connection_manager: M, connection_pool_size: usize) -> Result { + pub fn new( + name: &'static str, + connection_manager: M, + connection_pool_size: usize, + ) -> Result { let config = connection_manager.new_connection_config(); Ok(Self::new_with_config( + name, connection_pool_size, config, connection_manager, @@ -66,11 +72,13 @@ where } pub fn new_with_config( + name: &'static str, connection_pool_size: usize, connection_config: C, connection_manager: M, ) -> Self { Self { + name, map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)), stats: Arc::new(ConnectionCacheStats::default()), connection_manager, @@ -227,7 +235,7 @@ where } = self.get_or_add_connection(addr); if report_stats { - connection_cache_stats.report(); + connection_cache_stats.report(self.name); } if cache_hit { @@ -555,8 +563,12 @@ mod tests { // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) let connection_manager = MockConnectionManager::default(); - let connection_cache = - ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); + let connection_cache = ConnectionCache::new( + "connection_cache_test", + connection_manager, + DEFAULT_CONNECTION_POOL_SIZE, + ) + .unwrap(); let addrs = (0..MAX_CONNECTIONS) .map(|_| { let addr = get_addr(&mut rng); @@ -599,7 +611,8 @@ mod tests { let port = u16::MAX; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let connection_manager = MockConnectionManager::default(); - let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); + let connection_cache = + ConnectionCache::new("connection_cache_test", connection_manager, 1).unwrap(); let conn = connection_cache.get_connection(&addr); // We (intentionally) don't have an interface that allows us to distinguish between diff --git a/connection-cache/src/connection_cache_stats.rs b/connection-cache/src/connection_cache_stats.rs index 697757fd32..e6d7d4cabe 100644 --- a/connection-cache/src/connection_cache_stats.rs +++ b/connection-cache/src/connection_cache_stats.rs @@ -70,9 +70,9 @@ impl ConnectionCacheStats { } } - pub fn report(&self) { + pub(super) fn report(&self, name: &'static str) { datapoint_info!( - "quic-client-connection-stats", + name, ( "cache_hits", self.cache_hits.swap(0, Ordering::Relaxed), diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 23483ca208..87735334f4 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -297,7 +297,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, s, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0aa35d3ae9..9173395a68 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -660,7 +660,7 @@ mod tests { None, replay_vote_sender, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); @@ -716,7 +716,7 @@ mod tests { None, replay_vote_sender, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); @@ -797,7 +797,7 @@ mod tests { None, replay_vote_sender, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); @@ -959,7 +959,7 @@ mod tests { None, replay_vote_sender, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); @@ -1153,7 +1153,7 @@ mod tests { None, replay_vote_sender, None, - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0d69ec6ce6..f563d82217 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -370,7 +370,7 @@ mod tests { poh_recorder.clone(), bank_forks.clone(), cluster_info.clone(), - Arc::new(ConnectionCache::default()), + Arc::new(ConnectionCache::new("connection_cache_test")), Arc::new(data_budget), ); let unprocessed_packet_batches: UnprocessedPacketBatches = @@ -445,7 +445,7 @@ mod tests { ), ThreadType::Transactions, ); - let connection_cache = ConnectionCache::default(); + let connection_cache = ConnectionCache::new("connection_cache_test"); let test_cases = vec![ ("fwd-normal", true, vec![normal_block_hash], 2), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 624acc59c3..a5a7fba451 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -480,7 +480,7 @@ pub mod tests { None, AbsRequestSender::default(), None, - &Arc::new(ConnectionCache::default()), + &Arc::new(ConnectionCache::new("connection_cache_test")), &ignored_prioritization_fee_cache, BankingTracer::new_disabled(), ) diff --git a/core/src/validator.rs b/core/src/validator.rs index 878ab11e65..72b6ec1d9b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -879,6 +879,7 @@ impl Validator { let connection_cache = match use_quic { true => { let connection_cache = ConnectionCache::new_with_client_options( + "connection_cache_tpu_quic", tpu_connection_pool_size, None, Some(( @@ -892,7 +893,10 @@ impl Validator { ); Arc::new(connection_cache) } - false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), + false => Arc::new(ConnectionCache::with_udp( + "connection_cache_tpu_udp", + tpu_connection_pool_size, + )), }; // block min prioritization fee cache should be readable by RPC, and writable by validator diff --git a/dos/src/main.rs b/dos/src/main.rs index 5834607999..e0b4331125 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -256,8 +256,13 @@ fn create_sender_thread( ) -> thread::JoinHandle<()> { // ConnectionCache is used instead of client because it gives ~6% higher pps let connection_cache = match tpu_use_quic { - true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), - false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + true => ConnectionCache::new_quic( + "connection_cache_dos_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ), + false => { + ConnectionCache::with_udp("connection_cache_dos_udp", DEFAULT_TPU_CONNECTION_POOL_SIZE) + } }; let connection = connection_cache.get_connection(target); @@ -770,8 +775,14 @@ fn main() { }); let connection_cache = match cmd_params.tpu_use_quic { - true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), - false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + true => ConnectionCache::new_quic( + "connection_cache_dos_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ), + false => ConnectionCache::with_udp( + "connection_cache_dos_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ), }; let (client, num_clients) = get_multi_client( &validators, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 53adc011d7..b3ae79a11e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -315,8 +315,14 @@ impl LocalCluster { validators, genesis_config, connection_cache: match config.tpu_use_quic { - true => Arc::new(ConnectionCache::new(config.tpu_connection_pool_size)), - false => Arc::new(ConnectionCache::with_udp(config.tpu_connection_pool_size)), + true => Arc::new(ConnectionCache::new_quic( + "connection_cache_local_cluster_quic", + config.tpu_connection_pool_size, + )), + false => Arc::new(ConnectionCache::with_udp( + "connection_cache_local_cluster_udp", + config.tpu_connection_pool_size, + )), }, }; diff --git a/rpc-test/tests/nonblocking.rs b/rpc-test/tests/nonblocking.rs index a7b7dc6197..45b1c9e8bb 100644 --- a/rpc-test/tests/nonblocking.rs +++ b/rpc-test/tests/nonblocking.rs @@ -18,6 +18,7 @@ async fn test_tpu_send_transaction() { let (test_validator, mint_keypair) = TestValidatorGenesis::default().start_async().await; let rpc_client = Arc::new(test_validator.get_async_rpc_client()); let mut tpu_client = TpuClient::new( + "tpu_client_test", rpc_client.clone(), &test_validator.rpc_pubsub_url(), TpuClientConfig::default(), diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 58b0a73b69..f1c2d4acb9 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -461,8 +461,12 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { CommitmentConfig::processed(), )); let connection_cache = match tpu_use_quic { - true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), - false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + true => { + ConnectionCache::new_quic("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) + } + false => { + ConnectionCache::with_udp("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) + } }; let recent_blockhash = rpc_client.get_latest_blockhash().unwrap(); let tx = diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 188199969c..70db285563 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -5057,7 +5057,7 @@ pub mod tests { let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); bank.transfer(20, &genesis.mint_keypair, &bob_pubkey) .unwrap(); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let request_processor = JsonRpcRequestProcessor::new_from_bank( &bank, SocketAddrSpace::Unspecified, @@ -5076,7 +5076,7 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let meta = JsonRpcRequestProcessor::new_from_bank( &bank, SocketAddrSpace::Unspecified, @@ -5108,7 +5108,7 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let meta = JsonRpcRequestProcessor::new_from_bank( &bank, SocketAddrSpace::Unspecified, @@ -5235,7 +5235,7 @@ pub mod tests { bank.transfer(4, &genesis.mint_keypair, &bob_pubkey) .unwrap(); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let meta = JsonRpcRequestProcessor::new_from_bank( &bank, SocketAddrSpace::Unspecified, @@ -6381,7 +6381,7 @@ pub mod tests { fn test_rpc_send_bad_tx() { let genesis = create_genesis_config(100); let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let meta = JsonRpcRequestProcessor::new_from_bank( &bank, SocketAddrSpace::Unspecified, @@ -6421,7 +6421,7 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let connection_cache = Arc::::default(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let tpu_address = cluster_info .my_contact_info() .tpu(connection_cache.protocol()) @@ -6693,7 +6693,7 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let connection_cache = Arc::::default(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let tpu_address = cluster_info .my_contact_info() .tpu(connection_cache.protocol()) diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 5a40365f3d..01fd39ad96 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -629,7 +629,7 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index f49953fe51..629432ff8f 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -794,7 +794,7 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let (sender, receiver) = unbounded(); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let send_transaction_service = SendTransactionService::new::( tpu_address, &bank_forks, @@ -828,7 +828,7 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let _send_transaction_service = SendTransactionService::new::( tpu_address, &bank_forks, @@ -905,7 +905,7 @@ mod test { Some(Instant::now()), ), ); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, @@ -1177,7 +1177,7 @@ mod test { ); let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); - let connection_cache = Arc::new(ConnectionCache::default()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index de8cee0455..b543a63700 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -417,13 +417,14 @@ where /// Create a new client that disconnects when dropped pub async fn new( + name: &'static str, rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, connection_manager: M, ) -> Result { let connection_cache = Arc::new( - ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(), + ConnectionCache::new(name, connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(), ); // TODO: Handle error properly, as the ConnectionCache ctor is now fallible. Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await } diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index a620d0e0ce..9d5a159686 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -113,12 +113,14 @@ where /// Create a new client that disconnects when dropped pub fn new( + name: &'static str, rpc_client: Arc, websocket_url: &str, config: TpuClientConfig, connection_manager: M, ) -> Result { let create_tpu_client = NonblockingTpuClient::new( + name, rpc_client.get_inner_client().clone(), websocket_url, config,