serves remote repair requests from QUIC endpoint (#33069)

The commit implements server-side of repair using QUIC protocol.

UDP repair requests are adapted as RemoteRequest and sent down the same
channel as remote requests arriving over QUIC, and the rest of the
server code is update to process over RemoteRequest type.
This commit is contained in:
behzad nouri 2023-09-11 16:57:10 +00:00 committed by GitHub
parent 297ffad797
commit 7fc6fea8d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 184 additions and 83 deletions

View File

@ -124,16 +124,7 @@ impl AncestorRepairRequestsStats {
.ancestor_requests
.slot_pubkeys
.iter()
.map(|(slot, slot_repairs)| {
(
slot,
slot_repairs
.pubkey_repairs()
.iter()
.map(|(_key, count)| count)
.sum::<u64>(),
)
})
.map(|(slot, slot_repairs)| (slot, slot_repairs.pubkey_repairs().values().sum::<u64>()))
.collect();
let repair_total = self.ancestor_requests.count;
@ -161,8 +152,7 @@ impl AncestorHashesService {
repair_info: RepairInfo,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>> =
Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default()));
let outstanding_requests = Arc::<RwLock<OutstandingAncestorHashesRepairs>>::default();
let (response_sender, response_receiver) = unbounded();
let t_receiver = streamer::receiver(
ancestor_hashes_request_socket.clone(),
@ -864,6 +854,7 @@ mod test {
cluster_slot_state_verifier::{DuplicateSlotsToRepair, PurgeRepairSlotCounter},
duplicate_repair_status::DuplicateAncestorDecision,
serve_repair::MAX_ANCESTOR_RESPONSES,
serve_repair_service::adapt_repair_requests_packets,
},
replay_stage::{
tests::{replay_blockstore_components, ReplayBlockstoreComponents},
@ -1189,6 +1180,7 @@ mod test {
struct ResponderThreads {
t_request_receiver: JoinHandle<()>,
t_listen: JoinHandle<()>,
t_packet_adapter: JoinHandle<()>,
exit: Arc<AtomicBool>,
responder_info: ContactInfo,
response_receiver: PacketBatchReceiver,
@ -1200,6 +1192,7 @@ mod test {
self.exit.store(true, Ordering::Relaxed);
self.t_request_receiver.join().unwrap();
self.t_listen.join().unwrap();
self.t_packet_adapter.join().unwrap();
}
fn new(slot_to_query: Slot) -> Self {
@ -1255,9 +1248,13 @@ mod test {
false,
None,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()
.spawn(|| adapt_repair_requests_packets(requests_receiver, remote_request_sender))
.unwrap();
let t_listen = responder_serve_repair.listen(
blockstore,
requests_receiver,
remote_request_receiver,
response_sender,
exit.clone(),
);
@ -1265,6 +1262,7 @@ mod test {
Self {
t_request_receiver,
t_listen,
t_packet_adapter,
exit,
responder_info: responder_node.info,
response_receiver,

View File

@ -41,6 +41,7 @@ const ALPN_REPAIR_PROTOCOL_ID: &[u8] = b"solana-repair";
const CONNECT_SERVER_NAME: &str = "solana-repair";
const CLIENT_CHANNEL_CAPACITY: usize = 1 << 14;
const CONNECTION_CACHE_CAPACITY: usize = 4096;
const MAX_CONCURRENT_BIDI_STREAMS: VarInt = VarInt::from_u32(512);
const CONNECTION_CLOSE_ERROR_CODE_SHUTDOWN: VarInt = VarInt::from_u32(1);
@ -485,6 +486,13 @@ async fn cache_connection(
// only by SocketAddr when establishing outgoing connections.
let entries: [Arc<RwLock<Option<Connection>>>; 2] = {
let mut cache = cache.write().await;
if cache.len() >= CONNECTION_CACHE_CAPACITY {
connection.close(
CONNECTION_CLOSE_ERROR_CODE_DROPPED,
CONNECTION_CLOSE_REASON_DROPPED,
);
return;
}
[Some(remote_pubkey), None].map(|remote_pubkey| {
let key = (remote_address, remote_pubkey);
cache.entry(key).or_default().clone()

View File

@ -3,14 +3,15 @@ use {
cluster_slots_service::cluster_slots::ClusterSlots,
repair::{
duplicate_repair_status::get_ancestor_hash_repair_sample_size,
quic_endpoint::RemoteRequest,
repair_response,
repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS},
request_response::RequestResponse,
result::{Error, RepairVerifyError, Result},
},
},
bincode::serialize,
crossbeam_channel::RecvTimeoutError,
bincode::{serialize, Options},
crossbeam_channel::{Receiver, RecvTimeoutError},
lru::LruCache,
rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
@ -45,7 +46,7 @@ use {
solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
streamer::{PacketBatchReceiver, PacketBatchSender},
streamer::PacketBatchSender,
},
std::{
cmp::Reverse,
@ -58,6 +59,7 @@ use {
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::oneshot::Sender as OneShotSender,
};
/// the number of slots to respond with when responding to `Orphan` requests
@ -248,19 +250,13 @@ const REPAIR_REQUEST_PONG_SERIALIZED_BYTES: usize = PUBKEY_BYTES + HASH_BYTES +
const REPAIR_REQUEST_MIN_BYTES: usize = REPAIR_REQUEST_PONG_SERIALIZED_BYTES;
fn discard_malformed_repair_requests(
batch: &mut PacketBatch,
requests: &mut Vec<RemoteRequest>,
stats: &mut ServeRepairStats,
) -> usize {
let mut well_formed_requests = 0;
for packet in batch.iter_mut() {
if packet.meta().size < REPAIR_REQUEST_MIN_BYTES {
stats.err_malformed += 1;
packet.meta_mut().set_discard(true);
} else {
well_formed_requests += 1;
}
}
well_formed_requests
let num_requests = requests.len();
requests.retain(|request| request.bytes.len() >= REPAIR_REQUEST_MIN_BYTES);
stats.err_malformed += num_requests - requests.len();
requests.len()
}
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize)]
@ -386,6 +382,7 @@ struct RepairRequestWithMeta {
from_addr: SocketAddr,
stake: u64,
whitelisted: bool,
response_sender: Option<OneShotSender<Vec<Vec<u8>>>>,
}
impl ServeRepair {
@ -514,20 +511,28 @@ impl ServeRepair {
}
fn decode_request(
packet: &Packet,
remote_request: RemoteRequest,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
) -> Result<RepairRequestWithMeta> {
let Ok(request) = packet.deserialize_slice(..) else {
let Ok(request) = deserialize_request::<RepairProtocol>(&remote_request) else {
return Err(Error::from(RepairVerifyError::Malformed));
};
let from_addr = packet.meta().socket_addr();
let from_addr = remote_request.remote_address;
if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) {
return Err(Error::from(RepairVerifyError::Malformed));
}
Self::verify_signed_packet(my_id, packet, &request)?;
Self::verify_signed_packet(my_id, &remote_request.bytes, &request)?;
if let Some(remote_pubkey) = remote_request.remote_pubkey {
if &remote_pubkey != request.sender() {
error!(
"remote pubkey {remote_pubkey} != request sender {}",
request.sender()
);
}
}
if request.sender() == my_id {
error!("self repair: from_addr={from_addr} my_id={my_id} request={request:?}");
return Err(Error::from(RepairVerifyError::SelfRepair));
@ -544,6 +549,7 @@ impl ServeRepair {
from_addr,
stake,
whitelisted,
response_sender: remote_request.response_sender,
})
}
@ -574,16 +580,16 @@ impl ServeRepair {
}
fn decode_requests(
reqs_v: Vec<PacketBatch>,
requests: Vec<RemoteRequest>,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
stats: &mut ServeRepairStats,
) -> Vec<RepairRequestWithMeta> {
let decode_packet = |packet| {
let decode_request = |request| {
let result = Self::decode_request(
packet,
request,
epoch_staked_nodes,
whitelist,
my_id,
@ -603,12 +609,7 @@ impl ServeRepair {
}
result.ok()
};
reqs_v
.iter()
.flatten()
.filter(|packet| !packet.meta().discard())
.filter_map(decode_packet)
.collect()
requests.into_iter().filter_map(decode_request).collect()
}
/// Process messages from the network
@ -617,16 +618,15 @@ impl ServeRepair {
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
blockstore: &Blockstore,
requests_receiver: &PacketBatchReceiver,
requests_receiver: &Receiver<RemoteRequest>,
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
) -> std::result::Result<(), RecvTimeoutError> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
const TIMEOUT: Duration = Duration::from_secs(1);
let mut requests = vec![requests_receiver.recv_timeout(TIMEOUT)?];
const MAX_REQUESTS_PER_ITERATION: usize = 1024;
let mut total_requests = reqs_v[0].len();
let mut total_requests = requests.len();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let root_bank = self.bank_forks.read().unwrap().root_bank();
@ -641,8 +641,12 @@ impl ServeRepair {
};
let mut dropped_requests = 0;
let mut well_formed_requests = discard_malformed_repair_requests(&mut reqs_v[0], stats);
for mut more in requests_receiver.try_iter() {
let mut well_formed_requests = discard_malformed_repair_requests(&mut requests, stats);
loop {
let mut more: Vec<_> = requests_receiver.try_iter().collect();
if more.is_empty() {
break;
}
total_requests += more.len();
if well_formed_requests > max_buffered_packets {
// Already exceeded max. Don't waste time discarding
@ -652,7 +656,7 @@ impl ServeRepair {
let retained = discard_malformed_repair_requests(&mut more, stats);
well_formed_requests += retained;
if retained > 0 && well_formed_requests <= max_buffered_packets {
reqs_v.push(more);
requests.extend(more);
} else {
dropped_requests += more.len();
}
@ -665,7 +669,7 @@ impl ServeRepair {
let mut decoded_requests = {
let whitelist = self.repair_whitelist.read().unwrap();
Self::decode_requests(
reqs_v,
requests,
&epoch_staked_nodes,
&whitelist,
&my_id,
@ -789,7 +793,7 @@ impl ServeRepair {
pub fn listen(
self,
blockstore: Arc<Blockstore>,
requests_receiver: PacketBatchReceiver,
requests_receiver: Receiver<RemoteRequest>,
response_sender: PacketBatchSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
@ -840,11 +844,7 @@ impl ServeRepair {
.unwrap()
}
fn verify_signed_packet(
my_id: &Pubkey,
packet: &Packet,
request: &RepairProtocol,
) -> Result<()> {
fn verify_signed_packet(my_id: &Pubkey, bytes: &[u8], request: &RepairProtocol) -> Result<()> {
match request {
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
@ -871,14 +871,14 @@ impl ServeRepair {
if u128::from(time_diff_ms) > SIGNED_REPAIR_TIME_WINDOW.as_millis() {
return Err(Error::from(RepairVerifyError::TimeSkew));
}
let Some(leading_buf) = packet.data(..4) else {
let Some(leading_buf) = bytes.get(..4) else {
debug_assert!(
false,
"request should have failed deserialization: {request:?}",
);
return Err(Error::from(RepairVerifyError::Malformed));
};
let Some(trailing_buf) = packet.data(4 + SIGNATURE_BYTES..) else {
let Some(trailing_buf) = bytes.get(4 + SIGNATURE_BYTES..) else {
debug_assert!(
false,
"request should have failed deserialization: {request:?}",
@ -946,7 +946,7 @@ impl ServeRepair {
recycler: &PacketBatchRecycler,
blockstore: &Blockstore,
requests: Vec<RepairRequestWithMeta>,
response_sender: &PacketBatchSender,
packet_batch_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
) {
@ -957,14 +957,16 @@ impl ServeRepair {
request,
from_addr,
stake,
..
whitelisted: _,
response_sender,
} in requests.into_iter()
{
if !data_budget.check(request.max_response_bytes()) {
stats.dropped_requests_outbound_bandwidth += 1;
continue;
}
if !matches!(&request, RepairProtocol::Pong(_)) {
// Bypass ping/pong check for requests comming from QUIC endpoint.
if !matches!(&request, RepairProtocol::Pong(_)) && response_sender.is_none() {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
if let Some(ping_pkt) = ping_pkt {
@ -983,7 +985,9 @@ impl ServeRepair {
};
let num_response_packets = rsp.len();
let num_response_bytes = rsp.iter().map(|p| p.meta().size).sum();
if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() {
if data_budget.take(num_response_bytes)
&& send_response(rsp, packet_batch_sender, response_sender)
{
stats.total_response_packets += num_response_packets;
match stake > 0 {
true => stats.total_response_bytes_staked += num_response_bytes,
@ -998,7 +1002,7 @@ impl ServeRepair {
if !pending_pings.is_empty() {
stats.pings_sent += pending_pings.len();
let batch = PacketBatch::new(pending_pings);
let _ignore = response_sender.send(batch);
let _ = packet_batch_sender.send(batch);
}
}
@ -1363,6 +1367,36 @@ pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
Protocol::UDP
}
fn deserialize_request<T>(request: &RemoteRequest) -> std::result::Result<T, bincode::Error>
where
T: serde::de::DeserializeOwned,
{
bincode::options()
.with_limit(request.bytes.len() as u64)
.with_fixint_encoding()
.reject_trailing_bytes()
.deserialize(&request.bytes)
}
// Returns true on success.
fn send_response(
packets: PacketBatch,
packet_batch_sender: &PacketBatchSender,
response_sender: Option<OneShotSender<Vec<Vec<u8>>>>,
) -> bool {
match response_sender {
None => packet_batch_sender.send(packets).is_ok(),
Some(response_sender) => {
let response = packets
.iter()
.filter_map(|packet| packet.data(..))
.map(Vec::from)
.collect();
response_sender.send(response).is_ok()
}
}
}
#[cfg(test)]
mod tests {
use {
@ -1432,6 +1466,15 @@ mod tests {
}
}
fn make_remote_request(packet: &Packet) -> RemoteRequest {
RemoteRequest {
remote_pubkey: None,
remote_address: packet.meta().socket_addr(),
bytes: packet.data(..).map(Vec::from).unwrap(),
response_sender: None,
}
}
#[test]
fn test_check_well_formed_repair_request() {
let mut rng = rand::thread_rng();
@ -1440,12 +1483,12 @@ mod tests {
let pong = Pong::new(&ping, &keypair).unwrap();
let request = RepairProtocol::Pong(pong);
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = PacketBatch::new(vec![pkt.clone()]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 5;
let mut batch = PacketBatch::new(vec![pkt]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
@ -1457,12 +1500,12 @@ mod tests {
shred_index: 456,
};
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = PacketBatch::new(vec![pkt.clone()]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 8;
let mut batch = PacketBatch::new(vec![pkt]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
@ -1473,12 +1516,12 @@ mod tests {
slot: 123,
};
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = PacketBatch::new(vec![pkt.clone()]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 1;
let mut batch = PacketBatch::new(vec![pkt]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
@ -1486,12 +1529,12 @@ mod tests {
let request = RepairProtocol::LegacyOrphan(LegacyContactInfo::default(), 123);
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = PacketBatch::new(vec![pkt.clone()]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 3;
let mut batch = PacketBatch::new(vec![pkt]);
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
@ -1701,8 +1744,13 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request).is_ok()
assert_matches!(
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Ok(())
);
// recipient mismatch
@ -1721,7 +1769,11 @@ mod tests {
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(&my_keypair.pubkey(), &packet, &request),
ServeRepair::verify_signed_packet(
&my_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::IdMismatch))
);
@ -1743,7 +1795,11 @@ mod tests {
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::TimeSkew))
);
@ -1763,7 +1819,11 @@ mod tests {
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::SigVerify))
);
}

View File

@ -1,8 +1,8 @@
use {
crate::repair::serve_repair::ServeRepair,
crossbeam_channel::{unbounded, Sender},
crate::repair::{quic_endpoint::RemoteRequest, serve_repair::ServeRepair},
crossbeam_channel::{unbounded, Receiver, Sender},
solana_ledger::blockstore::Blockstore,
solana_perf::recycler::Recycler,
solana_perf::{packet::PacketBatch, recycler::Recycler},
solana_streamer::{
socket::SocketAddrSpace,
streamer::{self, StreamerReceiveStats},
@ -10,7 +10,7 @@ use {
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc},
thread::{self, JoinHandle},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
@ -22,6 +22,8 @@ pub struct ServeRepairService {
impl ServeRepairService {
pub fn new(
serve_repair: ServeRepair,
remote_request_sender: Sender<RemoteRequest>,
remote_request_receiver: Receiver<RemoteRequest>,
blockstore: Arc<Blockstore>,
serve_repair_socket: UdpSocket,
socket_addr_space: SocketAddrSpace,
@ -42,9 +44,13 @@ impl ServeRepairService {
Recycler::default(),
Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
Duration::from_millis(1), // coalesce
false,
None,
false, // use_pinned_memory
None, // in_vote_only_mode
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))
.spawn(|| adapt_repair_requests_packets(request_receiver, remote_request_sender))
.unwrap();
let (response_sender, response_receiver) = unbounded();
let t_responder = streamer::responder(
"Repair",
@ -53,9 +59,10 @@ impl ServeRepairService {
socket_addr_space,
Some(stats_reporter_sender),
);
let t_listen = serve_repair.listen(blockstore, request_receiver, response_sender, exit);
let t_listen =
serve_repair.listen(blockstore, remote_request_receiver, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen];
let thread_hdls = vec![t_receiver, t_packet_adapter, t_responder, t_listen];
Self { thread_hdls }
}
@ -63,3 +70,26 @@ impl ServeRepairService {
self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
}
}
// Adapts incoming UDP repair requests into RemoteRequest struct.
pub(crate) fn adapt_repair_requests_packets(
packets_receiver: Receiver<PacketBatch>,
remote_request_sender: Sender<RemoteRequest>,
) {
for packets in packets_receiver {
for packet in &packets {
let Some(bytes) = packet.data(..).map(Vec::from) else {
continue;
};
let request = RemoteRequest {
remote_pubkey: None,
remote_address: packet.meta().socket_addr(),
bytes,
response_sender: None,
};
if remote_request_sender.send(request).is_err() {
return; // The receiver end of the channel is disconnected.
}
}
}
}

View File

@ -1043,8 +1043,13 @@ impl Validator {
bank_forks.clone(),
config.repair_whitelist.clone(),
);
let (repair_quic_endpoint_sender, repair_quic_endpoint_receiver) = unbounded();
let serve_repair_service = ServeRepairService::new(
serve_repair,
// Incoming UDP repair requests are adapted into RemoteRequest
// and also sent through the same channel.
repair_quic_endpoint_sender,
repair_quic_endpoint_receiver,
blockstore.clone(),
node.sockets.serve_repair,
socket_addr_space,