simplifies packet/shred sanity checks (#26356)

This commit is contained in:
behzad nouri 2022-07-05 21:41:19 +00:00 committed by GitHub
parent 53b9420562
commit d3a14f5b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 44 deletions

View File

@ -11,7 +11,6 @@ use {
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
ops::RangeBounds,
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -63,12 +62,12 @@ impl ShredFetchStage {
}
stats.shred_count += packet_batch.len();
// Limit shreds to 2 epochs away.
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
let max_slot = last_slot + 2 * slots_per_epoch;
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&packet_hasher,
&mut shreds_received,
@ -197,14 +196,13 @@ impl ShredFetchStage {
fn should_discard_packet(
packet: &Packet,
root: Slot,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
packet_hasher: &PacketHasher,
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(packet, root, shred_version, slot_bounds, stats) {
if should_discard_shred(packet, root, max_slot, shred_version, stats) {
return true;
}
let hash = packet_hasher.hash_packet(packet);
@ -253,11 +251,11 @@ mod tests {
let last_root = 0;
let last_slot = 100;
let slots_per_epoch = 10;
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -272,7 +270,7 @@ mod tests {
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -290,7 +288,7 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let shred_version = 59445;
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
let max_slot = last_slot + 2 * slots_per_epoch;
let hasher = PacketHasher::default();
@ -298,7 +296,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -321,7 +319,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
3,
3..slot_bounds.end,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -332,7 +330,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
345, // shred_version
&hasher,
&mut shreds_received,
@ -344,7 +342,7 @@ mod tests {
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -355,7 +353,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -379,7 +377,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
max_slot,
shred_version,
&hasher,
&mut shreds_received,
@ -392,7 +390,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds,
max_slot,
shred_version,
&hasher,
&mut shreds_received,

View File

@ -69,7 +69,7 @@ use {
signature::{Keypair, Signature, Signer},
},
static_assertions::const_assert_eq,
std::{fmt::Debug, ops::RangeBounds},
std::fmt::Debug,
thiserror::Error,
};
@ -532,9 +532,7 @@ pub mod layout {
pub fn get_shred(packet: &Packet) -> Option<&[u8]> {
let size = get_shred_size(packet)?;
let shred = packet.data(..size)?;
// Should at least have a signature.
(size >= SIZE_OF_SIGNATURE).then(|| shred)
packet.data(..size)
}
pub(crate) fn get_signature(shred: &[u8]) -> Option<Signature> {
@ -678,15 +676,16 @@ impl TryFrom<u8> for ShredVariant {
}
}
// Accepts shreds in the slot range [root + 1, max_slot].
#[must_use]
pub fn should_discard_shred(
packet: &Packet,
root: Slot,
max_slot: Slot,
shred_version: u16,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
stats: &mut ShredFetchStats,
) -> bool {
debug_assert!(root < max_slot);
let shred = match layout::get_shred(packet) {
None => {
stats.index_overrun += 1;
@ -694,9 +693,17 @@ pub fn should_discard_shred(
}
Some(shred) => shred,
};
if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() {
stats.index_overrun += 1;
return true;
match layout::get_version(shred) {
None => {
stats.index_overrun += 1;
return true;
}
Some(version) => {
if version != shred_version {
stats.shred_version_mismatch += 1;
return true;
}
}
}
let shred_type = match layout::get_shred_type(shred) {
Ok(shred_type) => shred_type,
@ -707,7 +714,7 @@ pub fn should_discard_shred(
};
let slot = match layout::get_slot(shred) {
Some(slot) => {
if !slot_bounds.contains(&slot) {
if slot > max_slot {
stats.slot_out_of_range += 1;
return true;
}
@ -725,10 +732,6 @@ pub fn should_discard_shred(
return true;
}
};
if layout::get_version(shred) != Some(shred_version) {
stats.shred_version_mismatch += 1;
return true;
}
match shred_type {
ShredType::Code => {
if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 {
@ -739,7 +742,6 @@ pub fn should_discard_shred(
stats.slot_out_of_range += 1;
return true;
}
false
}
ShredType::Data => {
if index >= MAX_DATA_SHREDS_PER_SLOT as u32 {
@ -764,9 +766,9 @@ pub fn should_discard_shred(
stats.slot_out_of_range += 1;
return true;
}
false
}
}
false
}
pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -> u64 {
@ -940,7 +942,7 @@ mod tests {
let mut packet = Packet::default();
let root = 1;
let shred_version = 798;
let slot_bounds = ..16;
let max_slot = 16;
let shred = Shred::new_from_data(
2, // slot
3, // index
@ -956,8 +958,8 @@ mod tests {
assert!(!should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats, ShredFetchStats::default());
@ -966,8 +968,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 1);
@ -976,8 +978,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 2);
@ -986,8 +988,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 3);
@ -996,8 +998,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 4);
@ -1006,8 +1008,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.bad_parent_offset, 1);
@ -1026,8 +1028,8 @@ mod tests {
assert!(!should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
@ -1045,8 +1047,8 @@ mod tests {
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(1, stats.index_out_of_bounds);
@ -1059,19 +1061,38 @@ mod tests {
30, // num_data_shreds
4, // num_coding_shreds
3, // position
200, // version
shred_version,
);
shred.copy_to_packet(&mut packet);
assert!(!should_discard_shred(
&packet,
root,
max_slot,
shred_version,
&mut stats
));
packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX;
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
assert_eq!(stats.shred_version_mismatch, 0);
packet.buffer_mut()[OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 1] = u8::MAX;
assert!(should_discard_shred(
&packet,
root,
max_slot,
shred_version,
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
assert_eq!(stats.shred_version_mismatch, 1);
}
// Asserts that ShredType is backward compatible with u8.