From 4e34abbf3da48c3bce90f432a5fa177c2a8109b1 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 12 May 2023 16:16:20 +0000 Subject: [PATCH] specifies protocol in contact-info get-socket api (#31602) --- bench-tps/tests/bench_tps.rs | 9 ++- core/src/banking_stage/forwarder.rs | 16 ++--- core/src/tpu.rs | 6 +- core/src/validator.rs | 4 +- core/src/warm_quic_cache_service.rs | 9 ++- dos/src/main.rs | 41 ++++++------- gossip/src/cluster_info.rs | 27 +++++---- gossip/src/contact_info.rs | 91 ++++++++++++++++++++--------- gossip/src/legacy_contact_info.rs | 63 +++++++++++--------- local-cluster/src/cluster_tests.rs | 5 +- rpc/src/cluster_tpu_info.rs | 4 +- rpc/src/rpc.rs | 29 +++++---- rpc/src/rpc_service.rs | 11 ++-- test-validator/src/lib.rs | 3 +- validator/src/admin_rpc_service.rs | 23 +++++--- validator/src/bootstrap.rs | 5 +- 16 files changed, 198 insertions(+), 148 deletions(-) diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 184ae321c..4555d5182 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -9,7 +9,6 @@ use { spl_convert::FromOtherSolana, }, solana_client::{ - connection_cache::ConnectionCache, thin_client::ThinClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -82,10 +81,10 @@ fn test_bench_tps_local_cluster(config: Config) { let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc().unwrap(), - match *cluster.connection_cache { - ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(), - ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(), - }, + cluster + .entry_point_info + .tpu(cluster.connection_cache.protocol()) + .unwrap(), cluster.connection_cache.clone(), )); diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 20e06120b..0d69ec6ce 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -8,7 +8,7 @@ use { unprocessed_transaction_storage::UnprocessedTransactionStorage, }, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, - solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo}, + solana_gossip::cluster_info::ClusterInfo, solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, @@ -218,15 +218,11 @@ impl Forwarder { fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> { match forward_option { ForwardOption::NotForward => None, - ForwardOption::ForwardTransaction => next_leader( - &self.cluster_info, - &self.poh_recorder, - match *self.connection_cache { - ConnectionCache::Quic(_) => ContactInfo::tpu_forwards_quic, - ConnectionCache::Udp(_) => ContactInfo::tpu_forwards, - }, - ), - + ForwardOption::ForwardTransaction => { + next_leader(&self.cluster_info, &self.poh_recorder, |node| { + node.tpu_forwards(self.connection_cache.protocol()) + }) + } ForwardOption::ForwardTpuVote => { next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index dd65e9907..1c453100b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -17,7 +17,7 @@ use { staked_nodes_updater_service::StakedNodesUpdaterService, }, crossbeam_channel::{unbounded, Receiver}, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::Blockstore, blockstore_processor::TransactionStatusSender, @@ -149,7 +149,7 @@ impl Tpu { keypair, cluster_info .my_contact_info() - .tpu_quic() + .tpu(Protocol::QUIC) .expect("Operator must spin up node with valid (QUIC) TPU address") .ip(), packet_sender, @@ -169,7 +169,7 @@ impl Tpu { keypair, cluster_info .my_contact_info() - .tpu_forwards_quic() + .tpu_forwards(Protocol::QUIC) .expect("Operator must spin up node with valid (QUIC) TPU-forwards address") .ip(), forwarded_packet_sender, diff --git a/core/src/validator.rs b/core/src/validator.rs index b3a3e35d9..460a443c6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -30,7 +30,7 @@ use { crossbeam_channel::{bounded, unbounded, Receiver}, lazy_static::lazy_static, rand::{thread_rng, Rng}, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_entry::poh::compute_hash_time_ns, solana_geyser_plugin_manager::{ geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest, @@ -874,7 +874,7 @@ impl Validator { Some(( &identity_keypair, node.info - .tpu() + .tpu(Protocol::UDP) .expect("Operator must spin up node with valid TPU address") .ip(), )), diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 32e932930..c317ff5d5 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,8 +3,11 @@ use { rand::{thread_rng, Rng}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, - solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo}, + solana_client::{ + connection_cache::{ConnectionCache, Protocol}, + tpu_connection::TpuConnection, + }, + solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ sync::{ @@ -48,7 +51,7 @@ impl WarmQuicCacheService { { maybe_last_leader = Some(leader_pubkey); if let Some(Ok(addr)) = cluster_info - .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_quic) + .lookup_contact_info(&leader_pubkey, |node| node.tpu(Protocol::QUIC)) { let conn = connection_cache.get_connection(&addr); if let Err(err) = conn.send_data(&[0u8]) { diff --git a/dos/src/main.rs b/dos/src/main.rs index a10221908..3669e0df4 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -49,6 +49,7 @@ use { solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ + contact_info::Protocol, gossip_service::{discover, get_multi_client}, legacy_contact_info::LegacyContactInfo as ContactInfo, }, @@ -416,6 +417,11 @@ fn get_target( entrypoint_addr: SocketAddr, tpu_use_quic: bool, ) -> Option<(Pubkey, SocketAddr)> { + let protocol = if tpu_use_quic { + Protocol::QUIC + } else { + Protocol::UDP + }; let mut target = None; if nodes.is_empty() { // skip-gossip case @@ -434,24 +440,10 @@ fn get_target( Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())), Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())), Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())), - Mode::Tpu => Some(( - *node.pubkey(), - if tpu_use_quic { - node.tpu_quic() - } else { - node.tpu() - } - .unwrap(), - )), - Mode::TpuForwards => Some(( - *node.pubkey(), - if tpu_use_quic { - node.tpu_forwards_quic() - } else { - node.tpu_forwards() - } - .unwrap(), - )), + Mode::Tpu => Some((*node.pubkey(), node.tpu(protocol).unwrap())), + Mode::TpuForwards => { + Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap())) + } Mode::Repair => Some((*node.pubkey(), node.repair().unwrap())), Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())), Mode::Rpc => None, @@ -964,7 +956,10 @@ pub mod test { let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc().unwrap(), - cluster.entry_point_info.tpu().unwrap(), + cluster + .entry_point_info + .tpu(cluster.connection_cache.protocol()) + .unwrap(), cluster.connection_cache.clone(), )); @@ -1100,10 +1095,10 @@ pub mod test { let client = Arc::new(ThinClient::new( cluster.entry_point_info.rpc().unwrap(), - match *cluster.connection_cache { - ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(), - ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(), - }, + cluster + .entry_point_info + .tpu(cluster.connection_cache.protocol()) + .unwrap(), cluster.connection_cache.clone(), )); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 1700228f8..2e590620f 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -24,7 +24,7 @@ use { cluster_info_metrics::{ submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard, }, - contact_info::{ContactInfo, Error as ContactInfoError, LegacyContactInfo}, + contact_info::{self, ContactInfo, Error as ContactInfoError, LegacyContactInfo}, crds::{Crds, Cursor, GossipRoute}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, @@ -845,8 +845,8 @@ impl ClusterInfo { }, self.addr_to_string(&ip_addr, &node.gossip().ok()), self.addr_to_string(&ip_addr, &node.tpu_vote().ok()), - self.addr_to_string(&ip_addr, &node.tpu().ok()), - self.addr_to_string(&ip_addr, &node.tpu_forwards().ok()), + self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()), + self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()), self.addr_to_string(&ip_addr, &node.tvu().ok()), self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()), self.addr_to_string(&ip_addr, &node.repair().ok()), @@ -1144,7 +1144,7 @@ impl ClusterInfo { ) -> Result<(), GossipError> { let tpu = tpu .map(Ok) - .unwrap_or_else(|| self.my_contact_info().tpu())?; + .unwrap_or_else(|| self.my_contact_info().tpu(contact_info::Protocol::UDP))?; let buf = serialize(transaction)?; self.socket.send_to(&buf, tpu)?; Ok(()) @@ -1358,12 +1358,16 @@ impl ClusterInfo { } fn is_spy_node(node: &LegacyContactInfo, socket_addr_space: &SocketAddrSpace) -> bool { - ![node.tpu(), node.gossip(), node.tvu()] - .into_iter() - .all(|addr| { - addr.map(|addr| socket_addr_space.check(&addr)) - .unwrap_or_default() - }) + ![ + node.tpu(contact_info::Protocol::UDP), + node.gossip(), + node.tvu(), + ] + .into_iter() + .all(|addr| { + addr.map(|addr| socket_addr_space.check(&addr)) + .unwrap_or_default() + }) } /// compute broadcast table @@ -1373,7 +1377,8 @@ impl ClusterInfo { gossip_crds .get_nodes_contact_info() .filter(|node| { - node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.tpu()) + node.pubkey() != &self_pubkey + && self.check_socket_addr_space(&node.tpu(contact_info::Protocol::UDP)) }) .cloned() .collect() diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index fcf5b2938..5bb7a53f4 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -1,4 +1,3 @@ -pub use crate::legacy_contact_info::LegacyContactInfo; use { crate::crds_value::MAX_WALLCLOCK, matches::{assert_matches, debug_assert_matches}, @@ -19,6 +18,9 @@ use { }, thiserror::Error, }; +pub use { + crate::legacy_contact_info::LegacyContactInfo, solana_client::connection_cache::Protocol, +}; const SOCKET_TAG_GOSSIP: u8 = 0; const SOCKET_TAG_REPAIR: u8 = 1; @@ -115,6 +117,17 @@ macro_rules! get_socket { Ok(socket) } }; + ($name:ident, $udp:ident, $quic:ident) => { + pub fn $name(&self, protocol: Protocol) -> Result { + let key = match protocol { + Protocol::QUIC => $quic, + Protocol::UDP => $udp, + }; + let socket = self.cache[usize::from(key)]; + sanitize_socket(&socket)?; + Ok(socket) + } + }; } macro_rules! set_socket { @@ -203,10 +216,12 @@ impl ContactInfo { get_socket!(rpc, SOCKET_TAG_RPC); get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR); - get_socket!(tpu, SOCKET_TAG_TPU); - get_socket!(tpu_forwards, SOCKET_TAG_TPU_FORWARDS); - get_socket!(tpu_forwards_quic, SOCKET_TAG_TPU_FORWARDS_QUIC); - get_socket!(tpu_quic, SOCKET_TAG_TPU_QUIC); + get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); + get_socket!( + tpu_forwards, + SOCKET_TAG_TPU_FORWARDS, + SOCKET_TAG_TPU_FORWARDS_QUIC + ); get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE); get_socket!(tvu, SOCKET_TAG_TVU); get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS); @@ -705,19 +720,22 @@ mod tests { node.serve_repair().ok().as_ref(), sockets.get(&SOCKET_TAG_SERVE_REPAIR) ); - assert_eq!(node.tpu().ok().as_ref(), sockets.get(&SOCKET_TAG_TPU)); assert_eq!( - node.tpu_forwards().ok().as_ref(), + node.tpu(Protocol::UDP).ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU) + ); + assert_eq!( + node.tpu(Protocol::QUIC).ok().as_ref(), + sockets.get(&SOCKET_TAG_TPU_QUIC) + ); + assert_eq!( + node.tpu_forwards(Protocol::UDP).ok().as_ref(), sockets.get(&SOCKET_TAG_TPU_FORWARDS) ); assert_eq!( - node.tpu_forwards_quic().ok().as_ref(), + node.tpu_forwards(Protocol::QUIC).ok().as_ref(), sockets.get(&SOCKET_TAG_TPU_FORWARDS_QUIC) ); - assert_eq!( - node.tpu_quic().ok().as_ref(), - sockets.get(&SOCKET_TAG_TPU_QUIC) - ); assert_eq!( node.tpu_vote().ok().as_ref(), sockets.get(&SOCKET_TAG_TPU_VOTE) @@ -778,20 +796,34 @@ mod tests { assert_eq!(old.rpc().unwrap(), node.rpc().unwrap()); assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap()); assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap()); - assert_eq!(old.tpu().unwrap(), node.tpu().unwrap()); - assert_eq!(old.tpu_forwards().unwrap(), node.tpu_forwards().unwrap()); assert_eq!( - node.tpu_forwards_quic().unwrap(), + old.tpu(Protocol::QUIC).unwrap(), + node.tpu(Protocol::QUIC).unwrap() + ); + assert_eq!( + old.tpu(Protocol::UDP).unwrap(), + node.tpu(Protocol::UDP).unwrap() + ); + assert_eq!( + old.tpu_forwards(Protocol::QUIC).unwrap(), + node.tpu_forwards(Protocol::QUIC).unwrap() + ); + assert_eq!( + old.tpu_forwards(Protocol::UDP).unwrap(), + node.tpu_forwards(Protocol::UDP).unwrap() + ); + assert_eq!( + node.tpu_forwards(Protocol::QUIC).unwrap(), SocketAddr::new( - old.tpu_forwards().unwrap().ip(), - old.tpu_forwards().unwrap().port() + QUIC_PORT_OFFSET + old.tpu_forwards(Protocol::UDP).unwrap().ip(), + old.tpu_forwards(Protocol::UDP).unwrap().port() + QUIC_PORT_OFFSET ) ); assert_eq!( - node.tpu_quic().unwrap(), + node.tpu(Protocol::QUIC).unwrap(), SocketAddr::new( - old.tpu().unwrap().ip(), - old.tpu().unwrap().port() + QUIC_PORT_OFFSET + old.tpu(Protocol::UDP).unwrap().ip(), + old.tpu(Protocol::UDP).unwrap().port() + QUIC_PORT_OFFSET ) ); assert_eq!(old.tpu_vote().unwrap(), node.tpu_vote().unwrap()); @@ -860,23 +892,26 @@ mod tests { .unwrap(); // TPU socket. node.set_tpu(socket).unwrap(); - assert_eq!(node.tpu().unwrap(), socket); + assert_eq!(node.tpu(Protocol::UDP).unwrap(), socket); assert_eq!( - node.tpu_quic().unwrap(), + node.tpu(Protocol::QUIC).unwrap(), SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET) ); node.remove_tpu(); - assert_matches!(node.tpu(), Err(Error::InvalidPort(0))); - assert_matches!(node.tpu_quic(), Err(Error::InvalidPort(0))); + assert_matches!(node.tpu(Protocol::UDP), Err(Error::InvalidPort(0))); + assert_matches!(node.tpu(Protocol::QUIC), Err(Error::InvalidPort(0))); // TPU forwards socket. node.set_tpu_forwards(socket).unwrap(); - assert_eq!(node.tpu_forwards().unwrap(), socket); + assert_eq!(node.tpu_forwards(Protocol::UDP).unwrap(), socket); assert_eq!( - node.tpu_forwards_quic().unwrap(), + node.tpu_forwards(Protocol::QUIC).unwrap(), SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET) ); node.remove_tpu_forwards(); - assert_matches!(node.tpu_forwards(), Err(Error::InvalidPort(0))); - assert_matches!(node.tpu_forwards_quic(), Err(Error::InvalidPort(0))); + assert_matches!(node.tpu_forwards(Protocol::UDP), Err(Error::InvalidPort(0))); + assert_matches!( + node.tpu_forwards(Protocol::QUIC), + Err(Error::InvalidPort(0)) + ); } } diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index e91856977..5d27d4c10 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -2,11 +2,10 @@ use { crate::{ contact_info::{ get_quic_socket, sanitize_quic_offset, sanitize_socket, socket_addr_unspecified, - ContactInfo, Error, + ContactInfo, Error, Protocol, }, crds_value::MAX_WALLCLOCK, }, - solana_client::connection_cache::Protocol, solana_sdk::{ pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, @@ -65,6 +64,16 @@ macro_rules! get_socket { Ok(socket).copied() } }; + (@quic $name:ident) => { + pub fn $name(&self, protocol: Protocol) -> Result { + let socket = &self.$name; + sanitize_socket(socket)?; + match protocol { + Protocol::QUIC => get_quic_socket(socket), + Protocol::UDP => Ok(socket).copied(), + } + } + }; } macro_rules! set_socket { @@ -189,8 +198,8 @@ impl LegacyContactInfo { get_socket!(tvu); get_socket!(tvu_forwards); get_socket!(repair); - get_socket!(tpu); - get_socket!(tpu_forwards); + get_socket!(@quic tpu); + get_socket!(@quic tpu_forwards); get_socket!(tpu_vote); get_socket!(rpc); get_socket!(rpc_pubsub); @@ -199,14 +208,6 @@ impl LegacyContactInfo { set_socket!(set_gossip, gossip); set_socket!(set_rpc, rpc); - pub fn tpu_quic(&self) -> Result { - self.tpu().and_then(|addr| get_quic_socket(&addr)) - } - - pub fn tpu_forwards_quic(&self) -> Result { - self.tpu_forwards().and_then(|addr| get_quic_socket(&addr)) - } - fn is_valid_ip(addr: IpAddr) -> bool { !(addr.is_unspecified() || addr.is_multicast()) // || (addr.is_loopback() && !cfg_test)) @@ -226,17 +227,14 @@ impl LegacyContactInfo { protocol: Protocol, socket_addr_space: &SocketAddrSpace, ) -> Option<(SocketAddr, SocketAddr)> { - let rpc = self - .rpc() - .ok() - .filter(|addr| socket_addr_space.check(addr))?; - let tpu = match protocol { - Protocol::QUIC => self.tpu_quic(), - Protocol::UDP => self.tpu(), - } - .ok() - .filter(|addr| socket_addr_space.check(addr))?; - Some((rpc, tpu)) + Some(( + self.rpc() + .ok() + .filter(|addr| socket_addr_space.check(addr))?, + self.tpu(protocol) + .ok() + .filter(|addr| socket_addr_space.check(addr))?, + )) } } @@ -248,17 +246,28 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo { ($name:ident) => { node.$name().ok().unwrap_or_else(socket_addr_unspecified) }; + ($name:ident, $protocol:expr) => { + node.$name($protocol) + .ok() + .unwrap_or_else(socket_addr_unspecified) + }; } - sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?; - sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?; + sanitize_quic_offset( + &node.tpu(Protocol::UDP).ok(), + &node.tpu(Protocol::QUIC).ok(), + )?; + sanitize_quic_offset( + &node.tpu_forwards(Protocol::UDP).ok(), + &node.tpu_forwards(Protocol::QUIC).ok(), + )?; Ok(Self { id: *node.pubkey(), gossip: unwrap_socket!(gossip), tvu: unwrap_socket!(tvu), tvu_forwards: unwrap_socket!(tvu_forwards), repair: unwrap_socket!(repair), - tpu: unwrap_socket!(tpu), - tpu_forwards: unwrap_socket!(tpu_forwards), + tpu: unwrap_socket!(tpu, Protocol::UDP), + tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP), tpu_vote: unwrap_socket!(tpu_vote), rpc: unwrap_socket!(rpc), rpc_pubsub: unwrap_socket!(rpc_pubsub), diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 2edff64ca..7ed844350 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -59,10 +59,7 @@ pub fn get_client_facing_addr>( ) -> (SocketAddr, SocketAddr) { let contact_info = contact_info.borrow(); let rpc = contact_info.rpc().unwrap(); - let mut tpu = match protocol { - Protocol::QUIC => contact_info.tpu_quic().unwrap(), - Protocol::UDP => contact_info.tpu().unwrap(), - }; + let mut tpu = contact_info.tpu(protocol).unwrap(); // QUIC certificate authentication requires the IP Address to match. ContactInfo might have // 0.0.0.0 as the IP instead of 127.0.0.1. tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)); diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index c23a80d10..80abbe749 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -1,5 +1,5 @@ use { - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_poh::poh_recorder::PohRecorder, solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, solana_send_transaction_service::tpu_info::TpuInfo, @@ -33,7 +33,7 @@ impl TpuInfo for ClusterTpuInfo { .cluster_info .tpu_peers() .into_iter() - .filter_map(|node| Some((*node.pubkey(), node.tpu().ok()?))) + .filter_map(|node| Some((*node.pubkey(), node.tpu(Protocol::UDP).ok()?))) .collect(); } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 8854af9f9..0a2e382b9 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -14,7 +14,7 @@ use { parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount}, UiAccount, UiAccountEncoding, UiDataSliceConfig, MAX_BASE58_BYTES, }, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_entry::entry::Entry, solana_faucet::faucet::request_airdrop_transaction, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, @@ -356,11 +356,10 @@ impl JsonRpcRequestProcessor { ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = match *connection_cache { - ConnectionCache::Quic(_) => ContactInfo::tpu_quic, - ConnectionCache::Udp(_) => ContactInfo::tpu, - }(&cluster_info.my_contact_info()) - .unwrap(); + let tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .unwrap(); let (sender, receiver) = unbounded(); SendTransactionService::new::( tpu_address, @@ -3474,11 +3473,11 @@ pub mod rpc_full { pubkey: contact_info.pubkey().to_string(), gossip: contact_info.gossip().ok(), tpu: contact_info - .tpu() + .tpu(Protocol::UDP) .ok() .filter(|addr| socket_addr_space.check(addr)), tpu_quic: contact_info - .tpu_quic() + .tpu(Protocol::QUIC) .ok() .filter(|addr| socket_addr_space.check(addr)), rpc: contact_info @@ -6422,7 +6421,11 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); + let connection_cache = Arc::::default(); + let tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .unwrap(); let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6442,7 +6445,6 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let connection_cache = Arc::new(ConnectionCache::default()); SendTransactionService::new::( tpu_address, &bank_forks, @@ -6691,7 +6693,11 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); + let connection_cache = Arc::::default(); + let tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .unwrap(); let (request_processor, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6711,7 +6717,6 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let connection_cache = Arc::new(ConnectionCache::default()); SendTransactionService::new::( tpu_address, &bank_forks, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 7dff1c99e..e6b5a2db0 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -20,7 +20,7 @@ use { }, regex::Regex, solana_client::connection_cache::ConnectionCache, - solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, bigtable_upload_service::BigTableUploadService, blockstore::Blockstore, @@ -378,11 +378,10 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = match *connection_cache { - ConnectionCache::Quic(_) => ContactInfo::tpu_quic, - ConnectionCache::Udp(_) => ContactInfo::tpu, - }(&cluster_info.my_contact_info()) - .map_err(|err| format!("{err}"))?; + let tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .map_err(|err| format!("{err}"))?; // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 9a54d855b..ef3faca4e 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -15,6 +15,7 @@ use { }, solana_gossip::{ cluster_info::{ClusterInfo, Node}, + contact_info::Protocol, gossip_service::discover_cluster, socketaddr, }, @@ -883,7 +884,7 @@ impl TestValidator { let vote_account_address = validator_vote_account.pubkey(); let rpc_url = format!("http://{}", node.info.rpc().unwrap()); let rpc_pubsub_url = format!("ws://{}/", node.info.rpc_pubsub().unwrap()); - let tpu = node.info.tpu().unwrap(); + let tpu = node.info.tpu(Protocol::UDP).unwrap(); let gossip = node.info.gossip().unwrap(); { diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 874dd2edb..d3133ab32 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -14,7 +14,7 @@ use { tower_storage::TowerStorage, validator::ValidatorStartProgress, }, solana_geyser_plugin_manager::GeyserPluginManagerRequest, - solana_gossip::contact_info::ContactInfo, + solana_gossip::contact_info::{ContactInfo, Protocol}, solana_rpc::rpc::verify_pubkey, solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError}, solana_runtime::accounts_index::AccountIndex, @@ -95,6 +95,11 @@ impl From for AdminRpcContactInfo { SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) }) }; + ($name:ident, $protocol:expr) => { + node.$name($protocol).unwrap_or_else(|_| { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) + }) + }; } Self { id: node.pubkey().to_string(), @@ -103,8 +108,8 @@ impl From for AdminRpcContactInfo { tvu: unwrap_socket!(tvu), tvu_forwards: unwrap_socket!(tvu_forwards), repair: unwrap_socket!(repair), - tpu: unwrap_socket!(tpu), - tpu_forwards: unwrap_socket!(tpu_forwards), + tpu: unwrap_socket!(tpu, Protocol::UDP), + tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP), tpu_vote: unwrap_socket!(tpu_vote), rpc: unwrap_socket!(rpc), rpc_pubsub: unwrap_socket!(rpc_pubsub), @@ -614,7 +619,7 @@ impl AdminRpc for AdminRpcImpl { post_init .cluster_info .my_contact_info() - .tpu() + .tpu(Protocol::UDP) .map_err(|err| { error!( "The public TPU address isn't being published. \ @@ -634,8 +639,8 @@ impl AdminRpc for AdminRpcImpl { let my_contact_info = post_init.cluster_info.my_contact_info(); warn!( "Public TPU addresses set to {:?} (udp) and {:?} (quic)", - my_contact_info.tpu(), - my_contact_info.tpu_quic(), + my_contact_info.tpu(Protocol::UDP), + my_contact_info.tpu(Protocol::QUIC), ); Ok(()) }) @@ -652,7 +657,7 @@ impl AdminRpc for AdminRpcImpl { post_init .cluster_info .my_contact_info() - .tpu_forwards() + .tpu_forwards(Protocol::UDP) .map_err(|err| { error!( "The public TPU Forwards address isn't being published. \ @@ -672,8 +677,8 @@ impl AdminRpc for AdminRpcImpl { let my_contact_info = post_init.cluster_info.my_contact_info(); warn!( "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)", - my_contact_info.tpu_forwards(), - my_contact_info.tpu_forwards_quic(), + my_contact_info.tpu_forwards(Protocol::UDP), + my_contact_info.tpu_forwards(Protocol::QUIC), ); Ok(()) }) diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 55d1d59e5..2248965f9 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -8,6 +8,7 @@ use { solana_genesis_utils::download_then_check_genesis_hash, solana_gossip::{ cluster_info::{ClusterInfo, Node}, + contact_info::Protocol, crds_value, gossip_service::GossipService, legacy_contact_info::LegacyContactInfo as ContactInfo, @@ -83,11 +84,11 @@ fn verify_reachable_ports( if verify_address(&node.info.serve_repair().ok()) { udp_sockets.push(&node.sockets.serve_repair); } - if verify_address(&node.info.tpu().ok()) { + if verify_address(&node.info.tpu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu.iter()); udp_sockets.push(&node.sockets.tpu_quic); } - if verify_address(&node.info.tpu_forwards().ok()) { + if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); udp_sockets.push(&node.sockets.tpu_forwards_quic); }