From cb65a785bcf3132111b8b52ad7c2f46af59d4072 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 21 Apr 2023 15:39:16 +0000 Subject: [PATCH] makes sockets in LegacyContactInfo private (#31248) Working towards LegacyContactInfo => ContactInfo migration, the commit hides some implementation details of LegacyContactInfo and expands API parity with the new ContactInfo. --- accounts-cluster-bench/src/main.rs | 2 +- core/src/broadcast_stage.rs | 16 +- .../broadcast_duplicates_run.rs | 7 +- .../broadcast_fake_shreds_run.rs | 8 +- core/src/cluster_nodes.rs | 55 +++--- core/src/next_leader.rs | 23 ++- core/src/result.rs | 4 +- core/src/serve_repair.rs | 27 +-- core/src/warm_quic_cache_service.rs | 6 +- dos/src/main.rs | 26 +-- gossip/src/cluster_info.rs | 169 +++++++++--------- gossip/src/contact_info.rs | 64 ++----- gossip/src/crds.rs | 7 +- gossip/src/crds_gossip.rs | 16 +- gossip/src/crds_gossip_pull.rs | 16 +- gossip/src/crds_gossip_push.rs | 4 +- gossip/src/gossip_service.rs | 4 +- gossip/src/legacy_contact_info.rs | 94 ++++++++-- gossip/src/main.rs | 15 +- gossip/tests/crds_gossip.rs | 2 +- gossip/tests/gossip.rs | 6 +- local-cluster/tests/common.rs | 2 +- rpc/src/cluster_tpu_info.rs | 2 +- rpc/src/rpc.rs | 25 ++- streamer/src/socket.rs | 4 +- transaction-dos/src/main.rs | 2 +- validator/src/bootstrap.rs | 40 ++--- 27 files changed, 356 insertions(+), 290 deletions(-) diff --git a/accounts-cluster-bench/src/main.rs b/accounts-cluster-bench/src/main.rs index f6f282fa4a..db58cdb71f 100644 --- a/accounts-cluster-bench/src/main.rs +++ b/accounts-cluster-bench/src/main.rs @@ -661,7 +661,7 @@ fn main() { }); info!("done found {} nodes", gossip_nodes.len()); - gossip_nodes[0].rpc + gossip_nodes[0].rpc().unwrap() } else { info!("Using {:?} as the RPC address", entrypoint_addr); entrypoint_addr diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 89c05457b6..a27949bdc7 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -14,10 +14,7 @@ use { }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, - solana_gossip::{ - cluster_info::{ClusterInfo, ClusterInfoError}, - legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -412,10 +409,13 @@ pub fn broadcast_shreds( let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); - shreds.flat_map(move |shred| { - let node = cluster_nodes.get_broadcast_peer(&shred.id())?; - ContactInfo::is_valid_address(&node.tvu, socket_addr_space) - .then(|| (shred.payload(), node.tvu)) + shreds.filter_map(move |shred| { + cluster_nodes + .get_broadcast_peer(&shred.id())? + .tvu() + .ok() + .filter(|addr| socket_addr_space.check(addr)) + .map(|addr| (shred.payload(), addr)) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 29f901c3ad..abe09756d4 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -305,7 +305,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .filter_map(|shred| { let node = cluster_nodes.get_broadcast_peer(&shred.id())?; - if !ContactInfo::is_valid_address(&node.tvu, socket_addr_space) { + if !socket_addr_space.check(&node.tvu().ok()?) { return None; } if self @@ -338,14 +338,15 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .filter_map(|pubkey| { let tvu = cluster_info - .lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?; + .lookup_contact_info(pubkey, ContactInfo::tvu)? + .ok()?; Some((shred.payload(), tvu)) }) .collect(), ); } - Some(vec![(shred.payload(), node.tvu)]) + Some(vec![(shred.payload(), node.tvu().ok()?)]) }) .flatten() .collect(); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 4a61ee3780..56507ea5a8 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -139,9 +139,11 @@ impl BroadcastRun for BroadcastFakeShredsRun { peers.iter().enumerate().for_each(|(i, peer)| { if fake == (i <= self.partition) { // Send fake shreds to the first N peers - data_shreds.iter().for_each(|b| { - sock.send_to(b.payload(), peer.tvu_forwards).unwrap(); - }); + if let Ok(addr) = peer.tvu_forwards() { + data_shreds.iter().for_each(|b| { + sock.send_to(b.payload(), addr).unwrap(); + }); + } } }); } diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 474eff05d4..02e2d70321 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -177,39 +177,42 @@ impl ClusterNodes { frwds, } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?; if neighbors.is_empty() { - let peers = children - .into_iter() - .filter_map(Node::contact_info) - .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) - .map(|node| node.tvu) - .collect(); - return Ok((root_distance, peers)); + let peers = children.into_iter().filter_map(|node| { + node.contact_info()? + .tvu() + .ok() + .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) + }); + return Ok((root_distance, peers.collect())); } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it // should only forward to tvu_forwards socket of its children. if neighbors[0].pubkey() != self.pubkey { - let peers = children - .into_iter() - .filter_map(Node::contact_info) - .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) - .map(|node| node.tvu_forwards); + let peers = children.into_iter().filter_map(|node| { + node.contact_info()? + .tvu_forwards() + .ok() + .filter(|addr| frwds.get(addr) == Some(&node.pubkey())) + }); return Ok((root_distance, peers.collect())); } // First neighbor is this node itself, so skip it. let peers = neighbors[1..] .iter() - .filter_map(|node| node.contact_info()) - .filter(|node| frwds.get(&node.tvu_forwards) == Some(&node.id)) - .map(|node| node.tvu_forwards) - .chain( - children - .into_iter() - .filter_map(Node::contact_info) - .filter(|node| addrs.get(&node.tvu) == Some(&node.id)) - .map(|node| node.tvu), - ); + .filter_map(|node| { + node.contact_info()? + .tvu_forwards() + .ok() + .filter(|addr| frwds.get(addr) == Some(&node.pubkey())) + }) + .chain(children.into_iter().filter_map(|node| { + node.contact_info()? + .tvu() + .ok() + .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) + })); Ok((root_distance, peers.collect())) } @@ -241,9 +244,13 @@ impl ClusterNodes { .map(|index| &self.nodes[index]) .inspect(|node| { if let Some(node) = node.contact_info() { - addrs.entry(node.tvu).or_insert(node.id); + if let Ok(addr) = node.tvu() { + addrs.entry(addr).or_insert(node.id); + } if !drop_redundant_turbine_path { - frwds.entry(node.tvu_forwards).or_insert(node.id); + if let Ok(addr) = node.tvu_forwards() { + frwds.entry(addr).or_insert(node.id); + } } } }) diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 045cc6969f..cff8454df5 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -11,40 +11,37 @@ pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, poh_recorder: &RwLock, ) -> Option<(Pubkey, SocketAddr)> { - next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) + next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu) } pub(crate) fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, poh_recorder: &RwLock, ) -> Option<(Pubkey, SocketAddr)> { - next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) + next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_forwards) } pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, poh_recorder: &RwLock, ) -> Option<(Pubkey, SocketAddr)> { - next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) + next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_vote) } -fn next_leader_x( +fn next_leader_x( cluster_info: &ClusterInfo, poh_recorder: &RwLock, port_selector: F, ) -> Option<(Pubkey, SocketAddr)> where - F: FnOnce(&ContactInfo) -> SocketAddr, + F: FnOnce(&ContactInfo) -> Result, { let leader_pubkey = poh_recorder .read() .unwrap() - .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); - if let Some(leader_pubkey) = leader_pubkey { - cluster_info - .lookup_contact_info(&leader_pubkey, port_selector) - .map(|addr| (leader_pubkey, addr)) - } else { - None - } + .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)?; + cluster_info + .lookup_contact_info(&leader_pubkey, port_selector)? + .map(|addr| (leader_pubkey, addr)) + .ok() } diff --git a/core/src/result.rs b/core/src/result.rs index e17705c5c8..3491a5dab4 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -2,7 +2,7 @@ use { crate::serve_repair::RepairVerifyError, - solana_gossip::{cluster_info, gossip_error::GossipError}, + solana_gossip::{cluster_info, contact_info, gossip_error::GossipError}, solana_ledger::blockstore, thiserror::Error, }; @@ -16,6 +16,8 @@ pub enum Error { #[error(transparent)] Gossip(#[from] GossipError), #[error(transparent)] + InvalidContactInfo(#[from] contact_info::Error), + #[error(transparent)] Io(#[from] std::io::Error), #[error("ReadyTimeout")] ReadyTimeout, diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 121d5c4f19..82100ce3f5 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -338,17 +338,21 @@ pub(crate) struct RepairPeers { impl RepairPeers { fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result { - if peers.is_empty() { - return Err(Error::from(ClusterInfoError::NoPeers)); - } if peers.len() != weights.len() { return Err(Error::from(WeightedError::InvalidWeight)); } - let weighted_index = WeightedIndex::new(weights)?; - let peers = peers + let (peers, weights): (Vec<_>, Vec) = peers .iter() - .map(|peer| (peer.id, peer.serve_repair)) - .collect(); + .zip(weights) + .filter_map(|(peer, &weight)| { + let addr = peer.serve_repair().ok()?; + Some(((peer.id, addr), weight)) + }) + .unzip(); + if peers.is_empty() { + return Err(Error::from(ClusterInfoError::NoPeers)); + } + let weighted_index = WeightedIndex::new(weights)?; Ok(Self { asof, peers, @@ -1070,9 +1074,12 @@ impl ServeRepair { .unzip(); let peers = WeightedShuffle::new("repair_request_ancestor_hashes", &weights) .shuffle(&mut rand::thread_rng()) - .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) .map(|i| index[i]) - .map(|i| (repair_peers[i].id, repair_peers[i].serve_repair)) + .filter_map(|i| { + let addr = repair_peers[i].serve_repair().ok()?; + Some((repair_peers[i].id, addr)) + }) + .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) .collect(); Ok(peers) } @@ -1093,7 +1100,7 @@ impl ServeRepair { .unzip(); let k = WeightedIndex::new(weights)?.sample(&mut rand::thread_rng()); let n = index[k]; - Ok((repair_peers[n].id, repair_peers[n].serve_repair)) + Ok((repair_peers[n].id, repair_peers[n].serve_repair()?)) } pub(crate) fn map_repair_request( diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index ecf8cde423..380313bc99 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -4,7 +4,7 @@ use { rand::{thread_rng, Rng}, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo}, solana_poh::poh_recorder::PohRecorder, std::{ sync::{ @@ -46,8 +46,8 @@ impl WarmQuicCacheService { .map_or(true, |last_leader| last_leader != leader_pubkey) { maybe_last_leader = Some(leader_pubkey); - if let Some(addr) = cluster_info - .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) + if let Some(Ok(addr)) = cluster_info + .lookup_contact_info(&leader_pubkey, ContactInfo::tpu) { let conn = connection_cache.get_connection(&addr); if let Err(err) = conn.send_data(&[0u8]) { diff --git a/dos/src/main.rs b/dos/src/main.rs index 5537b468bb..69d8b2f10f 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -427,16 +427,16 @@ fn get_target( info!("ADDR = {}", entrypoint_addr); for node in nodes { - if node.gossip == entrypoint_addr { - info!("{}", node.gossip); + if node.gossip().ok() == Some(entrypoint_addr) { + info!("{:?}", node.gossip()); target = match mode { - Mode::Gossip => Some((node.id, node.gossip)), - Mode::Tvu => Some((node.id, node.tvu)), - Mode::TvuForwards => Some((node.id, node.tvu_forwards)), - Mode::Tpu => Some((node.id, node.tpu)), - Mode::TpuForwards => Some((node.id, node.tpu_forwards)), - Mode::Repair => Some((node.id, node.repair)), - Mode::ServeRepair => Some((node.id, node.serve_repair)), + Mode::Gossip => Some((node.id, node.gossip().unwrap())), + Mode::Tvu => Some((node.id, node.tvu().unwrap())), + Mode::TvuForwards => Some((node.id, node.tvu_forwards().unwrap())), + Mode::Tpu => Some((node.id, node.tpu().unwrap())), + Mode::TpuForwards => Some((node.id, node.tpu_forwards().unwrap())), + Mode::Repair => Some((node.id, node.repair().unwrap())), + Mode::ServeRepair => Some((node.id, node.serve_repair().unwrap())), Mode::Rpc => None, }; break; @@ -457,9 +457,9 @@ fn get_rpc_client( // find target node for node in nodes { - if node.gossip == entrypoint_addr { - info!("{}", node.gossip); - return Ok(RpcClient::new_socket(node.rpc)); + if node.gossip().ok() == Some(entrypoint_addr) { + info!("{:?}", node.gossip()); + return Ok(RpcClient::new_socket(node.rpc().unwrap())); } } Err("Node with entrypoint_addr was not found") @@ -813,7 +813,7 @@ pub mod test { &solana_sdk::pubkey::new_rand(), timestamp(), )]; - let entrypoint_addr = nodes[0].gossip; + let entrypoint_addr = nodes[0].gossip().unwrap(); run_dos_no_client( &nodes, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index feed93e680..1b74532c67 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -549,7 +549,7 @@ impl ClusterInfo { .read() .unwrap() .iter() - .map(|contact_info| contact_info.gossip) + .filter_map(|node| node.gossip().ok()) .collect::>(); let self_pubkey = self.id(); let gossip_crds = self.gossip.crds.read().unwrap(); @@ -564,7 +564,10 @@ impl ClusterInfo { // definition that information is already available let contact_info = v.value.contact_info().unwrap(); if contact_info.id != self_pubkey - && !entrypoint_gossip_addrs.contains(&contact_info.gossip) + && contact_info + .gossip() + .map(|addr| !entrypoint_gossip_addrs.contains(&addr)) + .unwrap_or_default() { return Some(v.value.clone()); } @@ -698,7 +701,9 @@ impl ClusterInfo { ) -> Option { let gossip_crds = self.gossip.crds.read().unwrap(); let mut nodes = gossip_crds.get_nodes_contact_info(); - nodes.find(|node| node.gossip == *gossip_addr).cloned() + nodes + .find(|node| node.gossip().ok() == Some(*gossip_addr)) + .cloned() } pub fn my_contact_info(&self) -> ContactInfo { @@ -720,6 +725,18 @@ impl ClusterInfo { .unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp())) } + fn addr_to_string(&self, default_ip: &Option, addr: &Option) -> String { + addr.filter(|addr| self.socket_addr_space.check(addr)) + .map(|addr| { + if &Some(addr.ip()) == default_ip { + addr.port().to_string() + } else { + addr.to_string() + } + }) + .unwrap_or_else(|| String::from("none")) + } + pub fn rpc_info_trace(&self) -> String { let now = timestamp(); let my_pubkey = self.id(); @@ -728,30 +745,17 @@ impl ClusterInfo { .all_peers() .into_iter() .filter_map(|(node, last_updated)| { - if !ContactInfo::is_valid_address(&node.rpc, &self.socket_addr_space) { - return None; - } - + let node_rpc = node + .rpc() + .ok() + .filter(|addr| self.socket_addr_space.check(addr))?; let node_version = self.get_node_version(&node.id); if my_shred_version != 0 && (node.shred_version != 0 && node.shred_version != my_shred_version) { return None; } - - let addr_to_string = |default_ip: &IpAddr, addr: &SocketAddr| -> String { - if ContactInfo::is_valid_address(addr, &self.socket_addr_space) { - if &addr.ip() == default_ip { - addr.port().to_string() - } else { - addr.to_string() - } - } else { - "none".to_string() - } - }; - - let rpc_addr = node.rpc.ip(); + let rpc_addr = node_rpc.ip(); Some(format!( "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {}\n", rpc_addr.to_string(), @@ -763,8 +767,8 @@ impl ClusterInfo { } else { "-".to_string() }, - addr_to_string(&rpc_addr, &node.rpc), - addr_to_string(&rpc_addr, &node.rpc_pubsub), + self.addr_to_string(&Some(rpc_addr), &node.rpc().ok()), + self.addr_to_string(&Some(rpc_addr), &node.rpc_pubsub().ok()), node.shred_version, )) }) @@ -806,25 +810,17 @@ impl ClusterInfo { if is_spy_node { shred_spy_nodes = shred_spy_nodes.saturating_add(1); } - let addr_to_string = |default_ip: &IpAddr, addr: &SocketAddr| -> String { - if ContactInfo::is_valid_address(addr, &self.socket_addr_space) { - if &addr.ip() == default_ip { - addr.port().to_string() - } else { - addr.to_string() - } - } else { - "none".to_string() - } - }; - let ip_addr = node.gossip.ip(); + let ip_addr = node.gossip().as_ref().map(SocketAddr::ip).ok(); Some(format!( "{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", - if ContactInfo::is_valid_address(&node.gossip, &self.socket_addr_space) { - ip_addr.to_string() - } else { - "none".to_string() - }, + node.gossip() + .ok() + .filter(|addr| self.socket_addr_space.check(addr)) + .as_ref() + .map(SocketAddr::ip) + .as_ref() + .map(IpAddr::to_string) + .unwrap_or_else(|| String::from("none")), if node.id == my_pubkey { "me" } else { "" }, now.saturating_sub(last_updated), node.id, @@ -833,14 +829,14 @@ impl ClusterInfo { } else { "-".to_string() }, - addr_to_string(&ip_addr, &node.gossip), - addr_to_string(&ip_addr, &node.tpu_vote), - addr_to_string(&ip_addr, &node.tpu), - addr_to_string(&ip_addr, &node.tpu_forwards), - addr_to_string(&ip_addr, &node.tvu), - addr_to_string(&ip_addr, &node.tvu_forwards), - addr_to_string(&ip_addr, &node.repair), - addr_to_string(&ip_addr, &node.serve_repair), + self.addr_to_string(&ip_addr, &node.gossip().ok()), + self.addr_to_string(&ip_addr, &node.tpu_vote().ok()), + self.addr_to_string(&ip_addr, &node.tpu().ok()), + self.addr_to_string(&ip_addr, &node.tpu_forwards().ok()), + self.addr_to_string(&ip_addr, &node.tvu().ok()), + self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()), + self.addr_to_string(&ip_addr, &node.repair().ok()), + self.addr_to_string(&ip_addr, &node.serve_repair().ok()), node.shred_version, )) } @@ -1259,16 +1255,19 @@ impl ClusterInfo { Some(version.version.clone().into()) } + fn check_socket_addr_space(&self, addr: &Result) -> bool { + addr.as_ref() + .map(|addr| self.socket_addr_space.check(addr)) + .unwrap_or_default() + } + /// all validators that have a valid rpc port regardless of `shred_version`. pub fn all_rpc_peers(&self) -> Vec { let self_pubkey = self.id(); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get_nodes_contact_info() - .filter(|x| { - x.id != self_pubkey - && ContactInfo::is_valid_address(&x.rpc, &self.socket_addr_space) - }) + .filter(|node| node.id != self_pubkey && self.check_socket_addr_space(&node.rpc())) .cloned() .collect() } @@ -1288,9 +1287,7 @@ impl ClusterInfo { gossip_crds .get_nodes_contact_info() // shred_version not considered for gossip peers (ie, spy nodes do not set shred_version) - .filter(|x| { - x.id != me && ContactInfo::is_valid_address(&x.gossip, &self.socket_addr_space) - }) + .filter(|node| node.id != me && self.check_socket_addr_space(&node.gossip())) .cloned() .collect() } @@ -1300,10 +1297,7 @@ impl ClusterInfo { let self_pubkey = self.id(); self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) .get_nodes_contact_info() - .filter(|x| { - ContactInfo::is_valid_address(&x.tvu, &self.socket_addr_space) - && x.id != self_pubkey - }) + .filter(|node| node.id != self_pubkey && self.check_socket_addr_space(&node.tvu())) .cloned() .collect() } @@ -1317,7 +1311,7 @@ impl ClusterInfo { .filter(|node| { node.id != self_pubkey && node.shred_version == self_shred_version - && ContactInfo::is_valid_address(&node.tvu, &self.socket_addr_space) + && self.check_socket_addr_space(&node.tvu()) }) .cloned() .collect() @@ -1334,8 +1328,8 @@ impl ClusterInfo { .filter(|node| { node.id != self_pubkey && node.shred_version == self_shred_version - && ContactInfo::is_valid_address(&node.tvu, &self.socket_addr_space) - && ContactInfo::is_valid_address(&node.serve_repair, &self.socket_addr_space) + && self.check_socket_addr_space(&node.tvu()) + && self.check_socket_addr_space(&node.serve_repair()) && match gossip_crds.get::<&LowestSlot>(node.id) { None => true, // fallback to legacy behavior Some(lowest_slot) => lowest_slot.lowest <= slot, @@ -1345,10 +1339,13 @@ impl ClusterInfo { .collect() } - fn is_spy_node(contact_info: &LegacyContactInfo, socket_addr_space: &SocketAddrSpace) -> bool { - !ContactInfo::is_valid_address(&contact_info.tpu, socket_addr_space) - || !ContactInfo::is_valid_address(&contact_info.gossip, socket_addr_space) - || !ContactInfo::is_valid_address(&contact_info.tvu, socket_addr_space) + fn is_spy_node(node: &LegacyContactInfo, socket_addr_space: &SocketAddrSpace) -> bool { + ![node.tpu(), node.gossip(), node.tvu()] + .into_iter() + .all(|addr| { + addr.map(|addr| socket_addr_space.check(&addr)) + .unwrap_or_default() + }) } /// compute broadcast table @@ -1357,10 +1354,7 @@ impl ClusterInfo { let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get_nodes_contact_info() - .filter(|x| { - x.id != self_pubkey - && ContactInfo::is_valid_address(&x.tpu, &self.socket_addr_space) - }) + .filter(|node| node.id != self_pubkey && self.check_socket_addr_space(&node.tpu())) .cloned() .collect() } @@ -1403,12 +1397,14 @@ impl ClusterInfo { return; } entrypoint.wallclock = now; - if self - .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) - .get_nodes_contact_info() - .any(|node| node.gossip == entrypoint.gossip) - { - return; // Found the entrypoint, no need to pull from it + if let Ok(entrypoint_gossip) = entrypoint.gossip() { + if self + .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) + .get_nodes_contact_info() + .any(|node| node.gossip().ok() == Some(entrypoint_gossip)) + { + return; // Found the entrypoint, no need to pull from it + } } } entrypoint.clone() @@ -1513,7 +1509,8 @@ impl ClusterInfo { let self_info = CrdsValue::new_signed(self_info, &self.keypair()); let pulls = pulls .into_iter() - .flat_map(|(peer, filters)| repeat(peer.gossip).zip(filters)) + .filter_map(|(peer, filters)| Some((peer.gossip().ok()?, filters))) + .flat_map(|(addr, filters)| repeat(addr).zip(filters)) .map(|(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) @@ -1563,7 +1560,7 @@ impl ClusterInfo { .into_iter() .filter_map(|(pubkey, messages)| { let peer: &LegacyContactInfo = gossip_crds.get(pubkey)?; - Some((peer.gossip, messages)) + Some((peer.gossip().ok()?, messages)) }) .collect() }; @@ -1655,8 +1652,10 @@ impl ClusterInfo { for entrypoint in entrypoints.iter_mut() { if entrypoint.id == Pubkey::default() { // If a pull from the entrypoint was successful it should exist in the CRDS table - if let Some(entrypoint_from_gossip) = - self.lookup_contact_info_by_gossip_addr(&entrypoint.gossip) + if let Some(entrypoint_from_gossip) = entrypoint + .gossip() + .ok() + .and_then(|addr| self.lookup_contact_info_by_gossip_addr(&addr)) { // Update the entrypoint's id so future entrypoint pulls correctly reference it *entrypoint = entrypoint_from_gossip; @@ -2337,7 +2336,7 @@ impl ClusterInfo { }; prune_data.sign(&self.keypair()); let prune_message = Protocol::PruneMessage(self_pubkey, prune_data); - Some((peer.gossip, prune_message)) + Some((peer.gossip().ok()?, prune_message)) }) .collect() }) @@ -4109,7 +4108,7 @@ RPC Enabled Nodes: 1"#; assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); for (addr, msg) in pulls { - assert_eq!(addr, entrypoint.gossip); + assert_eq!(addr, entrypoint.gossip().unwrap()); match msg { Protocol::PullRequest(_, value) => { assert!(value.verify()); @@ -4490,12 +4489,12 @@ RPC Enabled Nodes: 1"#; // address let entrypoint1_gossip_addr = socketaddr!("127.0.0.2:1234"); let mut entrypoint1 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); - entrypoint1.gossip = entrypoint1_gossip_addr; + entrypoint1.set_gossip(entrypoint1_gossip_addr).unwrap(); assert_eq!(entrypoint1.shred_version, 0); let entrypoint2_gossip_addr = socketaddr!("127.0.0.2:5678"); let mut entrypoint2 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); - entrypoint2.gossip = entrypoint2_gossip_addr; + entrypoint2.set_gossip(entrypoint2_gossip_addr).unwrap(); assert_eq!(entrypoint2.shred_version, 0); cluster_info.set_entrypoints(vec![entrypoint1, entrypoint2]); @@ -4582,7 +4581,7 @@ RPC Enabled Nodes: 1"#; // address let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234"); let mut entrypoint = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); - entrypoint.gossip = entrypoint_gossip_addr; + entrypoint.set_gossip(entrypoint_gossip_addr).unwrap(); assert_eq!(entrypoint.shred_version, 0); cluster_info.set_entrypoint(entrypoint); diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index a0e7c65578..092a12e9fd 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -441,41 +441,12 @@ impl Sanitize for ContactInfo { } } -impl TryFrom<&ContactInfo> for LegacyContactInfo { - type Error = Error; - - fn try_from(node: &ContactInfo) -> Result { - macro_rules! unwrap_socket { - ($name:ident) => { - node.$name().ok().unwrap_or_else(socket_addr_unspecified) - }; - } - sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?; - sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?; - Ok(Self { - id: *node.pubkey(), - gossip: unwrap_socket!(gossip), - tvu: unwrap_socket!(tvu), - tvu_forwards: unwrap_socket!(tvu_forwards), - repair: unwrap_socket!(repair), - tpu: unwrap_socket!(tpu), - tpu_forwards: unwrap_socket!(tpu_forwards), - tpu_vote: unwrap_socket!(tpu_vote), - rpc: unwrap_socket!(rpc), - rpc_pubsub: unwrap_socket!(rpc_pubsub), - serve_repair: unwrap_socket!(serve_repair), - wallclock: node.wallclock(), - shred_version: node.shred_version(), - }) - } -} - // Workaround until feature(const_socketaddr) is stable. -fn socket_addr_unspecified() -> SocketAddr { +pub(crate) fn socket_addr_unspecified() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) } -fn sanitize_socket(socket: &SocketAddr) -> Result<(), Error> { +pub(crate) fn sanitize_socket(socket: &SocketAddr) -> Result<(), Error> { if socket.port() == 0u16 { return Err(Error::InvalidPort(socket.port())); } @@ -540,7 +511,7 @@ fn sanitize_entries(addrs: &[IpAddr], sockets: &[SocketEntry]) -> Result<(), Err } // Verifies that the other socket is at QUIC_PORT_OFFSET from the first one. -fn sanitize_quic_offset( +pub(crate) fn sanitize_quic_offset( socket: &Option, // udp other: &Option, // quic: udp + QUIC_PORT_OFFSET ) -> Result<(), Error> { @@ -802,27 +773,30 @@ mod tests { fn cross_verify_with_legacy(node: &ContactInfo) { let old = LegacyContactInfo::try_from(node).unwrap(); - assert_eq!(old.gossip, node.gossip().unwrap()); - assert_eq!(old.repair, node.repair().unwrap()); - assert_eq!(old.rpc, node.rpc().unwrap()); - assert_eq!(old.rpc_pubsub, node.rpc_pubsub().unwrap()); - assert_eq!(old.serve_repair, node.serve_repair().unwrap()); - assert_eq!(old.tpu, node.tpu().unwrap()); - assert_eq!(old.tpu_forwards, node.tpu_forwards().unwrap()); + assert_eq!(old.gossip().unwrap(), node.gossip().unwrap()); + assert_eq!(old.repair().unwrap(), node.repair().unwrap()); + assert_eq!(old.rpc().unwrap(), node.rpc().unwrap()); + assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap()); + assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap()); + assert_eq!(old.tpu().unwrap(), node.tpu().unwrap()); + assert_eq!(old.tpu_forwards().unwrap(), node.tpu_forwards().unwrap()); assert_eq!( node.tpu_forwards_quic().unwrap(), SocketAddr::new( - old.tpu_forwards.ip(), - old.tpu_forwards.port() + QUIC_PORT_OFFSET + old.tpu_forwards().unwrap().ip(), + old.tpu_forwards().unwrap().port() + QUIC_PORT_OFFSET ) ); assert_eq!( node.tpu_quic().unwrap(), - SocketAddr::new(old.tpu.ip(), old.tpu.port() + QUIC_PORT_OFFSET) + SocketAddr::new( + old.tpu().unwrap().ip(), + old.tpu().unwrap().port() + QUIC_PORT_OFFSET + ) ); - assert_eq!(old.tpu_vote, node.tpu_vote().unwrap()); - assert_eq!(old.tvu, node.tvu().unwrap()); - assert_eq!(old.tvu_forwards, node.tvu_forwards().unwrap()); + assert_eq!(old.tpu_vote().unwrap(), node.tpu_vote().unwrap()); + assert_eq!(old.tvu().unwrap(), node.tvu().unwrap()); + assert_eq!(old.tvu_forwards().unwrap(), node.tvu_forwards().unwrap()); } #[test] diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 3c8c433f8a..7e370e65e2 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -719,10 +719,7 @@ impl CrdsStats { mod tests { use { super::*, - crate::{ - crds_value::{new_rand_timestamp, LegacySnapshotHashes, NodeInstance}, - socketaddr, - }, + crate::crds_value::{new_rand_timestamp, LegacySnapshotHashes, NodeInstance}, rand::{thread_rng, Rng, SeedableRng}, rand_chacha::ChaChaRng, rayon::ThreadPoolBuilder, @@ -1447,7 +1444,7 @@ mod tests { let v2 = VersionedCrdsValue::new( { let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0); - contact_info.rpc = socketaddr!(Ipv4Addr::UNSPECIFIED, 0); + contact_info.set_rpc((Ipv4Addr::LOCALHOST, 1244)).unwrap(); CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info)) }, Cursor::default(), diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index ee0edb1b6e..d87ad1fd9b 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -358,7 +358,10 @@ pub(crate) fn get_gossip_nodes( .filter(|node| { &node.id != pubkey && verify_shred_version(node.shred_version) - && ContactInfo::is_valid_address(&node.gossip, socket_addr_space) + && node + .gossip() + .map(|addr| socket_addr_space.check(&addr)) + .unwrap_or_default() && match gossip_validators { Some(nodes) => nodes.contains(&node.id), None => true, @@ -375,7 +378,8 @@ pub(crate) fn dedup_gossip_addresses( ) -> HashMap { nodes .into_iter() - .into_grouping_map_by(|node| node.gossip) + .filter_map(|node| Some((node.gossip().ok()?, node))) + .into_grouping_map() .aggregate(|acc, _node_gossip, node| { let stake = stakes.get(&node.id).copied().unwrap_or_default(); match acc { @@ -401,12 +405,16 @@ pub(crate) fn maybe_ping_gossip_addresses( nodes .into_iter() .filter(|node| { + let node_gossip = match node.gossip() { + Err(_) => return false, + Ok(addr) => addr, + }; let (check, ping) = { - let node = (node.id, node.gossip); + let node = (node.id, node_gossip); ping_cache.check(now, node, &mut pingf) }; if let Some(ping) = ping { - pings.push((node.gossip, ping)); + pings.push((node_gossip, ping)); } check }) diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index b7be36282d..63ce378792 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -829,7 +829,7 @@ pub(crate) mod tests { ping_cache .lock() .unwrap() - .mock_pong(new.id, new.gossip, Instant::now()); + .mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); crds.write() .unwrap() @@ -893,12 +893,12 @@ pub(crate) mod tests { let node = CrdsGossipPull::default(); crds.insert(entry, now, GossipRoute::LocalMessage).unwrap(); let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - ping_cache.mock_pong(old.id, old.gossip, Instant::now()); + ping_cache.mock_pong(old.id, old.gossip().unwrap(), Instant::now()); let old = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(old)); crds.insert(old.clone(), now, GossipRoute::LocalMessage) .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + ping_cache.mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); crds.insert(new, now, GossipRoute::LocalMessage).unwrap(); let crds = RwLock::new(crds); @@ -956,7 +956,7 @@ pub(crate) mod tests { .insert(entry, now, GossipRoute::LocalMessage) .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); - ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + ping_cache.mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); node_crds .insert(new, now, GossipRoute::LocalMessage) @@ -1058,7 +1058,7 @@ pub(crate) mod tests { 128, // capacity ); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + ping_cache.mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let node_crds = RwLock::new(node_crds); @@ -1118,14 +1118,14 @@ pub(crate) mod tests { 128, // capacity ); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); - ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + ping_cache.mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); let new = ContactInfo::new_localhost(&new_id, 1); - ping_cache.mock_pong(new.id, new.gossip, Instant::now()); + ping_cache.mock_pong(new.id, new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); dest_crds .insert(new.clone(), 0, GossipRoute::LocalMessage) @@ -1134,7 +1134,7 @@ pub(crate) mod tests { // node contains a key from the dest node, but at an older local timestamp let same_key = ContactInfo::new_localhost(&new_id, 0); - ping_cache.mock_pong(same_key.id, same_key.gossip, Instant::now()); + ping_cache.mock_pong(same_key.id, same_key.gossip().unwrap(), Instant::now()); let same_key = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(same_key)); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 6ee9375fc5..cd772d576a 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -398,7 +398,7 @@ mod tests { let push = CrdsGossipPush::default(); let mut ping_cache = new_ping_cache(); let peer = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - ping_cache.mock_pong(peer.id, peer.gossip, Instant::now()); + ping_cache.mock_pong(peer.id, peer.gossip().unwrap(), Instant::now()); let peer = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(peer)); assert_eq!( crds.insert(peer.clone(), now, GossipRoute::LocalMessage), @@ -450,7 +450,7 @@ mod tests { .map(|wallclock| { let mut peer = ContactInfo::new_rand(&mut rng, /*pubkey=*/ None); peer.wallclock = wallclock; - ping_cache.mock_pong(peer.id, peer.gossip, Instant::now()); + ping_cache.mock_pong(peer.id, peer.gossip().unwrap(), Instant::now()); CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(peer)) }) .collect(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 65f4f5e1a4..6845b67cc3 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -260,7 +260,9 @@ fn spy( }; let found_node_by_gossip_addr = if let Some(gossip_addr) = find_node_by_gossip_addr { - all_peers.iter().any(|x| x.gossip == *gossip_addr) + all_peers + .iter() + .any(|node| node.gossip().ok() == Some(*gossip_addr)) } else { false }; diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index aa4a15cd0d..503b6ef831 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -1,5 +1,10 @@ use { - crate::crds_value::MAX_WALLCLOCK, + crate::{ + contact_info::{ + sanitize_quic_offset, sanitize_socket, socket_addr_unspecified, ContactInfo, Error, + }, + crds_value::MAX_WALLCLOCK, + }, solana_sdk::{ pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, @@ -16,25 +21,25 @@ use { pub struct LegacyContactInfo { pub id: Pubkey, /// gossip address - pub gossip: SocketAddr, + gossip: SocketAddr, /// address to connect to for replication - pub tvu: SocketAddr, + tvu: SocketAddr, /// address to forward shreds to - pub tvu_forwards: SocketAddr, + tvu_forwards: SocketAddr, /// address to send repair responses to - pub repair: SocketAddr, + repair: SocketAddr, /// transactions address - pub tpu: SocketAddr, + tpu: SocketAddr, /// address to forward unprocessed transactions to - pub tpu_forwards: SocketAddr, + tpu_forwards: SocketAddr, /// address to which to send bank state requests - pub tpu_vote: SocketAddr, + tpu_vote: SocketAddr, /// address to which to send JSON-RPC requests - pub rpc: SocketAddr, + rpc: SocketAddr, /// websocket for JSON-RPC push notifications - pub rpc_pubsub: SocketAddr, + rpc_pubsub: SocketAddr, /// address to send repair requests to - pub serve_repair: SocketAddr, + serve_repair: SocketAddr, /// latest wallclock picked pub wallclock: u64, /// node shred version @@ -50,6 +55,30 @@ impl Sanitize for LegacyContactInfo { } } +macro_rules! get_socket { + ($name:ident) => { + pub fn $name(&self) -> Result { + let socket = &self.$name; + sanitize_socket(socket)?; + Ok(socket).copied() + } + }; +} + +macro_rules! set_socket { + ($name:ident, $key:ident) => { + pub fn $name(&mut self, socket: T) -> Result<(), Error> + where + SocketAddr: From, + { + let socket = SocketAddr::from(socket); + sanitize_socket(&socket)?; + self.$key = socket; + Ok(()) + } + }; +} + #[macro_export] macro_rules! socketaddr { ($ip:expr, $port:expr) => { @@ -126,6 +155,20 @@ impl LegacyContactInfo { } } + get_socket!(gossip); + get_socket!(tvu); + get_socket!(tvu_forwards); + get_socket!(repair); + get_socket!(tpu); + get_socket!(tpu_forwards); + get_socket!(tpu_vote); + get_socket!(rpc); + get_socket!(rpc_pubsub); + get_socket!(serve_repair); + + set_socket!(set_gossip, gossip); + set_socket!(set_rpc, rpc); + fn is_valid_ip(addr: IpAddr) -> bool { !(addr.is_unspecified() || addr.is_multicast()) // || (addr.is_loopback() && !cfg_test)) @@ -158,6 +201,35 @@ impl LegacyContactInfo { } } +impl TryFrom<&ContactInfo> for LegacyContactInfo { + type Error = Error; + + fn try_from(node: &ContactInfo) -> Result { + macro_rules! unwrap_socket { + ($name:ident) => { + node.$name().ok().unwrap_or_else(socket_addr_unspecified) + }; + } + sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?; + sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?; + Ok(Self { + id: *node.pubkey(), + gossip: unwrap_socket!(gossip), + tvu: unwrap_socket!(tvu), + tvu_forwards: unwrap_socket!(tvu_forwards), + repair: unwrap_socket!(repair), + tpu: unwrap_socket!(tpu), + tpu_forwards: unwrap_socket!(tpu_forwards), + tpu_vote: unwrap_socket!(tpu_vote), + rpc: unwrap_socket!(rpc), + rpc_pubsub: unwrap_socket!(rpc_pubsub), + serve_repair: unwrap_socket!(serve_repair), + wallclock: node.wallclock(), + shred_version: node.shred_version(), + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/gossip/src/main.rs b/gossip/src/main.rs index bd12a0695c..6ce8b724b8 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -292,14 +292,15 @@ fn process_rpc_url( let rpc_addrs: Vec<_> = validators .iter() - .filter_map(|contact_info| { - if (any || all || Some(contact_info.gossip) == entrypoint_addr) - && ContactInfo::is_valid_address(&contact_info.rpc, &socket_addr_space) - { - return Some(contact_info.rpc); - } - None + .filter(|node| { + any || all + || node + .gossip() + .map(|addr| Some(addr) == entrypoint_addr) + .unwrap_or_default() }) + .filter_map(|node| node.rpc().ok()) + .filter(|addr| socket_addr_space.check(addr)) .collect(); if rpc_addrs.is_empty() { diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index c77c94c598..8e0c3f1084 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -484,7 +484,7 @@ fn network_run_pull( if node.keypair.pubkey() != other.keypair.pubkey() { ping_cache.mock_pong( other.keypair.pubkey(), - other.contact_info.gossip, + other.contact_info.gossip().unwrap(), Instant::now(), ); } diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 914b222425..65baeb51c8 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -130,13 +130,13 @@ fn retransmit_to( let dests: Vec<_> = if forwarded { peers .iter() - .map(|peer| peer.tvu_forwards) - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .filter_map(|peer| peer.tvu_forwards().ok()) + .filter(|addr| socket_addr_space.check(addr)) .collect() } else { peers .iter() - .map(|peer| peer.tvu) + .filter_map(|peer| peer.tvu().ok()) .filter(|addr| socket_addr_space.check(addr)) .collect() }; diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index ba0fcdc14a..1570d62610 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -353,7 +353,7 @@ pub fn run_cluster_partition( // Check epochs have correct number of slots info!("PARTITION_TEST sleeping until partition starting condition",); for node in &cluster_nodes { - let node_client = RpcClient::new_socket(node.rpc); + let node_client = RpcClient::new_socket(node.rpc().unwrap()); let epoch_info = node_client.get_epoch_info().unwrap(); assert_eq!(epoch_info.slots_in_epoch, slots_per_epoch); } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index b3f98f1cdb..ce75dda07c 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -33,7 +33,7 @@ impl TpuInfo for ClusterTpuInfo { .cluster_info .tpu_peers() .into_iter() - .map(|ci| (ci.id, ci.tpu)) + .filter_map(|node| Some((node.id, node.tpu().ok()?))) .collect(); } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index edf8843a27..59b71946e7 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -3447,16 +3447,16 @@ pub mod rpc_full { debug!("get_cluster_nodes rpc request received"); let cluster_info = &meta.cluster_info; let socket_addr_space = cluster_info.socket_addr_space(); - let valid_address_or_none = |addr: &SocketAddr| -> Option { - ContactInfo::is_valid_address(addr, socket_addr_space).then_some(*addr) - }; let my_shred_version = cluster_info.my_shred_version(); Ok(cluster_info .all_peers() .iter() .filter_map(|(contact_info, _)| { if my_shred_version == contact_info.shred_version - && ContactInfo::is_valid_address(&contact_info.gossip, socket_addr_space) + && contact_info + .gossip() + .map(|addr| socket_addr_space.check(&addr)) + .unwrap_or_default() { let (version, feature_set) = if let Some(version) = cluster_info.get_node_version(&contact_info.id) @@ -3467,10 +3467,19 @@ pub mod rpc_full { }; Some(RpcContactInfo { pubkey: contact_info.id.to_string(), - gossip: Some(contact_info.gossip), - tpu: valid_address_or_none(&contact_info.tpu), - rpc: valid_address_or_none(&contact_info.rpc), - pubsub: valid_address_or_none(&contact_info.rpc_pubsub), + gossip: contact_info.gossip().ok(), + tpu: contact_info + .tpu() + .ok() + .filter(|addr| socket_addr_space.check(addr)), + rpc: contact_info + .rpc() + .ok() + .filter(|addr| socket_addr_space.check(addr)), + pubsub: contact_info + .rpc_pubsub() + .ok() + .filter(|addr| socket_addr_space.check(addr)), version, feature_set, shred_version: Some(my_shred_version), diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs index b5aa8157a6..fe86f84319 100644 --- a/streamer/src/socket.rs +++ b/streamer/src/socket.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, SocketAddr}; -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy)] pub enum SocketAddrSpace { Unspecified, Global, @@ -18,7 +18,7 @@ impl SocketAddrSpace { /// Returns true if the IP address is valid. #[must_use] pub fn check(&self, addr: &SocketAddr) -> bool { - if self == &SocketAddrSpace::Unspecified { + if matches!(self, SocketAddrSpace::Unspecified) { return true; } // TODO: remove these once IpAddr::is_global is stable. diff --git a/transaction-dos/src/main.rs b/transaction-dos/src/main.rs index 603ecefb65..832a943e31 100644 --- a/transaction-dos/src/main.rs +++ b/transaction-dos/src/main.rs @@ -606,7 +606,7 @@ fn main() { }); info!("done found {} nodes", gossip_nodes.len()); - gossip_nodes[0].rpc + gossip_nodes[0].rpc().unwrap() } else { info!("Using {:?} as the RPC address", entrypoint_addr); entrypoint_addr diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 79b39ecb52..f781fe5550 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -124,7 +124,7 @@ fn verify_reachable_ports( } solana_net_utils::verify_reachable_ports( - &cluster_entrypoint.gossip, + &cluster_entrypoint.gossip().unwrap(), tcp_listeners, &udp_sockets, ) @@ -186,8 +186,10 @@ fn get_rpc_peers( .unwrap_or_else(|| cluster_info.my_shred_version()); if shred_version == 0 { let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| { - cluster_info - .lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip) + cluster_entrypoint + .gossip() + .ok() + .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr)) .map_or(false, |entrypoint| entrypoint.shred_version == 0) }); @@ -391,7 +393,7 @@ pub fn attempt_download_genesis_and_snapshot( authorized_voter_keypairs: Arc>>>, ) -> Result<(), String> { download_then_check_genesis_hash( - &rpc_contact_info.rpc, + &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?, ledger_path, &mut validator_config.expected_genesis_hash, bootstrap_config.max_genesis_archive_unpacked_size, @@ -485,7 +487,7 @@ fn get_vetted_rpc_nodes( vetted_rpc_nodes.extend( rpc_node_details .into_par_iter() - .map(|rpc_node_details| { + .filter_map(|rpc_node_details| { let GetRpcNodeResult { rpc_contact_info, snapshot_hash, @@ -493,14 +495,15 @@ fn get_vetted_rpc_nodes( info!( "Using RPC service from node {}: {:?}", - rpc_contact_info.id, rpc_contact_info.rpc + rpc_contact_info.id, + rpc_contact_info.rpc() ); let rpc_client = RpcClient::new_socket_with_timeout( - rpc_contact_info.rpc, + rpc_contact_info.rpc().ok()?, Duration::from_secs(5), ); - (rpc_contact_info, snapshot_hash, rpc_client) + Some((rpc_contact_info, snapshot_hash, rpc_client)) }) .filter(|(rpc_contact_info, _snapshot_hash, rpc_client)| { match rpc_client.get_version() { @@ -1194,14 +1197,14 @@ fn download_snapshot( *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot { slot: desired_snapshot_hash.0, - rpc_addr: rpc_contact_info.rpc, + rpc_addr: rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?, }; let desired_snapshot_hash = ( desired_snapshot_hash.0, solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1), ); download_snapshot_archive( - &rpc_contact_info.rpc, + &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?, full_snapshot_archives_dir, incremental_snapshot_archives_dir, desired_snapshot_hash, @@ -1326,22 +1329,7 @@ mod tests { } fn default_contact_info_for_tests() -> ContactInfo { - let sock_addr = SocketAddr::from(([1, 1, 1, 1], 11_111)); - ContactInfo { - id: Pubkey::default(), - gossip: sock_addr, - tvu: sock_addr, - tvu_forwards: sock_addr, - repair: sock_addr, - tpu: sock_addr, - tpu_forwards: sock_addr, - tpu_vote: sock_addr, - rpc: sock_addr, - rpc_pubsub: sock_addr, - serve_repair: sock_addr, - wallclock: 123456789, - shred_version: 1, - } + ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321) } #[test]