sends repair requests over QUIC protocol (#33016)

The commit implements client-side of serve-repair and
ancestor-hash-service over QUIC protocol.
This commit is contained in:
behzad nouri 2023-09-11 22:22:04 +00:00 committed by GitHub
parent cf35799b2a
commit e01269a9de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 313 additions and 35 deletions

View File

@ -7,12 +7,15 @@ use {
},
outstanding_requests::OutstandingRequests,
packet_threshold::DynamicPacketToProcessThreshold,
quic_endpoint::LocalRequest,
repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup},
request_response::RequestResponse,
serve_repair::{
self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
},
},
replay_stage::DUPLICATE_THRESHOLD,
shred_fetch_stage::receive_repair_quic_packets,
},
bincode::serialize,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
@ -36,7 +39,7 @@ use {
std::{
collections::HashSet,
io::{Cursor, Read},
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
@ -44,6 +47,7 @@ use {
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
#[derive(Debug, PartialEq, Eq)]
@ -149,6 +153,7 @@ impl AncestorHashesService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
quic_endpoint_sender: AsyncSender<LocalRequest>,
repair_info: RepairInfo,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
@ -157,16 +162,31 @@ impl AncestorHashesService {
let t_receiver = streamer::receiver(
ancestor_hashes_request_socket.clone(),
exit.clone(),
response_sender,
response_sender.clone(),
Recycler::default(),
Arc::new(StreamerReceiveStats::new(
"ancestor_hashes_response_receiver",
)),
Duration::from_millis(1), // coalesce
false,
None,
false, // use_pinned_memory
None, // in_vote_only_mode
);
let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
let t_receiver_quic = {
let exit = exit.clone();
Builder::new()
.name(String::from("solAncHashQuic"))
.spawn(|| {
receive_repair_quic_packets(
quic_endpoint_response_receiver,
response_sender,
Recycler::default(),
exit,
)
})
.unwrap()
};
let ancestor_hashes_request_statuses: Arc<DashMap<Slot, AncestorRequestStatus>> =
Arc::new(DashMap::new());
let (retryable_slots_sender, retryable_slots_receiver) = unbounded();
@ -188,14 +208,22 @@ impl AncestorHashesService {
let t_ancestor_requests = Self::run_manage_ancestor_requests(
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
quic_endpoint_sender,
quic_endpoint_response_sender,
repair_info,
outstanding_requests,
exit,
ancestor_hashes_replay_update_receiver,
retryable_slots_receiver,
);
let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests];
Self { thread_hdls }
Self {
thread_hdls: vec![
t_receiver,
t_receiver_quic,
t_ancestor_hashes_responses,
t_ancestor_requests,
],
}
}
pub(crate) fn join(self) -> thread::Result<()> {
@ -551,6 +579,8 @@ impl AncestorHashesService {
fn run_manage_ancestor_requests(
ancestor_hashes_request_statuses: Arc<DashMap<Slot, AncestorRequestStatus>>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
quic_endpoint_sender: AsyncSender<LocalRequest>,
quic_endpoint_response_sender: Sender<(SocketAddr, Vec<u8>)>,
repair_info: RepairInfo,
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
@ -587,10 +617,11 @@ impl AncestorHashesService {
if exit.load(Ordering::Relaxed) {
return;
}
Self::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -612,6 +643,8 @@ impl AncestorHashesService {
fn manage_ancestor_requests(
ancestor_hashes_request_statuses: &DashMap<Slot, AncestorRequestStatus>,
ancestor_hashes_request_socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &Sender<(SocketAddr, Vec<u8>)>,
repair_info: &RepairInfo,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver,
@ -710,6 +743,8 @@ impl AncestorHashesService {
if Self::initiate_ancestor_hashes_requests_for_duplicate_slot(
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
quic_endpoint_sender,
quic_endpoint_response_sender,
&repair_info.cluster_slots,
serve_repair,
&repair_info.repair_validators,
@ -787,6 +822,8 @@ impl AncestorHashesService {
fn initiate_ancestor_hashes_requests_for_duplicate_slot(
ancestor_hashes_request_statuses: &DashMap<Slot, AncestorRequestStatus>,
ancestor_hashes_request_socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &Sender<(SocketAddr, Vec<u8>)>,
cluster_slots: &ClusterSlots,
serve_repair: &ServeRepair,
repair_validators: &Option<HashSet<Pubkey>>,
@ -811,10 +848,11 @@ impl AncestorHashesService {
repair_stats
.ancestor_requests
.update(pubkey, duplicate_slot, 0);
let ancestor_hashes_repair_type = AncestorHashesRepairType(duplicate_slot);
let nonce = outstanding_requests
.write()
.unwrap()
.add_request(AncestorHashesRepairType(duplicate_slot), timestamp());
.add_request(ancestor_hashes_repair_type, timestamp());
let Ok(request_bytes) = serve_repair.ancestor_repair_request_bytes(
identity_keypair,
pubkey,
@ -827,7 +865,21 @@ impl AncestorHashesService {
Protocol::UDP => {
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr);
}
Protocol::QUIC => todo!(),
Protocol::QUIC => {
let num_expected_responses =
usize::try_from(ancestor_hashes_repair_type.num_expected_responses())
.unwrap();
let request = LocalRequest {
remote_address: *socket_addr,
bytes: request_bytes,
num_expected_responses,
response_sender: quic_endpoint_response_sender.clone(),
};
if quic_endpoint_sender.blocking_send(request).is_err() {
// The receiver end of the channel is disconnected.
break;
}
}
}
}
@ -1441,10 +1493,14 @@ mod test {
repair_validators,
..
} = repair_info;
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
@ -1494,6 +1550,8 @@ mod test {
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
@ -1555,6 +1613,8 @@ mod test {
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
@ -1640,10 +1700,15 @@ mod test {
} = repair_info;
cluster_info.insert_info(responder_node.info);
bank_forks.read().unwrap().root_bank().epoch_schedule();
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
// 1) No signals from ReplayStage, no requests should be made
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1686,6 +1751,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1725,6 +1792,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1756,6 +1825,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1793,6 +1864,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1833,6 +1906,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -1989,10 +2064,15 @@ mod test {
&leader_schedule_cache,
);
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
// Simulate making a request
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -2088,6 +2168,9 @@ mod test {
&repair_info.ancestor_duplicate_slots_sender,
&retryable_slots_sender,
);
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
// Simulate ancestor request thread getting the retry signal
assert!(dead_slot_pool.is_empty());
@ -2096,6 +2179,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
@ -2134,6 +2219,8 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,

View File

@ -1,4 +1,3 @@
#![allow(dead_code)]
use {
bincode::Options,
crossbeam_channel::Sender,

View File

@ -13,6 +13,7 @@ use {
ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService},
duplicate_repair_status::AncestorDuplicateSlotToRepair,
outstanding_requests::OutstandingRequests,
quic_endpoint::LocalRequest,
repair_weight::RepairWeight,
serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
},
@ -46,6 +47,7 @@ use {
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
// Time to defer repair requests to allow for turbine propagation
@ -239,6 +241,8 @@ impl RepairService {
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
quic_endpoint_sender: AsyncSender<LocalRequest>,
quic_endpoint_response_sender: CrossbeamSender<(SocketAddr, Vec<u8>)>,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
@ -250,6 +254,7 @@ impl RepairService {
let blockstore = blockstore.clone();
let exit = exit.clone();
let repair_info = repair_info.clone();
let quic_endpoint_sender = quic_endpoint_sender.clone();
Builder::new()
.name("solRepairSvc".to_string())
.spawn(move || {
@ -257,6 +262,8 @@ impl RepairService {
&blockstore,
&exit,
&repair_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
repair_info,
verified_vote_receiver,
&outstanding_requests,
@ -271,6 +278,7 @@ impl RepairService {
exit,
blockstore,
ancestor_hashes_socket,
quic_endpoint_sender,
repair_info,
ancestor_hashes_replay_update_receiver,
);
@ -281,10 +289,13 @@ impl RepairService {
}
}
#[allow(clippy::too_many_arguments)]
fn run(
blockstore: &Blockstore,
exit: &AtomicBool,
repair_socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &CrossbeamSender<(SocketAddr, Vec<u8>)>,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
@ -433,9 +444,11 @@ impl RepairService {
&repair_info.repair_validators,
&mut outstanding_requests,
identity_keypair,
quic_endpoint_sender,
quic_endpoint_response_sender,
repair_protocol,
)
.ok()?;
.ok()??;
Some((req, to))
})
.collect()

View File

@ -26,11 +26,13 @@ pub enum Error {
#[error(transparent)]
InvalidContactInfo(#[from] contact_info::Error),
#[error(transparent)]
RepairVerify(#[from] RepairVerifyError),
#[error("Send Error")]
SendError,
#[error(transparent)]
Serialize(#[from] std::boxed::Box<bincode::ErrorKind>),
#[error(transparent)]
WeightedIndex(#[from] rand::distributions::weighted::WeightedError),
#[error(transparent)]
RepairVerify(#[from] RepairVerifyError),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -3,7 +3,7 @@ use {
cluster_slots_service::cluster_slots::ClusterSlots,
repair::{
duplicate_repair_status::get_ancestor_hash_repair_sample_size,
quic_endpoint::RemoteRequest,
quic_endpoint::{LocalRequest, RemoteRequest},
repair_response,
repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS},
request_response::RequestResponse,
@ -11,7 +11,7 @@ use {
},
},
bincode::{serialize, Options},
crossbeam_channel::{Receiver, RecvTimeoutError},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lru::LruCache,
rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
@ -59,7 +59,7 @@ use {
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::oneshot::Sender as OneShotSender,
tokio::sync::{mpsc::Sender as AsyncSender, oneshot::Sender as OneShotSender},
};
/// the number of slots to respond with when responding to `Orphan` requests
@ -132,6 +132,7 @@ impl RequestResponse for ShredRepairType {
}
}
#[derive(Copy, Clone)]
pub struct AncestorHashesRepairType(pub Slot);
impl AncestorHashesRepairType {
pub fn slot(&self) -> Slot {
@ -339,7 +340,6 @@ pub(crate) struct RepairPeers {
struct Node {
pubkey: Pubkey,
serve_repair: SocketAddr,
#[allow(dead_code)]
serve_repair_quic: SocketAddr,
}
@ -1027,6 +1027,7 @@ impl ServeRepair {
Self::repair_proto_to_bytes(&request, keypair)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
@ -1036,8 +1037,10 @@ impl ServeRepair {
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingShredRepairs,
identity_keypair: &Keypair,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &Sender<(SocketAddr, Vec<u8>)>,
repair_protocol: Protocol,
) -> Result<(SocketAddr, Vec<u8>)> {
) -> Result<Option<(SocketAddr, Vec<u8>)>> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
@ -1067,8 +1070,21 @@ impl ServeRepair {
repair_request
);
match repair_protocol {
Protocol::UDP => Ok((peer.serve_repair, out)),
Protocol::QUIC => todo!(),
Protocol::UDP => Ok(Some((peer.serve_repair, out))),
Protocol::QUIC => {
let num_expected_responses =
usize::try_from(repair_request.num_expected_responses()).unwrap();
let request = LocalRequest {
remote_address: peer.serve_repair_quic,
bytes: out,
num_expected_responses,
response_sender: quic_endpoint_response_sender.clone(),
};
quic_endpoint_sender
.blocking_send(request)
.map_err(|_| Error::SendError)
.map(|()| None)
}
}
}
@ -1970,6 +1986,10 @@ mod tests {
);
let identity_keypair = cluster_info.keypair().clone();
let mut outstanding_requests = OutstandingShredRepairs::default();
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) =
crossbeam_channel::unbounded();
let rv = serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
@ -1978,6 +1998,8 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
);
assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers)));
@ -2009,8 +2031,11 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
)
.unwrap()
.unwrap();
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap());
@ -2046,8 +2071,11 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
)
.unwrap()
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
@ -2294,7 +2322,10 @@ mod tests {
let cluster_slots = ClusterSlots::default();
let cluster_info = Arc::new(new_test_cluster_info());
let me = cluster_info.my_contact_info();
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) =
crossbeam_channel::unbounded();
// Insert two peers on the network
let contact_info2 =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
@ -2325,6 +2356,8 @@ mod tests {
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
),
Err(Error::ClusterInfo(ClusterInfoError::NoPeers))
@ -2345,9 +2378,11 @@ mod tests {
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
),
Ok(_)
Ok(Some(_))
);
// Using no known validators should default to all
@ -2369,9 +2404,11 @@ mod tests {
&None,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
Protocol::UDP, // repair_protocol
),
Ok(_)
Ok(Some(_))
);
}

View File

@ -162,8 +162,9 @@ impl ShredFetchStage {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
sockets: Vec<Arc<UdpSocket>>,
quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_socket: Arc<UdpSocket>,
repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec<u8>)>,
sender: Sender<PacketBatch>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
@ -202,13 +203,55 @@ impl ShredFetchStage {
tvu_threads.extend(repair_receiver);
tvu_threads.push(tvu_filter);
tvu_threads.push(repair_handler);
// Repair shreds fetched over QUIC protocol.
{
let (packet_sender, packet_receiver) = unbounded();
let bank_forks = bank_forks.clone();
let recycler = recycler.clone();
let exit = exit.clone();
let sender = sender.clone();
let turbine_disabled = turbine_disabled.clone();
tvu_threads.extend([
Builder::new()
.name("solTvuRecvRpr".to_string())
.spawn(|| {
receive_repair_quic_packets(
repair_quic_endpoint_receiver,
packet_sender,
recycler,
exit,
)
})
.unwrap(),
Builder::new()
.name("solTvuFetchRpr".to_string())
.spawn(move || {
Self::modify_packets(
packet_receiver,
sender,
&bank_forks,
shred_version,
"shred_fetch_repair_quic",
PacketFlags::REPAIR,
None, // repair_context; no ping packets!
turbine_disabled,
)
})
.unwrap(),
]);
}
// Turbine shreds fetched over QUIC protocol.
let (packet_sender, packet_receiver) = unbounded();
tvu_threads.extend([
Builder::new()
.name("solTvuRecvQuic".to_string())
.spawn(|| {
receive_quic_datagrams(quic_endpoint_receiver, packet_sender, recycler, exit)
receive_quic_datagrams(
turbine_quic_endpoint_receiver,
packet_sender,
recycler,
exit,
)
})
.unwrap(),
Builder::new()
@ -241,14 +284,14 @@ impl ShredFetchStage {
}
fn receive_quic_datagrams(
quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
sender: Sender<PacketBatch>,
recycler: PacketBatchRecycler,
exit: Arc<AtomicBool>,
) {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
while !exit.load(Ordering::Relaxed) {
let entry = match quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) {
let entry = match turbine_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) {
Ok(entry) => entry,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
@ -260,7 +303,7 @@ fn receive_quic_datagrams(
};
let deadline = Instant::now() + PACKET_COALESCE_DURATION;
let entries = std::iter::once(entry).chain(
std::iter::repeat_with(|| quic_endpoint_receiver.recv_deadline(deadline).ok())
std::iter::repeat_with(|| turbine_quic_endpoint_receiver.recv_deadline(deadline).ok())
.while_some(),
);
let size = entries
@ -285,6 +328,51 @@ fn receive_quic_datagrams(
}
}
pub(crate) fn receive_repair_quic_packets(
repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec<u8>)>,
sender: Sender<PacketBatch>,
recycler: PacketBatchRecycler,
exit: Arc<AtomicBool>,
) {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
while !exit.load(Ordering::Relaxed) {
let entry = match repair_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) {
Ok(entry) => entry,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
};
let mut packet_batch =
PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams");
unsafe {
packet_batch.set_len(PACKETS_PER_BATCH);
};
let deadline = Instant::now() + PACKET_COALESCE_DURATION;
let entries = std::iter::once(entry).chain(
std::iter::repeat_with(|| repair_quic_endpoint_receiver.recv_deadline(deadline).ok())
.while_some(),
);
let size = entries
.filter(|(_, bytes)| bytes.len() <= PACKET_DATA_SIZE)
.zip(packet_batch.iter_mut())
.map(|((addr, bytes), packet)| {
*packet.meta_mut() = Meta {
size: bytes.len(),
addr: addr.ip(),
port: addr.port(),
flags: PacketFlags::REPAIR,
};
packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes);
})
.count();
if size > 0 {
packet_batch.truncate(size);
if sender.send(packet_batch).is_err() {
return; // The receiver end of the channel is disconnected.
}
}
}
}
#[cfg(test)]
mod tests {
use {

View File

@ -15,7 +15,7 @@ use {
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
repair::repair_service::RepairInfo,
repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo},
replay_stage::{ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
@ -138,6 +138,7 @@ impl Tvu {
banking_tracer: Arc<BankingTracer>,
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -151,10 +152,13 @@ impl Tvu {
let repair_socket = Arc::new(repair_socket);
let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
let (repair_quic_endpoint_response_sender, repair_quic_endpoint_response_receiver) =
unbounded();
let fetch_stage = ShredFetchStage::new(
fetch_sockets,
turbine_quic_endpoint_receiver,
repair_socket.clone(),
repair_quic_endpoint_response_receiver,
fetch_sender,
tvu_config.shred_version,
bank_forks.clone(),
@ -209,6 +213,8 @@ impl Tvu {
retransmit_sender,
repair_socket,
ancestor_hashes_socket,
repair_quic_endpoint_sender,
repair_quic_endpoint_response_sender,
exit.clone(),
repair_info,
leader_schedule_cache.clone(),
@ -401,6 +407,8 @@ pub mod tests {
let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
let (repair_quic_endpoint_sender, _repair_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
//start cluster_info1
let cluster_info1 =
ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified);
@ -484,6 +492,7 @@ pub mod tests {
BankingTracer::new_disabled(),
turbine_quic_endpoint_sender,
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -16,7 +16,7 @@ use {
},
ledger_metric_report_service::LedgerMetricReportService,
poh_timing_report_service::PohTimingReportService,
repair::{serve_repair::ServeRepair, serve_repair_service::ServeRepairService},
repair::{self, serve_repair::ServeRepair, serve_repair_service::ServeRepairService},
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
sample_performance_service::SamplePerformanceService,
sigverify,
@ -132,6 +132,7 @@ use {
},
strum::VariantNames,
strum_macros::{Display, EnumString, EnumVariantNames, IntoStaticStr},
tokio::runtime::Runtime as TokioRuntime,
};
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
@ -463,8 +464,11 @@ pub struct Validator {
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
turbine_quic_endpoint: Endpoint,
turbine_quic_endpoint_runtime: Option<tokio::runtime::Runtime>,
turbine_quic_endpoint_runtime: Option<TokioRuntime>,
turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle,
repair_quic_endpoint: Endpoint,
repair_quic_endpoint_runtime: Option<TokioRuntime>,
repair_quic_endpoint_join_handle: repair::quic_endpoint::AsyncTryJoinHandle,
}
impl Validator {
@ -1048,7 +1052,7 @@ impl Validator {
serve_repair,
// Incoming UDP repair requests are adapted into RemoteRequest
// and also sent through the same channel.
repair_quic_endpoint_sender,
repair_quic_endpoint_sender.clone(),
repair_quic_endpoint_receiver,
blockstore.clone(),
node.sockets.serve_repair,
@ -1149,7 +1153,7 @@ impl Validator {
) = solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(tokio::runtime::Runtime::handle)
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.tvu_quic,
@ -1161,6 +1165,30 @@ impl Validator {
)
.unwrap();
// Repair quic endpoint.
let repair_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solRepairQuic")
.build()
.unwrap()
});
let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) =
repair::quic_endpoint::new_quic_endpoint(
repair_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.serve_repair_quic,
node.info
.serve_repair(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC serve-repair address")
.ip(),
repair_quic_endpoint_sender,
)
.unwrap();
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
@ -1213,6 +1241,7 @@ impl Validator {
banking_tracer.clone(),
turbine_quic_endpoint_sender.clone(),
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
)?;
let tpu = Tpu::new(
@ -1301,6 +1330,9 @@ impl Validator {
turbine_quic_endpoint,
turbine_quic_endpoint_runtime,
turbine_quic_endpoint_join_handle,
repair_quic_endpoint,
repair_quic_endpoint_runtime,
repair_quic_endpoint_join_handle,
})
}
@ -1410,9 +1442,14 @@ impl Validator {
}
self.gossip_service.join().expect("gossip_service");
repair::quic_endpoint::close_quic_endpoint(&self.repair_quic_endpoint);
self.serve_repair_service
.join()
.expect("serve_repair_service");
self.repair_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.repair_quic_endpoint_join_handle))
.transpose()
.unwrap();
self.stats_reporter_service
.join()
.expect("stats_reporter_service");

View File

@ -7,6 +7,7 @@ use {
completed_data_sets_service::CompletedDataSetsSender,
repair::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
quic_endpoint::LocalRequest,
repair_response,
repair_service::{
DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo,
@ -39,6 +40,7 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
type ShredPayload = Vec<u8>;
@ -325,6 +327,8 @@ impl WindowService {
retransmit_sender: Sender<Vec<ShredPayload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
repair_quic_endpoint_response_sender: Sender<(SocketAddr, Vec<u8>)>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
@ -344,6 +348,8 @@ impl WindowService {
exit.clone(),
repair_socket,
ancestor_hashes_socket,
repair_quic_endpoint_sender,
repair_quic_endpoint_response_sender,
repair_info,
verified_vote_receiver,
outstanding_requests.clone(),