diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 696d991acf..e3f993bef6 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -6,10 +6,9 @@ use solana_core::test_tx; use solana_ledger::entry::{create_ticks, Entry}; use solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE, - SIZE_OF_SHRED_HEADER, + SIZE_OF_DATA_SHRED_PAYLOAD, }; use solana_sdk::hash::Hash; -use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; use test::Bencher; @@ -30,7 +29,7 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec= root + let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0); let mut coding_shred = - Shred::new_empty_from_header(Shredder::new_coding_shred_header(5, 5, 6, 6, 0)); - Shredder::sign_shred(&leader_keypair, &mut coding_shred, *SIZE_OF_SHRED_TYPE); + Shred::new_empty_from_header(common, DataShredHeader::default(), coding); + Shredder::sign_shred(&leader_keypair, &mut coding_shred); assert_eq!( should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0), true diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 28af4ca107..23b0d411e4 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -1735,7 +1735,7 @@ pub fn make_chaining_slot_entries( pub mod tests { use super::*; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; - use crate::shred::max_ticks_per_n_shreds; + use crate::shred::{max_ticks_per_n_shreds, DataShredHeader}; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -3239,8 +3239,12 @@ pub mod tests { let last_root = RwLock::new(0); let slot = 1; - let mut shred = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10); - let coding_shred = Shred::new_empty_from_header(shred.clone()); + let (mut shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10); + let coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); // Insert a good coding shred assert!(Blocktree::should_insert_coding_shred( @@ -3256,10 +3260,7 @@ pub mod tests { // Trying to insert the same shred again should fail { - let index = index_cf - .get(shred.coding_header.common_header.slot) - .unwrap() - .unwrap(); + let index = index_cf.get(shred.slot).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), @@ -3267,15 +3268,16 @@ pub mod tests { )); } - shred.coding_header.common_header.index += 1; + shred.index += 1; // Establish a baseline that works { - let coding_shred = Shred::new_empty_from_header(shred.clone()); - let index = index_cf - .get(shred.coding_header.common_header.slot) - .unwrap() - .unwrap(); + let coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); + let index = index_cf.get(shred.slot).unwrap().unwrap(); assert!(Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), @@ -3285,8 +3287,12 @@ pub mod tests { // Trying to insert a shred with index < position should fail { - let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - let index = coding_shred.headers.coding_header.position - 1; + let mut coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); + let index = coding_shred.coding_header.position - 1; coding_shred.set_index(index as u32); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); @@ -3299,8 +3305,12 @@ pub mod tests { // Trying to insert shred with num_coding == 0 should fail { - let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.coding_header.num_coding_shreds = 0; + let mut coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); + coding_shred.coding_header.num_coding_shreds = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3311,9 +3321,12 @@ pub mod tests { // Trying to insert shred with pos >= num_coding should fail { - let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.coding_header.num_coding_shreds = - coding_shred.headers.coding_header.position; + let mut coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); + coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3325,10 +3338,14 @@ pub mod tests { // Trying to insert with set_index with num_coding that would imply the last blob // has index > u32::MAX should fail { - let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.coding_header.num_coding_shreds = 3; - coding_shred.headers.coding_header.common_header.index = std::u32::MAX - 1; - coding_shred.headers.coding_header.position = 0; + let mut coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); + coding_shred.coding_header.num_coding_shreds = 3; + coding_shred.common_header.index = std::u32::MAX - 1; + coding_shred.coding_header.position = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3337,7 +3354,7 @@ pub mod tests { )); // Decreasing the number of num_coding_shreds will put it within the allowed limit - coding_shred.headers.coding_header.num_coding_shreds = 2; + coding_shred.coding_header.num_coding_shreds = 2; assert!(Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), @@ -3350,7 +3367,11 @@ pub mod tests { // Trying to insert value into slot <= than last root should fail { - let mut coding_shred = Shred::new_empty_from_header(shred.clone()); + let mut coding_shred = Shred::new_empty_from_header( + shred.clone(), + DataShredHeader::default(), + coding.clone(), + ); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blocktree::should_insert_coding_shred( diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index e502f48461..586dadf719 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -14,21 +14,26 @@ use solana_sdk::hash::Hash; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use std::io; use std::sync::Arc; use std::time::Instant; lazy_static! { + pub static ref SIZE_OF_COMMON_SHRED_HEADER: usize = + { serialized_size(&ShredCommonHeader::default()).unwrap() as usize }; pub static ref SIZE_OF_CODING_SHRED_HEADER: usize = { serialized_size(&CodingShredHeader::default()).unwrap() as usize }; pub static ref SIZE_OF_DATA_SHRED_HEADER: usize = { serialized_size(&DataShredHeader::default()).unwrap() as usize }; - pub static ref SIZE_OF_SHRED_HEADER: usize = - { serialized_size(&ShredHeader::default()).unwrap() as usize }; + pub static ref SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize = + { *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_CODING_SHRED_HEADER }; + pub static ref SIZE_OF_DATA_SHRED_PAYLOAD: usize = { + PACKET_DATA_SIZE + - *SIZE_OF_COMMON_SHRED_HEADER + - *SIZE_OF_DATA_SHRED_HEADER + - *SIZE_OF_DATA_SHRED_IGNORED_TAIL + }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; - pub static ref SIZE_OF_SHRED_TYPE: usize = - { bincode::serialized_size(&ShredType(DATA_SHRED)).unwrap() as usize }; } thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -68,11 +73,17 @@ impl std::convert::From> for ShredError { #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct ShredType(pub u8); +impl Default for ShredType { + fn default() -> Self { + ShredType(DATA_SHRED) + } +} /// A common header that is present in data and code shred headers #[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct ShredCommonHeader { pub signature: Signature, + pub shred_type: ShredType, pub slot: u64, pub index: u32, } @@ -80,7 +91,6 @@ pub struct ShredCommonHeader { /// The data shred header has parent offset and flags #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] pub struct DataShredHeader { - pub common_header: ShredCommonHeader, pub parent_offset: u16, pub flags: u8, } @@ -88,42 +98,41 @@ pub struct DataShredHeader { /// The coding shred header has FEC information #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] pub struct CodingShredHeader { - pub common_header: ShredCommonHeader, pub num_data_shreds: u16, pub num_coding_shreds: u16, pub position: u16, } -/// A common header that is present at start of every shred -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct ShredHeader { - pub shred_type: ShredType, - pub coding_header: CodingShredHeader, - pub data_header: DataShredHeader, -} - -impl Default for ShredHeader { - fn default() -> Self { - ShredHeader { - shred_type: ShredType(DATA_SHRED), - coding_header: CodingShredHeader::default(), - data_header: DataShredHeader::default(), - } - } -} - #[derive(Clone, Debug, PartialEq)] pub struct Shred { - pub headers: ShredHeader, + pub common_header: ShredCommonHeader, + pub data_header: DataShredHeader, + pub coding_header: CodingShredHeader, pub payload: Vec, } impl Shred { - fn new(header: ShredHeader, shred_buf: Vec) -> Self { - Shred { - headers: header, - payload: shred_buf, - } + fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result + where + T: Deserialize<'de>, + { + let ret = bincode::deserialize(&buf[*index..*index + size])?; + *index += size; + Ok(ret) + } + + fn serialize_obj_into<'de, T>( + index: &mut usize, + size: usize, + buf: &'de mut [u8], + obj: &T, + ) -> bincode::Result<()> + where + T: Serialize, + { + bincode::serialize_into(&mut buf[*index..*index + size], obj)?; + *index += size; + Ok(()) } pub fn new_from_data( @@ -134,134 +143,176 @@ impl Shred { is_last_data: bool, is_last_in_slot: bool, ) -> Self { - let mut shred_buf = vec![0; PACKET_DATA_SIZE]; - let mut header = ShredHeader::default(); - header.data_header.common_header.slot = slot; - header.data_header.common_header.index = index; - header.data_header.parent_offset = parent_offset; - header.data_header.flags = 0; + let mut payload = vec![0; PACKET_DATA_SIZE]; + let mut common_header = ShredCommonHeader::default(); + common_header.slot = slot; + common_header.index = index; + + let mut data_header = DataShredHeader::default(); + data_header.parent_offset = parent_offset; if is_last_data { - header.data_header.flags |= DATA_COMPLETE_SHRED + data_header.flags |= DATA_COMPLETE_SHRED } if is_last_in_slot { - header.data_header.flags |= LAST_SHRED_IN_SLOT + data_header.flags |= LAST_SHRED_IN_SLOT } if let Some(data) = data { - bincode::serialize_into(&mut shred_buf[..*SIZE_OF_SHRED_HEADER], &header) - .expect("Failed to write header into shred buffer"); - shred_buf[*SIZE_OF_SHRED_HEADER..*SIZE_OF_SHRED_HEADER + data.len()] - .clone_from_slice(data); + let mut start = 0; + Self::serialize_obj_into( + &mut start, + *SIZE_OF_COMMON_SHRED_HEADER, + &mut payload, + &common_header, + ) + .expect("Failed to write header into shred buffer"); + Self::serialize_obj_into( + &mut start, + *SIZE_OF_DATA_SHRED_HEADER, + &mut payload, + &data_header, + ) + .expect("Failed to write data header into shred buffer"); + payload[start..start + data.len()].clone_from_slice(data); } - Self::new(header, shred_buf) + Self { + common_header, + data_header, + coding_header: CodingShredHeader::default(), + payload, + } } - pub fn new_from_serialized_shred(shred_buf: Vec) -> Result { - let shred_type: ShredType = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; - let mut header = if shred_type == ShredType(CODING_SHRED) { - let start = *SIZE_OF_SHRED_TYPE; - let end = start + *SIZE_OF_CODING_SHRED_HEADER; - let mut header = ShredHeader::default(); - header.coding_header = bincode::deserialize(&shred_buf[start..end])?; - header - } else if shred_type == ShredType(DATA_SHRED) { - let start = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; - let end = start + *SIZE_OF_DATA_SHRED_HEADER; - let mut header = ShredHeader::default(); - header.data_header = bincode::deserialize(&shred_buf[start..end])?; - header + pub fn new_from_serialized_shred(payload: Vec) -> Result { + let mut start = 0; + let common_header: ShredCommonHeader = + Self::deserialize_obj(&mut start, *SIZE_OF_COMMON_SHRED_HEADER, &payload)?; + + let shred = if common_header.shred_type == ShredType(CODING_SHRED) { + let coding_header: CodingShredHeader = + Self::deserialize_obj(&mut start, *SIZE_OF_CODING_SHRED_HEADER, &payload)?; + Self { + common_header, + data_header: DataShredHeader::default(), + coding_header, + payload, + } + } else if common_header.shred_type == ShredType(DATA_SHRED) { + let data_header: DataShredHeader = + Self::deserialize_obj(&mut start, *SIZE_OF_DATA_SHRED_HEADER, &payload)?; + Self { + common_header, + data_header, + coding_header: CodingShredHeader::default(), + payload, + } } else { return Err(ShredError::InvalidShredType); }; - header.shred_type = shred_type; - Ok(Self::new(header, shred_buf)) + Ok(shred) } - pub fn new_empty_from_header(headers: ShredHeader) -> Self { + pub fn new_empty_from_header( + common_header: ShredCommonHeader, + data_header: DataShredHeader, + coding_header: CodingShredHeader, + ) -> Self { let mut payload = vec![0; PACKET_DATA_SIZE]; - let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_SHRED_HEADER]); - bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred"); - Shred { headers, payload } + let mut start = 0; + Self::serialize_obj_into( + &mut start, + *SIZE_OF_COMMON_SHRED_HEADER, + &mut payload, + &common_header, + ) + .expect("Failed to write header into shred buffer"); + if common_header.shred_type == ShredType(DATA_SHRED) { + Self::serialize_obj_into( + &mut start, + *SIZE_OF_DATA_SHRED_HEADER, + &mut payload, + &data_header, + ) + .expect("Failed to write data header into shred buffer"); + } else if common_header.shred_type == ShredType(CODING_SHRED) { + Self::serialize_obj_into( + &mut start, + *SIZE_OF_CODING_SHRED_HEADER, + &mut payload, + &coding_header, + ) + .expect("Failed to write data header into shred buffer"); + } + Shred { + common_header, + data_header, + coding_header, + payload, + } } pub fn new_empty_data_shred() -> Self { - let mut payload = vec![0; PACKET_DATA_SIZE]; - payload[0] = DATA_SHRED; - let headers = ShredHeader::default(); - Shred { headers, payload } - } - - pub fn header(&self) -> &ShredCommonHeader { - if self.is_data() { - &self.headers.data_header.common_header - } else { - &self.headers.coding_header.common_header - } - } - - pub fn header_mut(&mut self) -> &mut ShredCommonHeader { - if self.is_data() { - &mut self.headers.data_header.common_header - } else { - &mut self.headers.coding_header.common_header - } + Self::new_empty_from_header( + ShredCommonHeader::default(), + DataShredHeader::default(), + CodingShredHeader::default(), + ) } pub fn slot(&self) -> u64 { - self.header().slot + self.common_header.slot } pub fn parent(&self) -> u64 { if self.is_data() { - self.headers.data_header.common_header.slot - - u64::from(self.headers.data_header.parent_offset) + self.common_header.slot - u64::from(self.data_header.parent_offset) } else { std::u64::MAX } } pub fn index(&self) -> u32 { - self.header().index + self.common_header.index } /// This is not a safe function. It only changes the meta information. /// Use this only for test code which doesn't care about actual shred pub fn set_index(&mut self, index: u32) { - self.header_mut().index = index + self.common_header.index = index } /// This is not a safe function. It only changes the meta information. /// Use this only for test code which doesn't care about actual shred pub fn set_slot(&mut self, slot: u64) { - self.header_mut().slot = slot + self.common_header.slot = slot } pub fn signature(&self) -> Signature { - self.header().signature + self.common_header.signature } pub fn seed(&self) -> [u8; 32] { let mut seed = [0; 32]; let seed_len = seed.len(); - let sig = self.header().signature.as_ref(); + let sig = self.common_header.signature.as_ref(); seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]); seed } pub fn is_data(&self) -> bool { - self.headers.shred_type == ShredType(DATA_SHRED) + self.common_header.shred_type == ShredType(DATA_SHRED) } pub fn is_code(&self) -> bool { - self.headers.shred_type == ShredType(CODING_SHRED) + self.common_header.shred_type == ShredType(CODING_SHRED) } pub fn last_in_slot(&self) -> bool { if self.is_data() { - self.headers.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT + self.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT } else { false } @@ -271,13 +322,13 @@ impl Shred { /// Use this only for test code which doesn't care about actual shred pub fn set_last_in_slot(&mut self) { if self.is_data() { - self.headers.data_header.flags |= LAST_SHRED_IN_SLOT + self.data_header.flags |= LAST_SHRED_IN_SLOT } } pub fn data_complete(&self) -> bool { if self.is_data() { - self.headers.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED + self.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED } else { false } @@ -285,11 +336,10 @@ impl Shred { pub fn coding_params(&self) -> Option<(u16, u16, u16)> { if self.is_code() { - let header = &self.headers.coding_header; Some(( - header.num_data_shreds, - header.num_coding_shreds, - header.position, + self.coding_header.num_data_shreds, + self.coding_header.num_coding_shreds, + self.coding_header.position, )) } else { None @@ -297,15 +347,8 @@ impl Shred { } pub fn verify(&self, pubkey: &Pubkey) -> bool { - let signed_payload_offset = if self.is_data() { - *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE - } else if self.is_code() { - *SIZE_OF_SHRED_TYPE - } else { - return false; - } + *SIZE_OF_SIGNATURE; self.signature() - .verify(pubkey.as_ref(), &self.payload[signed_payload_offset..]) + .verify(pubkey.as_ref(), &self.payload[*SIZE_OF_SIGNATURE..]) } } @@ -346,7 +389,7 @@ impl Shredder { bincode::serialize(entries).expect("Expect to serialize all entries"); let serialize_time = now.elapsed().as_millis(); - let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER; + let no_header_size = *SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; @@ -376,11 +419,7 @@ impl Shredder { is_last_in_slot, ); - Shredder::sign_shred( - &self.keypair, - &mut shred, - *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE, - ); + Shredder::sign_shred(&self.keypair, &mut shred); shred }) .collect() @@ -403,7 +442,7 @@ impl Shredder { PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { coding_shreds.par_iter_mut().for_each(|mut coding_shred| { - Shredder::sign_shred(&self.keypair, &mut coding_shred, *SIZE_OF_SHRED_TYPE); + Shredder::sign_shred(&self.keypair, &mut coding_shred); }) }) }); @@ -422,14 +461,11 @@ impl Shredder { (data_shreds, coding_shreds, last_shred_index + 1) } - pub fn sign_shred(signer: &Arc, shred_info: &mut Shred, signature_offset: usize) { - let data_offset = signature_offset + *SIZE_OF_SIGNATURE; - let signature = signer.sign_message(&shred_info.payload[data_offset..]); - let serialized_signature = - bincode::serialize(&signature).expect("Failed to generate serialized signature"); - shred_info.payload[signature_offset..signature_offset + serialized_signature.len()] - .copy_from_slice(&serialized_signature); - shred_info.header_mut().signature = signature; + pub fn sign_shred(signer: &Arc, shred: &mut Shred) { + let signature = signer.sign_message(&shred.payload[*SIZE_OF_SIGNATURE..]); + bincode::serialize_into(&mut shred.payload[..*SIZE_OF_SIGNATURE], &signature) + .expect("Failed to generate serialized signature"); + shred.common_header.signature = signature; } pub fn new_coding_shred_header( @@ -438,15 +474,19 @@ impl Shredder { num_data: usize, num_code: usize, position: usize, - ) -> ShredHeader { - let mut header = ShredHeader::default(); + ) -> (ShredCommonHeader, CodingShredHeader) { + let mut header = ShredCommonHeader::default(); header.shred_type = ShredType(CODING_SHRED); - header.coding_header.common_header.index = index; - header.coding_header.common_header.slot = slot; - header.coding_header.num_coding_shreds = num_code as u16; - header.coding_header.num_data_shreds = num_data as u16; - header.coding_header.position = position as u16; - header + header.index = index; + header.slot = slot; + ( + header, + CodingShredHeader { + num_data_shreds: num_data as u16, + num_coding_shreds: num_code as u16, + position: position as u16, + }, + ) } /// Generates coding shreds for the data shreds in the current FEC set @@ -462,30 +502,32 @@ impl Shredder { let num_coding = Self::calculate_num_coding_shreds(num_data as f32, fec_rate); let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); - let start_index = data_shred_batch[0].header().index; + let start_index = data_shred_batch[0].common_header.index; // All information after coding shred field in a data shred is encoded - let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; + let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL; let data_ptrs: Vec<_> = data_shred_batch .iter() - .map(|data| &data.payload[coding_block_offset..]) + .map(|data| &data.payload[..valid_data_len]) .collect(); // Create empty coding shreds, with correctly populated headers let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { - let header = Self::new_coding_shred_header( + let (header, coding_header) = Self::new_coding_shred_header( slot, start_index + i as u32, num_data, num_coding, i, ); - let shred = Shred::new_empty_from_header(header); + let shred = + Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); coding_shreds.push(shred.payload); }); // Grab pointers for the coding blocks + let coding_block_offset = *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_CODING_SHRED_HEADER; let mut coding_ptrs: Vec<_> = coding_shreds .iter_mut() .map(|buffer| &mut buffer[coding_block_offset..]) @@ -500,15 +542,20 @@ impl Shredder { coding_shreds .into_iter() .enumerate() - .map(|(i, code)| { - let header = Self::new_coding_shred_header( + .map(|(i, payload)| { + let (common_header, coding_header) = Self::new_coding_shred_header( slot, start_index + i as u32, num_data, num_coding, i, ); - Shred::new(header, code) + Shred { + common_header, + data_header: DataShredHeader::default(), + coding_header, + payload, + } }) .collect() } else { @@ -561,8 +608,6 @@ impl Shredder { let fec_set_size = num_data + num_coding; if num_coding > 0 && shreds.len() < fec_set_size { - let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE; - // Let's try recovering missing shreds using erasure let mut present = &mut vec![true; fec_set_size]; let mut next_expected_index = first_index; @@ -603,9 +648,18 @@ impl Shredder { let session = Session::new(num_data, num_coding).unwrap(); + let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_COMMON_SHRED_HEADER; let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs .iter_mut() - .map(|x| x[coding_block_offset..].as_mut()) + .enumerate() + .map(|(position, x)| { + if position < num_data { + x[..valid_data_len].as_mut() + } else { + x[coding_block_offset..].as_mut() + } + }) .zip(present.clone()) .collect(); session.decode_blocks(&mut blocks)?; @@ -667,11 +721,12 @@ impl Shredder { } fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec>) -> Vec { + let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL; data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = *SIZE_OF_SHRED_HEADER; - data[offset as usize..].iter() + let offset = *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_DATA_SHRED_HEADER; + data[offset..valid_data_len].iter() }) .cloned() .collect() @@ -684,7 +739,7 @@ pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 { } pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { - let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64; + let shred_data_size = *SIZE_OF_DATA_SHRED_PAYLOAD as u64; let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; @@ -774,7 +829,7 @@ pub mod tests { .collect(); let size = serialized_size(&entries).unwrap(); - let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64; + let no_header_size = *SIZE_OF_DATA_SHRED_PAYLOAD as u64; let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate); @@ -787,8 +842,8 @@ pub mod tests { let mut data_shred_indexes = HashSet::new(); let mut coding_shred_indexes = HashSet::new(); for shred in data_shreds.iter() { - assert_eq!(shred.headers.shred_type, ShredType(DATA_SHRED)); - let index = shred.headers.data_header.common_header.index; + assert_eq!(shred.common_header.shred_type, ShredType(DATA_SHRED)); + let index = shred.common_header.index; let is_last = index as u64 == num_expected_data_shreds - 1; verify_test_data_shred( shred, @@ -805,8 +860,8 @@ pub mod tests { } for shred in coding_shreds.iter() { - let index = shred.headers.data_header.common_header.index; - assert_eq!(shred.headers.shred_type, ShredType(CODING_SHRED)); + let index = shred.common_header.index; + assert_eq!(shred.common_header.shred_type, ShredType(CODING_SHRED)); verify_test_code_shred(shred, index, slot, &keypair.pubkey(), true); assert!(!coding_shred_indexes.contains(&index)); coding_shred_indexes.insert(index);