From 6a4a0418a614d4d13177fe2a725bdc1fa35fdcf7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 9 May 2023 13:46:17 +0000 Subject: [PATCH] removes hard-coded QUIC_PORT_OFFSET from connection-cache (#31541) New ContactInfo has api identifying QUIC vs UDP ports; no need to hard-code port-offset deep in connection-cache. --- bench-tps/tests/bench_tps.rs | 6 ++- client/src/connection_cache.rs | 18 ++++++--- connection-cache/src/connection_cache.rs | 48 ++++++++---------------- core/src/banking_stage/forwarder.rs | 17 +++++---- core/src/next_leader.rs | 11 +----- core/src/warm_quic_cache_service.rs | 3 +- dos/src/main.rs | 34 ++++++++++++++--- gossip/src/gossip_service.rs | 13 +++---- gossip/src/legacy_contact_info.rs | 32 +++++++++------- local-cluster/src/cluster_tests.rs | 35 ++++++++++------- local-cluster/src/local_cluster.rs | 12 ++++-- local-cluster/tests/local_cluster.rs | 12 ++++-- quic-client/src/lib.rs | 10 ++--- rpc-test/tests/nonblocking.rs | 13 +++++-- rpc/src/rpc.rs | 6 ++- rpc/src/rpc_service.rs | 11 +++--- tpu-client/src/nonblocking/tpu_client.rs | 37 ++++++++++++------ udp-client/src/lib.rs | 9 ++--- 18 files changed, 195 insertions(+), 132 deletions(-) diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 3b79c24a38..184ae321cf 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -9,6 +9,7 @@ use { spl_convert::FromOtherSolana, }, solana_client::{ + connection_cache::ConnectionCache, thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -81,7 +82,10 @@ fn test_bench_tps_local_cluster(config: Config) { let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc().unwrap(), - cluster.entry_point_info.tpu().unwrap(), + match *cluster.connection_cache { + ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(), + ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(), + }, cluster.connection_cache.clone(), )); diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 680300a39e..ee2c70b193 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,3 +1,4 @@ +pub use solana_connection_cache::connection_cache::Protocol; use { quinn::Endpoint, solana_connection_cache::{ @@ -74,6 +75,14 @@ impl ConnectionCache { Self::Quic(Arc::new(cache)) } + #[inline] + pub fn protocol(&self) -> Protocol { + match self { + Self::Quic(_) => Protocol::QUIC, + Self::Udp(_) => Protocol::UDP, + } + } + #[deprecated( since = "1.15.0", note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate." @@ -204,7 +213,7 @@ mod tests { super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, - solana_sdk::{net::DEFAULT_TPU_COALESCE, quic::QUIC_PORT_OFFSET, signature::Keypair}, + solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, streamer::StakedNodes, @@ -236,9 +245,6 @@ mod tests { #[test] fn test_connection_with_specified_client_endpoint() { - let port = u16::MAX - QUIC_PORT_OFFSET + 1; - assert!(port.checked_add(QUIC_PORT_OFFSET).is_none()); - // Start a response receiver: let ( response_recv_socket, @@ -274,13 +280,13 @@ mod tests { let port1 = 9001; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.server_addr().port(), port1 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port1); // server port 2: let port2 = 9002; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2); let conn = connection_cache.get_connection(&addr); - assert_eq!(conn.server_addr().port(), port2 + QUIC_PORT_OFFSET); + assert_eq!(conn.server_addr().port(), port2); response_recv_exit.store(true, Ordering::Relaxed); response_recv_thread.join().unwrap(); diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 2b086c2d9b..5337063997 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -21,13 +21,20 @@ const MAX_CONNECTIONS: usize = 1024; /// Default connection pool size per remote address pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; +#[derive(Clone, Copy, Eq, Hash, PartialEq)] +pub enum Protocol { + UDP, + QUIC, +} + pub trait ConnectionManager { type ConnectionPool: ConnectionPool; type NewConnectionConfig: NewConnectionConfig; + const PROTOCOL: Protocol; + fn new_connection_pool(&self) -> Self::ConnectionPool; fn new_connection_config(&self) -> Self::NewConnectionConfig; - fn get_port_offset(&self) -> u16; } pub struct ConnectionCache< @@ -150,14 +157,6 @@ where let map = self.map.read().unwrap(); get_connection_map_lock_measure.stop(); - let port_offset = self.connection_manager.get_port_offset(); - - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); - let addr = SocketAddr::new(addr.ip(), port); - let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); let report_stats = self @@ -171,12 +170,12 @@ where connection_cache_stats, num_evictions, eviction_timing_ms, - } = match map.get(&addr) { + } = match map.get(addr) { Some(pool) => { if pool.need_new_connection(self.connection_pool_size) { // create more connection and put it in the pool drop(map); - self.create_connection(&mut lock_timing_ms, &addr) + self.create_connection(&mut lock_timing_ms, addr) } else { let connection = pool.borrow_connection(); CreateConnectionResult { @@ -191,7 +190,7 @@ where None => { // Upgrade to write access by dropping read lock and acquire write lock drop(map); - self.create_connection(&mut lock_timing_ms, &addr) + self.create_connection(&mut lock_timing_ms, addr) } }; get_connection_map_measure.stop(); @@ -384,8 +383,6 @@ mod tests { }, }; - const MOCK_PORT_OFFSET: u16 = 42; - struct MockUdpPool { connections: Vec>, } @@ -487,6 +484,8 @@ mod tests { type ConnectionPool = MockUdpPool; type NewConnectionConfig = MockUdpConfig; + const PROTOCOL: Protocol = Protocol::QUIC; + fn new_connection_pool(&self) -> Self::ConnectionPool { MockUdpPool { connections: Vec::default(), @@ -496,10 +495,6 @@ mod tests { fn new_connection_config(&self) -> Self::NewConnectionConfig { MockUdpConfig::new().unwrap() } - - fn get_port_offset(&self) -> u16 { - MOCK_PORT_OFFSET - } } impl BlockingClientConnection for MockUdpConnection { @@ -562,7 +557,6 @@ mod tests { let connection_manager = MockConnectionManager::default(); let connection_cache = ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); - let port_offset = MOCK_PORT_OFFSET; let addrs = (0..MAX_CONNECTIONS) .map(|_| { let addr = get_addr(&mut rng); @@ -573,13 +567,7 @@ mod tests { { let map = connection_cache.map.read().unwrap(); assert!(map.len() == MAX_CONNECTIONS); - addrs.iter().for_each(|a| { - let port = a - .port() - .checked_add(port_offset) - .unwrap_or_else(|| a.port()); - let addr = &SocketAddr::new(a.ip(), port); - + addrs.iter().for_each(|addr| { let conn = &map.get(addr).expect("Address not found").get(0).unwrap(); let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); assert_eq!( @@ -596,10 +584,7 @@ mod tests { let addr = &get_addr(&mut rng); connection_cache.get_connection(addr); - let port = addr - .port() - .checked_add(port_offset) - .unwrap_or_else(|| addr.port()); + let port = addr.port(); let addr_with_quic_port = SocketAddr::new(addr.ip(), port); let map = connection_cache.map.read().unwrap(); assert!(map.len() == MAX_CONNECTIONS); @@ -611,8 +596,7 @@ mod tests { // an invalid port. #[test] fn test_overflow_address() { - let port = u16::MAX - MOCK_PORT_OFFSET + 1; - assert!(port.checked_add(MOCK_PORT_OFFSET).is_none()); + let port = u16::MAX; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let connection_manager = MockConnectionManager::default(); let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index f278ad6a42..e6863ee8ce 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -3,12 +3,12 @@ use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, - next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote}, + next_leader::{next_leader, next_leader_tpu_vote}, tracer_packet_stats::TracerPacketStats, unprocessed_transaction_storage::UnprocessedTransactionStorage, }, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo}, solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, @@ -214,9 +214,14 @@ impl Forwarder { fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> { match forward_option { ForwardOption::NotForward => None, - ForwardOption::ForwardTransaction => { - next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder) - } + ForwardOption::ForwardTransaction => next_leader( + &self.cluster_info, + &self.poh_recorder, + match *self.connection_cache { + ConnectionCache::Quic(_) => ContactInfo::tpu_forwards_quic, + ConnectionCache::Udp(_) => ContactInfo::tpu_forwards, + }, + ), ForwardOption::ForwardTpuVote => { next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) @@ -252,8 +257,6 @@ impl Forwarder { batch_send(&self.socket, &pkts).map_err(|err| err.into()) } ForwardOption::ForwardTransaction => { - // All other transactions can be forwarded using QUIC, get_connection() will use - // system wide setting to pick the correct connection object. let conn = self.connection_cache.get_connection(addr); conn.send_data_batch_async(packet_vec) } diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 2957562edf..0a2796c415 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -7,21 +7,14 @@ use { std::{net::SocketAddr, sync::RwLock}, }; -pub(crate) fn next_leader_tpu_forwards( - cluster_info: &ClusterInfo, - poh_recorder: &RwLock, -) -> Option<(Pubkey, SocketAddr)> { - next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_forwards) -} - pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, poh_recorder: &RwLock, ) -> Option<(Pubkey, SocketAddr)> { - next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_vote) + next_leader(cluster_info, poh_recorder, ContactInfo::tpu_vote) } -fn next_leader_x( +pub(crate) fn next_leader( cluster_info: &ClusterInfo, poh_recorder: &RwLock, port_selector: F, diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 380313bc99..32e9329306 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -31,6 +31,7 @@ impl WarmQuicCacheService { poh_recorder: Arc>, exit: Arc, ) -> Self { + assert!(matches!(*connection_cache, ConnectionCache::Quic(_))); let thread_hdl = Builder::new() .name("solWarmQuicSvc".to_string()) .spawn(move || { @@ -47,7 +48,7 @@ impl WarmQuicCacheService { { maybe_last_leader = Some(leader_pubkey); if let Some(Ok(addr)) = cluster_info - .lookup_contact_info(&leader_pubkey, ContactInfo::tpu) + .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_quic) { let conn = connection_cache.get_connection(&addr); if let Err(err) = conn.send_data(&[0u8]) { diff --git a/dos/src/main.rs b/dos/src/main.rs index 85c4e3691f..5d53a88255 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -414,6 +414,7 @@ fn get_target( nodes: &[ContactInfo], mode: Mode, entrypoint_addr: SocketAddr, + tpu_use_quic: bool, ) -> Option<(Pubkey, SocketAddr)> { let mut target = None; if nodes.is_empty() { @@ -433,8 +434,24 @@ fn get_target( Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())), Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())), Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())), - Mode::Tpu => Some((*node.pubkey(), node.tpu().unwrap())), - Mode::TpuForwards => Some((*node.pubkey(), node.tpu_forwards().unwrap())), + Mode::Tpu => Some(( + *node.pubkey(), + if tpu_use_quic { + node.tpu_quic() + } else { + node.tpu() + } + .unwrap(), + )), + Mode::TpuForwards => Some(( + *node.pubkey(), + if tpu_use_quic { + node.tpu_forwards_quic() + } else { + node.tpu_forwards() + } + .unwrap(), + )), Mode::Repair => Some((*node.pubkey(), node.repair().unwrap())), Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())), Mode::Rpc => None, @@ -606,8 +623,12 @@ fn run_dos( client: Option>, params: DosClientParameters, ) { - let target = get_target(nodes, params.mode, params.entrypoint_addr); - + let target = get_target( + nodes, + params.mode, + params.entrypoint_addr, + params.tpu_use_quic, + ); if params.mode == Mode::Rpc { // creating rpc_client because get_account, get_program_accounts are not implemented for BenchTpsClient let rpc_client = @@ -1079,7 +1100,10 @@ pub mod test { let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc().unwrap(), - cluster.entry_point_info.tpu().unwrap(), + match *cluster.connection_cache { + ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(), + ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(), + }, cluster.connection_cache.clone(), )); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index f12c4bc9ea..b8d8247901 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -200,9 +200,10 @@ pub fn get_client( socket_addr_space: &SocketAddrSpace, connection_cache: Arc, ) -> ThinClient { + let protocol = connection_cache.protocol(); let nodes: Vec<_> = nodes .iter() - .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) + .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) .collect(); let select = thread_rng().gen_range(0, nodes.len()); let (rpc, tpu) = nodes[select]; @@ -214,13 +215,11 @@ pub fn get_multi_client( socket_addr_space: &SocketAddrSpace, connection_cache: Arc, ) -> (ThinClient, usize) { - let addrs: Vec<_> = nodes + let protocol = connection_cache.protocol(); + let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes .iter() - .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) - .collect(); - let rpc_addrs: Vec<_> = addrs.iter().map(|addr| addr.0).collect(); - let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); - + .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) + .unzip(); let num_nodes = tpu_addrs.len(); ( ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache), diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index 5af12b77d6..e918569774 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -6,6 +6,7 @@ use { }, crds_value::MAX_WALLCLOCK, }, + solana_client::connection_cache::Protocol, solana_sdk::{ pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, @@ -202,6 +203,10 @@ impl LegacyContactInfo { self.tpu().and_then(|addr| get_quic_socket(&addr)) } + pub fn tpu_forwards_quic(&self) -> Result { + self.tpu_forwards().and_then(|addr| get_quic_socket(&addr)) + } + fn is_valid_ip(addr: IpAddr) -> bool { !(addr.is_unspecified() || addr.is_multicast()) // || (addr.is_loopback() && !cfg_test)) @@ -216,21 +221,22 @@ impl LegacyContactInfo { addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr) } - pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) { - (self.rpc, self.tpu) - } - pub(crate) fn valid_client_facing_addr( &self, + protocol: Protocol, socket_addr_space: &SocketAddrSpace, ) -> Option<(SocketAddr, SocketAddr)> { - if LegacyContactInfo::is_valid_address(&self.rpc, socket_addr_space) - && LegacyContactInfo::is_valid_address(&self.tpu, socket_addr_space) - { - Some((self.rpc, self.tpu)) - } else { - None + let rpc = self + .rpc() + .ok() + .filter(|addr| socket_addr_space.check(addr))?; + let tpu = match protocol { + Protocol::QUIC => self.tpu_quic(), + Protocol::UDP => self.tpu(), } + .ok() + .filter(|addr| socket_addr_space.check(addr))?; + Some((rpc, tpu)) } } @@ -323,17 +329,17 @@ mod tests { fn test_valid_client_facing() { let mut ci = LegacyContactInfo::default(); assert_eq!( - ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified), + ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified), None ); ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123); assert_eq!( - ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified), + ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified), None ); ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234); assert!(ci - .valid_client_facing_addr(&SocketAddrSpace::Unspecified) + .valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified) .is_some()); } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 8e37cb452d..2edff64ca6 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -6,7 +6,10 @@ use log::*; use { rand::{thread_rng, Rng}, rayon::prelude::*, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::{ + connection_cache::{ConnectionCache, Protocol}, + thin_client::ThinClient, + }, solana_core::consensus::VOTE_THRESHOLD_DEPTH, solana_entry::entry::{Entry, EntrySlice}, solana_gossip::{ @@ -51,9 +54,15 @@ use { }; pub fn get_client_facing_addr>( + protocol: Protocol, contact_info: T, ) -> (SocketAddr, SocketAddr) { - let (rpc, mut tpu) = contact_info.borrow().client_facing_addr(); + let contact_info = contact_info.borrow(); + let rpc = contact_info.rpc().unwrap(); + let mut tpu = match protocol { + Protocol::QUIC => contact_info.tpu_quic().unwrap(), + Protocol::UDP => contact_info.tpu().unwrap(), + }; // QUIC certificate authentication requires the IP Address to match. ContactInfo might have // 0.0.0.0 as the IP instead of 127.0.0.1. tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)); @@ -82,7 +91,7 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let (rpc, tpu) = get_client_facing_addr(ingress_node); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let bal = client .poll_get_balance_with_commitment( @@ -104,7 +113,7 @@ pub fn spend_and_verify_all_nodes( if ignore_nodes.contains(validator.pubkey()) { continue; } - let (rpc, tpu) = get_client_facing_addr(validator); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); client.poll_for_signature_confirmation(&sig, confs).unwrap(); } @@ -117,7 +126,7 @@ pub fn verify_balances( connection_cache: Arc, ) { let (rpc, tpu) = LegacyContactInfo::try_from(node) - .map(get_client_facing_addr) + .map(|node| get_client_facing_addr(connection_cache.protocol(), node)) .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache); for (pk, b) in expected_balances { @@ -135,7 +144,7 @@ pub fn send_many_transactions( max_tokens_per_transfer: u64, num_txs: u64, ) -> HashMap { - let (rpc, tpu) = get_client_facing_addr(node); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let mut expected_balances = HashMap::new(); for _ in 0..num_txs { @@ -233,7 +242,7 @@ pub fn kill_entry_and_spend_and_verify_rest( .unwrap(); assert!(cluster_nodes.len() >= nodes); let (rpc, tpu) = LegacyContactInfo::try_from(entry_point_info) - .map(get_client_facing_addr) + .map(|node| get_client_facing_addr(connection_cache.protocol(), node)) .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); @@ -262,7 +271,7 @@ pub fn kill_entry_and_spend_and_verify_rest( continue; } - let (rpc, tpu) = get_client_facing_addr(ingress_node); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let balance = client .poll_get_balance_with_commitment( @@ -346,7 +355,7 @@ pub fn check_for_new_roots( for (i, ingress_node) in contact_infos.iter().enumerate() { let (rpc, tpu) = LegacyContactInfo::try_from(ingress_node) - .map(get_client_facing_addr) + .map(|node| get_client_facing_addr(connection_cache.protocol(), node)) .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let root_slot = client @@ -380,7 +389,7 @@ pub fn check_no_new_roots( .iter() .enumerate() .map(|(i, ingress_node)| { - let (rpc, tpu) = get_client_facing_addr(ingress_node); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let initial_root = client .get_slot() @@ -399,7 +408,7 @@ pub fn check_no_new_roots( let mut reached_end_slot = false; loop { for contact_info in contact_infos { - let (rpc, tpu) = get_client_facing_addr(contact_info); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); current_slot = client .get_slot_with_commitment(CommitmentConfig::processed()) @@ -425,7 +434,7 @@ pub fn check_no_new_roots( } for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(ingress_node); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); assert_eq!( client @@ -447,7 +456,7 @@ fn poll_all_nodes_for_signature( if validator.pubkey() == entry_point_info.pubkey() { continue; } - let (rpc, tpu) = get_client_facing_addr(validator); + let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); client.poll_for_signature_confirmation(sig, confs)?; } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 3bdc785934..a9b40fe924 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -436,7 +436,9 @@ impl LocalCluster { socket_addr_space: SocketAddrSpace, ) -> Pubkey { let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node) + }) .unwrap(); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); @@ -531,7 +533,9 @@ impl LocalCluster { pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node) + }) .unwrap(); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) @@ -786,7 +790,9 @@ impl Cluster for LocalCluster { fn get_validator_client(&self, pubkey: &Pubkey) -> Option { self.validators.get(pubkey).map(|f| { let (rpc, tpu) = LegacyContactInfo::try_from(&f.info.contact_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node) + }) .unwrap(); ThinClient::new(rpc, tpu, self.connection_cache.clone()) }) diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index d75f2aeb6c..ab3bd7457b 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -201,7 +201,9 @@ fn test_local_cluster_signature_subscribe() { let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); let (rpc, tpu) = LegacyContactInfo::try_from(non_bootstrap_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node) + }) .unwrap(); let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); @@ -433,7 +435,9 @@ fn test_mainnet_beta_cluster_type() { assert_eq!(cluster_nodes.len(), 1); let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node) + }) .unwrap(); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); @@ -2641,7 +2645,9 @@ fn test_oc_bad_signatures() { // 3) Start up a spy to listen for and push votes to leader TPU let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) - .map(cluster_tests::get_client_facing_addr) + .map(|node| { + cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node) + }) .unwrap(); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index e9a6335bd9..1df6fc705a 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -19,11 +19,11 @@ use { solana_connection_cache::{ connection_cache::{ BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, - ConnectionPoolError, NewConnectionConfig, + ConnectionPoolError, NewConnectionConfig, Protocol, }, connection_cache_stats::ConnectionCacheStats, }, - solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair}, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{ nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, streamer::StakedNodes, @@ -195,6 +195,8 @@ impl ConnectionManager for QuicConnectionManager { type ConnectionPool = QuicPool; type NewConnectionConfig = QuicConfig; + const PROTOCOL: Protocol = Protocol::QUIC; + fn new_connection_pool(&self) -> Self::ConnectionPool { QuicPool { connections: Vec::default(), @@ -211,10 +213,6 @@ impl ConnectionManager for QuicConnectionManager { fn new_connection_config(&self) -> QuicConfig { QuicConfig::new().unwrap() } - - fn get_port_offset(&self) -> u16 { - QUIC_PORT_OFFSET - } } impl QuicConnectionManager { diff --git a/rpc-test/tests/nonblocking.rs b/rpc-test/tests/nonblocking.rs index ecb9ee5199..a7b7dc6197 100644 --- a/rpc-test/tests/nonblocking.rs +++ b/rpc-test/tests/nonblocking.rs @@ -1,5 +1,6 @@ use { solana_client::{ + connection_cache::Protocol, nonblocking::tpu_client::{LeaderTpuService, TpuClient}, tpu_client::TpuClientConfig, }, @@ -50,10 +51,14 @@ async fn test_tpu_cache_slot_updates() { let (test_validator, _) = TestValidatorGenesis::default().start_async().await; let rpc_client = Arc::new(test_validator.get_async_rpc_client()); let exit = Arc::new(AtomicBool::new(false)); - let mut leader_tpu_service = - LeaderTpuService::new(rpc_client, &test_validator.rpc_pubsub_url(), exit.clone()) - .await - .unwrap(); + let mut leader_tpu_service = LeaderTpuService::new( + rpc_client, + &test_validator.rpc_pubsub_url(), + Protocol::QUIC, + exit.clone(), + ) + .await + .unwrap(); let start_slot = leader_tpu_service.estimated_current_slot(); let timeout = Duration::from_secs(5); let sleep_time = Duration::from_millis(DEFAULT_MS_PER_SLOT); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9aae8a3188..f555d87cbd 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -355,7 +355,11 @@ impl JsonRpcRequestProcessor { ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); + let tpu_address = match *connection_cache { + ConnectionCache::Quic(_) => ContactInfo::tpu_quic, + ConnectionCache::Udp(_) => ContactInfo::tpu, + }(&cluster_info.my_contact_info()) + .unwrap(); let (sender, receiver) = unbounded(); SendTransactionService::new::( tpu_address, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 8153b9ae0f..7dff1c99e5 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -20,7 +20,7 @@ use { }, regex::Regex, solana_client::connection_cache::ConnectionCache, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, bigtable_upload_service::BigTableUploadService, blockstore::Blockstore, @@ -378,10 +378,11 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = cluster_info - .my_contact_info() - .tpu() - .map_err(|err| format!("{err}"))?; + let tpu_address = match *connection_cache { + ConnectionCache::Quic(_) => ContactInfo::tpu_quic, + ConnectionCache::Udp(_) => ContactInfo::tpu, + }(&cluster_info.my_contact_info()) + .map_err(|err| format!("{err}"))?; // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 9967553de2..de8cee0455 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -6,7 +6,7 @@ use { log::*, solana_connection_cache::{ connection_cache::{ - ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, Protocol, DEFAULT_CONNECTION_POOL_SIZE, }, nonblocking::client_connection::ClientConnection, @@ -22,6 +22,7 @@ use { commitment_config::CommitmentConfig, epoch_info::EpochInfo, pubkey::Pubkey, + quic::QUIC_PORT_OFFSET, signature::SignerError, transaction::Transaction, transport::{Result as TransportResult, TransportError}, @@ -102,6 +103,7 @@ impl LeaderTpuCacheUpdateInfo { } struct LeaderTpuCache { + protocol: Protocol, first_slot: Slot, leaders: Vec, leader_tpu_map: HashMap, @@ -115,9 +117,11 @@ impl LeaderTpuCache { slots_in_epoch: Slot, leaders: Vec, cluster_nodes: Vec, + protocol: Protocol, ) -> Self { - let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes); + let leader_tpu_map = Self::extract_cluster_tpu_sockets(protocol, cluster_nodes); Self { + protocol, first_slot, leaders, leader_tpu_map, @@ -183,16 +187,24 @@ impl LeaderTpuCache { } } - pub fn extract_cluster_tpu_sockets( + fn extract_cluster_tpu_sockets( + protocol: Protocol, cluster_contact_info: Vec, ) -> HashMap { cluster_contact_info .into_iter() .filter_map(|contact_info| { - Some(( - Pubkey::from_str(&contact_info.pubkey).ok()?, - contact_info.tpu?, - )) + let pubkey = Pubkey::from_str(&contact_info.pubkey).ok()?; + let socket = match protocol { + Protocol::QUIC => contact_info.tpu_quic.or_else(|| { + let mut socket = contact_info.tpu?; + let port = socket.port().checked_add(QUIC_PORT_OFFSET)?; + socket.set_port(port); + Some(socket) + }), + Protocol::UDP => contact_info.tpu, + }?; + Some((pubkey, socket)) }) .collect() } @@ -211,8 +223,8 @@ impl LeaderTpuCache { if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes { match cluster_nodes { Ok(cluster_nodes) => { - let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes); - self.leader_tpu_map = leader_tpu_map; + self.leader_tpu_map = + Self::extract_cluster_tpu_sockets(self.protocol, cluster_nodes); cluster_refreshed = true; } Err(err) => { @@ -425,7 +437,8 @@ where ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let leader_tpu_service = - LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?; + LeaderTpuService::new(rpc_client.clone(), websocket_url, M::PROTOCOL, exit.clone()) + .await?; Ok(Self { fanout_slots: config.fanout_slots.clamp(1, MAX_FANOUT_SLOTS), @@ -586,6 +599,7 @@ impl LeaderTpuService { pub async fn new( rpc_client: Arc, websocket_url: &str, + protocol: Protocol, exit: Arc, ) -> Result { let start_slot = rpc_client @@ -603,6 +617,7 @@ impl LeaderTpuService { slots_in_epoch, leaders, cluster_nodes, + protocol, ))); let pubsub_client = if !websocket_url.is_empty() { @@ -640,7 +655,7 @@ impl LeaderTpuService { self.recent_slots.estimated_current_slot() } - pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { let current_slot = self.recent_slots.estimated_current_slot(); self.leader_tpu_cache .read() diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index 995aff178b..00fb093da5 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -11,7 +11,7 @@ use { solana_connection_cache::{ connection_cache::{ BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, - ConnectionPoolError, NewConnectionConfig, + ConnectionPoolError, NewConnectionConfig, Protocol, }, connection_cache_stats::ConnectionCacheStats, }, @@ -98,6 +98,9 @@ pub struct UdpConnectionManager {} impl ConnectionManager for UdpConnectionManager { type ConnectionPool = UdpPool; type NewConnectionConfig = UdpConfig; + + const PROTOCOL: Protocol = Protocol::UDP; + fn new_connection_pool(&self) -> Self::ConnectionPool { UdpPool { connections: Vec::default(), @@ -107,8 +110,4 @@ impl ConnectionManager for UdpConnectionManager { fn new_connection_config(&self) -> Self::NewConnectionConfig { UdpConfig::new().unwrap() } - - fn get_port_offset(&self) -> u16 { - 0 - } }