From e01269a9de52b9c4d122af5bacb3c053e791e76d Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 11 Sep 2023 22:22:04 +0000 Subject: [PATCH] sends repair requests over QUIC protocol (#33016) The commit implements client-side of serve-repair and ancestor-hash-service over QUIC protocol. --- core/src/repair/ancestor_hashes_service.rs | 107 +++++++++++++++++++-- core/src/repair/quic_endpoint.rs | 1 - core/src/repair/repair_service.rs | 15 ++- core/src/repair/result.rs | 6 +- core/src/repair/serve_repair.rs | 57 +++++++++-- core/src/shred_fetch_stage.rs | 100 +++++++++++++++++-- core/src/tvu.rs | 11 ++- core/src/validator.rs | 45 ++++++++- core/src/window_service.rs | 6 ++ 9 files changed, 313 insertions(+), 35 deletions(-) diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index 29f0886258..3214c89e14 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -7,12 +7,15 @@ use { }, outstanding_requests::OutstandingRequests, packet_threshold::DynamicPacketToProcessThreshold, + quic_endpoint::LocalRequest, repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup}, + request_response::RequestResponse, serve_repair::{ self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair, }, }, replay_stage::DUPLICATE_THRESHOLD, + shred_fetch_stage::receive_repair_quic_packets, }, bincode::serialize, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -36,7 +39,7 @@ use { std::{ collections::HashSet, io::{Cursor, Read}, - net::UdpSocket, + net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, @@ -44,6 +47,7 @@ use { thread::{self, sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, + tokio::sync::mpsc::Sender as AsyncSender, }; #[derive(Debug, PartialEq, Eq)] @@ -149,6 +153,7 @@ impl AncestorHashesService { exit: Arc, blockstore: Arc, ancestor_hashes_request_socket: Arc, + quic_endpoint_sender: AsyncSender, repair_info: RepairInfo, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { @@ -157,16 +162,31 @@ impl AncestorHashesService { let t_receiver = streamer::receiver( ancestor_hashes_request_socket.clone(), exit.clone(), - response_sender, + response_sender.clone(), Recycler::default(), Arc::new(StreamerReceiveStats::new( "ancestor_hashes_response_receiver", )), Duration::from_millis(1), // coalesce - false, - None, + false, // use_pinned_memory + None, // in_vote_only_mode ); + let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded(); + let t_receiver_quic = { + let exit = exit.clone(); + Builder::new() + .name(String::from("solAncHashQuic")) + .spawn(|| { + receive_repair_quic_packets( + quic_endpoint_response_receiver, + response_sender, + Recycler::default(), + exit, + ) + }) + .unwrap() + }; let ancestor_hashes_request_statuses: Arc> = Arc::new(DashMap::new()); let (retryable_slots_sender, retryable_slots_receiver) = unbounded(); @@ -188,14 +208,22 @@ impl AncestorHashesService { let t_ancestor_requests = Self::run_manage_ancestor_requests( ancestor_hashes_request_statuses, ancestor_hashes_request_socket, + quic_endpoint_sender, + quic_endpoint_response_sender, repair_info, outstanding_requests, exit, ancestor_hashes_replay_update_receiver, retryable_slots_receiver, ); - let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests]; - Self { thread_hdls } + Self { + thread_hdls: vec![ + t_receiver, + t_receiver_quic, + t_ancestor_hashes_responses, + t_ancestor_requests, + ], + } } pub(crate) fn join(self) -> thread::Result<()> { @@ -551,6 +579,8 @@ impl AncestorHashesService { fn run_manage_ancestor_requests( ancestor_hashes_request_statuses: Arc>, ancestor_hashes_request_socket: Arc, + quic_endpoint_sender: AsyncSender, + quic_endpoint_response_sender: Sender<(SocketAddr, Vec)>, repair_info: RepairInfo, outstanding_requests: Arc>, exit: Arc, @@ -587,10 +617,11 @@ impl AncestorHashesService { if exit.load(Ordering::Relaxed) { return; } - Self::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -612,6 +643,8 @@ impl AncestorHashesService { fn manage_ancestor_requests( ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, + quic_endpoint_sender: &AsyncSender, + quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, repair_info: &RepairInfo, outstanding_requests: &RwLock, ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, @@ -710,6 +743,8 @@ impl AncestorHashesService { if Self::initiate_ancestor_hashes_requests_for_duplicate_slot( ancestor_hashes_request_statuses, ancestor_hashes_request_socket, + quic_endpoint_sender, + quic_endpoint_response_sender, &repair_info.cluster_slots, serve_repair, &repair_info.repair_validators, @@ -787,6 +822,8 @@ impl AncestorHashesService { fn initiate_ancestor_hashes_requests_for_duplicate_slot( ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, + quic_endpoint_sender: &AsyncSender, + quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, cluster_slots: &ClusterSlots, serve_repair: &ServeRepair, repair_validators: &Option>, @@ -811,10 +848,11 @@ impl AncestorHashesService { repair_stats .ancestor_requests .update(pubkey, duplicate_slot, 0); + let ancestor_hashes_repair_type = AncestorHashesRepairType(duplicate_slot); let nonce = outstanding_requests .write() .unwrap() - .add_request(AncestorHashesRepairType(duplicate_slot), timestamp()); + .add_request(ancestor_hashes_repair_type, timestamp()); let Ok(request_bytes) = serve_repair.ancestor_repair_request_bytes( identity_keypair, pubkey, @@ -827,7 +865,21 @@ impl AncestorHashesService { Protocol::UDP => { let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr); } - Protocol::QUIC => todo!(), + Protocol::QUIC => { + let num_expected_responses = + usize::try_from(ancestor_hashes_repair_type.num_expected_responses()) + .unwrap(); + let request = LocalRequest { + remote_address: *socket_addr, + bytes: request_bytes, + num_expected_responses, + response_sender: quic_endpoint_response_sender.clone(), + }; + if quic_endpoint_sender.blocking_send(request).is_err() { + // The receiver end of the channel is disconnected. + break; + } + } } } @@ -1441,10 +1493,14 @@ mod test { repair_validators, .. } = repair_info; - + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); + let (quic_endpoint_sender, _quic_endpoint_sender) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1494,6 +1550,8 @@ mod test { AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1555,6 +1613,8 @@ mod test { AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1640,10 +1700,15 @@ mod test { } = repair_info; cluster_info.insert_info(responder_node.info); bank_forks.read().unwrap().root_bank().epoch_schedule(); + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); + let (quic_endpoint_sender, _quic_endpoint_sender) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); // 1) No signals from ReplayStage, no requests should be made AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1686,6 +1751,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1725,6 +1792,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1756,6 +1825,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1793,6 +1864,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1833,6 +1906,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1989,10 +2064,15 @@ mod test { &leader_schedule_cache, ); + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); + let (quic_endpoint_sender, _quic_endpoint_sender) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); // Simulate making a request AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -2088,6 +2168,9 @@ mod test { &repair_info.ancestor_duplicate_slots_sender, &retryable_slots_sender, ); + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); + let (quic_endpoint_sender, _quic_endpoint_sender) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); // Simulate ancestor request thread getting the retry signal assert!(dead_slot_pool.is_empty()); @@ -2096,6 +2179,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -2134,6 +2219,8 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 03dfa42bd2..f7b445011c 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use { bincode::Options, crossbeam_channel::Sender, diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 592500929d..c7cfab03f8 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -13,6 +13,7 @@ use { ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, duplicate_repair_status::AncestorDuplicateSlotToRepair, outstanding_requests::OutstandingRequests, + quic_endpoint::LocalRequest, repair_weight::RepairWeight, serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, }, @@ -46,6 +47,7 @@ use { thread::{self, sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, + tokio::sync::mpsc::Sender as AsyncSender, }; // Time to defer repair requests to allow for turbine propagation @@ -239,6 +241,8 @@ impl RepairService { exit: Arc, repair_socket: Arc, ancestor_hashes_socket: Arc, + quic_endpoint_sender: AsyncSender, + quic_endpoint_response_sender: CrossbeamSender<(SocketAddr, Vec)>, repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, @@ -250,6 +254,7 @@ impl RepairService { let blockstore = blockstore.clone(); let exit = exit.clone(); let repair_info = repair_info.clone(); + let quic_endpoint_sender = quic_endpoint_sender.clone(); Builder::new() .name("solRepairSvc".to_string()) .spawn(move || { @@ -257,6 +262,8 @@ impl RepairService { &blockstore, &exit, &repair_socket, + &quic_endpoint_sender, + &quic_endpoint_response_sender, repair_info, verified_vote_receiver, &outstanding_requests, @@ -271,6 +278,7 @@ impl RepairService { exit, blockstore, ancestor_hashes_socket, + quic_endpoint_sender, repair_info, ancestor_hashes_replay_update_receiver, ); @@ -281,10 +289,13 @@ impl RepairService { } } + #[allow(clippy::too_many_arguments)] fn run( blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, + quic_endpoint_sender: &AsyncSender, + quic_endpoint_response_sender: &CrossbeamSender<(SocketAddr, Vec)>, repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, @@ -433,9 +444,11 @@ impl RepairService { &repair_info.repair_validators, &mut outstanding_requests, identity_keypair, + quic_endpoint_sender, + quic_endpoint_response_sender, repair_protocol, ) - .ok()?; + .ok()??; Some((req, to)) }) .collect() diff --git a/core/src/repair/result.rs b/core/src/repair/result.rs index b222817704..86329fda31 100644 --- a/core/src/repair/result.rs +++ b/core/src/repair/result.rs @@ -26,11 +26,13 @@ pub enum Error { #[error(transparent)] InvalidContactInfo(#[from] contact_info::Error), #[error(transparent)] + RepairVerify(#[from] RepairVerifyError), + #[error("Send Error")] + SendError, + #[error(transparent)] Serialize(#[from] std::boxed::Box), #[error(transparent)] WeightedIndex(#[from] rand::distributions::weighted::WeightedError), - #[error(transparent)] - RepairVerify(#[from] RepairVerifyError), } pub type Result = std::result::Result; diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 0610e2ea7a..8ab42c2882 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -3,7 +3,7 @@ use { cluster_slots_service::cluster_slots::ClusterSlots, repair::{ duplicate_repair_status::get_ancestor_hash_repair_sample_size, - quic_endpoint::RemoteRequest, + quic_endpoint::{LocalRequest, RemoteRequest}, repair_response, repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS}, request_response::RequestResponse, @@ -11,7 +11,7 @@ use { }, }, bincode::{serialize, Options}, - crossbeam_channel::{Receiver, RecvTimeoutError}, + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, lru::LruCache, rand::{ distributions::{Distribution, WeightedError, WeightedIndex}, @@ -59,7 +59,7 @@ use { thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, - tokio::sync::oneshot::Sender as OneShotSender, + tokio::sync::{mpsc::Sender as AsyncSender, oneshot::Sender as OneShotSender}, }; /// the number of slots to respond with when responding to `Orphan` requests @@ -132,6 +132,7 @@ impl RequestResponse for ShredRepairType { } } +#[derive(Copy, Clone)] pub struct AncestorHashesRepairType(pub Slot); impl AncestorHashesRepairType { pub fn slot(&self) -> Slot { @@ -339,7 +340,6 @@ pub(crate) struct RepairPeers { struct Node { pubkey: Pubkey, serve_repair: SocketAddr, - #[allow(dead_code)] serve_repair_quic: SocketAddr, } @@ -1027,6 +1027,7 @@ impl ServeRepair { Self::repair_proto_to_bytes(&request, keypair) } + #[allow(clippy::too_many_arguments)] pub(crate) fn repair_request( &self, cluster_slots: &ClusterSlots, @@ -1036,8 +1037,10 @@ impl ServeRepair { repair_validators: &Option>, outstanding_requests: &mut OutstandingShredRepairs, identity_keypair: &Keypair, + quic_endpoint_sender: &AsyncSender, + quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, repair_protocol: Protocol, - ) -> Result<(SocketAddr, Vec)> { + ) -> Result)>> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location let slot = repair_request.slot(); @@ -1067,8 +1070,21 @@ impl ServeRepair { repair_request ); match repair_protocol { - Protocol::UDP => Ok((peer.serve_repair, out)), - Protocol::QUIC => todo!(), + Protocol::UDP => Ok(Some((peer.serve_repair, out))), + Protocol::QUIC => { + let num_expected_responses = + usize::try_from(repair_request.num_expected_responses()).unwrap(); + let request = LocalRequest { + remote_address: peer.serve_repair_quic, + bytes: out, + num_expected_responses, + response_sender: quic_endpoint_response_sender.clone(), + }; + quic_endpoint_sender + .blocking_send(request) + .map_err(|_| Error::SendError) + .map(|()| None) + } } } @@ -1970,6 +1986,10 @@ mod tests { ); let identity_keypair = cluster_info.keypair().clone(); let mut outstanding_requests = OutstandingShredRepairs::default(); + let (quic_endpoint_sender, _quic_endpoint_receiver) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = + crossbeam_channel::unbounded(); let rv = serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), @@ -1978,6 +1998,8 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ); assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers))); @@ -2009,8 +2031,11 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ) + .unwrap() .unwrap(); assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr); assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap()); @@ -2046,8 +2071,11 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ) + .unwrap() .unwrap(); if rv.0 == serve_repair_addr { one = true; @@ -2294,7 +2322,10 @@ mod tests { let cluster_slots = ClusterSlots::default(); let cluster_info = Arc::new(new_test_cluster_info()); let me = cluster_info.my_contact_info(); - + let (quic_endpoint_sender, _quic_endpoint_receiver) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); + let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = + crossbeam_channel::unbounded(); // Insert two peers on the network let contact_info2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); @@ -2325,6 +2356,8 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ), Err(Error::ClusterInfo(ClusterInfoError::NoPeers)) @@ -2345,9 +2378,11 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ), - Ok(_) + Ok(Some(_)) ); // Using no known validators should default to all @@ -2369,9 +2404,11 @@ mod tests { &None, &mut OutstandingShredRepairs::default(), &identity_keypair, + &quic_endpoint_sender, + &quic_endpoint_response_sender, Protocol::UDP, // repair_protocol ), - Ok(_) + Ok(Some(_)) ); } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index fa49afb522..62733953cc 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -162,8 +162,9 @@ impl ShredFetchStage { #[allow(clippy::too_many_arguments)] pub(crate) fn new( sockets: Vec>, - quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, repair_socket: Arc, + repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec)>, sender: Sender, shred_version: u16, bank_forks: Arc>, @@ -202,13 +203,55 @@ impl ShredFetchStage { tvu_threads.extend(repair_receiver); tvu_threads.push(tvu_filter); tvu_threads.push(repair_handler); - + // Repair shreds fetched over QUIC protocol. + { + let (packet_sender, packet_receiver) = unbounded(); + let bank_forks = bank_forks.clone(); + let recycler = recycler.clone(); + let exit = exit.clone(); + let sender = sender.clone(); + let turbine_disabled = turbine_disabled.clone(); + tvu_threads.extend([ + Builder::new() + .name("solTvuRecvRpr".to_string()) + .spawn(|| { + receive_repair_quic_packets( + repair_quic_endpoint_receiver, + packet_sender, + recycler, + exit, + ) + }) + .unwrap(), + Builder::new() + .name("solTvuFetchRpr".to_string()) + .spawn(move || { + Self::modify_packets( + packet_receiver, + sender, + &bank_forks, + shred_version, + "shred_fetch_repair_quic", + PacketFlags::REPAIR, + None, // repair_context; no ping packets! + turbine_disabled, + ) + }) + .unwrap(), + ]); + } + // Turbine shreds fetched over QUIC protocol. let (packet_sender, packet_receiver) = unbounded(); tvu_threads.extend([ Builder::new() .name("solTvuRecvQuic".to_string()) .spawn(|| { - receive_quic_datagrams(quic_endpoint_receiver, packet_sender, recycler, exit) + receive_quic_datagrams( + turbine_quic_endpoint_receiver, + packet_sender, + recycler, + exit, + ) }) .unwrap(), Builder::new() @@ -241,14 +284,14 @@ impl ShredFetchStage { } fn receive_quic_datagrams( - quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, sender: Sender, recycler: PacketBatchRecycler, exit: Arc, ) { const RECV_TIMEOUT: Duration = Duration::from_secs(1); while !exit.load(Ordering::Relaxed) { - let entry = match quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) { + let entry = match turbine_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) { Ok(entry) => entry, Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => return, @@ -260,7 +303,7 @@ fn receive_quic_datagrams( }; let deadline = Instant::now() + PACKET_COALESCE_DURATION; let entries = std::iter::once(entry).chain( - std::iter::repeat_with(|| quic_endpoint_receiver.recv_deadline(deadline).ok()) + std::iter::repeat_with(|| turbine_quic_endpoint_receiver.recv_deadline(deadline).ok()) .while_some(), ); let size = entries @@ -285,6 +328,51 @@ fn receive_quic_datagrams( } } +pub(crate) fn receive_repair_quic_packets( + repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec)>, + sender: Sender, + recycler: PacketBatchRecycler, + exit: Arc, +) { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + while !exit.load(Ordering::Relaxed) { + let entry = match repair_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) { + Ok(entry) => entry, + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return, + }; + let mut packet_batch = + PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams"); + unsafe { + packet_batch.set_len(PACKETS_PER_BATCH); + }; + let deadline = Instant::now() + PACKET_COALESCE_DURATION; + let entries = std::iter::once(entry).chain( + std::iter::repeat_with(|| repair_quic_endpoint_receiver.recv_deadline(deadline).ok()) + .while_some(), + ); + let size = entries + .filter(|(_, bytes)| bytes.len() <= PACKET_DATA_SIZE) + .zip(packet_batch.iter_mut()) + .map(|((addr, bytes), packet)| { + *packet.meta_mut() = Meta { + size: bytes.len(), + addr: addr.ip(), + port: addr.port(), + flags: PacketFlags::REPAIR, + }; + packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); + }) + .count(); + if size > 0 { + packet_batch.truncate(size); + if sender.send(packet_batch).is_err() { + return; // The receiver end of the channel is disconnected. + } + } + } +} + #[cfg(test)] mod tests { use { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 1fbf211124..d3d57c1314 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -15,7 +15,7 @@ use { cost_update_service::CostUpdateService, drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, - repair::repair_service::RepairInfo, + repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo}, replay_stage::{ReplayStage, ReplayStageConfig}, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, @@ -138,6 +138,7 @@ impl Tvu { banking_tracer: Arc, turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + repair_quic_endpoint_sender: AsyncSender, ) -> Result { let TvuSockets { repair: repair_socket, @@ -151,10 +152,13 @@ impl Tvu { let repair_socket = Arc::new(repair_socket); let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket); let fetch_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); + let (repair_quic_endpoint_response_sender, repair_quic_endpoint_response_receiver) = + unbounded(); let fetch_stage = ShredFetchStage::new( fetch_sockets, turbine_quic_endpoint_receiver, repair_socket.clone(), + repair_quic_endpoint_response_receiver, fetch_sender, tvu_config.shred_version, bank_forks.clone(), @@ -209,6 +213,8 @@ impl Tvu { retransmit_sender, repair_socket, ancestor_hashes_socket, + repair_quic_endpoint_sender, + repair_quic_endpoint_response_sender, exit.clone(), repair_info, leader_schedule_cache.clone(), @@ -401,6 +407,8 @@ pub mod tests { let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) = tokio::sync::mpsc::channel(/*capacity:*/ 128); let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded(); + let (repair_quic_endpoint_sender, _repair_quic_endpoint_receiver) = + tokio::sync::mpsc::channel(/*buffer:*/ 128); //start cluster_info1 let cluster_info1 = ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified); @@ -484,6 +492,7 @@ pub mod tests { BankingTracer::new_disabled(), turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver, + repair_quic_endpoint_sender, ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index ec77b58612..80f06464bc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -16,7 +16,7 @@ use { }, ledger_metric_report_service::LedgerMetricReportService, poh_timing_report_service::PohTimingReportService, - repair::{serve_repair::ServeRepair, serve_repair_service::ServeRepairService}, + repair::{self, serve_repair::ServeRepair, serve_repair_service::ServeRepairService}, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, sigverify, @@ -132,6 +132,7 @@ use { }, strum::VariantNames, strum_macros::{Display, EnumString, EnumVariantNames, IntoStaticStr}, + tokio::runtime::Runtime as TokioRuntime, }; const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; @@ -463,8 +464,11 @@ pub struct Validator { accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, turbine_quic_endpoint: Endpoint, - turbine_quic_endpoint_runtime: Option, + turbine_quic_endpoint_runtime: Option, turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle, + repair_quic_endpoint: Endpoint, + repair_quic_endpoint_runtime: Option, + repair_quic_endpoint_join_handle: repair::quic_endpoint::AsyncTryJoinHandle, } impl Validator { @@ -1048,7 +1052,7 @@ impl Validator { serve_repair, // Incoming UDP repair requests are adapted into RemoteRequest // and also sent through the same channel. - repair_quic_endpoint_sender, + repair_quic_endpoint_sender.clone(), repair_quic_endpoint_receiver, blockstore.clone(), node.sockets.serve_repair, @@ -1149,7 +1153,7 @@ impl Validator { ) = solana_turbine::quic_endpoint::new_quic_endpoint( turbine_quic_endpoint_runtime .as_ref() - .map(tokio::runtime::Runtime::handle) + .map(TokioRuntime::handle) .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), &identity_keypair, node.sockets.tvu_quic, @@ -1161,6 +1165,30 @@ impl Validator { ) .unwrap(); + // Repair quic endpoint. + let repair_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("solRepairQuic") + .build() + .unwrap() + }); + let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) = + repair::quic_endpoint::new_quic_endpoint( + repair_quic_endpoint_runtime + .as_ref() + .map(TokioRuntime::handle) + .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), + &identity_keypair, + node.sockets.serve_repair_quic, + node.info + .serve_repair(Protocol::QUIC) + .expect("Operator must spin up node with valid QUIC serve-repair address") + .ip(), + repair_quic_endpoint_sender, + ) + .unwrap(); + let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -1213,6 +1241,7 @@ impl Validator { banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), turbine_quic_endpoint_receiver, + repair_quic_endpoint_sender, )?; let tpu = Tpu::new( @@ -1301,6 +1330,9 @@ impl Validator { turbine_quic_endpoint, turbine_quic_endpoint_runtime, turbine_quic_endpoint_join_handle, + repair_quic_endpoint, + repair_quic_endpoint_runtime, + repair_quic_endpoint_join_handle, }) } @@ -1410,9 +1442,14 @@ impl Validator { } self.gossip_service.join().expect("gossip_service"); + repair::quic_endpoint::close_quic_endpoint(&self.repair_quic_endpoint); self.serve_repair_service .join() .expect("serve_repair_service"); + self.repair_quic_endpoint_runtime + .map(|runtime| runtime.block_on(self.repair_quic_endpoint_join_handle)) + .transpose() + .unwrap(); self.stats_reporter_service .join() .expect("stats_reporter_service"); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7efe981275..a68a20e207 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,6 +7,7 @@ use { completed_data_sets_service::CompletedDataSetsSender, repair::{ ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, + quic_endpoint::LocalRequest, repair_response, repair_service::{ DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo, @@ -39,6 +40,7 @@ use { thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, + tokio::sync::mpsc::Sender as AsyncSender, }; type ShredPayload = Vec; @@ -325,6 +327,8 @@ impl WindowService { retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, + repair_quic_endpoint_sender: AsyncSender, + repair_quic_endpoint_response_sender: Sender<(SocketAddr, Vec)>, exit: Arc, repair_info: RepairInfo, leader_schedule_cache: Arc, @@ -344,6 +348,8 @@ impl WindowService { exit.clone(), repair_socket, ancestor_hashes_socket, + repair_quic_endpoint_sender, + repair_quic_endpoint_response_sender, repair_info, verified_vote_receiver, outstanding_requests.clone(),