diff --git a/core/src/shred.rs b/core/src/shred.rs index d4e01362ba..d48a4130ed 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -1,11 +1,13 @@ //! The `shred` module defines data structures and methods to pull MTU sized data frames from the network. use crate::erasure::Session; +use crate::result; +use crate::result::Error; use bincode::serialized_size; use core::borrow::BorrowMut; use serde::{Deserialize, Serialize}; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use std::io::Write; +use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; use std::{cmp, io}; @@ -32,7 +34,7 @@ pub struct ShredCommonHeader { pub struct DataShredHeader { _reserved: CodingShredHeader, pub common_header: ShredCommonHeader, - pub data_type: u8, + pub last_in_slot: u8, } /// The first data shred also has parent slot value in it @@ -46,9 +48,9 @@ pub struct FirstDataShredHeader { #[derive(Serialize, Deserialize, Default, PartialEq, Debug)] pub struct CodingShredHeader { pub common_header: ShredCommonHeader, - pub num_data_shreds: u8, - pub num_coding_shreds: u8, - pub position: u8, + pub num_data_shreds: u16, + pub num_coding_shreds: u16, + pub position: u16, pub payload: Vec, } @@ -93,6 +95,23 @@ impl Default for DataShred { } } +/// Default shred is sized correctly to meet MTU/Packet size requirements +impl Default for CodingShred { + fn default() -> Self { + let size = PACKET_DATA_SIZE + - serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize; + CodingShred { + header: CodingShredHeader { + common_header: ShredCommonHeader::default(), + num_data_shreds: 0, + num_coding_shreds: 0, + position: 0, + payload: vec![0; size], + }, + } + } +} + /// Common trait implemented by all types of shreds pub trait ShredCommon { /// Write at a particular offset in the shred @@ -167,7 +186,7 @@ impl ShredCommon for CodingShred { } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct Shredder { slot: u64, index: u32, @@ -215,7 +234,7 @@ impl Write for Shredder { // Continue generating more data shreds. // If the caller decides to finalize the FEC block or Slot, the data shred will // morph into appropriate shred accordingly - Shred::Data(DataShred::default()) + Shred::Data(self.new_data_shred()) } else { self.active_offset += slice_len; current_shred @@ -227,15 +246,17 @@ impl Write for Shredder { } fn flush(&mut self) -> io::Result<()> { - if self.active_shred.is_none() { - return Ok(()); - } - let current_shred = self.active_shred.take().unwrap(); - self.finalize_data_shred(current_shred); - Ok(()) + unimplemented!() } } +#[derive(Default, Debug, PartialEq)] +pub struct DeshredResult { + pub payload: Vec, + pub recovered_data: Vec, + pub recovered_code: Vec, +} + impl Shredder { pub fn new( slot: u64, @@ -243,14 +264,24 @@ impl Shredder { fec_rate: f32, signer: &Arc, index: u32, - ) -> Self { - Shredder { - slot, - index, - parent, - fec_rate, - signer: signer.clone(), - ..Shredder::default() + ) -> result::Result { + if fec_rate > 1.0 || fec_rate < 0.0 { + Err(Error::IO(IOError::new( + ErrorKind::Other, + format!( + "FEC rate {:?} must be more than 0.0 and less than 1.0", + fec_rate + ), + ))) + } else { + Ok(Shredder { + slot, + index, + parent, + fec_rate, + signer: signer.clone(), + ..Shredder::default() + }) } } @@ -291,6 +322,22 @@ impl Shredder { first_shred } + fn new_coding_shred( + slot: u64, + index: u32, + num_data: usize, + num_code: usize, + position: usize, + ) -> CodingShred { + let mut coding_shred = CodingShred::default(); + coding_shred.header.common_header.slot = slot; + coding_shred.header.common_header.index = index; + coding_shred.header.num_data_shreds = num_data as u16; + coding_shred.header.num_coding_shreds = num_code as u16; + coding_shred.header.position = position as u16; + coding_shred + } + /// Generates coding shreds for the data shreds in the current FEC set fn generate_coding_shreds(&mut self) { if self.fec_rate != 0.0 { @@ -311,20 +358,14 @@ impl Shredder { // Create empty coding shreds, with correctly populated headers let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { - let header = CodingShredHeader { - common_header: ShredCommonHeader { - signature: Signature::default(), - slot: self.slot, - index: start_index + i as u32, - }, - num_data_shreds: num_data as u8, - num_coding_shreds: num_coding as u8, - position: i as u8, - payload: vec![], - }; - - let mut shred = bincode::serialize(&Shred::Coding(CodingShred { header })).unwrap(); - shred.resize_with(PACKET_DATA_SIZE, || 0); + let shred = bincode::serialize(&Shred::Coding(Self::new_coding_shred( + self.slot, + start_index + i as u32, + num_data, + num_coding, + i, + ))) + .unwrap(); coding_shreds.push(shred); }); @@ -381,10 +422,222 @@ impl Shredder { /// Finalize the current slot (i.e. add last slot shred) and generate coding shreds pub fn finalize_slot(&mut self) { - let final_shred = self.make_final_data_shred(); + let mut final_shred = self.make_final_data_shred(); + final_shred.header.last_in_slot = 1; self.finalize_data_shred(Shred::LastInSlot(final_shred)); self.generate_coding_shreds(); } + + fn fill_in_missing_shreds( + shred: &Shred, + num_data: usize, + num_coding: usize, + slot: u64, + first_index: usize, + expected_index: usize, + present: &mut [bool], + ) -> (Vec>, bool, usize) { + let (index, mut first_shred_in_slot) = Self::get_shred_index(shred, num_data); + + let mut missing_blocks: Vec> = (expected_index..index) + .map(|missing| { + present[missing] = false; + // If index 0 shred is missing, then first shred in slot will also be recovered + first_shred_in_slot |= missing == 0; + Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) + }) + .collect(); + let shred_buf = bincode::serialize(shred).unwrap(); + missing_blocks.push(shred_buf); + (missing_blocks, first_shred_in_slot, index) + } + + fn new_empty_missing_shred( + num_data: usize, + num_coding: usize, + slot: u64, + first_index: usize, + missing: usize, + ) -> Vec { + let missing_shred = if missing == 0 { + let mut data_shred = FirstDataShred::default(); + data_shred.header.data_header.common_header.slot = slot; + data_shred.header.data_header.common_header.index = 0; + Shred::FirstInSlot(data_shred) + } else if missing < first_index + num_data { + let mut data_shred = DataShred::default(); + data_shred.header.common_header.slot = slot; + data_shred.header.common_header.index = missing as u32; + if missing == first_index + num_data - 1 { + Shred::LastInFECSet(data_shred) + } else { + Shred::Data(data_shred) + } + } else { + Shred::Coding(Self::new_coding_shred( + slot, + missing.saturating_sub(num_data) as u32, + num_data, + num_coding, + missing - first_index - num_data, + )) + }; + bincode::serialize(&missing_shred).unwrap() + } + + /// Combines all shreds to recreate the original buffer + /// If the shreds include coding shreds, and if not all shreds are present, it tries + /// to reconstruct missing shreds using erasure + /// Note: The shreds are expected to be sorted + /// (lower to higher index, and data shreds before coding shreds) + pub fn deshred(shreds: &[Shred]) -> Result { + // If coding is enabled, the last shred must be a coding shred. + let (num_data, num_coding, first_index, slot) = + if let Shred::Coding(code) = shreds.last().unwrap() { + ( + code.header.num_data_shreds as usize, + code.header.num_coding_shreds as usize, + code.header.common_header.index as usize - code.header.position as usize, + code.header.common_header.slot, + ) + } else { + (shreds.len(), 0, 0, 0) + }; + + let mut recovered_data = vec![]; + let mut recovered_code = vec![]; + let fec_set_size = num_data + num_coding; + let (data_shred_bufs, first_shred) = if num_coding > 0 && shreds.len() < fec_set_size { + let coding_block_offset = CodingShred::overhead(); + + // Let's try recovering missing shreds using erasure + let mut present = &mut vec![true; fec_set_size]; + let mut first_shred_in_slot = false; + let mut next_expected_index = first_index; + let mut shred_bufs: Vec> = shreds + .iter() + .flat_map(|shred| { + let (blocks, first_shred, last_index) = Self::fill_in_missing_shreds( + shred, + num_data, + num_coding, + slot, + first_index, + next_expected_index, + &mut present, + ); + first_shred_in_slot |= first_shred; + next_expected_index = last_index + 1; + blocks + }) + .collect(); + + let mut pending_shreds: Vec> = (next_expected_index + ..first_index + fec_set_size) + .map(|missing| { + present[missing] = false; + Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) + }) + .collect(); + shred_bufs.append(&mut pending_shreds); + + let session = Session::new(num_data, num_coding).unwrap(); + + let mut blocks: Vec<&mut [u8]> = shred_bufs + .iter_mut() + .map(|x| x[coding_block_offset..].as_mut()) + .collect(); + session.decode_blocks(&mut blocks, &present)?; + + present.iter().enumerate().for_each(|(index, was_present)| { + if !was_present { + let shred: Shred = bincode::deserialize(&shred_bufs[index]).unwrap(); + if index < first_index + num_data { + // Check if the last recovered data shred is also last in Slot. + // If so, it needs to be morphed into the correct type + let shred = if let Shred::Data(s) = shred { + if s.header.last_in_slot == 1 { + Shred::LastInSlot(s) + } else { + Shred::Data(s) + } + } else if let Shred::LastInFECSet(s) = shred { + if s.header.last_in_slot == 1 { + Shred::LastInSlot(s) + } else { + Shred::LastInFECSet(s) + } + } else { + shred + }; + recovered_data.push(shred) + } else { + recovered_code.push(shred) + } + } + }); + (shred_bufs, first_shred_in_slot) + } else { + let (first_index, first_shred_in_slot) = + Shredder::get_shred_index(shreds.first().unwrap(), num_data); + + let last_index = match shreds.last().unwrap() { + Shred::LastInFECSet(s) | Shred::LastInSlot(s) => { + s.header.common_header.index as usize + } + _ => 0, + }; + + if num_data.saturating_add(first_index) != last_index.saturating_add(1) { + Err(reed_solomon_erasure::Error::TooFewDataShards)?; + } + + let shred_bufs: Vec> = shreds + .iter() + .map(|shred| bincode::serialize(shred).unwrap()) + .collect(); + (shred_bufs, first_shred_in_slot) + }; + + Ok(DeshredResult { + payload: Self::reassemble_payload(num_data, data_shred_bufs, first_shred), + recovered_data, + recovered_code, + }) + } + + fn get_shred_index(shred: &Shred, num_data: usize) -> (usize, bool) { + let (first_index, first_shred_in_slot) = match shred { + Shred::FirstInSlot(s) => (s.header.data_header.common_header.index as usize, true), + Shred::FirstInFECSet(s) + | Shred::Data(s) + | Shred::LastInFECSet(s) + | Shred::LastInSlot(s) => (s.header.common_header.index as usize, false), + Shred::Coding(s) => (s.header.common_header.index as usize + num_data, false), + }; + (first_index, first_shred_in_slot) + } + + fn reassemble_payload( + num_data: usize, + data_shred_bufs: Vec>, + first_shred: bool, + ) -> Vec { + data_shred_bufs[..num_data] + .iter() + .enumerate() + .flat_map(|(i, data)| { + let offset = if i == 0 && first_shred { + bincode::serialized_size(&Shred::FirstInSlot(FirstDataShred::empty_shred())) + .unwrap() + } else { + bincode::serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap() + }; + data[offset as usize..].iter() + }) + .cloned() + .collect() + } } #[cfg(test)] @@ -394,12 +647,17 @@ mod tests { #[test] fn test_data_shredder() { let keypair = Arc::new(Keypair::new()); - let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0); + let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0) + .expect("Failed in creating shredder"); assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); + assert!(FirstDataShred::overhead() < PACKET_DATA_SIZE); + assert!(DataShred::overhead() < PACKET_DATA_SIZE); + assert!(CodingShred::overhead() < PACKET_DATA_SIZE); + // Test0: Write some data to shred. Not enough to create a signed shred let data: Vec = (0..25).collect(); assert_eq!(shredder.write(&data).unwrap(), data.len()); @@ -444,6 +702,7 @@ mod tests { .common_header .signature .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.data_header.common_header.index, 0); } // Test5: Write left over data, and assert that a data shred is being created @@ -471,6 +730,7 @@ mod tests { .common_header .signature .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.index, 1); } // Test7: Let's write some more data to the shredder. @@ -495,6 +755,7 @@ mod tests { .common_header .signature .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.index, 2); } // Test8: Write more data to generate an intermediate data shred @@ -516,6 +777,7 @@ mod tests { .common_header .signature .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.index, 3); } // Test9: Write some data to shredder @@ -540,13 +802,105 @@ mod tests { .common_header .signature .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.index, 4); + } + } + + #[test] + fn test_small_data_shredder() { + let keypair = Arc::new(Keypair::new()); + + let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0) + .expect("Failed in creating shredder"); + + assert!(shredder.shreds.is_empty()); + assert_eq!(shredder.active_shred, None); + assert_eq!(shredder.active_offset, 0); + + let data: Vec<_> = (0..25).collect(); + let data: Vec = data.iter().map(|x| *x as u8).collect(); + let _ = shredder.write(&data).unwrap(); + + // We should have 0 shreds now + assert_eq!(shredder.shreds.len(), 0); + + shredder.finalize_fec_block(); + + // We should have 2 shreds now (FirstInSlot, and LastInFECSet) + assert_eq!(shredder.shreds.len(), 2); + + let data_offset = CodingShred::overhead() + + bincode::serialized_size(&Signature::default()).unwrap() as usize; + + let shred = shredder.shreds.remove(0); + assert_eq!(shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_matches!(deserialized_shred, Shred::FirstInSlot(_)); + if let Shred::FirstInSlot(data) = deserialized_shred { + assert!(data + .header + .data_header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + } + + let shred = shredder.shreds.remove(0); + assert_eq!(shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + if let Shred::LastInFECSet(data) = deserialized_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + } + + // Try shredder when no parent is provided + let mut shredder = Shredder::new(0x123456789abcdef0, None, 0.0, &keypair, 2) + .expect("Failed in creating shredder"); + + assert!(shredder.shreds.is_empty()); + assert_eq!(shredder.active_shred, None); + assert_eq!(shredder.active_offset, 0); + + let data: Vec<_> = (0..25).collect(); + let data: Vec = data.iter().map(|x| *x as u8).collect(); + let _ = shredder.write(&data).unwrap(); + + // We should have 0 shreds now + assert_eq!(shredder.shreds.len(), 0); + + shredder.finalize_fec_block(); + + // We should have 1 shred now (LastInFECSet) + assert_eq!(shredder.shreds.len(), 1); + let shred = shredder.shreds.remove(0); + assert_eq!(shred.len(), PACKET_DATA_SIZE); + let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); + assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + if let Shred::LastInFECSet(data) = deserialized_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); } } #[test] fn test_data_and_code_shredder() { let keypair = Arc::new(Keypair::new()); - let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 1.0, &keypair, 0); + + // Test that FEC rate cannot be > 1.0 + assert_matches!( + Shredder::new(0x123456789abcdef0, Some(5), 1.001, &keypair, 0), + Err(_) + ); + + let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 1.0, &keypair, 0) + .expect("Failed in creating shredder"); assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_shred, None); @@ -646,4 +1000,318 @@ mod tests { .verify(keypair.pubkey().as_ref(), &shred[coding_data_offset..])); } } + + #[test] + fn test_recovery_and_reassembly() { + let keypair = Arc::new(Keypair::new()); + let slot = 0x123456789abcdef0; + let mut shredder = + Shredder::new(slot, Some(5), 1.0, &keypair, 0).expect("Failed in creating shredder"); + + assert!(shredder.shreds.is_empty()); + assert_eq!(shredder.active_shred, None); + assert_eq!(shredder.active_offset, 0); + + let data: Vec<_> = (0..5000).collect(); + let data: Vec = data.iter().map(|x| *x as u8).collect(); + let mut offset = shredder.write(&data).unwrap(); + let approx_shred_payload_size = offset; + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + + // We should have some shreds now + assert_eq!( + shredder.shreds.len(), + data.len() / approx_shred_payload_size + ); + assert_eq!(offset, data.len()); + + shredder.finalize_fec_block(); + + // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) + let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; + assert_eq!(shredder.shreds.len(), expected_shred_count); + + let shreds: Vec = shredder + .shreds + .iter() + .map(|s| bincode::deserialize(s).unwrap()) + .collect(); + + // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail + assert_matches!( + Shredder::deshred(&shreds[..4]), + Err(reed_solomon_erasure::Error::TooFewDataShards) + ); + + // Test1: Try recovery/reassembly with only data shreds. Hint: should work + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert_ne!(DeshredResult::default(), result); + assert!(result.payload.len() >= data.len()); + assert!(result.recovered_data.is_empty()); + assert!(result.recovered_code.is_empty()); + assert_eq!(data[..], result.payload[..data.len()]); + + // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work + let shreds: Vec = shredder + .shreds + .iter() + .enumerate() + .filter_map(|(i, s)| { + if i % 2 == 0 { + Some(bincode::deserialize(s).unwrap()) + } else { + None + } + }) + .collect(); + + let data_offset = CodingShred::overhead() + + bincode::serialized_size(&Signature::default()).unwrap() as usize; + + let mut result = Shredder::deshred(&shreds).unwrap(); + assert!(result.payload.len() >= data.len()); + assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::Data(_)); + if let Shred::Data(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 1); + } + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::Data(_)); + if let Shred::Data(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 3); + } + assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 0); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 0); + } + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 2); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 2); + } + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 4); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 4); + } + assert_eq!(data[..], result.payload[..data.len()]); + + // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work + let shreds: Vec = shredder + .shreds + .iter() + .enumerate() + .filter_map(|(i, s)| { + if i % 2 != 0 { + Some(bincode::deserialize(s).unwrap()) + } else { + None + } + }) + .collect(); + + let mut result = Shredder::deshred(&shreds).unwrap(); + assert!(result.payload.len() >= data.len()); + assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::FirstInSlot(_)); + if let Shred::FirstInSlot(data) = recovered_shred { + assert!(data + .header + .data_header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.data_header.common_header.slot, slot); + assert_eq!(data.header.data_header.common_header.index, 0); + } + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::Data(_)); + if let Shred::Data(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 2); + } + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::LastInFECSet(_)); + if let Shred::LastInFECSet(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 4); + } + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 1); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 1); + } + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 3); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 3); + } + assert_eq!(data[..], result.payload[..data.len()]); + + // Test4: Try recovery/reassembly full slot with 3 missing data shreds + 2 coding shreds. Hint: should work + let mut shredder = + Shredder::new(slot, Some(5), 1.0, &keypair, 0).expect("Failed in creating shredder"); + + let mut offset = shredder.write(&data).unwrap(); + let approx_shred_payload_size = offset; + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + + // We should have some shreds now + assert_eq!( + shredder.shreds.len(), + data.len() / approx_shred_payload_size + ); + assert_eq!(offset, data.len()); + + shredder.finalize_slot(); + + // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) + let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; + assert_eq!(shredder.shreds.len(), expected_shred_count); + + let shreds: Vec = shredder + .shreds + .iter() + .enumerate() + .filter_map(|(i, s)| { + if i % 2 != 0 { + Some(bincode::deserialize(s).unwrap()) + } else { + None + } + }) + .collect(); + + let mut result = Shredder::deshred(&shreds).unwrap(); + assert!(result.payload.len() >= data.len()); + assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::FirstInSlot(_)); + if let Shred::FirstInSlot(data) = recovered_shred { + assert!(data + .header + .data_header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.data_header.common_header.slot, slot); + assert_eq!(data.header.data_header.common_header.index, 0); + } + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::Data(_)); + if let Shred::Data(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 2); + } + let recovered_shred = result.recovered_data.remove(0); + let shred = bincode::serialize(&recovered_shred).unwrap(); + assert_matches!(recovered_shred, Shred::LastInSlot(_)); + if let Shred::LastInSlot(data) = recovered_shred { + assert!(data + .header + .common_header + .signature + .verify(keypair.pubkey().as_ref(), &shred[data_offset..])); + assert_eq!(data.header.common_header.slot, slot); + assert_eq!(data.header.common_header.index, 4); + } + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 1); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 1); + } + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 3); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 3); + } + assert_eq!(data[..], result.payload[..data.len()]); + + // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail + let shreds: Vec = shredder + .shreds + .iter() + .enumerate() + .filter_map(|(i, s)| { + if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) { + Some(bincode::deserialize(s).unwrap()) + } else { + None + } + }) + .collect(); + + assert_eq!(shreds.len(), 4); + assert_matches!( + Shredder::deshred(&shreds), + Err(reed_solomon_erasure::Error::TooFewShardsPresent) + ); + } }