Implement shred erasure recovery and reassembly (#5444)

* Implement shred erasure recovery and reassembly

* fixes and unit test

* clippy

* review comments, additional tests, and some fixes

* address review comments

* more tests and cleanup
This commit is contained in:
Pankaj Garg 2019-08-07 17:02:49 -07:00 committed by GitHub
parent e30ca01999
commit 6597c71e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 706 additions and 38 deletions

View File

@ -1,11 +1,13 @@
//! The `shred` module defines data structures and methods to pull MTU sized data frames from the network. //! The `shred` module defines data structures and methods to pull MTU sized data frames from the network.
use crate::erasure::Session; use crate::erasure::Session;
use crate::result;
use crate::result::Error;
use bincode::serialized_size; use bincode::serialized_size;
use core::borrow::BorrowMut; use core::borrow::BorrowMut;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use std::io::Write; use std::io::{Error as IOError, ErrorKind, Write};
use std::sync::Arc; use std::sync::Arc;
use std::{cmp, io}; use std::{cmp, io};
@ -32,7 +34,7 @@ pub struct ShredCommonHeader {
pub struct DataShredHeader { pub struct DataShredHeader {
_reserved: CodingShredHeader, _reserved: CodingShredHeader,
pub common_header: ShredCommonHeader, pub common_header: ShredCommonHeader,
pub data_type: u8, pub last_in_slot: u8,
} }
/// The first data shred also has parent slot value in it /// The first data shred also has parent slot value in it
@ -46,9 +48,9 @@ pub struct FirstDataShredHeader {
#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] #[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
pub struct CodingShredHeader { pub struct CodingShredHeader {
pub common_header: ShredCommonHeader, pub common_header: ShredCommonHeader,
pub num_data_shreds: u8, pub num_data_shreds: u16,
pub num_coding_shreds: u8, pub num_coding_shreds: u16,
pub position: u8, pub position: u16,
pub payload: Vec<u8>, pub payload: Vec<u8>,
} }
@ -93,6 +95,23 @@ impl Default for DataShred {
} }
} }
/// Default shred is sized correctly to meet MTU/Packet size requirements
impl Default for CodingShred {
fn default() -> Self {
let size = PACKET_DATA_SIZE
- serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize;
CodingShred {
header: CodingShredHeader {
common_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
payload: vec![0; size],
},
}
}
}
/// Common trait implemented by all types of shreds /// Common trait implemented by all types of shreds
pub trait ShredCommon { pub trait ShredCommon {
/// Write at a particular offset in the shred /// Write at a particular offset in the shred
@ -167,7 +186,7 @@ impl ShredCommon for CodingShred {
} }
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct Shredder { pub struct Shredder {
slot: u64, slot: u64,
index: u32, index: u32,
@ -215,7 +234,7 @@ impl Write for Shredder {
// Continue generating more data shreds. // Continue generating more data shreds.
// If the caller decides to finalize the FEC block or Slot, the data shred will // If the caller decides to finalize the FEC block or Slot, the data shred will
// morph into appropriate shred accordingly // morph into appropriate shred accordingly
Shred::Data(DataShred::default()) Shred::Data(self.new_data_shred())
} else { } else {
self.active_offset += slice_len; self.active_offset += slice_len;
current_shred current_shred
@ -227,15 +246,17 @@ impl Write for Shredder {
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
if self.active_shred.is_none() { unimplemented!()
return Ok(());
}
let current_shred = self.active_shred.take().unwrap();
self.finalize_data_shred(current_shred);
Ok(())
} }
} }
#[derive(Default, Debug, PartialEq)]
pub struct DeshredResult {
pub payload: Vec<u8>,
pub recovered_data: Vec<Shred>,
pub recovered_code: Vec<Shred>,
}
impl Shredder { impl Shredder {
pub fn new( pub fn new(
slot: u64, slot: u64,
@ -243,14 +264,24 @@ impl Shredder {
fec_rate: f32, fec_rate: f32,
signer: &Arc<Keypair>, signer: &Arc<Keypair>,
index: u32, index: u32,
) -> Self { ) -> result::Result<Self> {
Shredder { if fec_rate > 1.0 || fec_rate < 0.0 {
slot, Err(Error::IO(IOError::new(
index, ErrorKind::Other,
parent, format!(
fec_rate, "FEC rate {:?} must be more than 0.0 and less than 1.0",
signer: signer.clone(), fec_rate
..Shredder::default() ),
)))
} else {
Ok(Shredder {
slot,
index,
parent,
fec_rate,
signer: signer.clone(),
..Shredder::default()
})
} }
} }
@ -291,6 +322,22 @@ impl Shredder {
first_shred first_shred
} }
fn new_coding_shred(
slot: u64,
index: u32,
num_data: usize,
num_code: usize,
position: usize,
) -> CodingShred {
let mut coding_shred = CodingShred::default();
coding_shred.header.common_header.slot = slot;
coding_shred.header.common_header.index = index;
coding_shred.header.num_data_shreds = num_data as u16;
coding_shred.header.num_coding_shreds = num_code as u16;
coding_shred.header.position = position as u16;
coding_shred
}
/// Generates coding shreds for the data shreds in the current FEC set /// Generates coding shreds for the data shreds in the current FEC set
fn generate_coding_shreds(&mut self) { fn generate_coding_shreds(&mut self) {
if self.fec_rate != 0.0 { if self.fec_rate != 0.0 {
@ -311,20 +358,14 @@ impl Shredder {
// Create empty coding shreds, with correctly populated headers // Create empty coding shreds, with correctly populated headers
let mut coding_shreds = Vec::with_capacity(num_coding); let mut coding_shreds = Vec::with_capacity(num_coding);
(0..num_coding).for_each(|i| { (0..num_coding).for_each(|i| {
let header = CodingShredHeader { let shred = bincode::serialize(&Shred::Coding(Self::new_coding_shred(
common_header: ShredCommonHeader { self.slot,
signature: Signature::default(), start_index + i as u32,
slot: self.slot, num_data,
index: start_index + i as u32, num_coding,
}, i,
num_data_shreds: num_data as u8, )))
num_coding_shreds: num_coding as u8, .unwrap();
position: i as u8,
payload: vec![],
};
let mut shred = bincode::serialize(&Shred::Coding(CodingShred { header })).unwrap();
shred.resize_with(PACKET_DATA_SIZE, || 0);
coding_shreds.push(shred); coding_shreds.push(shred);
}); });
@ -381,10 +422,222 @@ impl Shredder {
/// Finalize the current slot (i.e. add last slot shred) and generate coding shreds /// Finalize the current slot (i.e. add last slot shred) and generate coding shreds
pub fn finalize_slot(&mut self) { pub fn finalize_slot(&mut self) {
let final_shred = self.make_final_data_shred(); let mut final_shred = self.make_final_data_shred();
final_shred.header.last_in_slot = 1;
self.finalize_data_shred(Shred::LastInSlot(final_shred)); self.finalize_data_shred(Shred::LastInSlot(final_shred));
self.generate_coding_shreds(); self.generate_coding_shreds();
} }
fn fill_in_missing_shreds(
shred: &Shred,
num_data: usize,
num_coding: usize,
slot: u64,
first_index: usize,
expected_index: usize,
present: &mut [bool],
) -> (Vec<Vec<u8>>, bool, usize) {
let (index, mut first_shred_in_slot) = Self::get_shred_index(shred, num_data);
let mut missing_blocks: Vec<Vec<u8>> = (expected_index..index)
.map(|missing| {
present[missing] = false;
// If index 0 shred is missing, then first shred in slot will also be recovered
first_shred_in_slot |= missing == 0;
Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing)
})
.collect();
let shred_buf = bincode::serialize(shred).unwrap();
missing_blocks.push(shred_buf);
(missing_blocks, first_shred_in_slot, index)
}
fn new_empty_missing_shred(
num_data: usize,
num_coding: usize,
slot: u64,
first_index: usize,
missing: usize,
) -> Vec<u8> {
let missing_shred = if missing == 0 {
let mut data_shred = FirstDataShred::default();
data_shred.header.data_header.common_header.slot = slot;
data_shred.header.data_header.common_header.index = 0;
Shred::FirstInSlot(data_shred)
} else if missing < first_index + num_data {
let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = slot;
data_shred.header.common_header.index = missing as u32;
if missing == first_index + num_data - 1 {
Shred::LastInFECSet(data_shred)
} else {
Shred::Data(data_shred)
}
} else {
Shred::Coding(Self::new_coding_shred(
slot,
missing.saturating_sub(num_data) as u32,
num_data,
num_coding,
missing - first_index - num_data,
))
};
bincode::serialize(&missing_shred).unwrap()
}
/// Combines all shreds to recreate the original buffer
/// If the shreds include coding shreds, and if not all shreds are present, it tries
/// to reconstruct missing shreds using erasure
/// Note: The shreds are expected to be sorted
/// (lower to higher index, and data shreds before coding shreds)
pub fn deshred(shreds: &[Shred]) -> Result<DeshredResult, reed_solomon_erasure::Error> {
// If coding is enabled, the last shred must be a coding shred.
let (num_data, num_coding, first_index, slot) =
if let Shred::Coding(code) = shreds.last().unwrap() {
(
code.header.num_data_shreds as usize,
code.header.num_coding_shreds as usize,
code.header.common_header.index as usize - code.header.position as usize,
code.header.common_header.slot,
)
} else {
(shreds.len(), 0, 0, 0)
};
let mut recovered_data = vec![];
let mut recovered_code = vec![];
let fec_set_size = num_data + num_coding;
let (data_shred_bufs, first_shred) = if num_coding > 0 && shreds.len() < fec_set_size {
let coding_block_offset = CodingShred::overhead();
// Let's try recovering missing shreds using erasure
let mut present = &mut vec![true; fec_set_size];
let mut first_shred_in_slot = false;
let mut next_expected_index = first_index;
let mut shred_bufs: Vec<Vec<u8>> = shreds
.iter()
.flat_map(|shred| {
let (blocks, first_shred, last_index) = Self::fill_in_missing_shreds(
shred,
num_data,
num_coding,
slot,
first_index,
next_expected_index,
&mut present,
);
first_shred_in_slot |= first_shred;
next_expected_index = last_index + 1;
blocks
})
.collect();
let mut pending_shreds: Vec<Vec<u8>> = (next_expected_index
..first_index + fec_set_size)
.map(|missing| {
present[missing] = false;
Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing)
})
.collect();
shred_bufs.append(&mut pending_shreds);
let session = Session::new(num_data, num_coding).unwrap();
let mut blocks: Vec<&mut [u8]> = shred_bufs
.iter_mut()
.map(|x| x[coding_block_offset..].as_mut())
.collect();
session.decode_blocks(&mut blocks, &present)?;
present.iter().enumerate().for_each(|(index, was_present)| {
if !was_present {
let shred: Shred = bincode::deserialize(&shred_bufs[index]).unwrap();
if index < first_index + num_data {
// Check if the last recovered data shred is also last in Slot.
// If so, it needs to be morphed into the correct type
let shred = if let Shred::Data(s) = shred {
if s.header.last_in_slot == 1 {
Shred::LastInSlot(s)
} else {
Shred::Data(s)
}
} else if let Shred::LastInFECSet(s) = shred {
if s.header.last_in_slot == 1 {
Shred::LastInSlot(s)
} else {
Shred::LastInFECSet(s)
}
} else {
shred
};
recovered_data.push(shred)
} else {
recovered_code.push(shred)
}
}
});
(shred_bufs, first_shred_in_slot)
} else {
let (first_index, first_shred_in_slot) =
Shredder::get_shred_index(shreds.first().unwrap(), num_data);
let last_index = match shreds.last().unwrap() {
Shred::LastInFECSet(s) | Shred::LastInSlot(s) => {
s.header.common_header.index as usize
}
_ => 0,
};
if num_data.saturating_add(first_index) != last_index.saturating_add(1) {
Err(reed_solomon_erasure::Error::TooFewDataShards)?;
}
let shred_bufs: Vec<Vec<u8>> = shreds
.iter()
.map(|shred| bincode::serialize(shred).unwrap())
.collect();
(shred_bufs, first_shred_in_slot)
};
Ok(DeshredResult {
payload: Self::reassemble_payload(num_data, data_shred_bufs, first_shred),
recovered_data,
recovered_code,
})
}
fn get_shred_index(shred: &Shred, num_data: usize) -> (usize, bool) {
let (first_index, first_shred_in_slot) = match shred {
Shred::FirstInSlot(s) => (s.header.data_header.common_header.index as usize, true),
Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::LastInSlot(s) => (s.header.common_header.index as usize, false),
Shred::Coding(s) => (s.header.common_header.index as usize + num_data, false),
};
(first_index, first_shred_in_slot)
}
fn reassemble_payload(
num_data: usize,
data_shred_bufs: Vec<Vec<u8>>,
first_shred: bool,
) -> Vec<u8> {
data_shred_bufs[..num_data]
.iter()
.enumerate()
.flat_map(|(i, data)| {
let offset = if i == 0 && first_shred {
bincode::serialized_size(&Shred::FirstInSlot(FirstDataShred::empty_shred()))
.unwrap()
} else {
bincode::serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap()
};
data[offset as usize..].iter()
})
.cloned()
.collect()
}
} }
#[cfg(test)] #[cfg(test)]
@ -394,12 +647,17 @@ mod tests {
#[test] #[test]
fn test_data_shredder() { fn test_data_shredder() {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0); let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty()); assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0); assert_eq!(shredder.active_offset, 0);
assert!(FirstDataShred::overhead() < PACKET_DATA_SIZE);
assert!(DataShred::overhead() < PACKET_DATA_SIZE);
assert!(CodingShred::overhead() < PACKET_DATA_SIZE);
// Test0: Write some data to shred. Not enough to create a signed shred // Test0: Write some data to shred. Not enough to create a signed shred
let data: Vec<u8> = (0..25).collect(); let data: Vec<u8> = (0..25).collect();
assert_eq!(shredder.write(&data).unwrap(), data.len()); assert_eq!(shredder.write(&data).unwrap(), data.len());
@ -444,6 +702,7 @@ mod tests {
.common_header .common_header
.signature .signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.data_header.common_header.index, 0);
} }
// Test5: Write left over data, and assert that a data shred is being created // Test5: Write left over data, and assert that a data shred is being created
@ -471,6 +730,7 @@ mod tests {
.common_header .common_header
.signature .signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.index, 1);
} }
// Test7: Let's write some more data to the shredder. // Test7: Let's write some more data to the shredder.
@ -495,6 +755,7 @@ mod tests {
.common_header .common_header
.signature .signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.index, 2);
} }
// Test8: Write more data to generate an intermediate data shred // Test8: Write more data to generate an intermediate data shred
@ -516,6 +777,7 @@ mod tests {
.common_header .common_header
.signature .signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.index, 3);
} }
// Test9: Write some data to shredder // Test9: Write some data to shredder
@ -540,13 +802,105 @@ mod tests {
.common_header .common_header
.signature .signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.index, 4);
}
}
#[test]
fn test_small_data_shredder() {
let keypair = Arc::new(Keypair::new());
let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 0.0, &keypair, 0)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..25).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let _ = shredder.write(&data).unwrap();
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
shredder.finalize_fec_block();
// We should have 2 shreds now (FirstInSlot, and LastInFECSet)
assert_eq!(shredder.shreds.len(), 2);
let data_offset = CodingShred::overhead()
+ bincode::serialized_size(&Signature::default()).unwrap() as usize;
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::FirstInSlot(_));
if let Shred::FirstInSlot(data) = deserialized_shred {
assert!(data
.header
.data_header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
}
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
if let Shred::LastInFECSet(data) = deserialized_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
}
// Try shredder when no parent is provided
let mut shredder = Shredder::new(0x123456789abcdef0, None, 0.0, &keypair, 2)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..25).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let _ = shredder.write(&data).unwrap();
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
shredder.finalize_fec_block();
// We should have 1 shred now (LastInFECSet)
assert_eq!(shredder.shreds.len(), 1);
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
if let Shred::LastInFECSet(data) = deserialized_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
} }
} }
#[test] #[test]
fn test_data_and_code_shredder() { fn test_data_and_code_shredder() {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 1.0, &keypair, 0);
// Test that FEC rate cannot be > 1.0
assert_matches!(
Shredder::new(0x123456789abcdef0, Some(5), 1.001, &keypair, 0),
Err(_)
);
let mut shredder = Shredder::new(0x123456789abcdef0, Some(5), 1.0, &keypair, 0)
.expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty()); assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_shred, None);
@ -646,4 +1000,318 @@ mod tests {
.verify(keypair.pubkey().as_ref(), &shred[coding_data_offset..])); .verify(keypair.pubkey().as_ref(), &shred[coding_data_offset..]));
} }
} }
#[test]
fn test_recovery_and_reassembly() {
let keypair = Arc::new(Keypair::new());
let slot = 0x123456789abcdef0;
let mut shredder =
Shredder::new(slot, Some(5), 1.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..5000).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let mut offset = shredder.write(&data).unwrap();
let approx_shred_payload_size = offset;
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
// We should have some shreds now
assert_eq!(
shredder.shreds.len(),
data.len() / approx_shred_payload_size
);
assert_eq!(offset, data.len());
shredder.finalize_fec_block();
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shreds.len(), expected_shred_count);
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
assert_matches!(
Shredder::deshred(&shreds[..4]),
Err(reed_solomon_erasure::Error::TooFewDataShards)
);
// Test1: Try recovery/reassembly with only data shreds. Hint: should work
let result = Shredder::deshred(&shreds[..5]).unwrap();
assert_ne!(DeshredResult::default(), result);
assert!(result.payload.len() >= data.len());
assert!(result.recovered_data.is_empty());
assert!(result.recovered_code.is_empty());
assert_eq!(data[..], result.payload[..data.len()]);
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.enumerate()
.filter_map(|(i, s)| {
if i % 2 == 0 {
Some(bincode::deserialize(s).unwrap())
} else {
None
}
})
.collect();
let data_offset = CodingShred::overhead()
+ bincode::serialized_size(&Signature::default()).unwrap() as usize;
let mut result = Shredder::deshred(&shreds).unwrap();
assert!(result.payload.len() >= data.len());
assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::Data(_));
if let Shred::Data(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 1);
}
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::Data(_));
if let Shred::Data(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 3);
}
assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 0);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 2);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 4);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 4);
}
assert_eq!(data[..], result.payload[..data.len()]);
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.enumerate()
.filter_map(|(i, s)| {
if i % 2 != 0 {
Some(bincode::deserialize(s).unwrap())
} else {
None
}
})
.collect();
let mut result = Shredder::deshred(&shreds).unwrap();
assert!(result.payload.len() >= data.len());
assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::FirstInSlot(_));
if let Shred::FirstInSlot(data) = recovered_shred {
assert!(data
.header
.data_header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.data_header.common_header.slot, slot);
assert_eq!(data.header.data_header.common_header.index, 0);
}
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::Data(_));
if let Shred::Data(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 2);
}
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::LastInFECSet(_));
if let Shred::LastInFECSet(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 4);
}
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 1);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 3);
}
assert_eq!(data[..], result.payload[..data.len()]);
// Test4: Try recovery/reassembly full slot with 3 missing data shreds + 2 coding shreds. Hint: should work
let mut shredder =
Shredder::new(slot, Some(5), 1.0, &keypair, 0).expect("Failed in creating shredder");
let mut offset = shredder.write(&data).unwrap();
let approx_shred_payload_size = offset;
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
offset += shredder.write(&data[offset..]).unwrap();
// We should have some shreds now
assert_eq!(
shredder.shreds.len(),
data.len() / approx_shred_payload_size
);
assert_eq!(offset, data.len());
shredder.finalize_slot();
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shreds.len(), expected_shred_count);
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.enumerate()
.filter_map(|(i, s)| {
if i % 2 != 0 {
Some(bincode::deserialize(s).unwrap())
} else {
None
}
})
.collect();
let mut result = Shredder::deshred(&shreds).unwrap();
assert!(result.payload.len() >= data.len());
assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::FirstInSlot(_));
if let Shred::FirstInSlot(data) = recovered_shred {
assert!(data
.header
.data_header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.data_header.common_header.slot, slot);
assert_eq!(data.header.data_header.common_header.index, 0);
}
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::Data(_));
if let Shred::Data(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 2);
}
let recovered_shred = result.recovered_data.remove(0);
let shred = bincode::serialize(&recovered_shred).unwrap();
assert_matches!(recovered_shred, Shred::LastInSlot(_));
if let Shred::LastInSlot(data) = recovered_shred {
assert!(data
.header
.common_header
.signature
.verify(keypair.pubkey().as_ref(), &shred[data_offset..]));
assert_eq!(data.header.common_header.slot, slot);
assert_eq!(data.header.common_header.index, 4);
}
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 1);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 3);
}
assert_eq!(data[..], result.payload[..data.len()]);
// Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.enumerate()
.filter_map(|(i, s)| {
if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) {
Some(bincode::deserialize(s).unwrap())
} else {
None
}
})
.collect();
assert_eq!(shreds.len(), 4);
assert_matches!(
Shredder::deshred(&shreds),
Err(reed_solomon_erasure::Error::TooFewShardsPresent)
);
}
} }