From 6163a6c279ed2521372133dce12d07639b529e50 Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Tue, 31 Jan 2023 02:44:58 -0800 Subject: [PATCH] restructure repair decode error handling (#29977) --- core/src/result.rs | 3 + core/src/serve_repair.rs | 293 ++++++++++++++++++++++++--------------- 2 files changed, 183 insertions(+), 113 deletions(-) diff --git a/core/src/result.rs b/core/src/result.rs index 2aa8f8718f..e17705c5c8 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -1,6 +1,7 @@ //! The `result` module exposes a Result type that propagates one of many different Error types. use { + crate::serve_repair::RepairVerifyError, solana_gossip::{cluster_info, gossip_error::GossipError}, solana_ledger::blockstore, thiserror::Error, @@ -30,6 +31,8 @@ pub enum Error { Serialize(#[from] std::boxed::Box), #[error(transparent)] WeightedIndex(#[from] rand::distributions::weighted::WeightedError), + #[error(transparent)] + RepairVerify(#[from] RepairVerifyError), } pub type Result = std::result::Result; diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index f3a46319cd..41935b618c 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -42,11 +42,12 @@ use { }, solana_streamer::{ sendmmsg::{batch_send, SendPktsError}, + socket::SocketAddrSpace, streamer::{PacketBatchReceiver, PacketBatchSender}, }, std::{ cmp::Reverse, - collections::HashSet, + collections::{HashMap, HashSet}, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -55,6 +56,7 @@ use { thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, + thiserror::Error, }; type SlotHash = (Slot, Hash); @@ -84,6 +86,22 @@ const SIGNED_REPAIR_TIME_WINDOW: Duration = Duration::from_secs(60 * 10); // 10 #[cfg(test)] static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30); +#[derive(Error, Debug)] +pub enum RepairVerifyError { + #[error("IdMismatch")] + IdMismatch, + #[error("Malformed")] + Malformed, + #[error("SelfRepair")] + SelfRepair, + #[error("SigVerify")] + SigVerify, + #[error("TimeSkew")] + TimeSkew, + #[error("Unsigned")] + Unsigned, +} + #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] pub enum ShredRepairType { /// Requesting `MAX_ORPHAN_REPAIR_RESPONSES ` parent shreds @@ -164,10 +182,7 @@ struct ServeRepairStats { total_response_packets: usize, total_response_bytes_staked: usize, total_response_bytes_unstaked: usize, - handle_requests_staked: usize, - handle_requests_unstaked: usize, processed: usize, - self_repair: usize, window_index: usize, highest_window_index: usize, orphan: usize, @@ -177,6 +192,9 @@ struct ServeRepairStats { pings_sent: usize, decode_time_us: u64, handle_requests_time_us: u64, + handle_requests_staked: usize, + handle_requests_unstaked: usize, + err_self_repair: usize, err_time_skew: usize, err_malformed: usize, err_sig_verify: usize, @@ -449,6 +467,117 @@ impl ServeRepair { } } + fn decode_request( + packet: &Packet, + epoch_staked_nodes: &Option>>, + whitelist: &HashSet, + my_id: &Pubkey, + socket_addr_space: &SocketAddrSpace, + stats: &mut ServeRepairStats, + cluster_type: ClusterType, + ) -> Result { + let request: RepairProtocol = match packet.deserialize_slice(..) { + Ok(request) => request, + Err(_) => { + return Err(Error::from(RepairVerifyError::Malformed)); + } + }; + let from_addr = packet.meta().socket_addr(); + if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) { + return Err(Error::from(RepairVerifyError::Malformed)); + } + match Self::verify_signed_packet(my_id, packet, &request) { + Ok(()) => (), + Err(Error::RepairVerify(RepairVerifyError::Unsigned)) => match cluster_type { + ClusterType::Testnet | ClusterType::Development => { + return Err(Error::from(RepairVerifyError::Unsigned)); + } + ClusterType::MainnetBeta | ClusterType::Devnet => { + stats.err_unsigned += 1; + } + }, + Err(e) => return Err(e), + } + if request.sender() == my_id { + return Err(Error::from(RepairVerifyError::SelfRepair)); + } + let stake = *epoch_staked_nodes + .as_ref() + .and_then(|stakes| stakes.get(request.sender())) + .unwrap_or(&0); + + let whitelisted = whitelist.contains(request.sender()); + + Ok(RepairRequestWithMeta { + request, + from_addr, + stake, + whitelisted, + }) + } + + fn record_request_decode_error(error: &Error, stats: &mut ServeRepairStats) { + match error { + Error::RepairVerify(RepairVerifyError::IdMismatch) => { + stats.err_id_mismatch += 1; + } + Error::RepairVerify(RepairVerifyError::Malformed) => { + stats.err_malformed += 1; + } + Error::RepairVerify(RepairVerifyError::SelfRepair) => { + stats.err_self_repair += 1; + } + Error::RepairVerify(RepairVerifyError::SigVerify) => { + stats.err_sig_verify += 1; + } + Error::RepairVerify(RepairVerifyError::TimeSkew) => { + stats.err_time_skew += 1; + } + Error::RepairVerify(RepairVerifyError::Unsigned) => { + stats.err_unsigned += 1; + } + _ => { + debug_assert!(false, "unhandled error {:?}", error); + } + } + } + + fn decode_requests( + reqs_v: Vec, + epoch_staked_nodes: &Option>>, + whitelist: &HashSet, + my_id: &Pubkey, + socket_addr_space: &SocketAddrSpace, + stats: &mut ServeRepairStats, + cluster_type: ClusterType, + ) -> Vec { + let decode_packet = |packet| { + let result = Self::decode_request( + packet, + epoch_staked_nodes, + whitelist, + my_id, + socket_addr_space, + stats, + cluster_type, + ); + match &result { + Ok(req) => { + if req.stake == 0 { + stats.handle_requests_unstaked += 1; + } else { + stats.handle_requests_staked += 1; + } + } + Err(e) => { + Self::record_request_decode_error(e, stats); + } + } + result.ok() + }; + reqs_v.iter().flatten().filter_map(decode_packet).collect() + } + /// Process messages from the network fn run_listen( &self, @@ -493,65 +622,19 @@ impl ServeRepair { stats.total_requests += total_requests; let decode_start = Instant::now(); - let mut decoded_requests = Vec::default(); - let mut whitelisted_request_count: usize = 0; - { + let mut decoded_requests = { let whitelist = self.repair_whitelist.read().unwrap(); - for packet in reqs_v.iter().flatten() { - let request: RepairProtocol = match packet.deserialize_slice(..) { - Ok(request) => request, - Err(_) => { - stats.err_malformed += 1; - continue; - } - }; - - let from_addr = packet.meta().socket_addr(); - if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) { - stats.err_malformed += 1; - continue; - } - - match cluster_type { - ClusterType::Testnet | ClusterType::Development => { - if !Self::verify_signed_packet(&my_id, packet, &request, stats) { - continue; - } - } - ClusterType::MainnetBeta | ClusterType::Devnet => { - // collect stats for signature verification - let _ = Self::verify_signed_packet(&my_id, packet, &request, stats); - } - } - - if request.sender() == &my_id { - stats.self_repair += 1; - continue; - } - - let stake = epoch_staked_nodes - .as_ref() - .and_then(|stakes| stakes.get(request.sender())) - .unwrap_or(&0); - if *stake == 0 { - stats.handle_requests_unstaked += 1; - } else { - stats.handle_requests_staked += 1; - } - - let whitelisted = whitelist.contains(request.sender()); - if whitelisted { - whitelisted_request_count += 1; - } - - decoded_requests.push(RepairRequestWithMeta { - request, - from_addr, - stake: *stake, - whitelisted, - }); - } - } + Self::decode_requests( + reqs_v, + &epoch_staked_nodes, + &whitelist, + &my_id, + &socket_addr_space, + stats, + cluster_type, + ) + }; + let whitelisted_request_count = decoded_requests.iter().filter(|r| r.whitelisted).count(); stats.decode_time_us += decode_start.elapsed().as_micros() as u64; stats.whitelisted_requests += whitelisted_request_count.min(MAX_REQUESTS_PER_ITERATION); @@ -578,13 +661,13 @@ impl ServeRepair { } fn report_reset_stats(&self, stats: &mut ServeRepairStats) { - if stats.self_repair > 0 { + if stats.err_self_repair > 0 { let my_id = self.cluster_info.id(); warn!( "{}: Ignored received repair requests from ME: {}", - my_id, stats.self_repair, + my_id, stats.err_self_repair, ); - inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair); + inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.err_self_repair); } datapoint_info!( @@ -629,7 +712,7 @@ impl ServeRepair { stats.total_response_bytes_unstaked, i64 ), - ("self_repair", stats.self_repair, i64), + ("self_repair", stats.err_self_repair, i64), ("window_index", stats.window_index, i64), ( "request-highest-window-index", @@ -719,13 +802,11 @@ impl ServeRepair { .unwrap() } - #[must_use] fn verify_signed_packet( my_id: &Pubkey, packet: &Packet, request: &RepairProtocol, - stats: &mut ServeRepairStats, - ) -> bool { + ) -> Result<()> { match request { RepairProtocol::LegacyWindowIndex(_, _, _) | RepairProtocol::LegacyHighestWindowIndex(_, _, _) @@ -734,13 +815,11 @@ impl ServeRepair { | RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _) | RepairProtocol::LegacyOrphanWithNonce(_, _, _) | RepairProtocol::LegacyAncestorHashes(_, _, _) => { - stats.err_unsigned += 1; - return false; + return Err(Error::from(RepairVerifyError::Unsigned)); } RepairProtocol::Pong(pong) => { if !pong.verify() { - stats.err_sig_verify += 1; - return false; + return Err(Error::from(RepairVerifyError::SigVerify)); } } RepairProtocol::WindowIndex { header, .. } @@ -748,39 +827,42 @@ impl ServeRepair { | RepairProtocol::Orphan { header, .. } | RepairProtocol::AncestorHashes { header, .. } => { if &header.recipient != my_id { - stats.err_id_mismatch += 1; - return false; + return Err(Error::from(RepairVerifyError::IdMismatch)); } let time_diff_ms = timestamp().abs_diff(header.timestamp); if u128::from(time_diff_ms) > SIGNED_REPAIR_TIME_WINDOW.as_millis() { - stats.err_time_skew += 1; - return false; + return Err(Error::from(RepairVerifyError::TimeSkew)); } let leading_buf = match packet.data(..4) { Some(buf) => buf, None => { - debug_assert!(false); // should have failed deserialize - stats.err_malformed += 1; - return false; + debug_assert!( + false, + "request should have failed deserialization: {:?}", + request + ); + return Err(Error::from(RepairVerifyError::Malformed)); } }; let trailing_buf = match packet.data(4 + SIGNATURE_BYTES..) { Some(buf) => buf, None => { - debug_assert!(false); // should have failed deserialize - stats.err_malformed += 1; - return false; + debug_assert!( + false, + "request should have failed deserialization: {:?}", + request + ); + return Err(Error::from(RepairVerifyError::Malformed)); } }; let from_id = request.sender(); let signed_data = [leading_buf, trailing_buf].concat(); if !header.signature.verify(from_id.as_ref(), &signed_data) { - stats.err_sig_verify += 1; - return false; + return Err(Error::from(RepairVerifyError::SigVerify)); } } } - true + Ok(()) } fn check_ping_cache( @@ -1507,12 +1589,9 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - assert!(ServeRepair::verify_signed_packet( - &other_keypair.pubkey(), - &packet, - &request, - &mut ServeRepairStats::default(), - )); + assert!( + ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request).is_ok() + ); // recipient mismatch let packet = { @@ -1529,14 +1608,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - let mut stats = ServeRepairStats::default(); - assert!(!ServeRepair::verify_signed_packet( - &my_keypair.pubkey(), - &packet, - &request, - &mut stats, + assert!(matches!( + ServeRepair::verify_signed_packet(&my_keypair.pubkey(), &packet, &request), + Err(Error::RepairVerify(RepairVerifyError::IdMismatch)) )); - assert_eq!(stats.err_id_mismatch, 1); // outside time window let packet = { @@ -1555,14 +1630,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - let mut stats = ServeRepairStats::default(); - assert!(!ServeRepair::verify_signed_packet( - &other_keypair.pubkey(), - &packet, - &request, - &mut stats, + assert!(matches!( + ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request), + Err(Error::RepairVerify(RepairVerifyError::TimeSkew)) )); - assert_eq!(stats.err_time_skew, 1); // bad signature let packet = { @@ -1579,14 +1650,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - let mut stats = ServeRepairStats::default(); - assert!(!ServeRepair::verify_signed_packet( - &other_keypair.pubkey(), - &packet, - &request, - &mut stats, + assert!(matches!( + ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request), + Err(Error::RepairVerify(RepairVerifyError::SigVerify)) )); - assert_eq!(stats.err_sig_verify, 1); } #[test]