minor edits in shred and shredder (#24841)
Removed Default implementation for ShredType. ShredType should always be explicitly specified, and not rely on default values. Simplified single-arg Shred Error variants to use shorter syntax. Renamed erasure blocks to shards, to be consistent with reed_solomon crate and not to confuse with FEC blocks.
This commit is contained in:
parent
c0981a9f8c
commit
08e1727926
|
@ -119,20 +119,20 @@ pub enum Error {
|
||||||
BincodeError(#[from] bincode::Error),
|
BincodeError(#[from] bincode::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
ErasureError(#[from] reed_solomon_erasure::Error),
|
ErasureError(#[from] reed_solomon_erasure::Error),
|
||||||
#[error("Invalid data shred index: {index}")]
|
#[error("Invalid data shred index: {0}")]
|
||||||
InvalidDataShredIndex { index: u32 },
|
InvalidDataShredIndex(/*shred index:*/ u32),
|
||||||
#[error("Invalid data size: {size}, payload: {payload}")]
|
#[error("Invalid data size: {size}, payload: {payload}")]
|
||||||
InvalidDataSize { size: u16, payload: usize },
|
InvalidDataSize { size: u16, payload: usize },
|
||||||
#[error("Invalid erasure block index: {0:?}")]
|
#[error("Invalid erasure shard index: {0:?}")]
|
||||||
InvalidErasureBlockIndex(Box<dyn Debug>),
|
InvalidErasureShardIndex(/*headers:*/ Box<dyn Debug>),
|
||||||
#[error("Invalid num coding shreds: {0}")]
|
#[error("Invalid num coding shreds: {0}")]
|
||||||
InvalidNumCodingShreds(u16),
|
InvalidNumCodingShreds(u16),
|
||||||
#[error("Invalid parent_offset: {parent_offset}, slot: {slot}")]
|
#[error("Invalid parent_offset: {parent_offset}, slot: {slot}")]
|
||||||
InvalidParentOffset { slot: Slot, parent_offset: u16 },
|
InvalidParentOffset { slot: Slot, parent_offset: u16 },
|
||||||
#[error("Invalid parent slot: {parent_slot}, slot: {slot}")]
|
#[error("Invalid parent slot: {parent_slot}, slot: {slot}")]
|
||||||
InvalidParentSlot { slot: Slot, parent_slot: Slot },
|
InvalidParentSlot { slot: Slot, parent_slot: Slot },
|
||||||
#[error("Invalid payload size: {size}")]
|
#[error("Invalid payload size: {0}")]
|
||||||
InvalidPayloadSize { size: usize },
|
InvalidPayloadSize(/*payload size:*/ usize),
|
||||||
#[error("Invalid shred type")]
|
#[error("Invalid shred type")]
|
||||||
InvalidShredType,
|
InvalidShredType,
|
||||||
}
|
}
|
||||||
|
@ -158,14 +158,8 @@ pub enum ShredType {
|
||||||
Code = 0b0101_1010,
|
Code = 0b0101_1010,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ShredType {
|
|
||||||
fn default() -> Self {
|
|
||||||
ShredType::Data
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A common header that is present in data and code shred headers
|
/// A common header that is present in data and code shred headers
|
||||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
|
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
|
||||||
struct ShredCommonHeader {
|
struct ShredCommonHeader {
|
||||||
signature: Signature,
|
signature: Signature,
|
||||||
shred_type: ShredType,
|
shred_type: ShredType,
|
||||||
|
@ -277,11 +271,12 @@ impl Shred {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
|
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
|
||||||
let common_header = ShredCommonHeader {
|
let common_header = ShredCommonHeader {
|
||||||
|
signature: Signature::default(),
|
||||||
|
shred_type: ShredType::Data,
|
||||||
slot,
|
slot,
|
||||||
index,
|
index,
|
||||||
version,
|
version,
|
||||||
fec_set_index,
|
fec_set_index,
|
||||||
..ShredCommonHeader::default()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16;
|
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16;
|
||||||
|
@ -366,12 +361,12 @@ impl Shred {
|
||||||
version: u16,
|
version: u16,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let common_header = ShredCommonHeader {
|
let common_header = ShredCommonHeader {
|
||||||
|
signature: Signature::default(),
|
||||||
shred_type: ShredType::Code,
|
shred_type: ShredType::Code,
|
||||||
index,
|
index,
|
||||||
slot,
|
slot,
|
||||||
version,
|
version,
|
||||||
fec_set_index,
|
fec_set_index,
|
||||||
..ShredCommonHeader::default()
|
|
||||||
};
|
};
|
||||||
let coding_header = CodingShredHeader {
|
let coding_header = CodingShredHeader {
|
||||||
num_data_shreds,
|
num_data_shreds,
|
||||||
|
@ -477,7 +472,7 @@ impl Shred {
|
||||||
// Possibly zero pads bytes stored in blockstore.
|
// Possibly zero pads bytes stored in blockstore.
|
||||||
pub(crate) fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
|
pub(crate) fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
|
||||||
let shred_type = match shred.get(OFFSET_OF_SHRED_TYPE) {
|
let shred_type = match shred.get(OFFSET_OF_SHRED_TYPE) {
|
||||||
None => return Err(Error::InvalidPayloadSize { size: shred.len() }),
|
None => return Err(Error::InvalidPayloadSize(shred.len())),
|
||||||
Some(shred_type) => match ShredType::try_from(*shred_type) {
|
Some(shred_type) => match ShredType::try_from(*shred_type) {
|
||||||
Err(_) => return Err(Error::InvalidShredType),
|
Err(_) => return Err(Error::InvalidShredType),
|
||||||
Ok(shred_type) => shred_type,
|
Ok(shred_type) => shred_type,
|
||||||
|
@ -487,7 +482,7 @@ impl Shred {
|
||||||
ShredType::Code => Ok(shred),
|
ShredType::Code => Ok(shred),
|
||||||
ShredType::Data => {
|
ShredType::Data => {
|
||||||
if shred.len() > SHRED_PAYLOAD_SIZE {
|
if shred.len() > SHRED_PAYLOAD_SIZE {
|
||||||
return Err(Error::InvalidPayloadSize { size: shred.len() });
|
return Err(Error::InvalidPayloadSize(shred.len()));
|
||||||
}
|
}
|
||||||
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
|
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
|
||||||
Ok(shred)
|
Ok(shred)
|
||||||
|
@ -516,23 +511,19 @@ impl Shred {
|
||||||
// Returns true if the shred passes sanity checks.
|
// Returns true if the shred passes sanity checks.
|
||||||
pub fn sanitize(&self) -> Result<(), Error> {
|
pub fn sanitize(&self) -> Result<(), Error> {
|
||||||
if self.payload().len() != SHRED_PAYLOAD_SIZE {
|
if self.payload().len() != SHRED_PAYLOAD_SIZE {
|
||||||
return Err(Error::InvalidPayloadSize {
|
return Err(Error::InvalidPayloadSize(self.payload.len()));
|
||||||
size: self.payload.len(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
if self.erasure_block_index().is_none() {
|
if self.erasure_shard_index().is_none() {
|
||||||
let headers: Box<dyn Debug> = match self.shred_type() {
|
let headers: Box<dyn Debug> = match self.shred_type() {
|
||||||
ShredType::Data => Box::new((self.common_header, self.data_header)),
|
ShredType::Data => Box::new((self.common_header, self.data_header)),
|
||||||
ShredType::Code => Box::new((self.common_header, self.coding_header)),
|
ShredType::Code => Box::new((self.common_header, self.coding_header)),
|
||||||
};
|
};
|
||||||
return Err(Error::InvalidErasureBlockIndex(headers));
|
return Err(Error::InvalidErasureShardIndex(headers));
|
||||||
}
|
}
|
||||||
match self.shred_type() {
|
match self.shred_type() {
|
||||||
ShredType::Data => {
|
ShredType::Data => {
|
||||||
if self.index() as usize >= MAX_DATA_SHREDS_PER_SLOT {
|
if self.index() as usize >= MAX_DATA_SHREDS_PER_SLOT {
|
||||||
return Err(Error::InvalidDataShredIndex {
|
return Err(Error::InvalidDataShredIndex(self.index()));
|
||||||
index: self.index(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
let _parent = self.parent()?;
|
let _parent = self.parent()?;
|
||||||
let size = usize::from(self.data_header.size);
|
let size = usize::from(self.data_header.size);
|
||||||
|
@ -564,8 +555,8 @@ impl Shred {
|
||||||
ErasureSetId(self.slot(), self.fec_set_index())
|
ErasureSetId(self.slot(), self.fec_set_index())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the block index within the erasure coding set.
|
// Returns the shard index within the erasure coding set.
|
||||||
pub(crate) fn erasure_block_index(&self) -> Option<usize> {
|
pub(crate) fn erasure_shard_index(&self) -> Option<usize> {
|
||||||
match self.shred_type() {
|
match self.shred_type() {
|
||||||
ShredType::Data => {
|
ShredType::Data => {
|
||||||
let index = self.index().checked_sub(self.fec_set_index())?;
|
let index = self.index().checked_sub(self.fec_set_index())?;
|
||||||
|
@ -591,34 +582,30 @@ impl Shred {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the portion of the shred's payload which is erasure coded.
|
// Returns the portion of the shred's payload which is erasure coded.
|
||||||
pub(crate) fn erasure_block(self) -> Result<Vec<u8>, Error> {
|
pub(crate) fn erasure_shard(self) -> Result<Vec<u8>, Error> {
|
||||||
if self.payload.len() != SHRED_PAYLOAD_SIZE {
|
if self.payload.len() != SHRED_PAYLOAD_SIZE {
|
||||||
return Err(Error::InvalidPayloadSize {
|
return Err(Error::InvalidPayloadSize(self.payload.len()));
|
||||||
size: self.payload.len(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
let shred_type = self.shred_type();
|
let shred_type = self.shred_type();
|
||||||
let mut block = self.payload;
|
let mut shard = self.payload;
|
||||||
match shred_type {
|
match shred_type {
|
||||||
ShredType::Data => {
|
ShredType::Data => {
|
||||||
block.resize(ENCODED_PAYLOAD_SIZE, 0u8);
|
shard.resize(ENCODED_PAYLOAD_SIZE, 0u8);
|
||||||
}
|
}
|
||||||
ShredType::Code => {
|
ShredType::Code => {
|
||||||
// SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the
|
// SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the
|
||||||
// coding shreds contains the header and is not part of erasure
|
// coding shreds contains the header and is not part of erasure
|
||||||
// coding.
|
// coding.
|
||||||
block.drain(..SIZE_OF_CODING_SHRED_HEADERS);
|
shard.drain(..SIZE_OF_CODING_SHRED_HEADERS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(block)
|
Ok(shard)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Like Shred::erasure_block but returning a slice
|
// Like Shred::erasure_shard but returning a slice.
|
||||||
pub(crate) fn erasure_block_as_slice(&self) -> Result<&[u8], Error> {
|
pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
|
||||||
if self.payload.len() != SHRED_PAYLOAD_SIZE {
|
if self.payload.len() != SHRED_PAYLOAD_SIZE {
|
||||||
return Err(Error::InvalidPayloadSize {
|
return Err(Error::InvalidPayloadSize(self.payload.len()));
|
||||||
size: self.payload.len(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
Ok(match self.shred_type() {
|
Ok(match self.shred_type() {
|
||||||
ShredType::Data => &self.payload[..ENCODED_PAYLOAD_SIZE],
|
ShredType::Data => &self.payload[..ENCODED_PAYLOAD_SIZE],
|
||||||
|
@ -888,9 +875,17 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_shred_constants() {
|
fn test_shred_constants() {
|
||||||
|
let common_header = ShredCommonHeader {
|
||||||
|
signature: Signature::default(),
|
||||||
|
shred_type: ShredType::Code,
|
||||||
|
slot: Slot::MAX,
|
||||||
|
index: u32::MAX,
|
||||||
|
version: u16::MAX,
|
||||||
|
fec_set_index: u32::MAX,
|
||||||
|
};
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_COMMON_SHRED_HEADER,
|
SIZE_OF_COMMON_SHRED_HEADER,
|
||||||
serialized_size(&ShredCommonHeader::default()).unwrap() as usize
|
serialized_size(&common_header).unwrap() as usize
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_CODING_SHRED_HEADER,
|
SIZE_OF_CODING_SHRED_HEADER,
|
||||||
|
@ -914,7 +909,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_SHRED_TYPE,
|
SIZE_OF_SHRED_TYPE,
|
||||||
bincode::serialized_size(&ShredType::default()).unwrap() as usize
|
bincode::serialized_size(&ShredType::Code).unwrap() as usize
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_SHRED_SLOT,
|
SIZE_OF_SHRED_SLOT,
|
||||||
|
@ -922,7 +917,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
SIZE_OF_SHRED_INDEX,
|
SIZE_OF_SHRED_INDEX,
|
||||||
bincode::serialized_size(&ShredCommonHeader::default().index).unwrap() as usize
|
bincode::serialized_size(&common_header.index).unwrap() as usize
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1089,10 +1084,7 @@ mod tests {
|
||||||
{
|
{
|
||||||
let mut shred = shred.clone();
|
let mut shred = shred.clone();
|
||||||
shred.payload.push(10u8);
|
shred.payload.push(10u8);
|
||||||
assert_matches!(
|
assert_matches!(shred.sanitize(), Err(Error::InvalidPayloadSize(1229)));
|
||||||
shred.sanitize(),
|
|
||||||
Err(Error::InvalidPayloadSize { size: 1229 })
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut shred = shred.clone();
|
let mut shred = shred.clone();
|
||||||
|
@ -1119,10 +1111,7 @@ mod tests {
|
||||||
{
|
{
|
||||||
let mut shred = shred.clone();
|
let mut shred = shred.clone();
|
||||||
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
|
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
|
||||||
assert_matches!(
|
assert_matches!(shred.sanitize(), Err(Error::InvalidDataShredIndex(32768)));
|
||||||
shred.sanitize(),
|
|
||||||
Err(Error::InvalidDataShredIndex { index: 32768 })
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
shred.data_header.size = shred.payload().len() as u16 + 1;
|
shred.data_header.size = shred.payload().len() as u16 + 1;
|
||||||
|
@ -1156,7 +1145,7 @@ mod tests {
|
||||||
shred.set_index(index as u32);
|
shred.set_index(index as u32);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
shred.sanitize(),
|
shred.sanitize(),
|
||||||
Err(Error::InvalidErasureBlockIndex { .. })
|
Err(Error::InvalidErasureShardIndex { .. })
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
@ -1164,7 +1153,7 @@ mod tests {
|
||||||
shred.coding_header.num_coding_shreds = 0;
|
shred.coding_header.num_coding_shreds = 0;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
shred.sanitize(),
|
shred.sanitize(),
|
||||||
Err(Error::InvalidErasureBlockIndex { .. })
|
Err(Error::InvalidErasureShardIndex { .. })
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// pos >= num_coding is invalid.
|
// pos >= num_coding is invalid.
|
||||||
|
@ -1174,7 +1163,7 @@ mod tests {
|
||||||
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
|
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
shred.sanitize(),
|
shred.sanitize(),
|
||||||
Err(Error::InvalidErasureBlockIndex { .. })
|
Err(Error::InvalidErasureShardIndex { .. })
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// set_index with num_coding that would imply the last
|
// set_index with num_coding that would imply the last
|
||||||
|
@ -1188,13 +1177,13 @@ mod tests {
|
||||||
shred.common_header.index = std::u32::MAX - 1;
|
shred.common_header.index = std::u32::MAX - 1;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
shred.sanitize(),
|
shred.sanitize(),
|
||||||
Err(Error::InvalidErasureBlockIndex { .. })
|
Err(Error::InvalidErasureShardIndex { .. })
|
||||||
);
|
);
|
||||||
|
|
||||||
shred.coding_header.num_coding_shreds = 2000;
|
shred.coding_header.num_coding_shreds = 2000;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
shred.sanitize(),
|
shred.sanitize(),
|
||||||
Err(Error::InvalidErasureBlockIndex { .. })
|
Err(Error::InvalidErasureShardIndex { .. })
|
||||||
);
|
);
|
||||||
|
|
||||||
// Decreasing the number of num_coding_shreds will put it within
|
// Decreasing the number of num_coding_shreds will put it within
|
||||||
|
|
|
@ -232,7 +232,7 @@ impl Shredder {
|
||||||
} else {
|
} else {
|
||||||
num_data
|
num_data
|
||||||
};
|
};
|
||||||
let data = data.iter().map(Shred::erasure_block_as_slice);
|
let data = data.iter().map(Shred::erasure_shard_as_slice);
|
||||||
let data: Vec<_> = data.collect::<Result<_, _>>().unwrap();
|
let data: Vec<_> = data.collect::<Result<_, _>>().unwrap();
|
||||||
let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
|
let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
|
||||||
Session::new(num_data, num_coding)
|
Session::new(num_data, num_coding)
|
||||||
|
@ -289,27 +289,27 @@ impl Shredder {
|
||||||
}
|
}
|
||||||
// Mask to exclude data shreds already received from the return value.
|
// Mask to exclude data shreds already received from the return value.
|
||||||
let mut mask = vec![false; num_data_shreds];
|
let mut mask = vec![false; num_data_shreds];
|
||||||
let mut blocks = vec![None; fec_set_size];
|
let mut shards = vec![None; fec_set_size];
|
||||||
for shred in shreds {
|
for shred in shreds {
|
||||||
let index = match shred.erasure_block_index() {
|
let index = match shred.erasure_shard_index() {
|
||||||
Some(index) if index < fec_set_size => index,
|
Some(index) if index < fec_set_size => index,
|
||||||
_ => return Err(Error::from(InvalidIndex)),
|
_ => return Err(Error::from(InvalidIndex)),
|
||||||
};
|
};
|
||||||
blocks[index] = Some(shred.erasure_block()?);
|
shards[index] = Some(shred.erasure_shard()?);
|
||||||
if index < num_data_shreds {
|
if index < num_data_shreds {
|
||||||
mask[index] = true;
|
mask[index] = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Session::new(num_data_shreds, num_coding_shreds)?.decode_blocks(&mut blocks)?;
|
Session::new(num_data_shreds, num_coding_shreds)?.decode_blocks(&mut shards)?;
|
||||||
let recovered_data = mask
|
let recovered_data = mask
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(blocks)
|
.zip(shards)
|
||||||
.filter(|(mask, _)| !mask)
|
.filter(|(mask, _)| !mask)
|
||||||
.filter_map(|(_, block)| Shred::new_from_serialized_shred(block?).ok())
|
.filter_map(|(_, shard)| Shred::new_from_serialized_shred(shard?).ok())
|
||||||
.filter(|shred| {
|
.filter(|shred| {
|
||||||
shred.slot() == slot
|
shred.slot() == slot
|
||||||
&& shred.is_data()
|
&& shred.is_data()
|
||||||
&& match shred.erasure_block_index() {
|
&& match shred.erasure_shard_index() {
|
||||||
Some(index) => index < num_data_shreds,
|
Some(index) => index < num_data_shreds,
|
||||||
None => false,
|
None => false,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue