From d4a174fb7cc2daa80fce11b060d0232e714316a1 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 15 Dec 2020 16:50:40 -0800 Subject: [PATCH] Partial shred deserialize cleanup and shred type differentiation (#14094) * Partial shred deserialize cleanup and shred type differentiation in retransmit * consolidate packet hashing logic --- core/src/lib.rs | 1 + core/src/packet_hasher.rs | 34 ++++++++ core/src/retransmit_stage.rs | 136 ++++++++++++++++++++--------- core/src/shred_fetch_stage.rs | 121 +++++++------------------- ledger/src/shred.rs | 159 +++++++++++++++++++++++++++++++--- 5 files changed, 310 insertions(+), 141 deletions(-) create mode 100644 core/src/packet_hasher.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index ee47cc3d2c..37e0fd991a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,6 +41,7 @@ pub mod ledger_cleanup_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; pub mod optimistically_confirmed_bank_tracker; +pub mod packet_hasher; pub mod ping_pong; pub mod poh_recorder; pub mod poh_service; diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs new file mode 100644 index 0000000000..77612428c5 --- /dev/null +++ b/core/src/packet_hasher.rs @@ -0,0 +1,34 @@ +// Get a unique hash value for a packet +// Used in retransmit and shred fetch to prevent dos with same packet data. + +use ahash::AHasher; +use rand::{thread_rng, Rng}; +use solana_perf::packet::Packet; +use std::hash::Hasher; + +#[derive(Clone)] +pub struct PacketHasher { + seed1: u128, + seed2: u128, +} + +impl Default for PacketHasher { + fn default() -> Self { + Self { + seed1: thread_rng().gen::(), + seed2: thread_rng().gen::(), + } + } +} + +impl PacketHasher { + pub fn hash_packet(&self, packet: &Packet) -> u64 { + let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2); + hasher.write(&packet.data[0..packet.meta.size]); + hasher.finish() + } + + pub fn reset(&mut self) { + *self = Self::default(); + } +} diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d73069361b..f0c6c60c28 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -1,8 +1,6 @@ //! The `retransmit_stage` retransmits shreds between validators #![allow(clippy::rc_buffer)] -use crate::shred_fetch_stage::ShredFetchStage; -use crate::shred_fetch_stage::ShredFetchStats; use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info_vote_listener::VerifiedVoteReceiver, @@ -15,10 +13,9 @@ use crate::{ result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; -use ahash::AHasher; use crossbeam_channel::Receiver; use lru::LruCache; -use rand::{thread_rng, Rng}; +use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, leader_schedule_cache::LeaderScheduleCache, @@ -26,14 +23,13 @@ use solana_ledger::{ }; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; -use solana_perf::packet::Packets; +use solana_perf::packet::{Packet, Packets}; use solana_runtime::bank_forks::BankForks; use solana_sdk::clock::{Epoch, Slot}; use solana_sdk::epoch_schedule::EpochSchedule; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; -use std::hash::Hasher; use std::{ cmp, collections::hash_set::HashSet, @@ -206,7 +202,42 @@ struct EpochStakesCache { stakes_and_index: Vec<(u64, usize)>, } -pub type ShredFilterAndSeeds = (LruCache<(Slot, u32), Vec>, u128, u128); +use crate::packet_hasher::PacketHasher; +// Map of shred (slot, index, is_data) => list of hash values seen for that key. +pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; + +pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); + +// Return true if shred is already received and should skip retransmit +fn check_if_already_received( + packet: &Packet, + shreds_received: &Arc>, +) -> bool { + match get_shred_slot_index_type(packet, &mut ShredFetchStats::default()) { + Some(slot_index) => { + let mut received = shreds_received.lock().unwrap(); + let hasher = received.1.clone(); + if let Some(sent) = received.0.get_mut(&slot_index) { + if sent.len() < MAX_DUPLICATE_COUNT { + let hash = hasher.hash_packet(packet); + if sent.contains(&hash) { + return true; + } + + sent.push(hash); + } else { + return true; + } + } else { + let hash = hasher.hash_packet(&packet); + received.0.put(slot_index, vec![hash]); + } + + false + } + None => true, + } +} #[allow(clippy::too_many_arguments)] fn retransmit( @@ -219,7 +250,7 @@ fn retransmit( stats: &Arc, epoch_stakes_cache: &Arc>, last_peer_update: &Arc, - shreds_received: &Arc>, + shreds_received: &Arc>, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -271,8 +302,7 @@ fn retransmit( { let mut sr = shreds_received.lock().unwrap(); sr.0.clear(); - sr.1 = thread_rng().gen::(); - sr.2 = thread_rng().gen::(); + sr.1.reset(); } } let mut peers_len = 0; @@ -299,33 +329,10 @@ fn retransmit( continue; } - match ShredFetchStage::get_slot_index(packet, &mut ShredFetchStats::default()) { - Some(slot_index) => { - let mut received = shreds_received.lock().unwrap(); - let seed1 = received.1; - let seed2 = received.2; - if let Some(sent) = received.0.get_mut(&slot_index) { - if sent.len() < MAX_DUPLICATE_COUNT { - let mut hasher = AHasher::new_with_keys(seed1, seed2); - hasher.write(&packet.data[0..packet.meta.size]); - let hash = hasher.finish(); - if sent.contains(&hash) { - continue; - } - - sent.push(hash); - } else { - continue; - } - } else { - let mut hasher = AHasher::new_with_keys(seed1, seed2); - hasher.write(&packet.data[0..packet.meta.size]); - let hash = hasher.finish(); - received.0.put(slot_index, vec![hash]); - } - } - None => continue, + if check_if_already_received(packet, shreds_received) { + continue; } + let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, @@ -414,7 +421,10 @@ pub fn retransmitter( r: Arc>, ) -> Vec> { let stats = Arc::new(RetransmitStats::default()); - let shreds_received = Arc::new(Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), 0, 0))); + let shreds_received = Arc::new(Mutex::new(( + LruCache::new(DEFAULT_LRU_SIZE), + PacketHasher::default(), + ))); (0..sockets.len()) .map(|s| { let sockets = sockets.clone(); @@ -568,6 +578,7 @@ mod tests { use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; use solana_ledger::create_new_tmp_ledger; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_ledger::shred::Shred; use solana_net_utils::find_available_port_in_range; use solana_perf::packet::{Packet, Packets}; use std::net::{IpAddr, Ipv4Addr}; @@ -616,8 +627,7 @@ mod tests { ); let _thread_hdls = vec![t_retransmit]; - let mut shred = - solana_ledger::shred::Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); + let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); let mut packet = Packet::default(); shred.copy_to_packet(&mut packet); @@ -642,4 +652,52 @@ mod tests { assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); } + + #[test] + fn test_already_received() { + let mut packet = Packet::default(); + let slot = 1; + let index = 5; + let version = 0x40; + let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0); + shred.copy_to_packet(&mut packet); + let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); + // unique shred for (1, 5) should pass + assert!(!check_if_already_received(&packet, &shreds_received)); + // duplicate shred for (1, 5) blocked + assert!(check_if_already_received(&packet, &shreds_received)); + + let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); + shred.copy_to_packet(&mut packet); + // first duplicate shred for (1, 5) passed + assert!(!check_if_already_received(&packet, &shreds_received)); + // then blocked + assert!(check_if_already_received(&packet, &shreds_received)); + + let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); + shred.copy_to_packet(&mut packet); + // 2nd duplicate shred for (1, 5) blocked + assert!(check_if_already_received(&packet, &shreds_received)); + assert!(check_if_already_received(&packet, &shreds_received)); + + let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version); + shred.copy_to_packet(&mut packet); + // Coding at (1, 5) passes + assert!(!check_if_already_received(&packet, &shreds_received)); + // then blocked + assert!(check_if_already_received(&packet, &shreds_received)); + + let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version); + shred.copy_to_packet(&mut packet); + // 2nd unique coding at (1, 5) passes + assert!(!check_if_already_received(&packet, &shreds_received)); + // same again is blocked + assert!(check_if_already_received(&packet, &shreds_received)); + + let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version); + shred.copy_to_packet(&mut packet); + // Another unique coding at (1, 5) always blocked + assert!(check_if_already_received(&packet, &shreds_received)); + assert!(check_if_already_received(&packet, &shreds_received)); + } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index b91e6233b1..1c5f725bb1 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,17 +1,10 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. -use ahash::AHasher; +use crate::packet_hasher::PacketHasher; use lru::LruCache; -use rand::{thread_rng, Rng}; -use std::hash::Hasher; - -use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT; -use solana_ledger::shred::{ - CODING_SHRED, DATA_SHRED, OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, OFFSET_OF_SHRED_TYPE, - SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT, -}; +use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; use solana_perf::cuda_runtime::PinnedVec; -use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler}; +use solana_perf::packet::{Packet, PacketsRecycler}; use solana_perf::recycler::Recycler; use solana_runtime::bank_forks::BankForks; use solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}; @@ -27,48 +20,11 @@ use std::time::Instant; const DEFAULT_LRU_SIZE: usize = 10_000; pub type ShredsReceived = LruCache; -#[derive(Default)] -pub struct ShredFetchStats { - index_overrun: usize, - shred_count: usize, - index_bad_deserialize: usize, - index_out_of_bounds: usize, - slot_bad_deserialize: usize, - duplicate_shred: usize, - slot_out_of_range: usize, -} - pub struct ShredFetchStage { thread_hdls: Vec>, } impl ShredFetchStage { - pub fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> { - let index_start = OFFSET_OF_SHRED_INDEX; - let index_end = index_start + SIZE_OF_SHRED_INDEX; - let slot_start = OFFSET_OF_SHRED_SLOT; - let slot_end = slot_start + SIZE_OF_SHRED_SLOT; - - if index_end <= p.meta.size { - if let Ok(index) = limited_deserialize::(&p.data[index_start..index_end]) { - if index < MAX_DATA_SHREDS_PER_SLOT as u32 && slot_end <= p.meta.size { - if let Ok(slot) = limited_deserialize::(&p.data[slot_start..slot_end]) { - return Some((slot, index)); - } else { - stats.slot_bad_deserialize += 1; - } - } else { - stats.index_out_of_bounds += 1; - } - } else { - stats.index_bad_deserialize += 1; - } - } else { - stats.index_overrun += 1; - } - None - } - fn process_packet( p: &mut Packet, shreds_received: &mut ShredsReceived, @@ -77,32 +33,24 @@ impl ShredFetchStage { last_slot: Slot, slots_per_epoch: u64, modify: &F, - seeds: (u128, u128), + packet_hasher: &PacketHasher, ) where F: Fn(&mut Packet), { p.meta.discard = true; - if let Some((slot, _index)) = Self::get_slot_index(p, stats) { + if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) { // Seems reasonable to limit shreds to 2 epochs away - if slot > last_root - && slot < (last_slot + 2 * slots_per_epoch) - && p.meta.size > OFFSET_OF_SHRED_TYPE - { - let shred_type = p.data[OFFSET_OF_SHRED_TYPE]; - if shred_type == DATA_SHRED || shred_type == CODING_SHRED { - // Shred filter + if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) { + // Shred filter - let mut hasher = AHasher::new_with_keys(seeds.0, seeds.1); - hasher.write(&p.data[0..p.meta.size]); - let hash = hasher.finish(); + let hash = packet_hasher.hash_packet(p); - if shreds_received.get(&hash).is_none() { - shreds_received.put(hash, ()); - p.meta.discard = false; - modify(p); - } else { - stats.duplicate_shred += 1; - } + if shreds_received.get(&hash).is_none() { + shreds_received.put(hash, ()); + p.meta.discard = false; + modify(p); + } else { + stats.duplicate_shred += 1; } } else { stats.slot_out_of_range += 1; @@ -130,12 +78,12 @@ impl ShredFetchStage { let mut last_stats = Instant::now(); let mut stats = ShredFetchStats::default(); - let mut seeds = (thread_rng().gen::(), thread_rng().gen::()); + let mut packet_hasher = PacketHasher::default(); while let Some(mut p) = recvr.iter().next() { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); - seeds = (thread_rng().gen::(), thread_rng().gen::()); + packet_hasher.reset(); shreds_received.clear(); if let Some(bank_forks) = bank_forks.as_ref() { let bank_forks_r = bank_forks.read().unwrap(); @@ -156,7 +104,7 @@ impl ShredFetchStage { last_slot, slots_per_epoch, &modify, - seeds, + &packet_hasher, ); }); if last_stats.elapsed().as_millis() > 1000 { @@ -274,6 +222,7 @@ impl ShredFetchStage { #[cfg(test)] mod tests { use super::*; + use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT; use solana_ledger::shred::Shred; #[test] @@ -287,7 +236,7 @@ mod tests { let shred = Shred::new_from_data(slot, 3, 0, None, true, true, 0, 0, 0); shred.copy_to_packet(&mut packet); - let seeds = (thread_rng().gen::(), thread_rng().gen::()); + let hasher = PacketHasher::default(); let last_root = 0; let last_slot = 100; @@ -300,7 +249,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(!packet.meta.discard); @@ -315,7 +264,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(!packet.meta.discard); } @@ -329,7 +278,9 @@ mod tests { let last_root = 0; let last_slot = 100; let slots_per_epoch = 10; - let seeds = (thread_rng().gen::(), thread_rng().gen::()); + + let hasher = PacketHasher::default(); + // packet size is 0, so cannot get index ShredFetchStage::process_packet( &mut packet, @@ -339,7 +290,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert_eq!(stats.index_overrun, 1); assert!(packet.meta.discard); @@ -355,7 +306,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(packet.meta.discard); @@ -368,7 +319,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(!packet.meta.discard); @@ -381,7 +332,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(packet.meta.discard); @@ -397,7 +348,7 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(packet.meta.discard); @@ -412,20 +363,8 @@ mod tests { last_slot, slots_per_epoch, &|_p| {}, - seeds, + &hasher, ); assert!(packet.meta.discard); } - - #[test] - fn test_shred_offsets() { - let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); - let mut packet = Packet::default(); - shred.copy_to_packet(&mut packet); - let mut stats = ShredFetchStats::default(); - assert_eq!( - Some((1, 3)), - ShredFetchStage::get_slot_index(&packet, &mut stats) - ); - } } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index fa6d601021..4b0d83a129 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -1,5 +1,6 @@ //! The `shred` module defines data structures and methods to pull MTU sized data frames from the network. use crate::{ + blockstore::MAX_DATA_SHREDS_PER_SLOT, entry::{create_ticks, Entry}, erasure::Session, }; @@ -12,7 +13,7 @@ use rayon::{ }; use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; -use solana_perf::packet::Packet; +use solana_perf::packet::{limited_deserialize, Packet}; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ clock::Slot, @@ -309,6 +310,27 @@ impl Shred { Ok(shred) } + pub fn new_empty_coding( + slot: Slot, + index: u32, + fec_set_index: u32, + num_data: usize, + num_code: usize, + position: usize, + version: u16, + ) -> Self { + let (header, coding_header) = Shredder::new_coding_shred_header( + slot, + index, + fec_set_index, + num_data, + num_code, + position, + version, + ); + Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header) + } + pub fn new_empty_from_header( common_header: ShredCommonHeader, data_header: DataShredHeader, @@ -699,7 +721,7 @@ impl Shredder { // Create empty coding shreds, with correctly populated headers let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { - let (header, coding_header) = Self::new_coding_shred_header( + let shred = Shred::new_empty_coding( slot, start_index + i as u32, start_index, @@ -708,8 +730,6 @@ impl Shredder { i, version, ); - let shred = - Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); coding_shreds.push(shred.payload); }); @@ -730,7 +750,7 @@ impl Shredder { .into_iter() .enumerate() .map(|(i, payload)| { - let (common_header, coding_header) = Self::new_coding_shred_header( + let mut shred = Shred::new_empty_coding( slot, start_index + i as u32, start_index, @@ -739,12 +759,8 @@ impl Shredder { i, version, ); - Shred { - common_header, - data_header: DataShredHeader::default(), - coding_header, - payload, - } + shred.payload = payload; + shred }) .collect() } else { @@ -963,6 +979,71 @@ impl Shredder { } } +#[derive(Default, Debug, Eq, PartialEq)] +pub struct ShredFetchStats { + pub index_overrun: usize, + pub shred_count: usize, + pub index_bad_deserialize: usize, + pub index_out_of_bounds: usize, + pub slot_bad_deserialize: usize, + pub duplicate_shred: usize, + pub slot_out_of_range: usize, + pub bad_shred_type: usize, +} + +// Get slot, index, and type from a packet with partial deserialize +pub fn get_shred_slot_index_type( + p: &Packet, + stats: &mut ShredFetchStats, +) -> Option<(Slot, u32, bool)> { + let index_start = OFFSET_OF_SHRED_INDEX; + let index_end = index_start + SIZE_OF_SHRED_INDEX; + let slot_start = OFFSET_OF_SHRED_SLOT; + let slot_end = slot_start + SIZE_OF_SHRED_SLOT; + + debug_assert!(index_end > slot_end); + debug_assert!(index_end > OFFSET_OF_SHRED_TYPE); + + if index_end > p.meta.size { + stats.index_overrun += 1; + return None; + } + + let index; + match limited_deserialize::(&p.data[index_start..index_end]) { + Ok(x) => index = x, + Err(_e) => { + stats.index_bad_deserialize += 1; + return None; + } + } + + if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { + stats.index_out_of_bounds += 1; + return None; + } + + let slot; + match limited_deserialize::(&p.data[slot_start..slot_end]) { + Ok(x) => { + slot = x; + } + Err(_e) => { + stats.slot_bad_deserialize += 1; + return None; + } + } + + let shred_type = p.data[OFFSET_OF_SHRED_TYPE]; + if shred_type == DATA_SHRED || shred_type == CODING_SHRED { + return Some((slot, index, shred_type == DATA_SHRED)); + } else { + stats.bad_shred_type += 1; + } + + None +} + pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) @@ -1707,4 +1788,60 @@ pub mod tests { }) ); } + + #[test] + fn test_shred_offsets() { + solana_logger::setup(); + let mut packet = Packet::default(); + let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); + shred.copy_to_packet(&mut packet); + let mut stats = ShredFetchStats::default(); + let ret = get_shred_slot_index_type(&packet, &mut stats); + assert_eq!(Some((1, 3, true)), ret); + assert_eq!(stats, ShredFetchStats::default()); + + packet.meta.size = OFFSET_OF_SHRED_TYPE; + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(stats.index_overrun, 1); + + packet.meta.size = OFFSET_OF_SHRED_INDEX; + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(stats.index_overrun, 2); + + packet.meta.size = OFFSET_OF_SHRED_INDEX + 1; + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(stats.index_overrun, 3); + + packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1; + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(stats.index_overrun, 4); + + packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX; + assert_eq!( + Some((1, 3, true)), + get_shred_slot_index_type(&packet, &mut stats) + ); + assert_eq!(stats.index_overrun, 4); + + let shred = Shred::new_empty_coding(8, 2, 10, 30, 4, 7, 200); + shred.copy_to_packet(&mut packet); + assert_eq!( + Some((8, 2, false)), + get_shred_slot_index_type(&packet, &mut stats) + ); + + let shred = Shred::new_from_data(1, std::u32::MAX - 10, 0, None, true, true, 0, 0, 0); + shred.copy_to_packet(&mut packet); + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(1, stats.index_out_of_bounds); + + let (mut header, coding_header) = + Shredder::new_coding_shred_header(8, 2, 10, 30, 4, 7, 200); + header.shred_type = ShredType(u8::MAX); + let shred = Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); + shred.copy_to_packet(&mut packet); + + assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert_eq!(1, stats.bad_shred_type); + } }