diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 6d7993c48..7e13c6419 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -4,7 +4,7 @@ use crate::entry::Entry; use crate::erasure::ErasureConfig; use crate::result::{Error, Result}; -use crate::shred::{Shred, ShredMetaBuf, Shredder}; +use crate::shred::{Shred, ShredInfo, Shredder}; #[cfg(feature = "kvstore")] use solana_kvstore as kvstore; @@ -320,8 +320,8 @@ impl Blocktree { db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, index_working_set: &HashMap, - prev_inserted_datas: &mut HashMap<(u64, u64), ShredMetaBuf>, - prev_inserted_codes: &mut HashMap<(u64, u64), ShredMetaBuf>, + prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>, + prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -357,12 +357,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { - Some(ShredMetaBuf { - slot, - index: i as u32, - data_shred: true, - shred_buf: data, - }) + Some(ShredInfo::new_from_serialized_shred(data)) } else { warn!("Data shred deleted while reading for recovery"); None @@ -382,12 +377,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { - Some(ShredMetaBuf { - slot, - index: i as u32, - data_shred: false, - shred_buf: code, - }) + Some(ShredInfo::new_from_serialized_shred(code)) } else { warn!("Code shred deleted while reading for recovery"); None @@ -439,15 +429,7 @@ impl Blocktree { let mut index_working_set = HashMap::new(); shreds.into_iter().for_each(|shred| { - if let Shred::Coding(_) = &shred { - self.check_insert_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut write_batch, - &mut just_inserted_coding_shreds, - ); - } else { + if shred.is_data() { self.check_insert_data_shred( shred, &mut index_working_set, @@ -455,6 +437,14 @@ impl Blocktree { &mut write_batch, &mut just_inserted_data_shreds, ); + } else { + self.check_insert_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut write_batch, + &mut just_inserted_coding_shreds, + ); } }); @@ -523,7 +513,7 @@ impl Blocktree { erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, + just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredInfo>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -538,15 +528,10 @@ impl Blocktree { if let Ok(shred_buf) = self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) { - let shred_meta = ShredMetaBuf { - slot, - index: shred_index as u32, - data_shred: false, - shred_buf, - }; + let shred_info = ShredInfo::new_from_shred(&shred, shred_buf); just_inserted_coding_shreds .entry((slot, shred_index)) - .or_insert_with(|| shred_meta); + .or_insert_with(|| shred_info); new_index_meta.map(|n| index_working_set.insert(slot, n)); } } @@ -558,7 +543,7 @@ impl Blocktree { index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, + just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredInfo>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -584,13 +569,8 @@ impl Blocktree { &shred, write_batch, ) { - let shred_meta = ShredMetaBuf { - slot, - index: shred_index as u32, - data_shred: true, - shred_buf, - }; - just_inserted_data_shreds.insert((slot, shred_index), shred_meta); + let shred_info = ShredInfo::new_from_shred(&shred, shred_buf); + just_inserted_data_shreds.insert((slot, shred_index), shred_info); new_index_meta.map(|n| index_working_set.insert(slot, n)); true } else { @@ -614,24 +594,17 @@ impl Blocktree { let slot = shred.slot(); let shred_index = shred.index(); - let (pos, num_coding) = { - if let Shred::Coding(coding_shred) = &shred { - ( - u32::from(coding_shred.header.position), - coding_shred.header.num_coding_shreds, - ) - } else { - panic!("should_insert_coding_shred called with non-coding shred") - } - }; + let (_, num_coding, pos) = shred + .coding_params() + .expect("should_insert_coding_shred called with non-coding shred"); - if shred_index < pos { + if shred_index < u32::from(pos) { return false; } - let set_index = shred_index - pos; + let set_index = shred_index - u32::from(pos); !(num_coding == 0 - || pos >= u32::from(num_coding) + || pos >= num_coding || std::u32::MAX - set_index < u32::from(num_coding) - 1 || coding_index.is_present(u64::from(shred_index)) || slot <= *last_root.read().unwrap()) @@ -646,29 +619,21 @@ impl Blocktree { ) -> Result> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); - let (num_data, num_coding, pos) = { - if let Shred::Coding(coding_shred) = &shred { - ( - coding_shred.header.num_data_shreds as usize, - coding_shred.header.num_coding_shreds as usize, - u64::from(coding_shred.header.position), - ) - } else { - panic!("insert_coding_shred called with non-coding shred") - } - }; + let (num_data, num_coding, pos) = shred + .coding_params() + .expect("insert_coding_shred called with non-coding shred"); // Assert guaranteed by integrity checks on the shred that happen before // `insert_coding_shred` is called - if shred_index < pos { + if shred_index < u64::from(pos) { error!("Due to earlier validation, shred index must be >= pos"); return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())), ))); } - let set_index = shred_index - pos; - let erasure_config = ErasureConfig::new(num_data, num_coding); + let set_index = shred_index - u64::from(pos); + let erasure_config = ErasureConfig::new(num_data as usize, num_coding as usize); let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { self.erasure_meta_cf @@ -3076,7 +3041,7 @@ pub mod tests { #[test] pub fn test_should_insert_data_shred() { - let (shreds, _) = make_slot_entries(0, 0, 100); + let (mut shreds, _) = make_slot_entries(0, 0, 100); let blocktree_path = get_tmp_ledger_path!(); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); @@ -3122,10 +3087,9 @@ pub mod tests { let index = index_cf.get(0).unwrap().unwrap(); assert_eq!(slot_meta.received, 9); let shred7 = { - if let Shred::Data(ref s) = shreds[7] { - let mut shred = Shred::Data(s.clone()); - shred.set_last_in_slot(); - shred + if shreds[7].is_data() { + shreds[7].set_last_in_slot(); + shreds[7].clone() } else { panic!("Shred in unexpected format") } @@ -3144,8 +3108,8 @@ pub mod tests { let index = index_cf.get(0).unwrap().unwrap(); // Trying to insert a shred with index > the "is_last" shred should fail - if let Shred::Data(ref mut s) = shred8 { - s.header.common_header.slot = slot_meta.last_index + 1; + if shred8.is_data() { + shred8.set_slot(slot_meta.last_index + 1); } else { panic!("Shred in unexpected format") } @@ -3170,8 +3134,8 @@ pub mod tests { let mut shred = CodingShred::default(); let slot = 1; shred.header.position = 10; - shred.header.common_header.index = 11; - shred.header.common_header.slot = 1; + shred.header.coding_header.index = 11; + shred.header.coding_header.slot = 1; shred.header.num_coding_shreds = shred.header.position + 1; let coding_shred = Shred::Coding(shred.clone()); @@ -3190,7 +3154,7 @@ pub mod tests { // Trying to insert the same shred again should fail { let index = index_cf - .get(shred.header.common_header.slot) + .get(shred.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3200,12 +3164,12 @@ pub mod tests { )); } - shred.header.common_header.index += 1; + shred.header.coding_header.index += 1; // Establish a baseline that works { let index = index_cf - .get(shred.header.common_header.slot) + .get(shred.header.coding_header.slot) .unwrap() .unwrap(); assert!(Blocktree::should_insert_coding_shred( @@ -3218,9 +3182,9 @@ pub mod tests { // Trying to insert a shred with index < position should fail { let mut shred_ = shred.clone(); - shred_.header.common_header.index = (shred_.header.position - 1).into(); + shred_.header.coding_header.index = (shred_.header.position - 1).into(); let index = index_cf - .get(shred_.header.common_header.slot) + .get(shred_.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3235,7 +3199,7 @@ pub mod tests { let mut shred_ = shred.clone(); shred_.header.num_coding_shreds = 0; let index = index_cf - .get(shred_.header.common_header.slot) + .get(shred_.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3250,7 +3214,7 @@ pub mod tests { let mut shred_ = shred.clone(); shred_.header.num_coding_shreds = shred_.header.position; let index = index_cf - .get(shred_.header.common_header.slot) + .get(shred_.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3265,10 +3229,10 @@ pub mod tests { { let mut shred_ = shred.clone(); shred_.header.num_coding_shreds = 3; - shred_.header.common_header.index = std::u32::MAX - 1; + shred_.header.coding_header.index = std::u32::MAX - 1; shred_.header.position = 0; let index = index_cf - .get(shred_.header.common_header.slot) + .get(shred_.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3294,10 +3258,10 @@ pub mod tests { { let mut shred_ = shred.clone(); let index = index_cf - .get(shred_.header.common_header.slot) + .get(shred_.header.coding_header.slot) .unwrap() .unwrap(); - shred_.header.common_header.slot = *last_root.read().unwrap(); + shred_.header.coding_header.slot = *last_root.read().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &Shred::Coding(shred_), index.coding(), diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 83c0cf71a..d9c98b1e5 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -79,11 +79,11 @@ impl BroadcastRun for BroadcastFakeBlobsRun { if i <= self.partition { // Send fake blobs to the first N peers fake_shred_bufs.iter().for_each(|b| { - sock.send_to(b, &peer.tvu_forwards).unwrap(); + sock.send_to(&b.shred, &peer.tvu_forwards).unwrap(); }); } else { shred_bufs.iter().for_each(|b| { - sock.send_to(b, &peer.tvu_forwards).unwrap(); + sock.send_to(&b.shred, &peer.tvu_forwards).unwrap(); }); } }); diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 627077d25..4442b674e 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,7 +1,7 @@ use crate::entry::Entry; use crate::poh_recorder::WorkingBankEntries; use crate::result::Result; -use crate::shred::{Shred, Shredder}; +use crate::shred::{Shred, ShredInfo, Shredder}; use solana_runtime::bank::Bank; use solana_sdk::signature::Keypair; use std::sync::mpsc::Receiver; @@ -78,7 +78,7 @@ pub(super) fn entries_to_shreds( keypair: &Arc, mut latest_shred_index: u64, parent_slot: u64, -) -> (Vec, Vec>, u64) { +) -> (Vec, Vec, u64) { let mut all_shred_bufs = vec![]; let mut all_shreds = vec![]; let num_ventries = ventries.len(); @@ -101,7 +101,7 @@ pub(super) fn entries_to_shreds( shredder.finalize_data(); } - let (mut shreds, mut shred_bufs): (Vec, Vec>) = + let (mut shreds, mut shred_bufs): (Vec, Vec) = shredder.shred_tuples.into_iter().unzip(); trace!("Inserting {:?} shreds in blocktree", shreds.len()); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 5c6afe947..c1521747d 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -41,7 +41,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { .map(|meta| meta.consumed) .unwrap_or(0); - let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds( + let (shreds, shred_infos, _) = broadcast_utils::entries_to_shreds( receive_results.ventries, bank.slot(), last_tick, @@ -59,6 +59,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + let shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.shred).collect(); // Broadcast data + erasures cluster_info.read().unwrap().broadcast_shreds( sock, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 2b7387feb..a8dc9a401 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -79,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun { 0 }; - let (all_shreds, all_shred_bufs, latest_shred_index) = entries_to_shreds( + let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds( receive_results.ventries, bank.slot(), last_tick, @@ -102,6 +102,7 @@ impl BroadcastRun for StandardBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + let all_shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.shred).collect(); trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); cluster_info.read().unwrap().broadcast_shreds( sock, diff --git a/core/src/chacha.rs b/core/src/chacha.rs index cc296d9ea..c0dd47f2d 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -153,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "9aHjDwufwfZgbmSdug5g58KSGqTsFx9faXuwoikDe4e4" + let golden: Hash = "C7RmQ7oDswQfgquukXHGvpYYSCcKTgPnJrYA3ABbX9oG" .parse() .unwrap(); diff --git a/core/src/shred.rs b/core/src/shred.rs index b82bc66d7..e1fb0536e 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -14,6 +14,10 @@ use std::sync::Arc; use std::{cmp, io}; lazy_static! { + static ref SIZE_OF_CODING_SHRED_HEADER: usize = + { serialized_size(&CodingShredHeader::default()).unwrap() as usize }; + static ref SIZE_OF_DATA_SHRED_HEADER: usize = + { serialized_size(&DataShredHeader::default()).unwrap() as usize }; static ref SIZE_OF_EMPTY_CODING_SHRED: usize = { serialized_size(&CodingShred::empty_shred()).unwrap() as usize }; static ref SIZE_OF_EMPTY_DATA_SHRED: usize = @@ -26,14 +30,134 @@ lazy_static! { { bincode::serialized_size(&Signature::default()).unwrap() as usize }; static ref SIZE_OF_EMPTY_VEC: usize = { bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize }; + static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize }; } -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct ShredMetaBuf { - pub slot: u64, - pub index: u32, - pub data_shred: bool, - pub shred_buf: Vec, +/// The constants that define if a shred is data or coding +const DATA_SHRED: u8 = 0b1010_0101; +const CODING_SHRED: u8 = 0b0101_1010; + +#[derive(Clone, Debug)] +pub struct ShredInfo { + pub headers: DataShredHeader, + pub shred: Vec, +} + +impl ShredInfo { + fn new(header: DataShredHeader, shred_buf: Vec) -> Self { + ShredInfo { + headers: header, + shred: shred_buf, + } + } + + pub fn new_from_serialized_shred(shred_buf: Vec) -> Self { + let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; + let shred_type: u8 = + bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE]) + .unwrap(); + let header = if shred_type == CODING_SHRED { + let end = *SIZE_OF_CODING_SHRED_HEADER; + let mut header = DataShredHeader::default(); + header.common_header.header = + bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap(); + header + } else { + let end = *SIZE_OF_DATA_SHRED_HEADER; + bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap() + }; + Self::new(header, shred_buf) + } + + pub fn new_from_shred(shred: &Shred, shred_buf: Vec) -> Self { + let header = match shred { + Shred::Data(s) => s.header.clone(), + Shred::Coding(s) => { + let mut hdr = DataShredHeader::default(); + hdr.common_header.header = s.header.clone(); + hdr + } + }; + + Self::new(header, shred_buf) + } + + fn header(&self) -> &ShredCommonHeader { + if self.is_data() { + &self.headers.data_header + } else { + &self.headers.common_header.header.coding_header + } + } + + pub fn header_mut(&mut self) -> &mut ShredCommonHeader { + if self.is_data() { + &mut self.headers.data_header + } else { + &mut self.headers.common_header.header.coding_header + } + } + + pub fn slot(&self) -> u64 { + self.header().slot + } + + pub fn parent(&self) -> u64 { + if self.is_data() { + self.headers.data_header.slot - u64::from(self.headers.parent_offset) + } else { + std::u64::MAX + } + } + + pub fn index(&self) -> u32 { + self.header().index + } + + pub fn signature(&self) -> Signature { + self.header().signature + } + + pub fn seed(&self) -> [u8; 32] { + let mut seed = [0; 32]; + let seed_len = seed.len(); + let sig = self.header().signature.as_ref(); + seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]); + seed + } + + pub fn is_data(&self) -> bool { + self.headers.common_header.header.shred_type == DATA_SHRED + } + + pub fn last_in_slot(&self) -> bool { + if self.is_data() { + self.headers.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT + } else { + false + } + } + + pub fn data_complete(&self) -> bool { + if self.is_data() { + self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED + } else { + false + } + } + + pub fn coding_params(&self) -> Option<(u16, u16, u16)> { + if !self.is_data() { + let header = &self.headers.common_header.header; + Some(( + header.num_data_shreds, + header.num_coding_shreds, + header.position, + )) + } else { + None + } + } } #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] @@ -46,20 +170,20 @@ pub enum Shred { /// a public constant defined for it. const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 4; -const LAST_SHRED_IN_SLOT: u8 = 1; -const DATA_COMPLETE_SHRED: u8 = 2; +const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001; +const DATA_COMPLETE_SHRED: u8 = 0b0000_0010; impl Shred { pub fn slot(&self) -> u64 { match self { - Shred::Data(s) => s.header.common_header.slot, - Shred::Coding(s) => s.header.common_header.slot, + Shred::Data(s) => s.header.data_header.slot, + Shred::Coding(s) => s.header.coding_header.slot, } } pub fn parent(&self) -> u64 { match self { - Shred::Data(s) => s.header.common_header.slot - u64::from(s.header.parent_offset), + Shred::Data(s) => s.header.data_header.slot - u64::from(s.header.parent_offset), Shred::Coding(_) => std::u64::MAX, } } @@ -68,38 +192,38 @@ impl Shred { let parent = self.parent(); match self { Shred::Data(s) => { - s.header.common_header.slot = slot; + s.header.data_header.slot = slot; s.header.parent_offset = (slot - parent) as u16; } - Shred::Coding(s) => s.header.common_header.slot = slot, + Shred::Coding(s) => s.header.coding_header.slot = slot, }; } pub fn index(&self) -> u32 { match self { - Shred::Data(s) => s.header.common_header.index, - Shred::Coding(s) => s.header.common_header.index, + Shred::Data(s) => s.header.data_header.index, + Shred::Coding(s) => s.header.coding_header.index, } } pub fn set_index(&mut self, index: u32) { match self { - Shred::Data(s) => s.header.common_header.index = index, - Shred::Coding(s) => s.header.common_header.index = index, + Shred::Data(s) => s.header.data_header.index = index, + Shred::Coding(s) => s.header.coding_header.index = index, }; } pub fn signature(&self) -> Signature { match self { - Shred::Data(s) => s.header.common_header.signature, - Shred::Coding(s) => s.header.common_header.signature, + Shred::Data(s) => s.header.data_header.signature, + Shred::Coding(s) => s.header.coding_header.signature, } } pub fn set_signature(&mut self, sig: Signature) { match self { - Shred::Data(s) => s.header.common_header.signature = sig, - Shred::Coding(s) => s.header.common_header.signature = sig, + Shred::Data(s) => s.header.data_header.signature = sig, + Shred::Coding(s) => s.header.coding_header.signature = sig, }; } @@ -107,8 +231,8 @@ impl Shred { let mut seed = [0; 32]; let seed_len = seed.len(); let sig = match self { - Shred::Data(s) => &s.header.common_header.signature, - Shred::Coding(s) => &s.header.common_header.signature, + Shred::Data(s) => &s.header.data_header.signature, + Shred::Coding(s) => &s.header.coding_header.signature, } .as_ref(); @@ -124,7 +248,11 @@ impl Shred { pub fn fast_verify(&self, shred_buf: &[u8], pubkey: &Pubkey) -> bool { let signed_payload_offset = match self { Shred::Data(_) => CodingShred::overhead(), - Shred::Coding(_) => CodingShred::overhead() - *SIZE_OF_EMPTY_CODING_SHRED, + Shred::Coding(_) => { + CodingShred::overhead() + *SIZE_OF_SHRED_TYPE + - *SIZE_OF_CODING_SHRED_HEADER + - *SIZE_OF_EMPTY_VEC + } } + *SIZE_OF_SIGNATURE; self.signature() .verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..]) @@ -158,6 +286,18 @@ impl Shred { Shred::Coding(_) => false, } } + + pub fn coding_params(&self) -> Option<(u16, u16, u16)> { + if let Shred::Coding(s) = self { + Some(( + s.header.num_data_shreds, + s.header.num_coding_shreds, + s.header.position, + )) + } else { + None + } + } } /// A common header that is present at start of every shred @@ -169,22 +309,22 @@ pub struct ShredCommonHeader { } /// A common header that is present at start of every data shred -#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct DataShredHeader { - _reserved: CodingShredHeader, - pub common_header: ShredCommonHeader, + common_header: CodingShred, + pub data_header: ShredCommonHeader, pub parent_offset: u16, pub flags: u8, } /// The coding shred header has FEC information -#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct CodingShredHeader { - pub common_header: ShredCommonHeader, + pub shred_type: u8, + pub coding_header: ShredCommonHeader, pub num_data_shreds: u16, pub num_coding_shreds: u16, pub position: u16, - pub payload: Vec, } #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] @@ -196,6 +336,39 @@ pub struct DataShred { #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct CodingShred { pub header: CodingShredHeader, + pub payload: Vec, +} + +impl Default for DataShredHeader { + fn default() -> Self { + DataShredHeader { + common_header: CodingShred { + header: CodingShredHeader { + shred_type: DATA_SHRED, + coding_header: ShredCommonHeader::default(), + num_data_shreds: 0, + num_coding_shreds: 0, + position: 0, + }, + payload: vec![], + }, + data_header: ShredCommonHeader::default(), + parent_offset: 0, + flags: 0, + } + } +} + +impl Default for CodingShredHeader { + fn default() -> Self { + CodingShredHeader { + shred_type: CODING_SHRED, + coding_header: ShredCommonHeader::default(), + num_data_shreds: 0, + num_coding_shreds: 0, + position: 0, + } + } } /// Default shred is sized correctly to meet MTU/Packet size requirements @@ -214,13 +387,8 @@ impl Default for CodingShred { fn default() -> Self { let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED; CodingShred { - header: CodingShredHeader { - common_header: ShredCommonHeader::default(), - num_data_shreds: 0, - num_coding_shreds: 0, - position: 0, - payload: vec![0; size], - }, + header: CodingShredHeader::default(), + payload: vec![0; size], } } } @@ -260,11 +428,11 @@ impl ShredCommon for DataShred { impl ShredCommon for CodingShred { fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { - let mut capacity = self.header.payload.len().saturating_sub(offset); + let mut capacity = self.payload.len().saturating_sub(offset); let slice_len = cmp::min(capacity, buf.len()); capacity -= slice_len; if slice_len > 0 { - self.header.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); + self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); } (slice_len, capacity) } @@ -276,6 +444,7 @@ impl ShredCommon for CodingShred { fn empty_shred() -> Self { CodingShred { header: CodingShredHeader::default(), + payload: vec![], } } } @@ -288,7 +457,7 @@ pub struct Shredder { parent_offset: u16, fec_rate: f32, signer: Arc, - pub shred_tuples: Vec<(Shred, Vec)>, + pub shred_tuples: Vec<(Shred, ShredInfo)>, fec_set_shred_start: usize, active_shred: Shred, active_offset: usize, @@ -359,8 +528,8 @@ impl Shredder { ))) } else { let mut data_shred = DataShred::default(); - data_shred.header.common_header.slot = slot; - data_shred.header.common_header.index = index; + data_shred.header.data_header.slot = slot; + data_shred.header.data_header.index = index; data_shred.header.parent_offset = (slot - parent) as u16; let active_shred = Shred::Data(data_shred); Ok(Shredder { @@ -381,16 +550,17 @@ impl Shredder { fn sign_shred( signer: &Arc, shred: &mut Shred, - shred_buf: &mut [u8], + shred_info: &mut ShredInfo, signature_offset: usize, ) { let data_offset = signature_offset + *SIZE_OF_SIGNATURE; - let signature = signer.sign_message(&shred_buf[data_offset..]); + let signature = signer.sign_message(&shred_info.shred[data_offset..]); let serialized_signature = bincode::serialize(&signature).expect("Failed to generate serialized signature"); shred.set_signature(signature); - shred_buf[signature_offset..signature_offset + serialized_signature.len()] + shred_info.shred[signature_offset..signature_offset + serialized_signature.len()] .copy_from_slice(&serialized_signature); + shred_info.header_mut().signature = signature; } fn sign_unsigned_shreds_and_generate_codes(&mut self) { @@ -401,7 +571,9 @@ impl Shredder { .for_each(|(s, d)| Self::sign_shred(&signer, s, d, signature_offset)); let unsigned_coding_shred_start = self.shred_tuples.len(); self.generate_coding_shreds(); - let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; + let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED + *SIZE_OF_SHRED_TYPE + - *SIZE_OF_CODING_SHRED_HEADER + - *SIZE_OF_EMPTY_VEC; self.shred_tuples[unsigned_coding_shred_start..] .iter_mut() .for_each(|(s, d)| Self::sign_shred(&signer, s, d, coding_header_offset)); @@ -419,14 +591,15 @@ impl Shredder { let mut shred = Shred::Data(self.new_data_shred()); std::mem::swap(&mut shred, &mut self.active_shred); - self.shred_tuples.push((shred, data)); + let shred_info = ShredInfo::new_from_shred(&shred, data); + self.shred_tuples.push((shred, shred_info)); } /// Creates a new data shred fn new_data_shred(&self) -> DataShred { let mut data_shred = DataShred::default(); - data_shred.header.common_header.slot = self.slot; - data_shred.header.common_header.index = self.index; + data_shred.header.data_header.slot = self.slot; + data_shred.header.data_header.index = self.index; data_shred.header.parent_offset = self.parent_offset; data_shred } @@ -439,8 +612,8 @@ impl Shredder { position: usize, ) -> CodingShred { let mut coding_shred = CodingShred::default(); - coding_shred.header.common_header.slot = slot; - coding_shred.header.common_header.index = index; + coding_shred.header.coding_header.slot = slot; + coding_shred.header.coding_header.index = index; coding_shred.header.num_data_shreds = num_data as u16; coding_shred.header.num_coding_shreds = num_code as u16; coding_shred.header.position = position as u16; @@ -460,7 +633,7 @@ impl Shredder { let coding_block_offset = CodingShred::overhead(); let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..] .iter() - .map(|(_, data)| &data[coding_block_offset..]) + .map(|(_, data)| &data.shred[coding_block_offset..]) .collect(); // Create empty coding shreds, with correctly populated headers @@ -491,7 +664,8 @@ impl Shredder { // append to the shred list coding_shreds.into_iter().for_each(|code| { let shred: Shred = bincode::deserialize(&code).unwrap(); - self.shred_tuples.push((shred, code)); + let shred_info = ShredInfo::new_from_shred(&shred, code); + self.shred_tuples.push((shred, shred_info)); }); self.fec_set_index = self.index; } @@ -529,7 +703,7 @@ impl Shredder { } fn fill_in_missing_shreds( - shred: &ShredMetaBuf, + shred: &ShredInfo, num_data: usize, num_coding: usize, slot: u64, @@ -563,8 +737,8 @@ impl Shredder { ) -> Vec { let missing_shred = if missing < first_index + num_data { let mut data_shred = DataShred::default(); - data_shred.header.common_header.slot = slot; - data_shred.header.common_header.index = missing as u32; + data_shred.header.data_header.slot = slot; + data_shred.header.data_header.index = missing as u32; Shred::Data(data_shred) } else { Shred::Coding(Self::new_coding_shred( @@ -579,7 +753,7 @@ impl Shredder { } pub fn try_recovery( - shreds: Vec, + shreds: Vec, num_data: usize, num_coding: usize, first_index: usize, @@ -606,7 +780,7 @@ impl Shredder { next_expected_index, &mut present, ); - blocks.push(shred.shred_buf); + blocks.push(shred.shred); next_expected_index = last_index + 1; blocks }) @@ -693,11 +867,11 @@ impl Shredder { Ok(Self::reassemble_payload(num_data, data_shred_bufs)) } - fn get_shred_index(shred: &ShredMetaBuf, num_data: usize) -> usize { - if shred.data_shred { - shred.index as usize + fn get_shred_index(shred: &ShredInfo, num_data: usize) -> usize { + if shred.is_data() { + shred.index() as usize } else { - shred.index as usize + num_data + shred.index() as usize + num_data } } @@ -762,13 +936,13 @@ mod tests { // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) let (_, shred) = &shredder.shred_tuples[0]; - assert_eq!(shred.len(), PACKET_DATA_SIZE); - info!("Len: {}", shred.len()); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + info!("Len: {}", shred.shred.len()); info!("{:?}", shred); // Test4: Try deserialize the PDU and assert that it matches the original shred let deserialized_shred: Shred = - bincode::deserialize(&shred).expect("Failed in deserializing the PDU"); + bincode::deserialize(&shred.shred).expect("Failed in deserializing the PDU"); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.slot(), slot); @@ -791,9 +965,9 @@ mod tests { // Must be Last in FEC Set let (_, shred) = &shredder.shred_tuples[1]; - assert_eq!(shred.len(), PACKET_DATA_SIZE); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); @@ -813,9 +987,9 @@ mod tests { assert!(!shredder.shred_tuples.is_empty()); let (_, shred) = &shredder.shred_tuples[2]; - assert_eq!(shred.len(), PACKET_DATA_SIZE); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); @@ -830,9 +1004,9 @@ mod tests { // Must be a Data shred let (_, shred) = &shredder.shred_tuples[3]; - assert_eq!(shred.len(), PACKET_DATA_SIZE); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 3); assert_eq!(deserialized_shred.slot(), slot); @@ -850,9 +1024,9 @@ mod tests { // Must be LastInSlot let (_, shred) = &shredder.shred_tuples[4]; - assert_eq!(shred.len(), PACKET_DATA_SIZE); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 4); assert_eq!(deserialized_shred.slot(), slot); @@ -883,8 +1057,8 @@ mod tests { assert_eq!(shredder.shred_tuples.len(), 2); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.slot(), slot); @@ -892,8 +1066,8 @@ mod tests { assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); @@ -918,8 +1092,8 @@ mod tests { // We should have 1 shred now (LastInFECBlock) assert_eq!(shredder.shred_tuples.len(), 1); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); @@ -955,8 +1129,8 @@ mod tests { // Finalize must have created 1 final data shred and 3 coding shreds // assert_eq!(shredder.shreds.len(), 6); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.slot(), slot); @@ -964,8 +1138,8 @@ mod tests { assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); @@ -973,8 +1147,8 @@ mod tests { assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); @@ -982,24 +1156,24 @@ mod tests { assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.slot(), slot); assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); assert!(deserialized_shred.verify(&keypair.pubkey())); let (_, shred) = shredder.shred_tuples.remove(0); - assert_eq!(shred.len(), PACKET_DATA_SIZE); - let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap(); assert_matches!(deserialized_shred, Shred::Coding(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); @@ -1038,26 +1212,16 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (shreds, shred_meta_bufs): (Vec, Vec) = shredder + let (shreds, shred_infos): (Vec, Vec) = shredder .shred_tuples .iter() - .map(|(s, b)| { - ( - s.clone(), - ShredMetaBuf { - slot: s.slot(), - index: s.index(), - data_shred: s.is_data(), - shred_buf: b.clone(), - }, - ) - }) + .map(|(s, b)| (s.clone(), b.clone())) .unzip(); // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( Shredder::try_recovery( - shred_meta_bufs[..3].to_vec(), + shred_infos[..3].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1068,7 +1232,7 @@ mod tests { // Test1: Try recovery/reassembly with only data shreds. Hint: should work let result = Shredder::try_recovery( - shred_meta_bufs[..4].to_vec(), + shred_infos[..4].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1083,21 +1247,13 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder + let (mut shreds, shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() .filter_map(|(i, (s, b))| { if i % 2 == 0 { - Some(( - s.clone(), - ShredMetaBuf { - slot: s.slot(), - index: s.index(), - data_shred: s.is_data(), - shred_buf: b.clone(), - }, - )) + Some((s.clone(), b.clone())) } else { None } @@ -1105,7 +1261,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_meta_bufs, + shred_info, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1137,16 +1293,16 @@ mod tests { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 1); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 1); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 1); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 3); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 3); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 3); } let result = Shredder::deshred(&shreds[..4]).unwrap(); @@ -1154,21 +1310,13 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder + let (mut shreds, shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() .filter_map(|(i, (s, b))| { if i % 2 != 0 { - Some(( - s.clone(), - ShredMetaBuf { - slot: s.slot(), - index: s.index(), - data_shred: s.is_data(), - shred_buf: b.clone(), - }, - )) + Some((s.clone(), b.clone())) } else { None } @@ -1176,7 +1324,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_meta_bufs, + shred_info, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1208,16 +1356,16 @@ mod tests { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 0); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 0); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 0); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 2); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 2); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 2); } let result = Shredder::deshred(&shreds[..4]).unwrap(); @@ -1248,21 +1396,13 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder + let (mut shreds, shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() .filter_map(|(i, (s, b))| { if i % 2 != 0 { - Some(( - s.clone(), - ShredMetaBuf { - slot: s.slot(), - index: s.index(), - data_shred: s.is_data(), - shred_buf: b.clone(), - }, - )) + Some((s.clone(), b.clone())) } else { None } @@ -1270,7 +1410,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_meta_bufs, + shred_info, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1302,16 +1442,16 @@ mod tests { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 0); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 0); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 0); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 2); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 2); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 2); } let result = Shredder::deshred(&shreds[..4]).unwrap(); @@ -1362,21 +1502,13 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder + let (mut shreds, shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() .filter_map(|(i, (s, b))| { if i % 2 != 0 { - Some(( - s.clone(), - ShredMetaBuf { - slot: s.slot(), - index: s.index(), - data_shred: s.is_data(), - shred_buf: b.clone(), - }, - )) + Some((s.clone(), b.clone())) } else { None } @@ -1384,7 +1516,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_meta_bufs.clone(), + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 25, @@ -1416,16 +1548,16 @@ mod tests { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 0); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 25); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 25); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.position, 2); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 27); + assert_eq!(code.header.coding_header.slot, slot); + assert_eq!(code.header.coding_header.index, 27); } let result = Shredder::deshred(&shreds[..4]).unwrap(); @@ -1434,7 +1566,7 @@ mod tests { // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds let result = Shredder::try_recovery( - shred_meta_bufs.clone(), + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 25, @@ -1446,7 +1578,7 @@ mod tests { // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( - shred_meta_bufs.clone(), + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 15, @@ -1458,7 +1590,7 @@ mod tests { // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( - shred_meta_bufs.clone(), + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 35, @@ -1511,4 +1643,45 @@ mod tests { .for_each(|(s, _)| assert!(!s.is_data())); } } + + #[test] + fn test_shred_info_construction() { + let keypair = Arc::new(Keypair::new()); + let slot = 0x123456789abcdef0; + let mut shredder = + Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); + + assert!(shredder.shred_tuples.is_empty()); + assert_eq!(shredder.active_offset, 0); + + let data: Vec<_> = (0..1200 * 3).collect(); + let data: Vec = data.iter().map(|x| *x as u8).collect(); + let mut offset = shredder.write(&data).unwrap(); + let approx_shred_payload_size = offset; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + + // We should have some shreds now + assert!(shredder.shred_tuples.len() >= data.len() / approx_shred_payload_size); + assert_eq!(offset, data.len()); + + shredder.finalize_data(); + let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; + assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + + shredder + .shred_tuples + .iter() + .for_each(|(shred, shred_info)| { + assert_eq!(shred.slot(), shred_info.slot()); + assert_eq!(shred.index(), shred_info.index()); + assert_eq!(shred.parent(), shred_info.parent()); + assert_eq!(shred.signature(), shred_info.signature()); + assert_eq!(shred.is_data(), shred_info.is_data()); + assert_eq!(shred.last_in_slot(), shred_info.last_in_slot()); + assert_eq!(shred.data_complete(), shred_info.data_complete()); + assert_eq!(shred.coding_params(), shred_info.coding_params()); + }) + } }