From ded457cd733bc5c9cbcab08eddbfdcf5f827a0ee Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 10 Feb 2023 20:07:45 +0000 Subject: [PATCH] embeds the new gossip ContactInfo in ClusterInfo (#30022) Working towards replacing the legacy gossip contact-info with the new one, the commit updates the respective field in gossip cluster-info. --- accounts-cluster-bench/src/main.rs | 2 +- bench-tps/tests/bench_tps.rs | 4 +- core/benches/cluster_info.rs | 2 +- core/benches/retransmit_stage.rs | 14 +- core/src/accounts_hash_verifier.rs | 5 +- core/src/ancestor_hashes_service.rs | 20 +- .../broadcast_fake_shreds_run.rs | 2 +- core/src/cluster_nodes.rs | 25 +- core/src/repair_service.rs | 6 +- core/src/serve_repair.rs | 76 ++-- core/src/staked_nodes_updater_service.rs | 6 +- core/src/tpu.rs | 12 +- core/src/validator.rs | 64 +-- core/src/window_service.rs | 2 +- core/tests/epoch_accounts_hash.rs | 4 +- core/tests/snapshots.rs | 13 +- dos/src/main.rs | 43 +- gossip/src/cluster_info.rs | 372 +++++++++++------- gossip/src/gossip_error.rs | 4 +- gossip/src/gossip_service.rs | 7 +- gossip/src/legacy_contact_info.rs | 80 +--- gossip/tests/gossip.rs | 8 +- local-cluster/src/cluster.rs | 2 +- local-cluster/src/cluster_tests.rs | 51 ++- local-cluster/src/local_cluster.rs | 54 ++- .../src/local_cluster_snapshot_utils.rs | 2 +- local-cluster/tests/common.rs | 2 +- local-cluster/tests/local_cluster.rs | 75 ++-- local-cluster/tests/local_cluster_slow_1.rs | 9 +- local-cluster/tests/local_cluster_slow_2.rs | 10 +- rpc/src/cluster_tpu_info.rs | 2 +- rpc/src/rpc.rs | 41 +- rpc/src/rpc_service.rs | 5 +- streamer/src/socket.rs | 1 + test-validator/src/lib.rs | 20 +- transaction-dos/src/main.rs | 2 +- validator/src/admin_rpc_service.rs | 65 ++- validator/src/bootstrap.rs | 28 +- validator/src/main.rs | 32 +- 39 files changed, 648 insertions(+), 524 deletions(-) diff --git a/accounts-cluster-bench/src/main.rs b/accounts-cluster-bench/src/main.rs index 0a578e96e..526a8e29c 100644 --- a/accounts-cluster-bench/src/main.rs +++ b/accounts-cluster-bench/src/main.rs @@ -740,7 +740,7 @@ pub mod test { let num_instructions = 2; let mut start = Measure::start("total accounts run"); run_accounts_bench( - cluster.entry_point_info.rpc, + cluster.entry_point_info.rpc().unwrap(), faucet_addr, &[&cluster.funding_keypair], iterations, diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 3da677ed1..3b79c24a3 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -80,8 +80,8 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc, - cluster.entry_point_info.tpu, + cluster.entry_point_info.rpc().unwrap(), + cluster.entry_point_info.tpu().unwrap(), cluster.connection_cache.clone(), )); diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 4b732d360..04eb85c2d 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -12,7 +12,7 @@ use { }, solana_gossip::{ cluster_info::{ClusterInfo, Node}, - legacy_contact_info::LegacyContactInfo as ContactInfo, + contact_info::ContactInfo, }, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 1377ca085..e74cd815a 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -10,7 +10,7 @@ use { solana_entry::entry::Entry, solana_gossip::{ cluster_info::{ClusterInfo, Node}, - legacy_contact_info::LegacyContactInfo as ContactInfo, + contact_info::ContactInfo, }, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -29,7 +29,7 @@ use { solana_streamer::socket::SocketAddrSpace, std::{ iter::repeat_with, - net::UdpSocket, + net::{Ipv4Addr, UdpSocket}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, @@ -60,10 +60,12 @@ fn bench_retransmitter(bencher: &mut Bencher) { let id = Pubkey::new_unique(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); - contact_info.tvu = socket.local_addr().unwrap(); - contact_info.tvu.set_ip("127.0.0.1".parse().unwrap()); - contact_info.tvu_forwards = contact_info.tvu; - info!("local: {:?}", contact_info.tvu); + let port = socket.local_addr().unwrap().port(); + contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap(); + contact_info + .set_tvu_forwards(contact_info.tvu().unwrap()) + .unwrap(); + info!("local: {:?}", contact_info.tvu().unwrap()); cluster_info.insert_info(contact_info); socket.set_nonblocking(true).unwrap(); socket diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index daa53bf65..03ff4ef31 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -464,10 +464,7 @@ mod tests { use { super::*, rand::seq::SliceRandom, - solana_gossip::{ - cluster_info::make_accounts_hashes_message, - legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}, solana_sdk::{ hash::hash, signature::{Keypair, Signer}, diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 6f134c89b..c571012bc 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -772,7 +772,7 @@ mod test { }, solana_gossip::{ cluster_info::{ClusterInfo, Node}, - legacy_contact_info::LegacyContactInfo as ContactInfo, + contact_info::ContactInfo, }, solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce}, solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, @@ -1151,13 +1151,13 @@ mod test { ) { let request_bytes = requester_serve_repair.ancestor_repair_request_bytes( &requester_cluster_info.keypair(), - &responder_info.id, + responder_info.pubkey(), dead_slot, nonce, ); if let Ok(request_bytes) = request_bytes { - let _ = - ancestor_hashes_request_socket.send_to(&request_bytes, responder_info.serve_repair); + let socket = responder_info.serve_repair().unwrap(); + let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket); } } @@ -1225,7 +1225,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair); + .set_socket_addr(&responder_info.serve_repair().unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1239,7 +1239,7 @@ mod test { assert_eq!(decision, None); // Add the responder to the eligible list for requests - let responder_id = responder_info.id; + let responder_id = *responder_info.pubkey(); cluster_slots.insert_node_id(dead_slot, responder_id); requester_cluster_info.insert_info(responder_info.clone()); // Now the request should actually be made @@ -1265,7 +1265,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair); + .set_socket_addr(&responder_info.serve_repair().unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1588,7 +1588,7 @@ mod test { let (dumped_slots_sender, _dumped_slots_receiver) = unbounded(); // Add the responder to the eligible list for requests - let responder_id = responder_info.id; + let responder_id = *responder_info.pubkey(); cluster_slots.insert_node_id(dead_slot, responder_id); requester_cluster_info.insert_info(responder_info.clone()); @@ -1608,7 +1608,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair); + .set_socket_addr(&responder_info.serve_repair().unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1670,7 +1670,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair); + .set_socket_addr(&responder_info.serve_repair().unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index ac32888bf..4a61ee378 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -159,7 +159,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { mod tests { use { super::*, - solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo, + solana_gossip::contact_info::ContactInfo, solana_sdk::signature::Signer, solana_streamer::socket::SocketAddrSpace, std::net::{IpAddr, Ipv4Addr, SocketAddr}, diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index a2daad10c..3d2dea96c 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -9,7 +9,7 @@ use { crds::GossipRoute, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::{CrdsData, CrdsValue}, - legacy_contact_info::LegacyContactInfo as ContactInfo, + legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo}, weighted_shuffle::WeightedShuffle, }, solana_ledger::shred::ShredId, @@ -316,7 +316,9 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec( HashMap, // stakes ClusterInfo, ) { + use solana_gossip::contact_info::ContactInfo; let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7)); - let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) - .take(num_nodes) - .collect(); + let mut nodes: Vec<_> = repeat_with(|| { + let pubkey = solana_sdk::pubkey::new_rand(); + ContactInfo::new_localhost(&pubkey, /*wallclock:*/ timestamp()) + }) + .take(num_nodes) + .collect(); nodes.shuffle(rng); let keypair = Arc::new(Keypair::new()); - nodes[0].id = keypair.pubkey(); + nodes[0].set_pubkey(keypair.pubkey()); let this_node = nodes[0].clone(); let mut stakes: HashMap = nodes .iter() @@ -472,13 +478,18 @@ pub fn make_test_cluster( if rng.gen_ratio(unstaked_numerator, unstaked_denominator) { None // No stake for some of the nodes. } else { - Some((node.id, rng.gen_range(0, 20))) + Some((*node.pubkey(), rng.gen_range(0, 20))) } }) .collect(); // Add some staked nodes with no contact-info. stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100)); let cluster_info = ClusterInfo::new(this_node, keypair, SocketAddrSpace::Unspecified); + let nodes: Vec<_> = nodes + .iter() + .map(LegacyContactInfo::try_from) + .collect::>() + .unwrap(); { let now = timestamp(); let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index cf0de22ee..8ae57bea3 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -803,9 +803,7 @@ pub(crate) fn post_shred_deferment_timestamp() -> u64 { mod test { use { super::*, - solana_gossip::{ - cluster_info::Node, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::{ blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, Blockstore, @@ -1277,7 +1275,7 @@ mod test { // a valid target for repair let dead_slot = 9; let cluster_slots = ClusterSlots::default(); - cluster_slots.insert_node_id(dead_slot, valid_repair_peer.id); + cluster_slots.insert_node_id(dead_slot, *valid_repair_peer.pubkey()); cluster_info.insert_info(valid_repair_peer); // Not enough time has passed, should not update the diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 41935b618..c6a111024 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1332,7 +1332,7 @@ mod tests { use { super::*, crate::{repair_response, result::Error}, - solana_gossip::{socketaddr, socketaddr_any}, + solana_gossip::{contact_info::ContactInfo, socketaddr, socketaddr_any}, solana_ledger::{ blockstore::make_many_slot_entries, blockstore_processor::fill_blockstore_slot_with_ticks, @@ -1810,21 +1810,21 @@ mod tests { assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers))); let serve_repair_addr = socketaddr!(Ipv4Addr::LOCALHOST, 1243); - let nxt = ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234), - tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235), - tvu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1236), - repair: socketaddr!(Ipv4Addr::LOCALHOST, 1237), - tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238), - tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239), - tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240), - rpc: socketaddr!(Ipv4Addr::LOCALHOST, 1241), - rpc_pubsub: socketaddr!(Ipv4Addr::LOCALHOST, 1242), - serve_repair: serve_repair_addr, - wallclock: 0, - shred_version: 0, - }; + let mut nxt = ContactInfo::new( + solana_sdk::pubkey::new_rand(), + timestamp(), // wallclock + 0u16, // shred_version + ); + nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap(); + nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap(); + nxt.set_tvu_forwards((Ipv4Addr::LOCALHOST, 1236)).unwrap(); + nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap(); + nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap(); + nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap(); + nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap(); + nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); + nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); + nxt.set_serve_repair(serve_repair_addr).unwrap(); cluster_info.insert_info(nxt.clone()); let rv = serve_repair .repair_request( @@ -1837,25 +1837,25 @@ mod tests { &identity_keypair, ) .unwrap(); - assert_eq!(nxt.serve_repair, serve_repair_addr); - assert_eq!(rv.0, nxt.serve_repair); + assert_eq!(nxt.serve_repair().unwrap(), serve_repair_addr); + assert_eq!(rv.0, nxt.serve_repair().unwrap()); let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243); - let nxt = ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234), - tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235), - tvu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1236), - repair: socketaddr!(Ipv4Addr::LOCALHOST, 1237), - tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238), - tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239), - tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240), - rpc: socketaddr!(Ipv4Addr::LOCALHOST, 1241), - rpc_pubsub: socketaddr!(Ipv4Addr::LOCALHOST, 1242), - serve_repair: serve_repair_addr2, - wallclock: 0, - shred_version: 0, - }; + let mut nxt = ContactInfo::new( + solana_sdk::pubkey::new_rand(), + timestamp(), // wallclock + 0u16, // shred_version + ); + nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap(); + nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap(); + nxt.set_tvu_forwards((Ipv4Addr::LOCALHOST, 1236)).unwrap(); + nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap(); + nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap(); + nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap(); + nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap(); + nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); + nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); + nxt.set_serve_repair(serve_repair_addr2).unwrap(); cluster_info.insert_info(nxt); let mut one = false; let mut two = false; @@ -2136,7 +2136,7 @@ mod tests { // 1) repair validator set doesn't exist in gossip // 2) repair validator set only includes our own id // then no repairs should be generated - for pubkey in &[solana_sdk::pubkey::new_rand(), me.id] { + for pubkey in &[solana_sdk::pubkey::new_rand(), *me.pubkey()] { let known_validators = Some(vec![*pubkey].into_iter().collect()); assert!(serve_repair.repair_peers(&known_validators, 1).is_empty()); assert!(serve_repair @@ -2153,10 +2153,10 @@ mod tests { } // If known validator exists in gossip, should return repair successfully - let known_validators = Some(vec![contact_info2.id].into_iter().collect()); + let known_validators = Some(vec![*contact_info2.pubkey()].into_iter().collect()); let repair_peers = serve_repair.repair_peers(&known_validators, 1); assert_eq!(repair_peers.len(), 1); - assert_eq!(repair_peers[0].id, contact_info2.id); + assert_eq!(&repair_peers[0].id, contact_info2.pubkey()); assert!(serve_repair .repair_request( &cluster_slots, @@ -2177,8 +2177,8 @@ mod tests { .map(|c| c.id) .collect(); assert_eq!(repair_peers.len(), 2); - assert!(repair_peers.contains(&contact_info2.id)); - assert!(repair_peers.contains(&contact_info3.id)); + assert!(repair_peers.contains(contact_info2.pubkey())); + assert!(repair_peers.contains(contact_info3.pubkey())); assert!(serve_repair .repair_request( &cluster_slots, diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 495bd0bf0..db5116a6a 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -100,10 +100,12 @@ impl StakedNodesUpdaterService { Some((node.tvu.ip(), *stake)) }) .collect(); - let my_pubkey = cluster_info.my_contact_info().id; + let my_pubkey = *cluster_info.my_contact_info().pubkey(); if let Some(stake) = staked_nodes.get(&my_pubkey) { id_to_stake.insert(my_pubkey, *stake); - ip_to_stake.insert(cluster_info.my_contact_info().tvu.ip(), *stake); + if let Ok(tvu) = cluster_info.my_contact_info().tvu() { + ip_to_stake.insert(tvu.ip(), *stake); + } } Self::override_stake( cluster_info, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index fca209c7d..4d8daedf3 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -164,7 +164,11 @@ impl Tpu { let (_, tpu_quic_t) = spawn_server( transactions_quic_sockets, keypair, - cluster_info.my_contact_info().tpu.ip(), + cluster_info + .my_contact_info() + .tpu_quic() + .expect("Operator must spin up node with valid (QUIC) TPU address") + .ip(), packet_sender, exit.clone(), MAX_QUIC_CONNECTIONS_PER_PEER, @@ -179,7 +183,11 @@ impl Tpu { let (_, tpu_forwards_quic_t) = spawn_server( transactions_forwards_quic_sockets, keypair, - cluster_info.my_contact_info().tpu_forwards.ip(), + cluster_info + .my_contact_info() + .tpu_forwards_quic() + .expect("Operator must spin up node with valid (QUIC) TPU-forwards address") + .ip(), forwarded_packet_sender, exit.clone(), MAX_QUIC_CONNECTIONS_PER_PEER, diff --git a/core/src/validator.rs b/core/src/validator.rs index 26980f5c2..750697005 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -398,7 +398,7 @@ impl Validator { tpu_enable_udp: bool, ) -> Result { let id = identity_keypair.pubkey(); - assert_eq!(id, node.info.id); + assert_eq!(&id, node.info.pubkey()); warn!("identity: {}", id); warn!("vote account: {}", vote_account); @@ -555,8 +555,8 @@ impl Validator { Some(poh_timing_point_sender.clone()), )?; - node.info.wallclock = timestamp(); - node.info.shred_version = compute_shred_version( + node.info.set_wallclock(timestamp()); + node.info.set_shred_version(compute_shred_version( &genesis_config.hash(), Some( &bank_forks @@ -567,15 +567,16 @@ impl Validator { .read() .unwrap(), ), - ); + )); Self::print_node_info(&node); if let Some(expected_shred_version) = config.expected_shred_version { - if expected_shred_version != node.info.shred_version { + if expected_shred_version != node.info.shred_version() { return Err(format!( "shred version mismatch: expected {} found: {}", - expected_shred_version, node.info.shred_version, + expected_shred_version, + node.info.shred_version(), )); } } @@ -762,7 +763,13 @@ impl Validator { let connection_cache = ConnectionCache::new_with_client_options( tpu_connection_pool_size, None, - Some((&identity_keypair, node.info.tpu.ip())), + Some(( + &identity_keypair, + node.info + .tpu() + .expect("Operator must spin up node with valid TPU address") + .ip(), + )), Some((&staked_nodes, &identity_keypair.pubkey())), ); Arc::new(connection_cache) @@ -781,18 +788,16 @@ impl Validator { optimistically_confirmed_bank_tracker, bank_notification_sender, ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs { - if ContactInfo::is_valid_address(&node.info.rpc, &socket_addr_space) { - assert!(ContactInfo::is_valid_address( - &node.info.rpc_pubsub, - &socket_addr_space - )); - } else { - assert!(!ContactInfo::is_valid_address( - &node.info.rpc_pubsub, - &socket_addr_space - )); - } - + assert_eq!( + node.info + .rpc() + .map(|addr| socket_addr_space.check(&addr)) + .ok(), + node.info + .rpc_pubsub() + .map(|addr| socket_addr_space.check(&addr)) + .ok() + ); let (bank_notification_sender, bank_notification_receiver) = unbounded(); let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() { Some(Arc::new(RwLock::new(bank_notification_senders))) @@ -873,7 +878,7 @@ impl Validator { None => None, Some(tcp_listener) => Some(solana_net_utils::ip_echo_server( tcp_listener, - Some(node.info.shred_version), + Some(node.info.shred_version()), )), }; @@ -1002,7 +1007,7 @@ impl Validator { cluster_confirmed_slot_receiver, TvuConfig { max_ledger_shreds: config.max_ledger_shreds, - shred_version: node.info.shred_version, + shred_version: node.info.shred_version(), repair_validators: config.repair_validators.clone(), repair_whitelist: config.repair_whitelist.clone(), wait_for_vote_to_start_leader, @@ -1036,7 +1041,7 @@ impl Validator { &blockstore, &config.broadcast_stage_type, &exit, - node.info.shred_version, + node.info.shred_version(), vote_tracker, bank_forks.clone(), verified_vote_sender, @@ -2088,6 +2093,7 @@ mod tests { use { super::*, crossbeam_channel::{bounded, RecvTimeoutError}, + solana_gossip::contact_info::{ContactInfo, LegacyContactInfo}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, solana_tpu_client::tpu_client::{ @@ -2111,7 +2117,10 @@ mod tests { let voting_keypair = Arc::new(Keypair::new()); let config = ValidatorConfig { - rpc_addrs: Some((validator_node.info.rpc, validator_node.info.rpc_pubsub)), + rpc_addrs: Some(( + validator_node.info.rpc().unwrap(), + validator_node.info.rpc_pubsub().unwrap(), + )), ..ValidatorConfig::default_for_test() }; let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); @@ -2121,7 +2130,7 @@ mod tests { &validator_ledger_path, &voting_keypair.pubkey(), Arc::new(RwLock::new(vec![voting_keypair.clone()])), - vec![leader_node.info], + vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], &config, true, // should_check_duplicate_instance start_progress.clone(), @@ -2204,7 +2213,10 @@ mod tests { ledger_paths.push(validator_ledger_path.clone()); let vote_account_keypair = Keypair::new(); let config = ValidatorConfig { - rpc_addrs: Some((validator_node.info.rpc, validator_node.info.rpc_pubsub)), + rpc_addrs: Some(( + validator_node.info.rpc().unwrap(), + validator_node.info.rpc_pubsub().unwrap(), + )), ..ValidatorConfig::default_for_test() }; Validator::new( @@ -2213,7 +2225,7 @@ mod tests { &validator_ledger_path, &vote_account_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])), - vec![leader_node.info.clone()], + vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], &config, true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 92d2924c1..e68edd60a 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -480,7 +480,7 @@ mod test { use { super::*, solana_entry::entry::{create_ticks, Entry}, - solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo, + solana_gossip::contact_info::ContactInfo, solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, get_tmp_ledger_path, diff --git a/core/tests/epoch_accounts_hash.rs b/core/tests/epoch_accounts_hash.rs index 466442317..fda98516b 100755 --- a/core/tests/epoch_accounts_hash.rs +++ b/core/tests/epoch_accounts_hash.rs @@ -6,9 +6,7 @@ use { accounts_hash_verifier::AccountsHashVerifier, snapshot_packager_service::SnapshotPackagerService, }, - solana_gossip::{ - cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_runtime::{ accounts_background_service::{ AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index b8c21bac5..b2c2e017c 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -10,9 +10,7 @@ use { accounts_hash_verifier::AccountsHashVerifier, snapshot_packager_service::SnapshotPackagerService, }, - solana_gossip::{ - cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_runtime::{ accounts_background_service::{ AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, @@ -516,10 +514,11 @@ fn test_concurrent_snapshot_packaging( let cluster_info = Arc::new({ let keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo { - id: keypair.pubkey(), - ..ContactInfo::default() - }; + let contact_info = ContactInfo::new( + keypair.pubkey(), + timestamp(), // wallclock + 0u16, // shred_version + ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); diff --git a/dos/src/main.rs b/dos/src/main.rs index 1f14c059b..5537b468b 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -789,6 +789,7 @@ pub mod test { solana_client::thin_client::ThinClient, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, + solana_gossip::contact_info::LegacyContactInfo, solana_local_cluster::{ cluster::Cluster, local_cluster::{ClusterConfig, LocalCluster}, @@ -896,7 +897,11 @@ pub mod test { assert_eq!(cluster.validators.len(), num_nodes); let nodes = cluster.get_node_pubkeys(); - let node = cluster.get_contact_info(&nodes[0]).unwrap().clone(); + let node = cluster + .get_contact_info(&nodes[0]) + .map(LegacyContactInfo::try_from) + .unwrap() + .unwrap(); let nodes_slice = [node]; // send random transactions to TPU @@ -905,7 +910,7 @@ pub mod test { &nodes_slice, 10, DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 1024, data_type: DataType::Random, @@ -929,12 +934,16 @@ pub mod test { assert_eq!(cluster.validators.len(), num_nodes); let nodes = cluster.get_node_pubkeys(); - let node = cluster.get_contact_info(&nodes[0]).unwrap().clone(); + let node = cluster + .get_contact_info(&nodes[0]) + .map(LegacyContactInfo::try_from) + .unwrap() + .unwrap(); let nodes_slice = [node]; let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc, - cluster.entry_point_info.tpu, + cluster.entry_point_info.rpc().unwrap(), + cluster.entry_point_info.tpu().unwrap(), cluster.connection_cache.clone(), )); @@ -944,7 +953,7 @@ pub mod test { 10, Some(client.clone()), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant data_type: DataType::Transaction, @@ -971,7 +980,7 @@ pub mod test { 10, Some(client.clone()), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant data_type: DataType::Transaction, @@ -998,7 +1007,7 @@ pub mod test { 10, Some(client), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant data_type: DataType::Transaction, @@ -1061,12 +1070,16 @@ pub mod test { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); let nodes = cluster.get_node_pubkeys(); - let node = cluster.get_contact_info(&nodes[0]).unwrap().clone(); + let node = cluster + .get_contact_info(&nodes[0]) + .map(LegacyContactInfo::try_from) + .unwrap() + .unwrap(); let nodes_slice = [node]; let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc, - cluster.entry_point_info.tpu, + cluster.entry_point_info.rpc().unwrap(), + cluster.entry_point_info.tpu().unwrap(), cluster.connection_cache.clone(), )); @@ -1077,7 +1090,7 @@ pub mod test { 10, Some(client.clone()), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant if not random data_type: DataType::Transaction, @@ -1106,7 +1119,7 @@ pub mod test { 10, Some(client.clone()), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant if not random data_type: DataType::Transaction, @@ -1134,7 +1147,7 @@ pub mod test { 10, Some(client.clone()), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant if not random data_type: DataType::Transaction, @@ -1162,7 +1175,7 @@ pub mod test { 10, Some(client), DosClientParameters { - entrypoint_addr: cluster.entry_point_info.gossip, + entrypoint_addr: cluster.entry_point_info.gossip().unwrap(), mode: Mode::Tpu, data_size: 0, // irrelevant if not random data_type: DataType::Transaction, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e372cfe1f..d52712b7e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -24,6 +24,7 @@ use { cluster_info_metrics::{ submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard, }, + contact_info::{ContactInfo, LegacyContactInfo}, crds::{Crds, Cursor, GossipRoute}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, @@ -35,7 +36,6 @@ use { duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, gossip_error::GossipError, - legacy_contact_info::LegacyContactInfo as ContactInfo, ping_pong::{self, PingCache, Pong}, socketaddr, socketaddr_any, weighted_shuffle::WeightedShuffle, @@ -161,7 +161,7 @@ pub struct ClusterInfo { /// set the keypair that will be used to sign crds values generated. It is unset only in tests. keypair: RwLock>, /// Network entrypoints - entrypoints: RwLock>, + entrypoints: RwLock>, outbound_budget: DataBudget, my_contact_info: RwLock, ping_cache: Mutex, @@ -407,8 +407,8 @@ impl ClusterInfo { keypair: Arc, socket_addr_space: SocketAddrSpace, ) -> Self { - assert_eq!(contact_info.id, keypair.pubkey()); - let id = contact_info.id; + assert_eq!(contact_info.pubkey(), &keypair.pubkey()); + let id = *contact_info.pubkey(); let me = Self { gossip: CrdsGossip::default(), keypair: RwLock::new(keypair), @@ -444,9 +444,16 @@ impl ClusterInfo { fn push_self(&self) { let now = timestamp(); - self.my_contact_info.write().unwrap().wallclock = now; - let entries: Vec<_> = vec![ - CrdsData::LegacyContactInfo(self.my_contact_info()), + let node = { + let mut node = self.my_contact_info.write().unwrap(); + node.set_wallclock(now); + node.clone() + }; + let entries: Vec<_> = [ + LegacyContactInfo::try_from(&node) + .map(CrdsData::LegacyContactInfo) + .expect("Operator must spin up node with valid contact-info"), + CrdsData::ContactInfo(node), CrdsData::NodeInstance(self.instance.read().unwrap().with_wallclock(now)), ] .into_iter() @@ -465,7 +472,7 @@ impl ClusterInfo { gossip_validators: Option<&HashSet>, sender: &PacketBatchSender, ) { - let ContactInfo { shred_version, .. } = *self.my_contact_info.read().unwrap(); + let shred_version = self.my_contact_info.read().unwrap().shred_version(); let self_keypair: Arc = self.keypair().clone(); let mut pings = Vec::new(); self.gossip.refresh_push_active_set( @@ -498,18 +505,34 @@ impl ClusterInfo { } // TODO kill insert_info, only used by tests - pub fn insert_info(&self, contact_info: ContactInfo) { + pub fn insert_legacy_info(&self, contact_info: LegacyContactInfo) { let value = CrdsValue::new_signed(CrdsData::LegacyContactInfo(contact_info), &self.keypair()); let mut gossip_crds = self.gossip.crds.write().unwrap(); let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage); } - pub fn set_entrypoint(&self, entrypoint: ContactInfo) { + pub fn insert_info(&self, node: ContactInfo) { + let entries: Vec<_> = [ + LegacyContactInfo::try_from(&node) + .map(CrdsData::LegacyContactInfo) + .expect("Operator must spin up node with valid contact-info"), + CrdsData::ContactInfo(node), + ] + .into_iter() + .map(|entry| CrdsValue::new_signed(entry, &self.keypair())) + .collect(); + let mut gossip_crds = self.gossip.crds.write().unwrap(); + for entry in entries { + let _ = gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage); + } + } + + pub fn set_entrypoint(&self, entrypoint: LegacyContactInfo) { self.set_entrypoints(vec![entrypoint]); } - pub fn set_entrypoints(&self, entrypoints: Vec) { + pub fn set_entrypoints(&self, entrypoints: Vec) { *self.entrypoints.write().unwrap() = entrypoints; } @@ -638,7 +661,7 @@ impl ClusterInfo { *instance = NodeInstance::new(&mut thread_rng(), id, timestamp()); } *self.keypair.write().unwrap() = new_keypair; - self.my_contact_info.write().unwrap().id = id; + self.my_contact_info.write().unwrap().set_pubkey(id); self.insert_self(); self.push_message(CrdsValue::new_signed( @@ -650,7 +673,7 @@ impl ClusterInfo { pub fn lookup_contact_info(&self, id: &Pubkey, map: F) -> Option where - F: FnOnce(&ContactInfo) -> Y, + F: FnOnce(&LegacyContactInfo) -> Y, { let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds.get(*id).map(map) @@ -659,7 +682,7 @@ impl ClusterInfo { pub fn lookup_contact_info_by_gossip_addr( &self, gossip_addr: &SocketAddr, - ) -> Option { + ) -> Option { let gossip_crds = self.gossip.crds.read().unwrap(); let mut nodes = gossip_crds.get_nodes_contact_info(); nodes.find(|node| node.gossip == *gossip_addr).cloned() @@ -670,7 +693,7 @@ impl ClusterInfo { } pub fn my_shred_version(&self) -> u16 { - self.my_contact_info.read().unwrap().shred_version + self.my_contact_info.read().unwrap().shred_version() } fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { @@ -1095,7 +1118,9 @@ impl ClusterInfo { transaction: &Transaction, tpu: Option, ) -> Result<(), GossipError> { - let tpu = tpu.unwrap_or_else(|| self.my_contact_info().tpu); + let tpu = tpu + .map(Ok) + .unwrap_or_else(|| self.my_contact_info().tpu())?; let buf = serialize(transaction)?; self.socket.send_to(&buf, tpu)?; Ok(()) @@ -1224,7 +1249,7 @@ impl ClusterInfo { } /// all validators that have a valid rpc port regardless of `shred_version`. - pub fn all_rpc_peers(&self) -> Vec { + pub fn all_rpc_peers(&self) -> Vec { let self_pubkey = self.id(); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds @@ -1238,7 +1263,7 @@ impl ClusterInfo { } // All nodes in gossip (including spy nodes) and the last time we heard about them - pub fn all_peers(&self) -> Vec<(ContactInfo, u64)> { + pub fn all_peers(&self) -> Vec<(LegacyContactInfo, u64)> { let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get_nodes() @@ -1246,7 +1271,7 @@ impl ClusterInfo { .collect() } - pub fn gossip_peers(&self) -> Vec { + pub fn gossip_peers(&self) -> Vec { let me = self.id(); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds @@ -1260,7 +1285,7 @@ impl ClusterInfo { } /// all validators that have a valid tvu port regardless of `shred_version`. - pub fn all_tvu_peers(&self) -> Vec { + pub fn all_tvu_peers(&self) -> Vec { let self_pubkey = self.id(); self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) .get_nodes_contact_info() @@ -1273,7 +1298,7 @@ impl ClusterInfo { } /// all validators that have a valid tvu port and are on the same `shred_version`. - pub fn tvu_peers(&self) -> Vec { + pub fn tvu_peers(&self) -> Vec { let self_pubkey = self.id(); let self_shred_version = self.my_shred_version(); self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers) @@ -1288,7 +1313,7 @@ impl ClusterInfo { } /// all tvu peers with valid gossip addrs that likely have the slot being requested - pub fn repair_peers(&self, slot: Slot) -> Vec { + pub fn repair_peers(&self, slot: Slot) -> Vec { let _st = ScopedTimer::from(&self.stats.repair_peers); let self_pubkey = self.id(); let self_shred_version = self.my_shred_version(); @@ -1309,14 +1334,14 @@ impl ClusterInfo { .collect() } - fn is_spy_node(contact_info: &ContactInfo, socket_addr_space: &SocketAddrSpace) -> bool { + fn is_spy_node(contact_info: &LegacyContactInfo, socket_addr_space: &SocketAddrSpace) -> bool { !ContactInfo::is_valid_address(&contact_info.tpu, socket_addr_space) || !ContactInfo::is_valid_address(&contact_info.gossip, socket_addr_space) || !ContactInfo::is_valid_address(&contact_info.tvu, socket_addr_space) } /// compute broadcast table - pub fn tpu_peers(&self) -> Vec { + pub fn tpu_peers(&self) -> Vec { let self_pubkey = self.id(); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds @@ -1330,19 +1355,29 @@ impl ClusterInfo { } fn insert_self(&self) { - let value = CrdsValue::new_signed( - CrdsData::LegacyContactInfo(self.my_contact_info()), - &self.keypair(), - ); + let node = self.my_contact_info(); + let entries: Vec<_> = [ + LegacyContactInfo::try_from(&node) + .map(CrdsData::LegacyContactInfo) + .expect("Operator must spin up node with valid contact-info"), + CrdsData::ContactInfo(node), + ] + .into_iter() + .map(|entry| CrdsValue::new_signed(entry, &self.keypair())) + .collect(); let mut gossip_crds = self.gossip.crds.write().unwrap(); - let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage); + for entry in entries { + if let Err(err) = gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage) { + error!("Insert self failed: {err:?}"); + } + } } // If the network entrypoint hasn't been discovered yet, add it to the crds table fn append_entrypoint_to_pulls( &self, thread_pool: &ThreadPool, - pulls: &mut HashMap>, + pulls: &mut HashMap>, ) { const THROTTLE_DELAY: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2; let entrypoint = { @@ -1461,7 +1496,9 @@ impl ClusterInfo { self.append_entrypoint_to_pulls(thread_pool, &mut pulls); let num_requests = pulls.values().map(Vec::len).sum::() as u64; self.stats.new_pull_requests_count.add_relaxed(num_requests); - let self_info = CrdsData::LegacyContactInfo(self.my_contact_info()); + let self_info = LegacyContactInfo::try_from(&self.my_contact_info()) + .map(CrdsData::LegacyContactInfo) + .expect("Operator must spin up node with valid contact-info"); let self_info = CrdsValue::new_signed(self_info, &self.keypair()); let pulls = pulls .into_iter() @@ -1514,7 +1551,7 @@ impl ClusterInfo { push_messages .into_iter() .filter_map(|(pubkey, messages)| { - let peer: &ContactInfo = gossip_crds.get(pubkey)?; + let peer: &LegacyContactInfo = gossip_crds.get(pubkey)?; Some((peer.gossip, messages)) }) .collect() @@ -1625,7 +1662,10 @@ impl ClusterInfo { "Setting shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id ); - self.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version; + self.my_contact_info + .write() + .unwrap() + .set_shred_version(entrypoint.shred_version); } } self.my_shred_version() != 0 @@ -2276,7 +2316,7 @@ impl ClusterInfo { .into_par_iter() .with_min_len(256) .filter_map(|(from, prunes)| { - let peer: &ContactInfo = gossip_crds.get(from)?; + let peer: &LegacyContactInfo = gossip_crds.get(from)?; let mut prune_data = PruneData { pubkey: self_pubkey, prunes, @@ -2665,13 +2705,9 @@ impl ClusterInfo { } pub fn gossip_contact_info(id: Pubkey, gossip: SocketAddr, shred_version: u16) -> ContactInfo { - ContactInfo { - id, - gossip, - wallclock: timestamp(), - shred_version, - ..ContactInfo::default() - } + let mut node = ContactInfo::new(id, /*wallclock:*/ timestamp(), shred_version); + let _ = node.set_gossip(gossip); + node } /// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip. @@ -2806,21 +2842,41 @@ impl Node { let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let ancestor_hashes_requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let info = ContactInfo { - id: *pubkey, - gossip: gossip_addr, - tvu: tvu.local_addr().unwrap(), - tvu_forwards: tvu_forwards.local_addr().unwrap(), - repair: repair.local_addr().unwrap(), - tpu: tpu.local_addr().unwrap(), - tpu_forwards: tpu_forwards.local_addr().unwrap(), - tpu_vote: tpu_vote.local_addr().unwrap(), - rpc: rpc_addr, - rpc_pubsub: rpc_pubsub_addr, - serve_repair: serve_repair.local_addr().unwrap(), - wallclock: timestamp(), - shred_version: 0, - }; + let mut info = ContactInfo::new( + *pubkey, + timestamp(), // wallclock + 0u16, // shred_version + ); + macro_rules! set_socket { + ($method:ident, $addr:expr, $name:literal) => { + info.$method($addr).expect(&format!( + "Operator must spin up node with valid {} address", + $name + )) + }; + } + set_socket!(set_gossip, gossip_addr, "gossip"); + set_socket!(set_tvu, tvu.local_addr().unwrap(), "TVU"); + set_socket!( + set_tvu_forwards, + tvu_forwards.local_addr().unwrap(), + "TVU-forwards" + ); + set_socket!(set_repair, repair.local_addr().unwrap(), "repair"); + set_socket!(set_tpu, tpu.local_addr().unwrap(), "TPU"); + set_socket!( + set_tpu_forwards, + tpu_forwards.local_addr().unwrap(), + "TPU-forwards" + ); + set_socket!(set_tpu_vote, tpu_vote.local_addr().unwrap(), "TPU-vote"); + set_socket!(set_rpc, rpc_addr, "RPC"); + set_socket!(set_rpc_pubsub, rpc_pubsub_addr, "RPC-pubsub"); + set_socket!( + set_serve_repair, + serve_repair.local_addr().unwrap(), + "serve-repair" + ); Node { info, sockets: Sockets { @@ -2886,21 +2942,30 @@ impl Node { let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); - let info = ContactInfo { - id: *pubkey, - gossip: SocketAddr::new(gossip_addr.ip(), gossip_port), - tvu: SocketAddr::new(gossip_addr.ip(), tvu_port), - tvu_forwards: SocketAddr::new(gossip_addr.ip(), tvu_forwards_port), - repair: SocketAddr::new(gossip_addr.ip(), repair_port), - tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), - tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), - rpc: SocketAddr::new(gossip_addr.ip(), rpc_port), - rpc_pubsub: SocketAddr::new(gossip_addr.ip(), rpc_pubsub_port), - serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), - wallclock: timestamp(), - shred_version: 0, - }; + let addr = gossip_addr.ip(); + let mut info = ContactInfo::new( + *pubkey, + timestamp(), // wallclock + 0u16, // shred_version + ); + macro_rules! set_socket { + ($method:ident, $port:ident, $name:literal) => { + info.$method((addr, $port)).expect(&format!( + "Operator must spin up node with valid {} address", + $name + )) + }; + } + set_socket!(set_gossip, gossip_port, "gossip"); + set_socket!(set_tvu, tvu_port, "TVU"); + set_socket!(set_tvu_forwards, tvu_forwards_port, "TVU-forwards"); + set_socket!(set_repair, repair_port, "repair"); + set_socket!(set_tpu, tpu_port, "TPU"); + set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards"); + set_socket!(set_tpu_vote, tpu_vote_port, "TPU-vote"); + set_socket!(set_rpc, rpc_port, "RPC"); + set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub"); + set_socket!(set_serve_repair, serve_repair_port, "serve-repair"); trace!("new ContactInfo: {:?}", info); Node { @@ -2973,21 +3038,20 @@ impl Node { let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); - let info = ContactInfo { - id: *pubkey, - gossip: SocketAddr::new(gossip_addr.ip(), gossip_port), - tvu: SocketAddr::new(gossip_addr.ip(), tvu_port), - tvu_forwards: SocketAddr::new(gossip_addr.ip(), tvu_forwards_port), - repair: SocketAddr::new(gossip_addr.ip(), repair_port), - tpu: overwrite_tpu_addr.unwrap_or_else(|| SocketAddr::new(gossip_addr.ip(), tpu_port)), - tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - tpu_vote: SocketAddr::new(gossip_addr.ip(), tpu_vote_port), - rpc: socketaddr_any!(), - rpc_pubsub: socketaddr_any!(), - serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), - wallclock: 0, - shred_version: 0, - }; + let mut info = ContactInfo::new( + *pubkey, + timestamp(), // wallclock + 0u16, // shred_version + ); + let addr = gossip_addr.ip(); + let _ = info.set_gossip((addr, gossip_port)); + let _ = info.set_tvu((addr, tvu_port)); + let _ = info.set_tvu_forwards((addr, tvu_forwards_port)); + let _ = info.set_repair((addr, repair_port)); + let _ = info.set_tpu(overwrite_tpu_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_port))); + let _ = info.set_tpu_forwards((addr, tpu_forwards_port)); + let _ = info.set_tpu_vote((addr, tpu_vote_port)); + let _ = info.set_serve_repair((addr, serve_repair_port)); trace!("new ContactInfo: {:?}", info); Node { @@ -3130,7 +3194,7 @@ mod tests { //check that a gossip nodes always show up as spies let (node, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0); assert!(ClusterInfo::is_spy_node( - &node, + &LegacyContactInfo::try_from(&node).unwrap(), &SocketAddrSpace::Unspecified )); let (node, _, _) = ClusterInfo::gossip_node( @@ -3139,7 +3203,7 @@ mod tests { 0, ); assert!(ClusterInfo::is_spy_node( - &node, + &LegacyContactInfo::try_from(&node).unwrap(), &SocketAddrSpace::Unspecified )); } @@ -3165,21 +3229,21 @@ mod tests { let serve_repair = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8910); - let info = ContactInfo { - id: keypair.pubkey(), - gossip, - tvu, - tvu_forwards, - repair, - tpu, - tpu_forwards, - tpu_vote, - rpc, - rpc_pubsub, - serve_repair, - wallclock: timestamp(), - shred_version: 0, - }; + let mut info = ContactInfo::new( + keypair.pubkey(), + timestamp(), // wallclock + 0u16, // shred_version + ); + info.set_gossip(gossip).unwrap(); + info.set_tvu(tvu).unwrap(); + info.set_tvu_forwards(tvu_forwards).unwrap(); + info.set_repair(repair).unwrap(); + info.set_tpu(tpu).unwrap(); + info.set_tpu_forwards(tpu_forwards).unwrap(); + info.set_tpu_vote(tpu_vote).unwrap(); + info.set_rpc(rpc).unwrap(); + info.set_rpc_pubsub(rpc_pubsub).unwrap(); + info.set_serve_repair(serve_repair).unwrap(); Node { info, sockets: Sockets { @@ -3387,7 +3451,7 @@ RPC Enabled Nodes: 1"#; } fn test_crds_values(pubkey: Pubkey) -> Vec { - let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); + let entrypoint = LegacyContactInfo::new_localhost(&pubkey, timestamp()); let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(entrypoint)); vec![entrypoint_crdsvalue] } @@ -3577,7 +3641,7 @@ RPC Enabled Nodes: 1"#; let keypair = Arc::new(Keypair::new()); let d = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); let cluster_info = ClusterInfo::new(d.clone(), keypair, SocketAddrSpace::Unspecified); - assert_eq!(d.id, cluster_info.id()); + assert_eq!(d.pubkey(), &cluster_info.id()); } #[test] @@ -3586,7 +3650,7 @@ RPC Enabled Nodes: 1"#; let d = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); let cluster_info = ClusterInfo::new(d, keypair, SocketAddrSpace::Unspecified); let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); - let label = CrdsValueLabel::LegacyContactInfo(d.id); + let label = CrdsValueLabel::LegacyContactInfo(*d.pubkey()); cluster_info.insert_info(d); let gossip_crds = cluster_info.gossip.crds.read().unwrap(); assert!(gossip_crds.get::<&CrdsValue>(&label).is_some()); @@ -3622,7 +3686,7 @@ RPC Enabled Nodes: 1"#; #[test] fn new_with_external_ip_test_random() { - let ip = Ipv4Addr::UNSPECIFIED; + let ip = Ipv4Addr::LOCALHOST; let node = Node::new_with_external_ip( &solana_sdk::pubkey::new_rand(), &socketaddr!(ip, 0), @@ -3643,11 +3707,11 @@ RPC Enabled Nodes: 1"#; VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH), ); - let ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = bind_in_range(ip, port_range).expect("Failed to bind").0; let node = Node::new_with_external_ip( &solana_sdk::pubkey::new_rand(), - &socketaddr!(Ipv4Addr::UNSPECIFIED, port), + &socketaddr!(Ipv4Addr::LOCALHOST, port), port_range, ip, None, @@ -3674,11 +3738,11 @@ RPC Enabled Nodes: 1"#; SocketAddrSpace::Unspecified, ); let stakes = HashMap::::default(); - cluster_info - .ping_cache - .lock() - .unwrap() - .mock_pong(peer.id, peer.gossip, Instant::now()); + cluster_info.ping_cache.lock().unwrap().mock_pong( + *peer.pubkey(), + peer.gossip().unwrap(), + Instant::now(), + ); cluster_info.insert_info(peer); cluster_info.gossip.refresh_push_active_set( &cluster_info.keypair(), @@ -3950,7 +4014,7 @@ RPC Enabled Nodes: 1"#; // Test with different shred versions. let mut rng = rand::thread_rng(); let node_pubkey = Pubkey::new_unique(); - let mut node = ContactInfo::new_rand(&mut rng, Some(node_pubkey)); + let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); node.shred_version = 42; let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey)); let entries = vec![ @@ -3973,7 +4037,7 @@ RPC Enabled Nodes: 1"#; // Match shred versions. { let mut node = cluster_info.my_contact_info.write().unwrap(); - node.shred_version = 42; + node.set_shred_version(42); } cluster_info.push_self(); cluster_info.flush_push_queue(); @@ -3994,7 +4058,7 @@ RPC Enabled Nodes: 1"#; SocketAddrSpace::Unspecified, ); let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); - let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); + let entrypoint = LegacyContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); assert!(pings.is_empty()); @@ -4032,7 +4096,8 @@ RPC Enabled Nodes: 1"#; #[test] fn test_split_messages_small() { - let value = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())); + let value = + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(LegacyContactInfo::default())); test_split_messages(value); } @@ -4134,7 +4199,8 @@ RPC Enabled Nodes: 1"#; } fn check_pull_request_size(filter: CrdsFilter) { - let value = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())); + let value = + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(LegacyContactInfo::default())); let protocol = Protocol::PullRequest(filter, value); assert!(serialized_size(&protocol).unwrap() <= PACKET_DATA_SIZE as u64); } @@ -4158,21 +4224,21 @@ RPC Enabled Nodes: 1"#; stakes.insert(id2, 10); // duplicate - contact_info.wallclock = timestamp() + 1; + contact_info.set_wallclock(timestamp() + 1); cluster_info.insert_info(contact_info); // no tvu let id3 = Pubkey::from([3u8; 32]); let mut contact_info = ContactInfo::new_localhost(&id3, timestamp()); - contact_info.tvu = "0.0.0.0:0".parse().unwrap(); + contact_info.remove_tvu(); cluster_info.insert_info(contact_info); stakes.insert(id3, 10); // normal but with different shred version let id4 = Pubkey::from([4u8; 32]); let mut contact_info = ContactInfo::new_localhost(&id4, timestamp()); - contact_info.shred_version = 1; - assert_ne!(contact_info.shred_version, d.shred_version); + contact_info.set_shred_version(1); + assert_ne!(contact_info.shred_version(), d.shred_version()); cluster_info.insert_info(contact_info); stakes.insert(id4, 10); } @@ -4188,17 +4254,19 @@ RPC Enabled Nodes: 1"#; ); let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); let mut entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); - entrypoint.gossip = socketaddr!("127.0.0.2:1234"); - cluster_info.set_entrypoint(entrypoint.clone()); + entrypoint + .set_gossip(socketaddr!("127.0.0.2:1234")) + .unwrap(); + cluster_info.set_entrypoint(LegacyContactInfo::try_from(&entrypoint).unwrap()); let mut stakes = HashMap::new(); let other_node_pubkey = solana_sdk::pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); - assert_ne!(other_node.gossip, entrypoint.gossip); + assert_ne!(other_node.gossip().unwrap(), entrypoint.gossip().unwrap()); cluster_info.ping_cache.lock().unwrap().mock_pong( - other_node.id, - other_node.gossip, + *other_node.pubkey(), + other_node.gossip().unwrap(), Instant::now(), ); cluster_info.insert_info(other_node.clone()); @@ -4209,7 +4277,9 @@ RPC Enabled Nodes: 1"#; let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); - assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip)); + assert!(pulls + .into_iter() + .all(|(addr, _)| addr == other_node.gossip().unwrap())); // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests @@ -4221,7 +4291,7 @@ RPC Enabled Nodes: 1"#; assert_eq!( pulls .iter() - .filter(|(addr, _)| *addr == node.gossip) + .filter(|(addr, _)| *addr == node.gossip().unwrap()) .count(), MIN_NUM_BLOOM_FILTERS ); @@ -4231,7 +4301,9 @@ RPC Enabled Nodes: 1"#; let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); - assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip)); + assert!(pulls + .into_iter() + .all(|(addr, _)| addr == other_node.gossip().unwrap())); } #[test] @@ -4298,7 +4370,7 @@ RPC Enabled Nodes: 1"#; .expect("unable to serialize default filter") as usize; let protocol = Protocol::PullRequest( CrdsFilter::default(), - CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())), + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(LegacyContactInfo::default())), ); let protocol_size = serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; @@ -4370,12 +4442,12 @@ RPC Enabled Nodes: 1"#; // Simulating starting up with two entrypoints, no known id, only a gossip // address let entrypoint1_gossip_addr = socketaddr!("127.0.0.2:1234"); - let mut entrypoint1 = ContactInfo::new_localhost(&Pubkey::default(), timestamp()); + let mut entrypoint1 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); entrypoint1.gossip = entrypoint1_gossip_addr; assert_eq!(entrypoint1.shred_version, 0); let entrypoint2_gossip_addr = socketaddr!("127.0.0.2:5678"); - let mut entrypoint2 = ContactInfo::new_localhost(&Pubkey::default(), timestamp()); + let mut entrypoint2 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); entrypoint2.gossip = entrypoint2_gossip_addr; assert_eq!(entrypoint2.shred_version, 0); cluster_info.set_entrypoints(vec![entrypoint1, entrypoint2]); @@ -4384,9 +4456,13 @@ RPC Enabled Nodes: 1"#; // 0 let mut gossiped_entrypoint1_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); - gossiped_entrypoint1_info.gossip = entrypoint1_gossip_addr; - gossiped_entrypoint1_info.shred_version = 0; + gossiped_entrypoint1_info + .set_gossip(entrypoint1_gossip_addr) + .unwrap(); + gossiped_entrypoint1_info.set_shred_version(0); cluster_info.insert_info(gossiped_entrypoint1_info.clone()); + let gossiped_entrypoint1_info = + LegacyContactInfo::try_from(&gossiped_entrypoint1_info).unwrap(); assert!(!cluster_info .entrypoints .read() @@ -4411,9 +4487,13 @@ RPC Enabled Nodes: 1"#; // !0 let mut gossiped_entrypoint2_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); - gossiped_entrypoint2_info.gossip = entrypoint2_gossip_addr; - gossiped_entrypoint2_info.shred_version = 1; + gossiped_entrypoint2_info + .set_gossip(entrypoint2_gossip_addr) + .unwrap(); + gossiped_entrypoint2_info.set_shred_version(1); cluster_info.insert_info(gossiped_entrypoint2_info.clone()); + let gossiped_entrypoint2_info = + LegacyContactInfo::try_from(&gossiped_entrypoint2_info).unwrap(); assert!(!cluster_info .entrypoints .read() @@ -4443,7 +4523,7 @@ RPC Enabled Nodes: 1"#; { let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()); - contact_info.shred_version = 2; + contact_info.set_shred_version(2); contact_info }, node_keypair, @@ -4454,7 +4534,7 @@ RPC Enabled Nodes: 1"#; // Simulating starting up with default entrypoint, no known id, only a gossip // address let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234"); - let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp()); + let mut entrypoint = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); entrypoint.gossip = entrypoint_gossip_addr; assert_eq!(entrypoint.shred_version, 0); cluster_info.set_entrypoint(entrypoint); @@ -4462,8 +4542,10 @@ RPC Enabled Nodes: 1"#; // Simulate getting entrypoint ContactInfo from gossip let mut gossiped_entrypoint_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); - gossiped_entrypoint_info.gossip = entrypoint_gossip_addr; - gossiped_entrypoint_info.shred_version = 1; + gossiped_entrypoint_info + .set_gossip(entrypoint_gossip_addr) + .unwrap(); + gossiped_entrypoint_info.set_shred_version(1); cluster_info.insert_info(gossiped_entrypoint_info.clone()); // Adopt the entrypoint's gossiped contact info and verify @@ -4471,7 +4553,7 @@ RPC Enabled Nodes: 1"#; assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 1); assert_eq!( cluster_info.entrypoints.read().unwrap()[0], - gossiped_entrypoint_info + LegacyContactInfo::try_from(&gossiped_entrypoint_info).unwrap(), ); assert!(entrypoints_processed); assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version @@ -4625,7 +4707,7 @@ RPC Enabled Nodes: 1"#; SocketAddrSpace::Unspecified, )); let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); - let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); + let entrypoint = LegacyContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint); let mut rng = rand::thread_rng(); @@ -4636,7 +4718,7 @@ RPC Enabled Nodes: 1"#; let data: Vec<_> = repeat_with(|| { let keypair = Keypair::new(); peers.push(keypair.pubkey()); - let mut rand_ci = ContactInfo::new_rand(&mut rng, Some(keypair.pubkey())); + let mut rand_ci = LegacyContactInfo::new_rand(&mut rng, Some(keypair.pubkey())); rand_ci.shred_version = shred_version; rand_ci.wallclock = timestamp(); CrdsValue::new_signed(CrdsData::LegacyContactInfo(rand_ci), &keypair) diff --git a/gossip/src/gossip_error.rs b/gossip/src/gossip_error.rs index 87db5ccfa..72dc1cc3a 100644 --- a/gossip/src/gossip_error.rs +++ b/gossip/src/gossip_error.rs @@ -1,5 +1,5 @@ use { - crate::duplicate_shred, + crate::{contact_info, duplicate_shred}, crossbeam_channel::{RecvTimeoutError, SendError}, std::io, thiserror::Error, @@ -12,6 +12,8 @@ pub enum GossipError { #[error(transparent)] DuplicateShredError(#[from] duplicate_shred::Error), #[error(transparent)] + InvalidContactInfo(#[from] contact_info::Error), + #[error(transparent)] Io(#[from] io::Error), #[error(transparent)] RecvTimeoutError(#[from] RecvTimeoutError), diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 85f9e0166..bff493d20 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -331,7 +331,10 @@ pub fn make_gossip_node( mod tests { use { super::*, - crate::cluster_info::{ClusterInfo, Node}, + crate::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + }, std::sync::{atomic::AtomicBool, Arc}, }; @@ -422,7 +425,7 @@ mod tests { None, TIMEOUT, None, - Some(&peer0_info.gossip), + Some(&peer0_info.gossip().unwrap()), ); assert!(met_criteria); diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index ba4755dcb..aa4a15cd0 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -2,7 +2,6 @@ use { crate::crds_value::MAX_WALLCLOCK, solana_sdk::{ pubkey::Pubkey, - rpc_port, sanitize::{Sanitize, SanitizeError}, timing::timestamp, }, @@ -117,41 +116,6 @@ impl LegacyContactInfo { node } - // Used in tests - pub fn new_with_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self { - fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { - let mut nxt_addr = *addr; - nxt_addr.set_port(addr.port() + nxt); - nxt_addr - } - - let tpu = *bind_addr; - let gossip = next_port(bind_addr, 1); - let tvu = next_port(bind_addr, 2); - let tpu_forwards = next_port(bind_addr, 3); - let tvu_forwards = next_port(bind_addr, 4); - let repair = next_port(bind_addr, 5); - let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); - let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - let serve_repair = next_port(bind_addr, 6); - let tpu_vote = next_port(bind_addr, 7); - Self { - id: *pubkey, - gossip, - tvu, - tvu_forwards, - repair, - tpu, - tpu_forwards, - tpu_vote, - rpc, - rpc_pubsub, - serve_repair, - wallclock: timestamp(), - shred_version: 0, - } - } - // Construct a LegacyContactInfo that's only usable for gossip pub fn new_gossip_entry_point(gossip_addr: &SocketAddr) -> Self { Self { @@ -196,10 +160,7 @@ impl LegacyContactInfo { #[cfg(test)] mod tests { - use { - super::*, - solana_sdk::signature::{Keypair, Signer}, - }; + use super::*; #[test] fn test_is_valid_address() { @@ -252,45 +213,6 @@ mod tests { assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } - #[test] - fn test_socketaddr() { - let addr = socketaddr!(Ipv4Addr::LOCALHOST, 10); - let ci = LegacyContactInfo::new_with_socketaddr(&Keypair::new().pubkey(), &addr); - assert_eq!(ci.tpu, addr); - assert_eq!(ci.tpu_vote.port(), 17); - assert_eq!(ci.gossip.port(), 11); - assert_eq!(ci.tvu.port(), 12); - assert_eq!(ci.tpu_forwards.port(), 13); - assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); - assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert_eq!(ci.serve_repair.port(), 16); - } - - #[test] - fn replayed_data_new_with_socketaddr_with_pubkey() { - let keypair = Keypair::new(); - let d1 = LegacyContactInfo::new_with_socketaddr( - &keypair.pubkey(), - &socketaddr!(Ipv4Addr::LOCALHOST, 1234), - ); - assert_eq!(d1.id, keypair.pubkey()); - assert_eq!(d1.gossip, socketaddr!(Ipv4Addr::LOCALHOST, 1235)); - assert_eq!(d1.tvu, socketaddr!(Ipv4Addr::LOCALHOST, 1236)); - assert_eq!(d1.tpu_forwards, socketaddr!(Ipv4Addr::LOCALHOST, 1237)); - assert_eq!(d1.tpu, socketaddr!(Ipv4Addr::LOCALHOST, 1234)); - assert_eq!( - d1.rpc, - socketaddr!(Ipv4Addr::LOCALHOST, rpc_port::DEFAULT_RPC_PORT) - ); - assert_eq!( - d1.rpc_pubsub, - socketaddr!(Ipv4Addr::LOCALHOST, rpc_port::DEFAULT_RPC_PUBSUB_PORT) - ); - assert_eq!(d1.tvu_forwards, socketaddr!(Ipv4Addr::LOCALHOST, 1238)); - assert_eq!(d1.repair, socketaddr!(Ipv4Addr::LOCALHOST, 1239)); - assert_eq!(d1.serve_repair, socketaddr!(Ipv4Addr::LOCALHOST, 1240)); - assert_eq!(d1.tpu_vote, socketaddr!(Ipv4Addr::LOCALHOST, 1241)); - } #[test] fn test_valid_client_facing() { diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 7813e54bb..914b22242 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -163,7 +163,7 @@ fn gossip_ring() { let yv = &listen[y].0; let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.wallclock = timestamp(); - listen[x].0.insert_info(d); + listen[x].0.insert_legacy_info(d); } }); } @@ -181,7 +181,7 @@ fn gossip_ring_large() { let yv = &listen[y].0; let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.wallclock = timestamp(); - listen[x].0.insert_info(d); + listen[x].0.insert_legacy_info(d); } }); } @@ -198,7 +198,7 @@ fn gossip_star() { let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); yd.wallclock = timestamp(); let xv = &listen[x].0; - xv.insert_info(yd); + xv.insert_legacy_info(yd); trace!("star leader {}", &xv.id()); } }); @@ -218,7 +218,7 @@ fn gossip_rstar() { for n in 0..(num - 1) { let y = (n + 1) % listen.len(); let yv = &listen[y].0; - yv.insert_info(xd.clone()); + yv.insert_legacy_info(xd.clone()); trace!("rstar insert {} into {}", xd.id, yv.id()); } }); diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 0cee27e6c..b5bcf658f 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,7 +1,7 @@ use { solana_client::thin_client::ThinClient, solana_core::validator::{Validator, ValidatorConfig}, - solana_gossip::{cluster_info::Node, legacy_contact_info::LegacyContactInfo as ContactInfo}, + solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::socket::SocketAddrSpace, std::{path::PathBuf, sync::Arc}, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index f0448eb84..3d3d95020 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -11,11 +11,11 @@ use { solana_entry::entry::{Entry, EntrySlice}, solana_gossip::{ cluster_info::{self, ClusterInfo}, + contact_info::{ContactInfo, LegacyContactInfo}, crds::Cursor, crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel}, gossip_error::GossipError, gossip_service::{self, discover_cluster, GossipService}, - legacy_contact_info::LegacyContactInfo as ContactInfo, }, solana_ledger::blockstore::Blockstore, solana_runtime::vote_transaction::VoteTransaction, @@ -37,6 +37,7 @@ use { solana_streamer::socket::SocketAddrSpace, solana_vote_program::vote_transaction, std::{ + borrow::Borrow, collections::{HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::Path, @@ -49,8 +50,10 @@ use { }, }; -pub fn get_client_facing_addr(contact_info: &ContactInfo) -> (SocketAddr, SocketAddr) { - let (rpc, mut tpu) = contact_info.client_facing_addr(); +pub fn get_client_facing_addr>( + contact_info: T, +) -> (SocketAddr, SocketAddr) { + let (rpc, mut tpu) = contact_info.borrow().client_facing_addr(); // QUIC certificate authentication requires the IP Address to match. ContactInfo might have // 0.0.0.0 as the IP instead of 127.0.0.1. tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)); @@ -66,8 +69,12 @@ pub fn spend_and_verify_all_nodes( socket_addr_space: SocketAddrSpace, connection_cache: &Arc, ) { - let cluster_nodes = - discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap(); + let cluster_nodes = discover_cluster( + &entry_point_info.gossip().unwrap(), + nodes, + socket_addr_space, + ) + .unwrap(); assert!(cluster_nodes.len() >= nodes); let ignore_nodes = Arc::new(ignore_nodes); cluster_nodes.par_iter().for_each(|ingress_node| { @@ -109,7 +116,9 @@ pub fn verify_balances( node: &ContactInfo, connection_cache: Arc, ) { - let (rpc, tpu) = get_client_facing_addr(node); + let (rpc, tpu) = LegacyContactInfo::try_from(node) + .map(get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache); for (pk, b) in expected_balances { let bal = client @@ -120,7 +129,7 @@ pub fn verify_balances( } pub fn send_many_transactions( - node: &ContactInfo, + node: &LegacyContactInfo, funding_keypair: &Keypair, connection_cache: &Arc, max_tokens_per_transfer: u64, @@ -216,10 +225,16 @@ pub fn kill_entry_and_spend_and_verify_rest( socket_addr_space: SocketAddrSpace, ) { info!("kill_entry_and_spend_and_verify_rest..."); - let cluster_nodes = - discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap(); + let cluster_nodes = discover_cluster( + &entry_point_info.gossip().unwrap(), + nodes, + socket_addr_space, + ) + .unwrap(); assert!(cluster_nodes.len() >= nodes); - let (rpc, tpu) = get_client_facing_addr(entry_point_info); + let (rpc, tpu) = LegacyContactInfo::try_from(entry_point_info) + .map(get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); // sleep long enough to make sure we are in epoch 3 @@ -234,7 +249,7 @@ pub fn kill_entry_and_spend_and_verify_rest( info!("sleeping for 2 leader fortnights"); sleep(Duration::from_millis(slot_millis * first_two_epoch_slots)); info!("done sleeping for first 2 warmup epochs"); - info!("killing entry point: {}", entry_point_info.id); + info!("killing entry point: {}", entry_point_info.pubkey()); entry_point_validator_exit.write().unwrap().exit(); info!("sleeping for some time"); sleep(Duration::from_millis( @@ -242,7 +257,7 @@ pub fn kill_entry_and_spend_and_verify_rest( )); info!("done sleeping for 2 fortnights"); for ingress_node in &cluster_nodes { - if ingress_node.id == entry_point_info.id { + if &ingress_node.id == entry_point_info.pubkey() { info!("ingress_node.id == entry_point_info.id, continuing..."); continue; } @@ -330,13 +345,15 @@ pub fn check_for_new_roots( assert!(loop_start.elapsed() < loop_timeout); for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(ingress_node); + let (rpc, tpu) = LegacyContactInfo::try_from(ingress_node) + .map(get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let root_slot = client .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); roots[i].insert(root_slot); - num_roots_map.insert(ingress_node.id, roots[i].len()); + num_roots_map.insert(*ingress_node.pubkey(), roots[i].len()); let num_roots = roots.iter().map(|r| r.len()).min().unwrap(); done = num_roots >= num_new_roots; if done || last_print.elapsed().as_secs() > 3 { @@ -353,7 +370,7 @@ pub fn check_for_new_roots( pub fn check_no_new_roots( num_slots_to_wait: usize, - contact_infos: &[ContactInfo], + contact_infos: &[LegacyContactInfo], connection_cache: &Arc, test_name: &str, ) { @@ -418,13 +435,13 @@ pub fn check_no_new_roots( fn poll_all_nodes_for_signature( entry_point_info: &ContactInfo, - cluster_nodes: &[ContactInfo], + cluster_nodes: &[LegacyContactInfo], connection_cache: &Arc, sig: &Signature, confs: usize, ) -> Result<(), TransportError> { for validator in cluster_nodes { - if validator.id == entry_point_info.id { + if &validator.id == entry_point_info.pubkey() { continue; } let (rpc, tpu) = get_client_facing_addr(validator); diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 029ad46a5..51be49d7e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -12,8 +12,9 @@ use { validator::{Validator, ValidatorConfig, ValidatorStartProgress}, }, solana_gossip::{ - cluster_info::Node, gossip_service::discover_cluster, - legacy_contact_info::LegacyContactInfo as ContactInfo, + cluster_info::Node, + contact_info::{ContactInfo, LegacyContactInfo}, + gossip_service::discover_cluster, }, solana_ledger::create_new_tmp_ledger, solana_runtime::{ @@ -265,7 +266,10 @@ impl LocalCluster { let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); let leader_contact_info = leader_node.info.clone(); let mut leader_config = safe_clone_config(&config.validator_configs[0]); - leader_config.rpc_addrs = Some((leader_node.info.rpc, leader_node.info.rpc_pubsub)); + leader_config.rpc_addrs = Some(( + leader_node.info.rpc().unwrap(), + leader_node.info.rpc_pubsub().unwrap(), + )); Self::sync_ledger_path_across_nested_config_fields(&mut leader_config, &leader_ledger_path); let leader_keypair = Arc::new(leader_keypair.insecure_clone()); let leader_vote_keypair = Arc::new(leader_vote_keypair.insecure_clone()); @@ -349,14 +353,14 @@ impl LocalCluster { }); discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), config.node_stakes.len() + config.num_listeners as usize, socket_addr_space, ) .unwrap(); discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), config.node_stakes.len(), socket_addr_space, ) @@ -429,7 +433,9 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let (rpc, tpu) = cluster_tests::get_client_facing_addr(&self.entry_point_info); + let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); // Must have enough tokens to fund vote account and set delegate @@ -467,7 +473,10 @@ impl LocalCluster { } let mut config = safe_clone_config(validator_config); - config.rpc_addrs = Some((validator_node.info.rpc, validator_node.info.rpc_pubsub)); + config.rpc_addrs = Some(( + validator_node.info.rpc().unwrap(), + validator_node.info.rpc_pubsub().unwrap(), + )); Self::sync_ledger_path_across_nested_config_fields(&mut config, &ledger_path); let voting_keypair = voting_keypair.unwrap(); let validator_server = Validator::new( @@ -476,7 +485,7 @@ impl LocalCluster { &ledger_path, &voting_keypair.pubkey(), Arc::new(RwLock::new(vec![voting_keypair.clone()])), - vec![self.entry_point_info.clone()], + vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()], &config, true, // should_check_duplicate_instance Arc::new(RwLock::new(ValidatorStartProgress::default())), @@ -517,7 +526,9 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let (rpc, tpu) = cluster_tests::get_client_facing_addr(&self.entry_point_info); + let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -536,7 +547,7 @@ impl LocalCluster { assert!(!alive_node_contact_infos.is_empty()); info!("{} discovering nodes", test_name); let cluster_nodes = discover_cluster( - &alive_node_contact_infos[0].gossip, + &alive_node_contact_infos[0].gossip().unwrap(), alive_node_contact_infos.len(), socket_addr_space, ) @@ -561,12 +572,12 @@ impl LocalCluster { let alive_node_contact_infos: Vec<_> = self .validators .values() - .map(|v| v.info.contact_info.clone()) + .map(|node| &node.info.contact_info) .collect(); assert!(!alive_node_contact_infos.is_empty()); info!("{} discovering nodes", test_name); let cluster_nodes = discover_cluster( - &alive_node_contact_infos[0].gossip, + &alive_node_contact_infos[0].gossip().unwrap(), alive_node_contact_infos.len(), socket_addr_space, ) @@ -575,7 +586,11 @@ impl LocalCluster { info!("{} making sure no new roots on any nodes", test_name); cluster_tests::check_no_new_roots( num_slots_to_wait, - &alive_node_contact_infos, + &alive_node_contact_infos + .into_iter() + .map(LegacyContactInfo::try_from) + .collect::, _>>() + .unwrap(), &self.connection_cache, test_name, ); @@ -762,7 +777,9 @@ impl Cluster for LocalCluster { fn get_validator_client(&self, pubkey: &Pubkey) -> Option { self.validators.get(pubkey).map(|f| { - let (rpc, tpu) = cluster_tests::get_client_facing_addr(&f.info.contact_info); + let (rpc, tpu) = LegacyContactInfo::try_from(&f.info.contact_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); ThinClient::new(rpc, tpu, self.connection_cache.clone()) }) } @@ -785,10 +802,11 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(pubkey); cluster_validator_info.info.contact_info = node.info.clone(); - cluster_validator_info.config.rpc_addrs = Some((node.info.rpc, node.info.rpc_pubsub)); + cluster_validator_info.config.rpc_addrs = + Some((node.info.rpc().unwrap(), node.info.rpc_pubsub().unwrap())); let entry_point_info = { - if *pubkey == self.entry_point_info.id { + if pubkey == self.entry_point_info.pubkey() { self.entry_point_info = node.info.clone(); None } else { @@ -836,7 +854,9 @@ impl Cluster for LocalCluster { &validator_info.voting_keypair.pubkey(), Arc::new(RwLock::new(vec![validator_info.voting_keypair.clone()])), entry_point_info - .map(|entry_point_info| vec![entry_point_info]) + .map(|entry_point_info| { + vec![LegacyContactInfo::try_from(&entry_point_info).unwrap()] + }) .unwrap_or_default(), &safe_clone_config(&cluster_validator_info.config), true, // should_check_duplicate_instance diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 55e6d1a9e..259b9e155 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -73,7 +73,7 @@ impl LocalCluster { ) -> NextSnapshotResult { // Get slot after which this was generated let client = self - .get_validator_client(&self.entry_point_info.id) + .get_validator_client(self.entry_point_info.pubkey()) .unwrap(); let last_slot = client .get_slot_with_commitment(CommitmentConfig::processed()) diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index 667ca2fc6..54e4a6228 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -343,7 +343,7 @@ pub fn run_cluster_partition( ); let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), num_nodes, SocketAddrSpace::Unspecified, ) diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index e5e7e4747..c3124683e 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -16,7 +16,7 @@ use { validator::ValidatorConfig, }, solana_download_utils::download_snapshot_archive, - solana_gossip::gossip_service::discover_cluster, + solana_gossip::{contact_info::LegacyContactInfo, gossip_service::discover_cluster}, solana_ledger::{ ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore, blockstore_processor::ProcessOptions, @@ -195,11 +195,13 @@ fn test_local_cluster_signature_subscribe() { // Get non leader let non_bootstrap_id = nodes .into_iter() - .find(|id| *id != cluster.entry_point_info.id) + .find(|id| id != cluster.entry_point_info.pubkey()) .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let (rpc, tpu) = cluster_tests::get_client_facing_addr(non_bootstrap_info); + let (rpc, tpu) = LegacyContactInfo::try_from(non_bootstrap_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let (blockhash, _) = tx_client @@ -214,7 +216,10 @@ fn test_local_cluster_signature_subscribe() { ); let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe( - &format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()), + &format!( + "ws://{}", + &non_bootstrap_info.rpc_pubsub().unwrap().to_string() + ), &transaction.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::processed()), @@ -312,7 +317,7 @@ fn test_two_unbalanced_stakes() { num_slots_per_epoch, ); cluster.close_preserve_ledgers(); - let leader_pubkey = cluster.entry_point_info.id; + let leader_pubkey = *cluster.entry_point_info.pubkey(); let leader_ledger = cluster.validators[&leader_pubkey].info.ledger_path.clone(); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); } @@ -335,14 +340,14 @@ fn test_forwarding() { let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 2, SocketAddrSpace::Unspecified, ) .unwrap(); assert!(cluster_nodes.len() >= 2); - let leader_pubkey = cluster.entry_point_info.id; + let leader_pubkey = *cluster.entry_point_info.pubkey(); let validator_info = cluster_nodes .iter() @@ -394,7 +399,7 @@ fn test_restart_node() { slots_per_epoch, ); cluster_tests::send_many_transactions( - &cluster.entry_point_info, + &LegacyContactInfo::try_from(&cluster.entry_point_info).unwrap(), &cluster.funding_keypair, &cluster.connection_cache, 10, @@ -419,14 +424,16 @@ fn test_mainnet_beta_cluster_type() { }; let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 1, SocketAddrSpace::Unspecified, ) .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let (rpc, tpu) = cluster_tests::get_client_facing_addr(&cluster.entry_point_info); + let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); // Programs that are available at epoch 0 @@ -506,7 +513,7 @@ fn test_snapshot_download() { // Download the snapshot, then boot a validator from it. download_snapshot_archive( - &cluster.entry_point_info.rpc, + &cluster.entry_point_info.rpc().unwrap(), &validator_snapshot_test_config .validator_config .snapshot_config @@ -637,7 +644,7 @@ fn test_incremental_snapshot_download() { // Download the snapshots, then boot a validator from them. download_snapshot_archive( - &cluster.entry_point_info.rpc, + &cluster.entry_point_info.rpc().unwrap(), &validator_snapshot_test_config .validator_config .snapshot_config @@ -665,7 +672,7 @@ fn test_incremental_snapshot_download() { .unwrap(); download_snapshot_archive( - &cluster.entry_point_info.rpc, + &cluster.entry_point_info.rpc().unwrap(), &validator_snapshot_test_config .validator_config .snapshot_config @@ -813,7 +820,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st // Download the snapshots, then boot a validator from them. info!("Downloading full snapshot to validator..."); download_snapshot_archive( - &cluster.entry_point_info.rpc, + &cluster.entry_point_info.rpc().unwrap(), validator_snapshot_test_config .full_snapshot_archives_dir .path(), @@ -847,7 +854,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st info!("Downloading incremental snapshot to validator..."); download_snapshot_archive( - &cluster.entry_point_info.rpc, + &cluster.entry_point_info.rpc().unwrap(), validator_snapshot_test_config .full_snapshot_archives_dir .path(), @@ -1244,7 +1251,7 @@ fn test_snapshot_restart_tower() { let all_pubkeys = cluster.get_node_pubkeys(); let validator_id = all_pubkeys .into_iter() - .find(|x| *x != cluster.entry_point_info.id) + .find(|x| x != cluster.entry_point_info.pubkey()) .unwrap(); let validator_info = cluster.exit_node(&validator_id); @@ -1344,7 +1351,7 @@ fn test_snapshots_blockstore_floor() { // Start up a new node from a snapshot let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 1, SocketAddrSpace::Unspecified, ) @@ -1365,7 +1372,7 @@ fn test_snapshots_blockstore_floor() { let all_pubkeys = cluster.get_node_pubkeys(); let validator_id = all_pubkeys .into_iter() - .find(|x| *x != cluster.entry_point_info.id) + .find(|x| x != cluster.entry_point_info.pubkey()) .unwrap(); let validator_client = cluster.get_validator_client(&validator_id).unwrap(); let mut current_slot = 0; @@ -1430,7 +1437,7 @@ fn test_snapshots_restart_validity() { // forwarded to and processed. trace!("Sending transactions"); let new_balances = cluster_tests::send_many_transactions( - &cluster.entry_point_info, + &LegacyContactInfo::try_from(&cluster.entry_point_info).unwrap(), &cluster.funding_keypair, &cluster.connection_cache, 10, @@ -1525,7 +1532,7 @@ fn test_wait_for_max_stake() { ..ClusterConfig::default() }; let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - let client = RpcClient::new_socket(cluster.entry_point_info.rpc); + let client = RpcClient::new_socket(cluster.entry_point_info.rpc().unwrap()); assert!(client .wait_for_max_stake(CommitmentConfig::default(), 33.0f32) @@ -1550,7 +1557,7 @@ fn test_no_voting() { }; let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let client = cluster - .get_validator_client(&cluster.entry_point_info.id) + .get_validator_client(cluster.entry_point_info.pubkey()) .unwrap(); loop { let last_slot = client @@ -1563,7 +1570,7 @@ fn test_no_voting() { } cluster.close_preserve_ledgers(); - let leader_pubkey = cluster.entry_point_info.id; + let leader_pubkey = *cluster.entry_point_info.pubkey(); let ledger_path = cluster.validators[&leader_pubkey].info.ledger_path.clone(); let ledger = Blockstore::open(&ledger_path).unwrap(); for i in 0..2 * VOTE_THRESHOLD_DEPTH { @@ -2481,11 +2488,11 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| { let update_client = cluster - .get_validator_client(&cluster.entry_point_info.id) + .get_validator_client(cluster.entry_point_info.pubkey()) .unwrap(); update_client_sender.send(update_client).unwrap(); let scan_client = cluster - .get_validator_client(&cluster.entry_point_info.id) + .get_validator_client(cluster.entry_point_info.pubkey()) .unwrap(); scan_client_sender.send(scan_client).unwrap(); }; @@ -2542,7 +2549,10 @@ fn test_rpc_block_subscribe() { }; let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe( - &format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()), + &format!( + "ws://{}", + &cluster.entry_point_info.rpc_pubsub().unwrap().to_string() + ), RpcBlockSubscribeFilter::All, Some(RpcBlockSubscribeConfig { commitment: Some(CommitmentConfig::confirmed()), @@ -2628,13 +2638,15 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let (rpc, tpu) = cluster_tests::get_client_facing_addr(&cluster.entry_point_info); + let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) + .map(cluster_tests::get_client_facing_addr) + .unwrap(); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); let gossip_voter = cluster_tests::start_gossip_voter( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), &node_keypair, |(_label, leader_vote_tx)| { let vote = vote_parser::parse_vote_transaction(&leader_vote_tx) @@ -2686,7 +2698,10 @@ fn test_oc_bad_signatures() { ); let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe( - &format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()), + &format!( + "ws://{}", + &cluster.entry_point_info.rpc_pubsub().unwrap().to_string() + ), RpcBlockSubscribeFilter::All, Some(RpcBlockSubscribeConfig { commitment: Some(CommitmentConfig::confirmed()), @@ -2982,10 +2997,10 @@ fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) { let all_pubkeys = cluster.get_node_pubkeys(); let other_validator_id = all_pubkeys .into_iter() - .find(|x| *x != cluster.entry_point_info.id) + .find(|x| x != cluster.entry_point_info.pubkey()) .unwrap(); let client = cluster - .get_validator_client(&cluster.entry_point_info.id) + .get_validator_client(cluster.entry_point_info.pubkey()) .unwrap(); update_client_sender.send(client).unwrap(); let scan_client = cluster.get_validator_client(&other_validator_id).unwrap(); diff --git a/local-cluster/tests/local_cluster_slow_1.rs b/local-cluster/tests/local_cluster_slow_1.rs index 8a2f036f6..7788d33f4 100644 --- a/local-cluster/tests/local_cluster_slow_1.rs +++ b/local-cluster/tests/local_cluster_slow_1.rs @@ -448,14 +448,14 @@ fn test_duplicate_shreds_broadcast_leader() { let our_info = cluster.exit_node(&our_id); let node_keypair = our_info.info.keypair; let vote_keypair = our_info.info.voting_keypair; - let bad_leader_id = cluster.entry_point_info.id; + let bad_leader_id = *cluster.entry_point_info.pubkey(); let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone(); info!("our node id: {}", node_keypair.pubkey()); // 3) Start up a gossip instance to listen for and push votes let voter_thread_sleep_ms = 100; let gossip_voter = cluster_tests::start_gossip_voter( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), &node_keypair, move |(label, leader_vote_tx)| { // Filter out votes not from the bad leader @@ -721,7 +721,8 @@ fn test_switch_threshold_uses_gossip_votes() { cluster .get_contact_info(&context.heaviest_validator_key) .unwrap() - .gossip, + .gossip() + .unwrap(), &SocketAddrSpace::Unspecified, ) .unwrap(); @@ -798,7 +799,7 @@ fn test_listener_startup() { }; let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 4, SocketAddrSpace::Unspecified, ) diff --git a/local-cluster/tests/local_cluster_slow_2.rs b/local-cluster/tests/local_cluster_slow_2.rs index cd1465cd6..a2b24857f 100644 --- a/local-cluster/tests/local_cluster_slow_2.rs +++ b/local-cluster/tests/local_cluster_slow_2.rs @@ -90,7 +90,7 @@ fn test_consistency_halt() { sleep(Duration::from_millis(5000)); let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 1, SocketAddrSpace::Unspecified, ) @@ -123,7 +123,7 @@ fn test_consistency_halt() { let num_nodes = 2; assert_eq!( discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), num_nodes, SocketAddrSpace::Unspecified ) @@ -136,7 +136,7 @@ fn test_consistency_halt() { let mut encountered_error = false; loop { let discover = discover_cluster( - &cluster.entry_point_info.gossip, + &cluster.entry_point_info.gossip().unwrap(), 2, SocketAddrSpace::Unspecified, ); @@ -154,7 +154,7 @@ fn test_consistency_halt() { } } let client = cluster - .get_validator_client(&cluster.entry_point_info.id) + .get_validator_client(cluster.entry_point_info.pubkey()) .unwrap(); if let Ok(slot) = client.get_slot() { if slot > 210 { @@ -187,7 +187,7 @@ fn test_leader_failure_4() { &local.entry_point_info, &local .validators - .get(&local.entry_point_info.id) + .get(local.entry_point_info.pubkey()) .unwrap() .config .validator_exit, diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index 6e8b7f3b5..3c4195913 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -59,7 +59,7 @@ impl TpuInfo for ClusterTpuInfo { mod test { use { super::*, - solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo, + solana_gossip::contact_info::ContactInfo, solana_ledger::{ blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, }, diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index d624621b6..742e6a0a0 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -16,9 +16,7 @@ use { solana_client::connection_cache::ConnectionCache, solana_entry::entry::Entry, solana_faucet::faucet::request_airdrop_transaction, - solana_gossip::{ - cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ blockstore::{Blockstore, SignatureInfosForAddress}, blockstore_db::BlockstoreError, @@ -348,13 +346,13 @@ impl JsonRpcRequestProcessor { let exit = Arc::new(AtomicBool::new(false)); let cluster_info = Arc::new({ let keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo { - id: keypair.pubkey(), - ..ContactInfo::default() - }; + let contact_info = ContactInfo::new_localhost( + &keypair.pubkey(), + solana_sdk::timing::timestamp(), // wallclock + ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = cluster_info.my_contact_info().tpu; + let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); let (sender, receiver) = unbounded(); SendTransactionService::new::( tpu_address, @@ -3444,11 +3442,7 @@ pub mod rpc_full { let cluster_info = &meta.cluster_info; let socket_addr_space = cluster_info.socket_addr_space(); let valid_address_or_none = |addr: &SocketAddr| -> Option { - if ContactInfo::is_valid_address(addr, socket_addr_space) { - Some(*addr) - } else { - None - } + ContactInfo::is_valid_address(addr, socket_addr_space).then_some(*addr) }; let my_shred_version = cluster_info.my_shred_version(); Ok(cluster_info @@ -4686,10 +4680,10 @@ pub mod tests { pub(crate) fn new_test_cluster_info() -> ClusterInfo { let keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo { - id: keypair.pubkey(), - ..ContactInfo::default() - }; + let contact_info = ContactInfo::new_localhost( + &keypair.pubkey(), + solana_sdk::timing::timestamp(), // wallclock + ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) } @@ -5123,6 +5117,15 @@ pub mod tests { let request = create_test_request("getClusterNodes", None); let result: Value = parse_success_result(rpc.handle_request_sync(request)); let expected = json!([{ + "pubkey": rpc.identity.to_string(), + "gossip": "127.0.0.1:8000", + "shredVersion": 0u16, + "tpu": "127.0.0.1:8003", + "rpc": format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PORT), + "pubsub": format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT), + "version": null, + "featureSet": null, + }, { "pubkey": rpc.leader_pubkey().to_string(), "gossip": "127.0.0.1:1235", "shredVersion": 0u16, @@ -6390,7 +6393,7 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let tpu_address = cluster_info.my_contact_info().tpu; + let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6657,7 +6660,7 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let tpu_address = cluster_info.my_contact_info().tpu; + let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); let (request_processor, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index b9dac452d..0e147eded 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -377,7 +377,10 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = cluster_info.my_contact_info().tpu; + let tpu_address = cluster_info + .my_contact_info() + .tpu() + .map_err(|err| format!("{err}"))?; // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs index 38589d534..b5aa8157a 100644 --- a/streamer/src/socket.rs +++ b/streamer/src/socket.rs @@ -16,6 +16,7 @@ impl SocketAddrSpace { } /// Returns true if the IP address is valid. + #[must_use] pub fn check(&self, addr: &SocketAddr) -> bool { if self == &SocketAddrSpace::Unspecified { return true; diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 700f4469b..1e3b34f25 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -750,15 +750,16 @@ impl TestValidator { config.node_config.bind_ip_addr, ); if let Some((rpc, rpc_pubsub)) = config.rpc_ports { - node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc); - node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub); + let addr = node.info.gossip().unwrap().ip(); + node.info.set_rpc((addr, rpc)).unwrap(); + node.info.set_rpc_pubsub((addr, rpc_pubsub)).unwrap(); } let vote_account_address = validator_vote_account.pubkey(); - let rpc_url = format!("http://{}", node.info.rpc); - let rpc_pubsub_url = format!("ws://{}/", node.info.rpc_pubsub); - let tpu = node.info.tpu; - let gossip = node.info.gossip; + let rpc_url = format!("http://{}", node.info.rpc().unwrap()); + let rpc_pubsub_url = format!("ws://{}/", node.info.rpc_pubsub().unwrap()); + let tpu = node.info.tpu().unwrap(); + let gossip = node.info.gossip().unwrap(); { let mut authorized_voter_keypairs = config.authorized_voter_keypairs.write().unwrap(); @@ -793,10 +794,13 @@ impl TestValidator { let mut validator_config = ValidatorConfig { geyser_plugin_config_files: config.geyser_plugin_config_files.clone(), rpc_addrs: Some(( - SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), node.info.rpc.port()), SocketAddr::new( IpAddr::V4(Ipv4Addr::UNSPECIFIED), - node.info.rpc_pubsub.port(), + node.info.rpc().unwrap().port(), + ), + SocketAddr::new( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + node.info.rpc_pubsub().unwrap().port(), ), )), rpc_config: config.rpc_config.clone(), diff --git a/transaction-dos/src/main.rs b/transaction-dos/src/main.rs index 4d308afda..603ecefb6 100644 --- a/transaction-dos/src/main.rs +++ b/transaction-dos/src/main.rs @@ -707,7 +707,7 @@ pub mod test { let account_keypair_refs: Vec<_> = account_keypairs.iter().collect(); let mut start = Measure::start("total accounts run"); run_transactions_dos( - cluster.entry_point_info.rpc, + cluster.entry_point_info.rpc().unwrap(), faucet_addr, &[&cluster.funding_keypair], iterations, diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 54430bdec..b667928e6 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -9,9 +9,7 @@ use { solana_core::{ consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress, }, - solana_gossip::{ - cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_rpc::rpc::verify_pubkey, solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError}, solana_runtime::{accounts_index::AccountIndex, bank_forks::BankForks}, @@ -24,7 +22,7 @@ use { collections::{HashMap, HashSet}, error, fmt::{self, Display}, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, sync::{Arc, RwLock}, thread::{self, Builder}, @@ -91,36 +89,28 @@ pub struct AdminRpcRepairWhitelist { } impl From for AdminRpcContactInfo { - fn from(contact_info: ContactInfo) -> Self { - let ContactInfo { - id, - gossip, - tvu, - tvu_forwards, - repair, - tpu, - tpu_forwards, - tpu_vote, - rpc, - rpc_pubsub, - serve_repair, - wallclock, - shred_version, - } = contact_info; + fn from(node: ContactInfo) -> Self { + macro_rules! unwrap_socket { + ($name:ident) => { + node.$name().unwrap_or_else(|_| { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) + }) + }; + } Self { - id: id.to_string(), - last_updated_timestamp: wallclock, - gossip, - tvu, - tvu_forwards, - repair, - tpu, - tpu_forwards, - tpu_vote, - rpc, - rpc_pubsub, - serve_repair, - shred_version, + id: node.pubkey().to_string(), + last_updated_timestamp: node.wallclock(), + gossip: unwrap_socket!(gossip), + tvu: unwrap_socket!(tvu), + tvu_forwards: unwrap_socket!(tvu_forwards), + repair: unwrap_socket!(repair), + tpu: unwrap_socket!(tpu), + tpu_forwards: unwrap_socket!(tpu_forwards), + tpu_vote: unwrap_socket!(tpu_vote), + rpc: unwrap_socket!(rpc), + rpc_pubsub: unwrap_socket!(rpc_pubsub), + serve_repair: unwrap_socket!(serve_repair), + shred_version: node.shred_version(), } } } @@ -693,10 +683,11 @@ mod tests { fn start_with_config(config: TestConfig) -> Self { let keypair = Arc::new(Keypair::new()); let cluster_info = Arc::new(ClusterInfo::new( - ContactInfo { - id: keypair.pubkey(), - ..ContactInfo::default() - }, + ContactInfo::new( + keypair.pubkey(), + solana_sdk::timing::timestamp(), // wallclock + 0u16, // shred_version + ), keypair, SocketAddrSpace::Unspecified, )); diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index be5182751..bcb0482ce 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -60,38 +60,43 @@ fn verify_reachable_ports( validator_config: &ValidatorConfig, socket_addr_space: &SocketAddrSpace, ) -> bool { + let verify_address = |addr: &Option| -> bool { + addr.as_ref() + .map(|addr| socket_addr_space.check(addr)) + .unwrap_or_default() + }; let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; - if ContactInfo::is_valid_address(&node.info.serve_repair, socket_addr_space) { + if verify_address(&node.info.serve_repair().ok()) { udp_sockets.push(&node.sockets.serve_repair); } - if ContactInfo::is_valid_address(&node.info.tpu, socket_addr_space) { + if verify_address(&node.info.tpu().ok()) { udp_sockets.extend(node.sockets.tpu.iter()); udp_sockets.push(&node.sockets.tpu_quic); } - if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) { + if verify_address(&node.info.tpu_forwards().ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); udp_sockets.push(&node.sockets.tpu_forwards_quic); } - if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) { + if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter()); } - if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) { + if verify_address(&node.info.tvu().ok()) { udp_sockets.extend(node.sockets.tvu.iter()); udp_sockets.extend(node.sockets.broadcast.iter()); udp_sockets.extend(node.sockets.retransmit_sockets.iter()); } - if ContactInfo::is_valid_address(&node.info.tvu_forwards, socket_addr_space) { + if verify_address(&node.info.tvu_forwards().ok()) { udp_sockets.extend(node.sockets.tvu_forwards.iter()); } let mut tcp_listeners = vec![]; if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs { for (purpose, bind_addr, public_addr) in &[ - ("RPC", rpc_addr, &node.info.rpc), - ("RPC pubsub", rpc_pubsub_addr, &node.info.rpc_pubsub), + ("RPC", rpc_addr, node.info.rpc()), + ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()), ] { - if ContactInfo::is_valid_address(public_addr, socket_addr_space) { + if verify_address(&public_addr.as_ref().ok().copied()) { tcp_listeners.push(( bind_addr.port(), TcpListener::bind(bind_addr).unwrap_or_else(|err| { @@ -501,7 +506,10 @@ pub fn rpc_bootstrap( identity_keypair.clone(), cluster_entrypoints, ledger_path, - &node.info.gossip, + &node + .info + .gossip() + .expect("Operator must spin up node with valid gossip address"), node.sockets.gossip.try_clone().unwrap(), validator_config.expected_shred_version, validator_config.gossip_validators.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 5df496ebf..ece5a4509 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1599,27 +1599,39 @@ pub fn main() { ); if restricted_repair_only_mode { - let any = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); // When in --restricted_repair_only_mode is enabled only the gossip and repair ports // need to be reachable by the entrypoint to respond to gossip pull requests and repair // requests initiated by the node. All other ports are unused. - node.info.tpu = any; - node.info.tpu_forwards = any; - node.info.tvu = any; - node.info.tvu_forwards = any; - node.info.serve_repair = any; + node.info.remove_tpu(); + node.info.remove_tpu_forwards(); + node.info.remove_tvu(); + node.info.remove_tvu_forwards(); + node.info.remove_serve_repair(); // A node in this configuration shouldn't be an entrypoint to other nodes node.sockets.ip_echo = None; } if !private_rpc { + macro_rules! set_socket { + ($method:ident, $addr:expr, $name:literal) => { + node.info.$method($addr).expect(&format!( + "Operator must spin up node with valid {} address", + $name + )) + }; + } if let Some(public_rpc_addr) = public_rpc_addr { - node.info.rpc = public_rpc_addr; - node.info.rpc_pubsub = public_rpc_addr; + set_socket!(set_rpc, public_rpc_addr, "RPC"); + set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub"); } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs { - node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_addr.port()); - node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_addr.port()); + let addr = node + .info + .gossip() + .expect("Operator must spin up node with valid gossip address") + .ip(); + set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC"); + set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub"); } }