diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 97fed9e92..a0e836724 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -11,7 +11,6 @@ use { solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, std::{ net::UdpSocket, - ops::RangeBounds, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -63,12 +62,12 @@ impl ShredFetchStage { } stats.shred_count += packet_batch.len(); // Limit shreds to 2 epochs away. - let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch); + let max_slot = last_slot + 2 * slots_per_epoch; for packet in packet_batch.iter_mut() { if should_discard_packet( packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &packet_hasher, &mut shreds_received, @@ -197,14 +196,13 @@ impl ShredFetchStage { fn should_discard_packet( packet: &Packet, root: Slot, - // Range of slots to ingest shreds for. - slot_bounds: impl RangeBounds, + max_slot: Slot, // Max slot to ingest shreds for. shred_version: u16, packet_hasher: &PacketHasher, shreds_received: &mut ShredsReceived, stats: &mut ShredFetchStats, ) -> bool { - if should_discard_shred(packet, root, shred_version, slot_bounds, stats) { + if should_discard_shred(packet, root, max_slot, shred_version, stats) { return true; } let hash = packet_hasher.hash_packet(packet); @@ -253,11 +251,11 @@ mod tests { let last_root = 0; let last_slot = 100; let slots_per_epoch = 10; - let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch); + let max_slot = last_slot + 2 * slots_per_epoch; assert!(!should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &hasher, &mut shreds_received, @@ -272,7 +270,7 @@ mod tests { assert!(!should_discard_packet( &packet, last_root, - slot_bounds, + max_slot, shred_version, &hasher, &mut shreds_received, @@ -290,7 +288,7 @@ mod tests { let last_slot = 100; let slots_per_epoch = 10; let shred_version = 59445; - let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch); + let max_slot = last_slot + 2 * slots_per_epoch; let hasher = PacketHasher::default(); @@ -298,7 +296,7 @@ mod tests { assert!(should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &hasher, &mut shreds_received, @@ -321,7 +319,7 @@ mod tests { assert!(should_discard_packet( &packet, 3, - 3..slot_bounds.end, + max_slot, shred_version, &hasher, &mut shreds_received, @@ -332,7 +330,7 @@ mod tests { assert!(should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, 345, // shred_version &hasher, &mut shreds_received, @@ -344,7 +342,7 @@ mod tests { assert!(!should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &hasher, &mut shreds_received, @@ -355,7 +353,7 @@ mod tests { assert!(should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &hasher, &mut shreds_received, @@ -379,7 +377,7 @@ mod tests { assert!(should_discard_packet( &packet, last_root, - slot_bounds.clone(), + max_slot, shred_version, &hasher, &mut shreds_received, @@ -392,7 +390,7 @@ mod tests { assert!(should_discard_packet( &packet, last_root, - slot_bounds, + max_slot, shred_version, &hasher, &mut shreds_received, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 656f324fb..dda68ff09 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -69,7 +69,7 @@ use { signature::{Keypair, Signature, Signer}, }, static_assertions::const_assert_eq, - std::{fmt::Debug, ops::RangeBounds}, + std::fmt::Debug, thiserror::Error, }; @@ -532,9 +532,7 @@ pub mod layout { pub fn get_shred(packet: &Packet) -> Option<&[u8]> { let size = get_shred_size(packet)?; - let shred = packet.data(..size)?; - // Should at least have a signature. - (size >= SIZE_OF_SIGNATURE).then(|| shred) + packet.data(..size) } pub(crate) fn get_signature(shred: &[u8]) -> Option { @@ -678,15 +676,16 @@ impl TryFrom for ShredVariant { } } +// Accepts shreds in the slot range [root + 1, max_slot]. #[must_use] pub fn should_discard_shred( packet: &Packet, root: Slot, + max_slot: Slot, shred_version: u16, - // Range of slots to ingest shreds for. - slot_bounds: impl RangeBounds, stats: &mut ShredFetchStats, ) -> bool { + debug_assert!(root < max_slot); let shred = match layout::get_shred(packet) { None => { stats.index_overrun += 1; @@ -694,9 +693,17 @@ pub fn should_discard_shred( } Some(shred) => shred, }; - if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() { - stats.index_overrun += 1; - return true; + match layout::get_version(shred) { + None => { + stats.index_overrun += 1; + return true; + } + Some(version) => { + if version != shred_version { + stats.shred_version_mismatch += 1; + return true; + } + } } let shred_type = match layout::get_shred_type(shred) { Ok(shred_type) => shred_type, @@ -707,7 +714,7 @@ pub fn should_discard_shred( }; let slot = match layout::get_slot(shred) { Some(slot) => { - if !slot_bounds.contains(&slot) { + if slot > max_slot { stats.slot_out_of_range += 1; return true; } @@ -725,10 +732,6 @@ pub fn should_discard_shred( return true; } }; - if layout::get_version(shred) != Some(shred_version) { - stats.shred_version_mismatch += 1; - return true; - } match shred_type { ShredType::Code => { if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 { @@ -739,7 +742,6 @@ pub fn should_discard_shred( stats.slot_out_of_range += 1; return true; } - false } ShredType::Data => { if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { @@ -764,9 +766,9 @@ pub fn should_discard_shred( stats.slot_out_of_range += 1; return true; } - false } } + false } pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { @@ -940,7 +942,7 @@ mod tests { let mut packet = Packet::default(); let root = 1; let shred_version = 798; - let slot_bounds = ..16; + let max_slot = 16; let shred = Shred::new_from_data( 2, // slot 3, // index @@ -956,8 +958,8 @@ mod tests { assert!(!should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats, ShredFetchStats::default()); @@ -966,8 +968,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats.index_overrun, 1); @@ -976,8 +978,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats.index_overrun, 2); @@ -986,8 +988,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats.index_overrun, 3); @@ -996,8 +998,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats.index_overrun, 4); @@ -1006,8 +1008,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(stats.bad_parent_offset, 1); @@ -1026,8 +1028,8 @@ mod tests { assert!(!should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); @@ -1045,8 +1047,8 @@ mod tests { assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(1, stats.index_out_of_bounds); @@ -1059,19 +1061,38 @@ mod tests { 30, // num_data_shreds 4, // num_coding_shreds 3, // position - 200, // version + shred_version, ); shred.copy_to_packet(&mut packet); + assert!(!should_discard_shred( + &packet, + root, + max_slot, + shred_version, + &mut stats + )); packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX; assert!(should_discard_shred( &packet, root, + max_slot, shred_version, - slot_bounds, &mut stats )); assert_eq!(1, stats.bad_shred_type); + assert_eq!(stats.shred_version_mismatch, 0); + + packet.buffer_mut()[OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 1] = u8::MAX; + assert!(should_discard_shred( + &packet, + root, + max_slot, + shred_version, + &mut stats + )); + assert_eq!(1, stats.bad_shred_type); + assert_eq!(stats.shred_version_mismatch, 1); } // Asserts that ShredType is backward compatible with u8.