restructure repair decode error handling (#29977)

This commit is contained in:
Jeff Biseda 2023-01-31 02:44:58 -08:00 committed by GitHub
parent 376cd588c8
commit 6163a6c279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 183 additions and 113 deletions

View File

@ -1,6 +1,7 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.
use {
crate::serve_repair::RepairVerifyError,
solana_gossip::{cluster_info, gossip_error::GossipError},
solana_ledger::blockstore,
thiserror::Error,
@ -30,6 +31,8 @@ pub enum Error {
Serialize(#[from] std::boxed::Box<bincode::ErrorKind>),
#[error(transparent)]
WeightedIndex(#[from] rand::distributions::weighted::WeightedError),
#[error(transparent)]
RepairVerify(#[from] RepairVerifyError),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -42,11 +42,12 @@ use {
},
solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
streamer::{PacketBatchReceiver, PacketBatchSender},
},
std::{
cmp::Reverse,
collections::HashSet,
collections::{HashMap, HashSet},
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
@ -55,6 +56,7 @@ use {
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
thiserror::Error,
};
type SlotHash = (Slot, Hash);
@ -84,6 +86,22 @@ const SIGNED_REPAIR_TIME_WINDOW: Duration = Duration::from_secs(60 * 10); // 10
#[cfg(test)]
static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30);
#[derive(Error, Debug)]
pub enum RepairVerifyError {
#[error("IdMismatch")]
IdMismatch,
#[error("Malformed")]
Malformed,
#[error("SelfRepair")]
SelfRepair,
#[error("SigVerify")]
SigVerify,
#[error("TimeSkew")]
TimeSkew,
#[error("Unsigned")]
Unsigned,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum ShredRepairType {
/// Requesting `MAX_ORPHAN_REPAIR_RESPONSES ` parent shreds
@ -164,10 +182,7 @@ struct ServeRepairStats {
total_response_packets: usize,
total_response_bytes_staked: usize,
total_response_bytes_unstaked: usize,
handle_requests_staked: usize,
handle_requests_unstaked: usize,
processed: usize,
self_repair: usize,
window_index: usize,
highest_window_index: usize,
orphan: usize,
@ -177,6 +192,9 @@ struct ServeRepairStats {
pings_sent: usize,
decode_time_us: u64,
handle_requests_time_us: u64,
handle_requests_staked: usize,
handle_requests_unstaked: usize,
err_self_repair: usize,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
@ -449,6 +467,117 @@ impl ServeRepair {
}
}
fn decode_request(
packet: &Packet,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
stats: &mut ServeRepairStats,
cluster_type: ClusterType,
) -> Result<RepairRequestWithMeta> {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
return Err(Error::from(RepairVerifyError::Malformed));
}
};
let from_addr = packet.meta().socket_addr();
if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) {
return Err(Error::from(RepairVerifyError::Malformed));
}
match Self::verify_signed_packet(my_id, packet, &request) {
Ok(()) => (),
Err(Error::RepairVerify(RepairVerifyError::Unsigned)) => match cluster_type {
ClusterType::Testnet | ClusterType::Development => {
return Err(Error::from(RepairVerifyError::Unsigned));
}
ClusterType::MainnetBeta | ClusterType::Devnet => {
stats.err_unsigned += 1;
}
},
Err(e) => return Err(e),
}
if request.sender() == my_id {
return Err(Error::from(RepairVerifyError::SelfRepair));
}
let stake = *epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()))
.unwrap_or(&0);
let whitelisted = whitelist.contains(request.sender());
Ok(RepairRequestWithMeta {
request,
from_addr,
stake,
whitelisted,
})
}
fn record_request_decode_error(error: &Error, stats: &mut ServeRepairStats) {
match error {
Error::RepairVerify(RepairVerifyError::IdMismatch) => {
stats.err_id_mismatch += 1;
}
Error::RepairVerify(RepairVerifyError::Malformed) => {
stats.err_malformed += 1;
}
Error::RepairVerify(RepairVerifyError::SelfRepair) => {
stats.err_self_repair += 1;
}
Error::RepairVerify(RepairVerifyError::SigVerify) => {
stats.err_sig_verify += 1;
}
Error::RepairVerify(RepairVerifyError::TimeSkew) => {
stats.err_time_skew += 1;
}
Error::RepairVerify(RepairVerifyError::Unsigned) => {
stats.err_unsigned += 1;
}
_ => {
debug_assert!(false, "unhandled error {:?}", error);
}
}
}
fn decode_requests(
reqs_v: Vec<PacketBatch>,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
stats: &mut ServeRepairStats,
cluster_type: ClusterType,
) -> Vec<RepairRequestWithMeta> {
let decode_packet = |packet| {
let result = Self::decode_request(
packet,
epoch_staked_nodes,
whitelist,
my_id,
socket_addr_space,
stats,
cluster_type,
);
match &result {
Ok(req) => {
if req.stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
}
}
Err(e) => {
Self::record_request_decode_error(e, stats);
}
}
result.ok()
};
reqs_v.iter().flatten().filter_map(decode_packet).collect()
}
/// Process messages from the network
fn run_listen(
&self,
@ -493,65 +622,19 @@ impl ServeRepair {
stats.total_requests += total_requests;
let decode_start = Instant::now();
let mut decoded_requests = Vec::default();
let mut whitelisted_request_count: usize = 0;
{
let mut decoded_requests = {
let whitelist = self.repair_whitelist.read().unwrap();
for packet in reqs_v.iter().flatten() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};
let from_addr = packet.meta().socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
match cluster_type {
ClusterType::Testnet | ClusterType::Development => {
if !Self::verify_signed_packet(&my_id, packet, &request, stats) {
continue;
}
}
ClusterType::MainnetBeta | ClusterType::Devnet => {
// collect stats for signature verification
let _ = Self::verify_signed_packet(&my_id, packet, &request, stats);
}
}
if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}
let stake = epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()))
.unwrap_or(&0);
if *stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
}
let whitelisted = whitelist.contains(request.sender());
if whitelisted {
whitelisted_request_count += 1;
}
decoded_requests.push(RepairRequestWithMeta {
request,
from_addr,
stake: *stake,
whitelisted,
});
}
}
Self::decode_requests(
reqs_v,
&epoch_staked_nodes,
&whitelist,
&my_id,
&socket_addr_space,
stats,
cluster_type,
)
};
let whitelisted_request_count = decoded_requests.iter().filter(|r| r.whitelisted).count();
stats.decode_time_us += decode_start.elapsed().as_micros() as u64;
stats.whitelisted_requests += whitelisted_request_count.min(MAX_REQUESTS_PER_ITERATION);
@ -578,13 +661,13 @@ impl ServeRepair {
}
fn report_reset_stats(&self, stats: &mut ServeRepairStats) {
if stats.self_repair > 0 {
if stats.err_self_repair > 0 {
let my_id = self.cluster_info.id();
warn!(
"{}: Ignored received repair requests from ME: {}",
my_id, stats.self_repair,
my_id, stats.err_self_repair,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.err_self_repair);
}
datapoint_info!(
@ -629,7 +712,7 @@ impl ServeRepair {
stats.total_response_bytes_unstaked,
i64
),
("self_repair", stats.self_repair, i64),
("self_repair", stats.err_self_repair, i64),
("window_index", stats.window_index, i64),
(
"request-highest-window-index",
@ -719,13 +802,11 @@ impl ServeRepair {
.unwrap()
}
#[must_use]
fn verify_signed_packet(
my_id: &Pubkey,
packet: &Packet,
request: &RepairProtocol,
stats: &mut ServeRepairStats,
) -> bool {
) -> Result<()> {
match request {
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
@ -734,13 +815,11 @@ impl ServeRepair {
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::LegacyAncestorHashes(_, _, _) => {
stats.err_unsigned += 1;
return false;
return Err(Error::from(RepairVerifyError::Unsigned));
}
RepairProtocol::Pong(pong) => {
if !pong.verify() {
stats.err_sig_verify += 1;
return false;
return Err(Error::from(RepairVerifyError::SigVerify));
}
}
RepairProtocol::WindowIndex { header, .. }
@ -748,39 +827,42 @@ impl ServeRepair {
| RepairProtocol::Orphan { header, .. }
| RepairProtocol::AncestorHashes { header, .. } => {
if &header.recipient != my_id {
stats.err_id_mismatch += 1;
return false;
return Err(Error::from(RepairVerifyError::IdMismatch));
}
let time_diff_ms = timestamp().abs_diff(header.timestamp);
if u128::from(time_diff_ms) > SIGNED_REPAIR_TIME_WINDOW.as_millis() {
stats.err_time_skew += 1;
return false;
return Err(Error::from(RepairVerifyError::TimeSkew));
}
let leading_buf = match packet.data(..4) {
Some(buf) => buf,
None => {
debug_assert!(false); // should have failed deserialize
stats.err_malformed += 1;
return false;
debug_assert!(
false,
"request should have failed deserialization: {:?}",
request
);
return Err(Error::from(RepairVerifyError::Malformed));
}
};
let trailing_buf = match packet.data(4 + SIGNATURE_BYTES..) {
Some(buf) => buf,
None => {
debug_assert!(false); // should have failed deserialize
stats.err_malformed += 1;
return false;
debug_assert!(
false,
"request should have failed deserialization: {:?}",
request
);
return Err(Error::from(RepairVerifyError::Malformed));
}
};
let from_id = request.sender();
let signed_data = [leading_buf, trailing_buf].concat();
if !header.signature.verify(from_id.as_ref(), &signed_data) {
stats.err_sig_verify += 1;
return false;
return Err(Error::from(RepairVerifyError::SigVerify));
}
}
}
true
Ok(())
}
fn check_ping_cache(
@ -1507,12 +1589,9 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert!(ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
&packet,
&request,
&mut ServeRepairStats::default(),
));
assert!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request).is_ok()
);
// recipient mismatch
let packet = {
@ -1529,14 +1608,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
let mut stats = ServeRepairStats::default();
assert!(!ServeRepair::verify_signed_packet(
&my_keypair.pubkey(),
&packet,
&request,
&mut stats,
assert!(matches!(
ServeRepair::verify_signed_packet(&my_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::IdMismatch))
));
assert_eq!(stats.err_id_mismatch, 1);
// outside time window
let packet = {
@ -1555,14 +1630,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
let mut stats = ServeRepairStats::default();
assert!(!ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
&packet,
&request,
&mut stats,
assert!(matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::TimeSkew))
));
assert_eq!(stats.err_time_skew, 1);
// bad signature
let packet = {
@ -1579,14 +1650,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
let mut stats = ServeRepairStats::default();
assert!(!ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
&packet,
&request,
&mut stats,
assert!(matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::SigVerify))
));
assert_eq!(stats.err_sig_verify, 1);
}
#[test]