From 0de8ccfda93e3849a6be7b577ef8a1bf3f9cb817 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 15 Aug 2023 17:09:09 +0000 Subject: [PATCH] adds socket address for repair service over QUIC (#32834) Working towards migrating repair to QUIC. --- core/src/repair/ancestor_hashes_service.rs | 14 +++++----- core/src/repair/serve_repair.rs | 17 +++++++----- dos/src/main.rs | 4 ++- gossip/src/cluster_info.rs | 24 ++++++++++++++-- gossip/src/contact_info.rs | 32 +++++++++++++++++++--- gossip/src/legacy_contact_info.rs | 13 +++++---- validator/src/admin_rpc_service.rs | 6 ++-- validator/src/bootstrap.rs | 2 +- 8 files changed, 80 insertions(+), 32 deletions(-) diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index e5d24df75..639a86845 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -869,7 +869,7 @@ mod test { }, solana_gossip::{ cluster_info::{ClusterInfo, Node}, - contact_info::ContactInfo, + contact_info::{ContactInfo, Protocol}, }, solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce}, solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, @@ -1400,7 +1400,7 @@ mod test { nonce, ); if let Ok(request_bytes) = request_bytes { - let socket = responder_info.serve_repair().unwrap(); + let socket = responder_info.serve_repair(Protocol::UDP).unwrap(); let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket); } } @@ -1470,7 +1470,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair().unwrap()); + .set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1511,7 +1511,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair().unwrap()); + .set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap()); let AncestorRequestDecision { slot, request_type, @@ -1571,7 +1571,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair().unwrap()); + .set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap()); let AncestorRequestDecision { slot, request_type, @@ -1947,7 +1947,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair().unwrap()); + .set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap()); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -2010,7 +2010,7 @@ mod test { let packet = &mut response_packet[0]; packet .meta_mut() - .set_socket_addr(&responder_info.serve_repair().unwrap()); + .set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap()); let AncestorRequestDecision { slot, request_type, diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 7862012f7..b7196575f 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -18,7 +18,7 @@ use { }, solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, - legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo}, + contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol}, ping_pong::{self, PingCache, Pong}, weighted_shuffle::WeightedShuffle, }, @@ -214,7 +214,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; /// Window protocol messages #[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)] -#[frozen_abi(digest = "7vZyACjc13qQYWUsqWbdidLXR3uNXpmqUZaKeV3gKuY2")] +#[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")] pub enum RepairProtocol { LegacyWindowIndex(LegacyContactInfo, Slot, u64), LegacyHighestWindowIndex(LegacyContactInfo, Slot, u64), @@ -350,7 +350,7 @@ impl RepairPeers { .iter() .zip(weights) .filter_map(|(peer, &weight)| { - let addr = peer.serve_repair().ok()?; + let addr = peer.serve_repair(Protocol::UDP).ok()?; Some(((*peer.pubkey(), addr), weight)) }) .unzip(); @@ -1078,7 +1078,7 @@ impl ServeRepair { .shuffle(&mut rand::thread_rng()) .map(|i| index[i]) .filter_map(|i| { - let addr = repair_peers[i].serve_repair().ok()?; + let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?; Some((*repair_peers[i].pubkey(), addr)) }) .take(get_ancestor_hash_repair_sample_size()) @@ -1102,7 +1102,10 @@ impl ServeRepair { .unzip(); let k = WeightedIndex::new(weights)?.sample(&mut rand::thread_rng()); let n = index[k]; - Ok((*repair_peers[n].pubkey(), repair_peers[n].serve_repair()?)) + Ok(( + *repair_peers[n].pubkey(), + repair_peers[n].serve_repair(Protocol::UDP)?, + )) } pub(crate) fn map_repair_request( @@ -1930,8 +1933,8 @@ mod tests { &identity_keypair, ) .unwrap(); - assert_eq!(nxt.serve_repair().unwrap(), serve_repair_addr); - assert_eq!(rv.0, nxt.serve_repair().unwrap()); + assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr); + assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap()); let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243); let mut nxt = ContactInfo::new( diff --git a/dos/src/main.rs b/dos/src/main.rs index 20ccd4eed..2d663636d 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -449,7 +449,9 @@ fn get_target( Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap())) } Mode::Repair => todo!("repair socket is not gossiped anymore!"), - Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())), + Mode::ServeRepair => { + Some((*node.pubkey(), node.serve_repair(Protocol::UDP).unwrap())) + } Mode::Rpc => None, }; break; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index f33362d69..e91f85796 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "4jtxvWyeFwfDQTTGh4yJLyukALzRNVJ9WNnCbFeJUmaS")] +#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -847,7 +847,7 @@ impl ClusterInfo { self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()), self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()), self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC).ok()), - self.addr_to_string(&ip_addr, &node.serve_repair().ok()), + self.addr_to_string(&ip_addr, &node.serve_repair(contact_info::Protocol::UDP).ok()), node.shred_version(), )) } @@ -1345,7 +1345,7 @@ impl ClusterInfo { node.pubkey() != &self_pubkey && node.shred_version() == self_shred_version && self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP)) - && self.check_socket_addr_space(&node.serve_repair()) + && self.check_socket_addr_space(&node.serve_repair(contact_info::Protocol::UDP)) && match gossip_crds.get::<&LowestSlot>(*node.pubkey()) { None => true, // fallback to legacy behavior Some(lowest_slot) => lowest_slot.lowest <= slot, @@ -2799,6 +2799,7 @@ pub struct Sockets { pub repair: UdpSocket, pub retransmit_sockets: Vec, pub serve_repair: UdpSocket, + pub serve_repair_quic: UdpSocket, pub ancestor_hashes_requests: UdpSocket, pub tpu_quic: UdpSocket, pub tpu_forwards_quic: UdpSocket, @@ -2839,6 +2840,7 @@ impl Node { let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()]; let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap(); let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap(); let mut info = ContactInfo::new( @@ -2871,6 +2873,11 @@ impl Node { serve_repair.local_addr().unwrap(), "serve-repair" ); + set_socket!( + set_serve_repair_quic, + serve_repair_quic.local_addr().unwrap(), + "serve-repair QUIC" + ); Node { info, sockets: Sockets { @@ -2885,6 +2892,7 @@ impl Node { repair, retransmit_sockets: vec![retransmit_socket], serve_repair, + serve_repair_quic, ancestor_hashes_requests, tpu_quic, tpu_forwards_quic, @@ -2930,6 +2938,7 @@ impl Node { let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); + let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); let (_, broadcast) = Self::bind(bind_ip_addr, port_range); let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); @@ -2959,6 +2968,11 @@ impl Node { set_socket!(set_rpc, rpc_port, "RPC"); set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub"); set_socket!(set_serve_repair, serve_repair_port, "serve-repair"); + set_socket!( + set_serve_repair_quic, + serve_repair_quic_port, + "serve-repair QUIC" + ); trace!("new ContactInfo: {:?}", info); Node { @@ -2975,6 +2989,7 @@ impl Node { repair, retransmit_sockets: vec![retransmit_socket], serve_repair, + serve_repair_quic, ancestor_hashes_requests, tpu_quic, tpu_forwards_quic, @@ -3023,6 +3038,7 @@ impl Node { let (_, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); + let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); let (_, broadcast) = multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind"); @@ -3044,6 +3060,7 @@ impl Node { ); let _ = info.set_tpu_vote((addr, tpu_vote_port)); let _ = info.set_serve_repair((addr, serve_repair_port)); + let _ = info.set_serve_repair((addr, serve_repair_quic_port)); trace!("new ContactInfo: {:?}", info); Node { @@ -3059,6 +3076,7 @@ impl Node { repair, retransmit_sockets, serve_repair, + serve_repair_quic, ip_echo: Some(ip_echo), ancestor_hashes_requests, tpu_quic, diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 5833e737e..3c9860b49 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -29,6 +29,7 @@ const SOCKET_TAG_GOSSIP: u8 = 0; const SOCKET_TAG_RPC: u8 = 2; const SOCKET_TAG_RPC_PUBSUB: u8 = 3; const SOCKET_TAG_SERVE_REPAIR: u8 = 4; +const SOCKET_TAG_SERVE_REPAIR_QUIC: u8 = 1; const SOCKET_TAG_TPU: u8 = 5; const SOCKET_TAG_TPU_FORWARDS: u8 = 6; const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7; @@ -224,7 +225,11 @@ impl ContactInfo { get_socket!(gossip, SOCKET_TAG_GOSSIP); get_socket!(rpc, SOCKET_TAG_RPC); get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); - get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR); + get_socket!( + serve_repair, + SOCKET_TAG_SERVE_REPAIR, + SOCKET_TAG_SERVE_REPAIR_QUIC + ); get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); get_socket!( tpu_forwards, @@ -238,6 +243,7 @@ impl ContactInfo { set_socket!(set_rpc, SOCKET_TAG_RPC); set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR); + set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC); set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); set_socket!( set_tpu_forwards, @@ -248,7 +254,11 @@ impl ContactInfo { set_socket!(set_tvu, SOCKET_TAG_TVU); set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC); - remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR); + remove_socket!( + remove_serve_repair, + SOCKET_TAG_SERVE_REPAIR, + SOCKET_TAG_SERVE_REPAIR_QUIC + ); remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); remove_socket!( remove_tpu_forwards, @@ -370,6 +380,8 @@ impl ContactInfo { node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT)) .unwrap(); node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap(); + node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006)) + .unwrap(); node } @@ -392,6 +404,7 @@ impl ContactInfo { node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT)) .unwrap(); node.set_serve_repair((addr, port + 8)).unwrap(); + node.set_serve_repair_quic((addr, port + 4)).unwrap(); node } } @@ -733,9 +746,13 @@ mod tests { sockets.get(&SOCKET_TAG_RPC_PUBSUB) ); assert_eq!( - node.serve_repair().ok().as_ref(), + node.serve_repair(Protocol::UDP).ok().as_ref(), sockets.get(&SOCKET_TAG_SERVE_REPAIR) ); + assert_eq!( + node.serve_repair(Protocol::QUIC).ok().as_ref(), + sockets.get(&SOCKET_TAG_SERVE_REPAIR_QUIC) + ); assert_eq!( node.tpu(Protocol::UDP).ok().as_ref(), sockets.get(&SOCKET_TAG_TPU) @@ -813,7 +830,14 @@ mod tests { assert_eq!(old.gossip().unwrap(), node.gossip().unwrap()); assert_eq!(old.rpc().unwrap(), node.rpc().unwrap()); assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap()); - assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap()); + assert_eq!( + old.serve_repair(Protocol::QUIC).unwrap(), + node.serve_repair(Protocol::QUIC).unwrap() + ); + assert_eq!( + old.serve_repair(Protocol::UDP).unwrap(), + node.serve_repair(Protocol::UDP).unwrap() + ); assert_eq!( old.tpu(Protocol::QUIC).unwrap(), node.tpu(Protocol::QUIC).unwrap() diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index 11df5f918..94b992367 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -27,7 +27,8 @@ pub struct LegacyContactInfo { tvu: SocketAddr, /// TVU over QUIC protocol. tvu_quic: SocketAddr, - unused: SocketAddr, + /// repair service over QUIC protocol. + serve_repair_quic: SocketAddr, /// transactions address tpu: SocketAddr, /// address to forward unprocessed transactions to @@ -123,7 +124,7 @@ impl Default for LegacyContactInfo { gossip: socketaddr_any!(), tvu: socketaddr_any!(), tvu_quic: socketaddr_any!(), - unused: socketaddr_any!(), + serve_repair_quic: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), tpu_vote: socketaddr_any!(), @@ -143,7 +144,7 @@ impl LegacyContactInfo { gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234), tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235), tvu_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1236), - unused: socketaddr!(Ipv4Addr::LOCALHOST, 1237), + serve_repair_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1237), tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238), tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239), tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240), @@ -210,7 +211,7 @@ impl LegacyContactInfo { get_socket!(tpu_vote); get_socket!(rpc); get_socket!(rpc_pubsub); - get_socket!(serve_repair); + get_socket!(serve_repair, serve_repair_quic); set_socket!(set_gossip, gossip); set_socket!(set_rpc, rpc); @@ -272,13 +273,13 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo { gossip: unwrap_socket!(gossip), tvu: unwrap_socket!(tvu, Protocol::UDP), tvu_quic: unwrap_socket!(tvu, Protocol::QUIC), - unused: SOCKET_ADDR_UNSPECIFIED, + serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC), tpu: unwrap_socket!(tpu, Protocol::UDP), tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP), tpu_vote: unwrap_socket!(tpu_vote), rpc: unwrap_socket!(rpc), rpc_pubsub: unwrap_socket!(rpc_pubsub), - serve_repair: unwrap_socket!(serve_repair), + serve_repair: unwrap_socket!(serve_repair, Protocol::UDP), wallclock: node.wallclock(), shred_version: node.shred_version(), }) diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 32f899ef3..106af6715 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -72,7 +72,7 @@ pub struct AdminRpcContactInfo { pub gossip: SocketAddr, pub tvu: SocketAddr, pub tvu_quic: SocketAddr, - pub unused: SocketAddr, + pub serve_repair_quic: SocketAddr, pub tpu: SocketAddr, pub tpu_forwards: SocketAddr, pub tpu_vote: SocketAddr, @@ -104,13 +104,13 @@ impl From for AdminRpcContactInfo { gossip: unwrap_socket!(gossip), tvu: unwrap_socket!(tvu, Protocol::UDP), tvu_quic: unwrap_socket!(tvu, Protocol::QUIC), - unused: SOCKET_ADDR_UNSPECIFIED, + serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC), tpu: unwrap_socket!(tpu, Protocol::UDP), tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP), tpu_vote: unwrap_socket!(tpu_vote), rpc: unwrap_socket!(rpc), rpc_pubsub: unwrap_socket!(rpc_pubsub), - serve_repair: unwrap_socket!(serve_repair), + serve_repair: unwrap_socket!(serve_repair, Protocol::UDP), shred_version: node.shred_version(), } } diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 9b394f192..a94256489 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -81,7 +81,7 @@ fn verify_reachable_ports( }; let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; - if verify_address(&node.info.serve_repair().ok()) { + if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) { udp_sockets.push(&node.sockets.serve_repair); } if verify_address(&node.info.tpu(Protocol::UDP).ok()) {