From 5178d4d49b347ed615b124b9d1527ff3198a8e65 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 15 May 2023 15:13:21 +0000 Subject: [PATCH] adds quic tvu port to contact-info (#31614) Working towards migrating turbine to QUIC. --- core/benches/retransmit_stage.rs | 6 ++-- core/src/broadcast_stage.rs | 7 +++-- .../broadcast_duplicates_run.rs | 7 ++--- core/src/cluster_nodes.rs | 8 ++--- dos/src/main.rs | 2 +- gossip/src/cluster_info.rs | 11 +++---- gossip/src/contact_info.rs | 29 ++++++++++++++----- gossip/src/legacy_contact_info.rs | 4 +-- gossip/tests/gossip.rs | 4 +-- validator/src/admin_rpc_service.rs | 2 +- validator/src/bootstrap.rs | 2 +- 11 files changed, 50 insertions(+), 32 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 3751b9f1ee..9af88ae880 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -10,7 +10,7 @@ use { solana_entry::entry::Entry, solana_gossip::{ cluster_info::{ClusterInfo, Node}, - contact_info::ContactInfo, + contact_info::{ContactInfo, Protocol}, }, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -63,9 +63,9 @@ fn bench_retransmitter(bencher: &mut Bencher) { let port = socket.local_addr().unwrap().port(); contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap(); contact_info - .set_tvu_forwards(contact_info.tvu().unwrap()) + .set_tvu_forwards(contact_info.tvu(Protocol::UDP).unwrap()) .unwrap(); - info!("local: {:?}", contact_info.tvu().unwrap()); + info!("local: {:?}", contact_info.tvu(Protocol::UDP).unwrap()); cluster_info.insert_info(contact_info); socket.set_nonblocking(true).unwrap(); socket diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a27949bdc7..d8b86a8971 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -14,7 +14,10 @@ use { }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}, + solana_gossip::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + contact_info::Protocol, + }, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -412,7 +415,7 @@ pub fn broadcast_shreds( shreds.filter_map(move |shred| { cluster_nodes .get_broadcast_peer(&shred.id())? - .tvu() + .tvu(Protocol::UDP) .ok() .filter(|addr| socket_addr_space.check(addr)) .map(|addr| (shred.payload(), addr)) diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 8b94c1f3ed..08eec89838 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,7 +3,6 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, - solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo, solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{ hash::Hash, @@ -305,7 +304,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .filter_map(|shred| { let node = cluster_nodes.get_broadcast_peer(&shred.id())?; - if !socket_addr_space.check(&node.tvu().ok()?) { + if !socket_addr_space.check(&node.tvu(Protocol::UDP).ok()?) { return None; } if self @@ -338,7 +337,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .filter_map(|pubkey| { let tvu = cluster_info - .lookup_contact_info(pubkey, ContactInfo::tvu)? + .lookup_contact_info(pubkey, |node| node.tvu(Protocol::UDP))? .ok()?; Some((shred.payload(), tvu)) }) @@ -346,7 +345,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { ); } - Some(vec![(shred.payload(), node.tvu().ok()?)]) + Some(vec![(shred.payload(), node.tvu(Protocol::UDP).ok()?)]) }) .flatten() .collect(); diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 9487cf5c55..28e9396f5f 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -6,10 +6,10 @@ use { rand_chacha::ChaChaRng, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, + contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol}, crds::GossipRoute, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::{CrdsData, CrdsValue}, - legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo}, weighted_shuffle::WeightedShuffle, }, solana_ledger::shred::ShredId, @@ -179,7 +179,7 @@ impl ClusterNodes { if neighbors.is_empty() { let peers = children.into_iter().filter_map(|node| { node.contact_info()? - .tvu() + .tvu(Protocol::UDP) .ok() .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) }); @@ -209,7 +209,7 @@ impl ClusterNodes { }) .chain(children.into_iter().filter_map(|node| { node.contact_info()? - .tvu() + .tvu(Protocol::UDP) .ok() .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) })); @@ -244,7 +244,7 @@ impl ClusterNodes { .map(|index| &self.nodes[index]) .inspect(|node| { if let Some(node) = node.contact_info() { - if let Ok(addr) = node.tvu() { + if let Ok(addr) = node.tvu(Protocol::UDP) { addrs.entry(addr).or_insert(*node.pubkey()); } if !drop_redundant_turbine_path { diff --git a/dos/src/main.rs b/dos/src/main.rs index 3669e0df46..5834607999 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -438,7 +438,7 @@ fn get_target( info!("{:?}", node.gossip()); target = match mode { Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())), - Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())), + Mode::Tvu => Some((*node.pubkey(), node.tvu(Protocol::UDP).unwrap())), Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())), Mode::Tpu => Some((*node.pubkey(), node.tpu(protocol).unwrap())), Mode::TpuForwards => { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2e590620f3..f086fe2394 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -847,7 +847,7 @@ impl ClusterInfo { self.addr_to_string(&ip_addr, &node.tpu_vote().ok()), self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()), self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()), - self.addr_to_string(&ip_addr, &node.tvu().ok()), + self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()), self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()), self.addr_to_string(&ip_addr, &node.repair().ok()), self.addr_to_string(&ip_addr, &node.serve_repair().ok()), @@ -1314,7 +1314,8 @@ impl ClusterInfo { self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) .get_nodes_contact_info() .filter(|node| { - node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.tvu()) + node.pubkey() != &self_pubkey + && self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP)) }) .cloned() .collect() @@ -1329,7 +1330,7 @@ impl ClusterInfo { .filter(|node| { node.pubkey() != &self_pubkey && node.shred_version() == self_shred_version - && self.check_socket_addr_space(&node.tvu()) + && self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP)) }) .cloned() .collect() @@ -1346,7 +1347,7 @@ impl ClusterInfo { .filter(|node| { node.pubkey() != &self_pubkey && node.shred_version() == self_shred_version - && self.check_socket_addr_space(&node.tvu()) + && self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP)) && self.check_socket_addr_space(&node.serve_repair()) && match gossip_crds.get::<&LowestSlot>(*node.pubkey()) { None => true, // fallback to legacy behavior @@ -1361,7 +1362,7 @@ impl ClusterInfo { ![ node.tpu(contact_info::Protocol::UDP), node.gossip(), - node.tvu(), + node.tvu(contact_info::Protocol::UDP), ] .into_iter() .all(|addr| { diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 5bb7a53f45..12d3e95a61 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -34,8 +34,9 @@ const SOCKET_TAG_TPU_QUIC: u8 = 8; const SOCKET_TAG_TPU_VOTE: u8 = 9; const SOCKET_TAG_TVU: u8 = 10; const SOCKET_TAG_TVU_FORWARDS: u8 = 11; -const_assert_eq!(SOCKET_CACHE_SIZE, 12); -const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TVU_FORWARDS as usize + 1usize; +const SOCKET_TAG_TVU_QUIC: u8 = 12; +const_assert_eq!(SOCKET_CACHE_SIZE, 13); +const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TVU_QUIC as usize + 1usize; #[derive(Debug, Error)] pub enum Error { @@ -223,7 +224,7 @@ impl ContactInfo { SOCKET_TAG_TPU_FORWARDS_QUIC ); get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE); - get_socket!(tvu, SOCKET_TAG_TVU); + get_socket!(tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC); get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS); set_socket!(set_gossip, SOCKET_TAG_GOSSIP); @@ -238,7 +239,7 @@ impl ContactInfo { SOCKET_TAG_TPU_FORWARDS_QUIC ); set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE); - set_socket!(set_tvu, SOCKET_TAG_TVU); + set_socket!(set_tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC); set_socket!(set_tvu_forwards, SOCKET_TAG_TVU_FORWARDS); remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR); @@ -248,7 +249,7 @@ impl ContactInfo { SOCKET_TAG_TPU_FORWARDS, SOCKET_TAG_TPU_FORWARDS_QUIC ); - remove_socket!(remove_tvu, SOCKET_TAG_TVU); + remove_socket!(remove_tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC); remove_socket!(remove_tvu_forwards, SOCKET_TAG_TVU_FORWARDS); #[cfg(test)] @@ -740,7 +741,14 @@ mod tests { node.tpu_vote().ok().as_ref(), sockets.get(&SOCKET_TAG_TPU_VOTE) ); - assert_eq!(node.tvu().ok().as_ref(), sockets.get(&SOCKET_TAG_TVU)); + assert_eq!( + node.tvu(Protocol::UDP).ok().as_ref(), + sockets.get(&SOCKET_TAG_TVU) + ); + assert_eq!( + node.tvu(Protocol::QUIC).ok().as_ref(), + sockets.get(&SOCKET_TAG_TVU_QUIC) + ); assert_eq!( node.tvu_forwards().ok().as_ref(), sockets.get(&SOCKET_TAG_TVU_FORWARDS) @@ -827,7 +835,14 @@ mod tests { ) ); assert_eq!(old.tpu_vote().unwrap(), node.tpu_vote().unwrap()); - assert_eq!(old.tvu().unwrap(), node.tvu().unwrap()); + assert_eq!( + old.tvu(Protocol::QUIC).unwrap(), + node.tvu(Protocol::QUIC).unwrap() + ); + assert_eq!( + old.tvu(Protocol::UDP).unwrap(), + node.tvu(Protocol::UDP).unwrap() + ); assert_eq!(old.tvu_forwards().unwrap(), node.tvu_forwards().unwrap()); } diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index 5d27d4c10d..25e0b3624a 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -195,7 +195,7 @@ impl LegacyContactInfo { } get_socket!(gossip); - get_socket!(tvu); + get_socket!(@quic tvu); get_socket!(tvu_forwards); get_socket!(repair); get_socket!(@quic tpu); @@ -263,7 +263,7 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo { Ok(Self { id: *node.pubkey(), gossip: unwrap_socket!(gossip), - tvu: unwrap_socket!(tvu), + tvu: unwrap_socket!(tvu, Protocol::UDP), tvu_forwards: unwrap_socket!(tvu_forwards), repair: unwrap_socket!(repair), tpu: unwrap_socket!(tpu, Protocol::UDP), diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index ff6261ac6a..aab7bdee6b 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -6,9 +6,9 @@ use { rayon::iter::*, solana_gossip::{ cluster_info::{ClusterInfo, Node}, + contact_info::{LegacyContactInfo as ContactInfo, Protocol}, crds::Cursor, gossip_service::GossipService, - legacy_contact_info::LegacyContactInfo as ContactInfo, }, solana_perf::packet::Packet, solana_runtime::bank_forks::BankForks, @@ -136,7 +136,7 @@ fn retransmit_to( } else { peers .iter() - .filter_map(|peer| peer.tvu().ok()) + .filter_map(|peer| peer.tvu(Protocol::UDP).ok()) .filter(|addr| socket_addr_space.check(addr)) .collect() }; diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index d3133ab323..32b77fd394 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -105,7 +105,7 @@ impl From for AdminRpcContactInfo { id: node.pubkey().to_string(), last_updated_timestamp: node.wallclock(), gossip: unwrap_socket!(gossip), - tvu: unwrap_socket!(tvu), + tvu: unwrap_socket!(tvu, Protocol::UDP), tvu_forwards: unwrap_socket!(tvu_forwards), repair: unwrap_socket!(repair), tpu: unwrap_socket!(tpu, Protocol::UDP), diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 2248965f9a..c2777189db 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -95,7 +95,7 @@ fn verify_reachable_ports( if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter()); } - if verify_address(&node.info.tvu().ok()) { + if verify_address(&node.info.tvu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tvu.iter()); udp_sockets.extend(node.sockets.broadcast.iter()); udp_sockets.extend(node.sockets.retransmit_sockets.iter());