separates out data vs code shreds at the type level

Working towards revising shred struct to embed versioning so that a new
variant can contain merkle tree hashes of the erasure batch. To ease out
migration the commit adds more type-safety by distinguishing data vs
code shreds at the type level.

Additionally having both data and coding headers in each shred is
redundant as only one is relevant for each shred. The revised shred type
in this commit will only have one type-specific header.
https://github.com/solana-labs/solana/blob/c785f1ffc/ledger/src/shred.rs#L198-L203
This commit is contained in:
behzad nouri 2022-05-05 19:48:00 -04:00
parent 971748b335
commit e2bbc3913d
6 changed files with 808 additions and 511 deletions

View File

@ -24,7 +24,6 @@ pub mod leader_schedule_utils;
pub mod next_slots_iterator;
pub mod rooted_slot_iterator;
pub mod shred;
pub mod shred_stats;
mod shredder;
pub mod sigverify_shreds;
pub mod slot_stats;

View File

@ -49,17 +49,14 @@
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
//! payload can fit into one coding shred / packet.
pub use crate::{
shred_stats::{ProcessShredsStats, ShredFetchStats},
shredder::Shredder,
};
use {
self::traits::{Shred as _, ShredCode as _, ShredData as _},
crate::blockstore::MAX_DATA_SHREDS_PER_SLOT,
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::{deserialize_from_with_limit, limited_deserialize, Packet},
solana_perf::packet::{limited_deserialize, Packet},
solana_sdk::{
clock::Slot,
hash::{hashv, Hash},
@ -68,9 +65,20 @@ use {
signature::{Keypair, Signature, Signer},
},
static_assertions::const_assert_eq,
std::{fmt::Debug, io::Cursor, mem::size_of, ops::RangeInclusive},
std::fmt::Debug,
thiserror::Error,
};
pub use {
self::{
legacy::{ShredCode, ShredData},
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::Shredder,
};
mod legacy;
mod stats;
mod traits;
pub type Nonce = u32;
@ -97,22 +105,12 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
- SIZE_OF_NONCE;
const_assert_eq!(SHRED_DATA_OFFSET, 88);
const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
// DataShredHeader.size is sum of common-shred-header, data-shred-header and
// data.len(). Broadcast stage may create zero length data shreds when the
// previous slot was interrupted:
// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79
const DATA_SHRED_SIZE_RANGE: RangeInclusive<usize> =
SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD;
const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
const_assert_eq!(SHRED_PAYLOAD_SIZE, 1228);
const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
// is never used and is not part of erasure coding.
const_assert_eq!(ENCODED_PAYLOAD_SIZE, 1139);
const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32;
@ -186,7 +184,7 @@ struct ShredCommonHeader {
}
/// The data shred header has parent offset and flags
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
struct DataShredHeader {
parent_offset: u16,
flags: ShredFlags,
@ -194,7 +192,7 @@ struct DataShredHeader {
}
/// The coding shred header has FEC information
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
@ -202,11 +200,9 @@ struct CodingShredHeader {
}
#[derive(Clone, Debug, PartialEq)]
pub struct Shred {
common_header: ShredCommonHeader,
data_header: DataShredHeader,
coding_header: CodingShredHeader,
payload: Vec<u8>,
pub enum Shred {
ShredCode(ShredCode),
ShredData(ShredData),
}
/// Tuple which uniquely identifies a shred should it exists.
@ -238,11 +234,61 @@ impl ErasureSetId {
}
}
macro_rules! dispatch {
($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::ShredCode(shred) => shred.$name($($arg, )?),
Self::ShredData(shred) => shred.$name($($arg, )?),
}
}
};
($vis:vis fn $name:ident(self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::ShredCode(shred) => shred.$name($($arg, )?),
Self::ShredData(shred) => shred.$name($($arg, )?),
}
}
};
($vis:vis fn $name:ident(&mut self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&mut self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::ShredCode(shred) => shred.$name($($arg, )?),
Self::ShredData(shred) => shred.$name($($arg, )?),
}
}
}
}
impl Shred {
dispatch!(fn common_header(&self) -> &ShredCommonHeader);
dispatch!(fn set_signature(&mut self, signature: Signature));
dispatch!(fn signed_payload(&self) -> &[u8]);
// Returns the portion of the shred's payload which is erasure coded.
dispatch!(pub(crate) fn erasure_shard(self) -> Result<Vec<u8>, Error>);
// Like Shred::erasure_shard but returning a slice.
dispatch!(pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>);
// Returns the shard index within the erasure coding set.
dispatch!(pub(crate) fn erasure_shard_index(&self) -> Option<usize>);
dispatch!(pub fn into_payload(self) -> Vec<u8>);
dispatch!(pub fn payload(&self) -> &Vec<u8>);
dispatch!(pub fn sanitize(&self) -> Result<(), Error>);
// Only for tests.
dispatch!(pub fn set_index(&mut self, index: u32));
dispatch!(pub fn set_slot(&mut self, slot: Slot));
pub fn copy_to_packet(&self, packet: &mut Packet) {
let len = self.payload.len();
packet.data[..len].copy_from_slice(&self.payload[..]);
packet.meta.size = len;
let payload = self.payload();
let size = payload.len();
packet.data[..size].copy_from_slice(&payload[..]);
packet.meta.size = size;
}
// TODO: Should this sanitize output?
@ -256,69 +302,23 @@ impl Shred {
version: u16,
fec_set_index: u32,
) -> Self {
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let common_header = ShredCommonHeader {
signature: Signature::default(),
shred_type: ShredType::Data,
Self::from(ShredData::new_from_data(
slot,
index,
parent_offset,
data,
flags,
reference_tick,
version,
fec_set_index,
};
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16;
let flags = flags
| unsafe {
ShredFlags::from_bits_unchecked(
ShredFlags::SHRED_TICK_REFERENCE_MASK
.bits()
.min(reference_tick),
)
};
let data_header = DataShredHeader {
parent_offset,
flags,
size,
};
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &data_header).unwrap();
// TODO: Need to check if data is too large!
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SHRED_DATA_OFFSET);
payload[offset..offset + data.len()].copy_from_slice(data);
Self {
common_header,
data_header,
coding_header: CodingShredHeader::default(),
payload,
}
))
}
pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut cursor = Cursor::new(&payload[..]);
let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?;
let (data_header, coding_header) = match common_header.shred_type {
ShredType::Code => {
let coding_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/10109
payload.truncate(SHRED_PAYLOAD_SIZE);
(DataShredHeader::default(), coding_header)
}
ShredType::Data => {
let data_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/16602
payload.resize(SHRED_PAYLOAD_SIZE, 0u8);
(data_header, CodingShredHeader::default())
}
};
let shred = Self {
common_header,
data_header,
coding_header,
payload,
};
shred.sanitize().map(|_| shred)
pub fn new_from_serialized_shred(shred: Vec<u8>) -> Result<Self, Error> {
Ok(match Self::shred_type_from_payload(&shred)? {
ShredType::Code => Self::from(ShredCode::from_payload(shred)?),
ShredType::Data => Self::from(ShredData::from_payload(shred)?),
})
}
pub fn new_from_parity_shard(
@ -331,35 +331,16 @@ impl Shred {
position: u16,
version: u16,
) -> Self {
let common_header = ShredCommonHeader {
signature: Signature::default(),
shred_type: ShredType::Code,
index,
Self::from(ShredCode::new_from_parity_shard(
slot,
version,
index,
parity_shard,
fec_set_index,
};
let coding_header = CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position,
};
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &coding_header).unwrap();
// Tests may have an empty parity_shard.
if !parity_shard.is_empty() {
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SIZE_OF_CODING_SHRED_HEADERS);
payload[offset..].copy_from_slice(parity_shard);
}
Shred {
common_header,
data_header: DataShredHeader::default(),
coding_header,
payload,
}
version,
))
}
/// Unique identifier for each shred.
@ -368,158 +349,57 @@ impl Shred {
}
pub fn slot(&self) -> Slot {
self.common_header.slot
self.common_header().slot
}
pub fn parent(&self) -> Result<Slot, Error> {
match self.shred_type() {
ShredType::Data => {
let slot = self.slot();
let parent_offset = self.data_header.parent_offset;
if parent_offset == 0 && slot != 0 {
return Err(Error::InvalidParentOffset {
slot,
parent_offset,
});
}
slot.checked_sub(Slot::from(parent_offset))
.ok_or(Error::InvalidParentOffset {
slot,
parent_offset,
})
}
ShredType::Code => Err(Error::InvalidShredType),
match self {
Self::ShredCode(_) => Err(Error::InvalidShredType),
Self::ShredData(shred) => shred.parent(),
}
}
pub fn index(&self) -> u32 {
self.common_header.index
self.common_header().index
}
pub(crate) fn data(&self) -> Result<&[u8], Error> {
match self.shred_type() {
ShredType::Code => Err(Error::InvalidShredType),
ShredType::Data => {
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
Ok(&self.payload[SHRED_DATA_OFFSET..size])
}
match self {
Self::ShredCode(_) => Err(Error::InvalidShredType),
Self::ShredData(shred) => shred.data(),
}
}
#[inline]
pub fn payload(&self) -> &Vec<u8> {
&self.payload
}
// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
pub(crate) fn bytes_to_store(&self) -> &[u8] {
match self.shred_type() {
ShredType::Code => &self.payload,
ShredType::Data => {
// Payload will be padded out to SHRED_PAYLOAD_SIZE.
// But only need to store the bytes within data_header.size.
&self.payload[..self.data_header.size as usize]
}
match self {
Self::ShredCode(shred) => shred.payload(),
Self::ShredData(shred) => shred.bytes_to_store(),
}
}
// Possibly zero pads bytes stored in blockstore.
pub(crate) fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
let shred_type = match shred.get(OFFSET_OF_SHRED_TYPE) {
None => return Err(Error::InvalidPayloadSize(shred.len())),
Some(shred_type) => match ShredType::try_from(*shred_type) {
Err(_) => return Err(Error::InvalidShredType),
Ok(shred_type) => shred_type,
},
};
match shred_type {
ShredType::Code => {
if shred.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(shred.len()));
}
Ok(shred)
}
ShredType::Data => {
if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Ok(shred)
}
pub(crate) fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error> {
match Self::shred_type_from_payload(&shred)? {
ShredType::Code => ShredCode::resize_stored_shred(shred),
ShredType::Data => ShredData::resize_stored_shred(shred),
}
}
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
pub fn fec_set_index(&self) -> u32 {
self.common_header.fec_set_index
self.common_header().fec_set_index
}
pub(crate) fn first_coding_index(&self) -> Option<u32> {
match self.shred_type() {
ShredType::Data => None,
ShredType::Code => {
let position = u32::from(self.coding_header.position);
self.index().checked_sub(position)
}
match self {
Self::ShredCode(shred) => shred.first_coding_index(),
Self::ShredData(_) => None,
}
}
// Returns true if the shred passes sanity checks.
pub fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
if self.erasure_shard_index().is_none() {
let headers: Box<dyn Debug> = match self.shred_type() {
ShredType::Data => Box::new((self.common_header, self.data_header)),
ShredType::Code => Box::new((self.common_header, self.coding_header)),
};
return Err(Error::InvalidErasureShardIndex(headers));
}
match self.shred_type() {
ShredType::Data => {
if self.index() as usize >= MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex(self.index()));
}
let _parent = self.parent()?;
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
let flags = self.data_header.flags;
if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT)
&& !flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
{
return Err(Error::InvalidShredFlags(self.data_header.flags.bits()));
}
}
ShredType::Code => {
let num_coding_shreds = u32::from(self.coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
self.coding_header.num_coding_shreds,
));
}
}
}
Ok(())
}
pub fn version(&self) -> u16 {
self.common_header.version
self.common_header().version
}
// Identifier for the erasure coding set that the shred belongs to.
@ -527,83 +407,13 @@ impl Shred {
ErasureSetId(self.slot(), self.fec_set_index())
}
// Returns the shard index within the erasure coding set.
pub(crate) fn erasure_shard_index(&self) -> Option<usize> {
match self.shred_type() {
ShredType::Data => {
let index = self.index().checked_sub(self.fec_set_index())?;
usize::try_from(index).ok()
}
ShredType::Code => {
// Assert that the last shred index in the erasure set does not
// overshoot u32.
self.fec_set_index().checked_add(u32::from(
self.coding_header.num_data_shreds.checked_sub(1)?,
))?;
self.first_coding_index()?.checked_add(u32::from(
self.coding_header.num_coding_shreds.checked_sub(1)?,
))?;
let num_data_shreds = usize::from(self.coding_header.num_data_shreds);
let num_coding_shreds = usize::from(self.coding_header.num_coding_shreds);
let position = usize::from(self.coding_header.position);
let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?;
let index = position.checked_add(num_data_shreds)?;
(index < fec_set_size).then(|| index)
}
}
}
// Returns the portion of the shred's payload which is erasure coded.
pub(crate) fn erasure_shard(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
let shred_type = self.shred_type();
let mut shard = self.payload;
match shred_type {
ShredType::Data => {
shard.resize(ENCODED_PAYLOAD_SIZE, 0u8);
}
ShredType::Code => {
// SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the
// coding shreds contains the header and is not part of erasure
// coding.
shard.drain(..SIZE_OF_CODING_SHRED_HEADERS);
}
}
Ok(shard)
}
// Like Shred::erasure_shard but returning a slice.
pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
Ok(match self.shred_type() {
ShredType::Data => &self.payload[..ENCODED_PAYLOAD_SIZE],
ShredType::Code => &self.payload[SIZE_OF_CODING_SHRED_HEADERS..],
})
}
pub fn set_index(&mut self, index: u32) {
self.common_header.index = index;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
pub fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
pub fn signature(&self) -> Signature {
self.common_header.signature
self.common_header().signature
}
pub fn sign(&mut self, keypair: &Keypair) {
let signature = keypair.sign_message(&self.payload[SIZE_OF_SIGNATURE..]);
bincode::serialize_into(&mut self.payload[..SIZE_OF_SIGNATURE], &signature)
.expect("Failed to generate serialized signature");
self.common_header.signature = signature;
let signature = keypair.sign_message(self.signed_payload());
self.set_signature(signature);
}
pub fn seed(&self, leader_pubkey: Pubkey) -> [u8; 32] {
@ -617,7 +427,17 @@ impl Shred {
#[inline]
pub fn shred_type(&self) -> ShredType {
self.common_header.shred_type
self.common_header().shred_type
}
fn shred_type_from_payload(shred: &[u8]) -> Result<ShredType, Error> {
match shred.get(OFFSET_OF_SHRED_TYPE) {
None => Err(Error::InvalidPayloadSize(shred.len())),
Some(shred_type) => match ShredType::try_from(*shred_type) {
Err(_) => Err(Error::InvalidShredType),
Ok(shred_type) => Ok(shred_type),
},
}
}
pub fn is_data(&self) -> bool {
@ -628,100 +448,87 @@ impl Shred {
}
pub fn last_in_slot(&self) -> bool {
if self.is_data() {
self.data_header
.flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT)
} else {
false
match self {
Self::ShredCode(_) => false,
Self::ShredData(shred) => shred.last_in_slot(),
}
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_last_in_slot(&mut self) {
if self.is_data() {
self.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT
match self {
Self::ShredCode(_) => (),
Self::ShredData(shred) => shred.set_last_in_slot(),
}
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}
pub fn data_complete(&self) -> bool {
if self.is_data() {
self.data_header
.flags
.contains(ShredFlags::DATA_COMPLETE_SHRED)
} else {
false
match self {
Self::ShredCode(_) => false,
Self::ShredData(shred) => shred.data_complete(),
}
}
pub(crate) fn reference_tick(&self) -> u8 {
if self.is_data() {
self.data_header.flags & ShredFlags::SHRED_TICK_REFERENCE_MASK
} else {
ShredFlags::SHRED_TICK_REFERENCE_MASK
match self {
Self::ShredCode(_) => ShredFlags::SHRED_TICK_REFERENCE_MASK.bits(),
Self::ShredData(shred) => shred.reference_tick(),
}
.bits()
}
// Get slot from a shred packet with partial deserialize
pub fn get_slot_from_packet(p: &Packet) -> Option<Slot> {
let slot_start = OFFSET_OF_SHRED_SLOT;
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
if slot_end > p.meta.size {
return None;
}
limited_deserialize::<Slot>(&p.data[slot_start..slot_end]).ok()
pub fn get_slot_from_packet(packet: &Packet) -> Option<Slot> {
let buffer = packet.data.get(OFFSET_OF_SHRED_SLOT..)?;
limited_deserialize(buffer).ok()
}
pub(crate) fn reference_tick_from_data(data: &[u8]) -> u8 {
let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER
- size_of::<u8>()
- size_of::<u16>()];
flags & ShredFlags::SHRED_TICK_REFERENCE_MASK.bits()
// TODO: should check the type bit as well!
const SHRED_FLAGS_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + std::mem::size_of::<u16>();
data[SHRED_FLAGS_OFFSET] & ShredFlags::SHRED_TICK_REFERENCE_MASK.bits()
}
pub fn verify(&self, pubkey: &Pubkey) -> bool {
self.signature()
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
let message = self.signed_payload();
self.signature().verify(pubkey.as_ref(), message)
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(crate) fn erasure_mismatch(self: &Shred, other: &Shred) -> Result<bool, Error> {
match (self.shred_type(), other.shred_type()) {
(ShredType::Code, ShredType::Code) => {
let CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position: _,
} = self.coding_header;
Ok(num_coding_shreds != other.coding_header.num_coding_shreds
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index())
}
pub(crate) fn erasure_mismatch(&self, other: &Self) -> Result<bool, Error> {
match (self, other) {
(Self::ShredCode(shred), Self::ShredCode(other)) => Ok(shred.erasure_mismatch(other)),
_ => Err(Error::InvalidShredType),
}
}
pub(crate) fn num_data_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(Error::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_data_shreds),
pub(crate) fn num_data_shreds(&self) -> Result<u16, Error> {
match self {
Self::ShredCode(shred) => Ok(shred.num_data_shreds()),
Self::ShredData(_) => Err(Error::InvalidShredType),
}
}
pub(crate) fn num_coding_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(Error::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_coding_shreds),
pub(crate) fn num_coding_shreds(&self) -> Result<u16, Error> {
match self {
Self::ShredCode(shred) => Ok(shred.num_coding_shreds()),
Self::ShredData(_) => Err(Error::InvalidShredType),
}
}
}
impl From<ShredCode> for Shred {
fn from(shred: ShredCode) -> Self {
Self::ShredCode(shred)
}
}
impl From<ShredData> for Shred {
fn from(shred: ShredData) -> Self {
Self::ShredData(shred)
}
}
// Get slot, index, and type from a packet with partial deserialize
pub fn get_shred_slot_index_type(
p: &Packet,
@ -800,7 +607,7 @@ pub fn verify_test_data_shred(
is_last_data: bool,
) {
shred.sanitize().unwrap();
assert_eq!(shred.payload.len(), SHRED_PAYLOAD_SIZE);
assert_eq!(shred.payload().len(), SHRED_PAYLOAD_SIZE);
assert!(shred.is_data());
assert_eq!(shred.index(), index);
assert_eq!(shred.slot(), slot);
@ -826,7 +633,7 @@ mod tests {
matches::assert_matches,
rand::Rng,
rand_chacha::{rand_core::SeedableRng, ChaChaRng},
solana_sdk::shred_version,
solana_sdk::{shred_version, signature::Signer},
};
fn bs58_decode<T: AsRef<[u8]>>(data: T) -> Vec<u8> {
@ -843,21 +650,31 @@ mod tests {
version: u16::MAX,
fec_set_index: u32::MAX,
};
let data_shred_header = DataShredHeader {
parent_offset: u16::MAX,
flags: ShredFlags::all(),
size: u16::MAX,
};
let coding_shred_header = CodingShredHeader {
num_data_shreds: u16::MAX,
num_coding_shreds: u16::MAX,
position: u16::MAX,
};
assert_eq!(
SIZE_OF_COMMON_SHRED_HEADER,
serialized_size(&common_header).unwrap() as usize
);
assert_eq!(
SIZE_OF_CODING_SHRED_HEADER,
serialized_size(&CodingShredHeader::default()).unwrap() as usize
serialized_size(&coding_shred_header).unwrap() as usize
);
assert_eq!(
SIZE_OF_DATA_SHRED_HEADER,
serialized_size(&DataShredHeader::default()).unwrap() as usize
serialized_size(&data_shred_header).unwrap() as usize
);
let data_shred_header_with_size = DataShredHeader {
size: 1000,
..DataShredHeader::default()
..data_shred_header
};
assert_eq!(
SIZE_OF_DATA_SHRED_HEADER,
@ -1035,146 +852,6 @@ mod tests {
);
}
#[test]
fn test_sanitize_data_shred() {
let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let mut shred = Shred::new_from_data(
420, // slot
19, // index
5, // parent_offset
&data,
ShredFlags::DATA_COMPLETE_SHRED,
3, // reference_tick
1, // version
16, // fec_set_index
);
assert_matches!(shred.sanitize(), Ok(()));
// Corrupt shred by making it too large
{
let mut shred = shred.clone();
shred.payload.push(10u8);
assert_matches!(shred.sanitize(), Err(Error::InvalidPayloadSize(1229)));
}
{
let mut shred = shred.clone();
shred.data_header.size += 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1140,
payload: 1228,
})
);
}
{
let mut shred = shred.clone();
shred.data_header.size = 0;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 0,
payload: 1228,
})
);
}
{
let mut shred = shred.clone();
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
assert_matches!(shred.sanitize(), Err(Error::InvalidDataShredIndex(32768)));
}
{
let mut shred = shred.clone();
shred.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT;
assert_matches!(shred.sanitize(), Ok(()));
shred.data_header.flags &= !ShredFlags::DATA_COMPLETE_SHRED;
assert_matches!(shred.sanitize(), Err(Error::InvalidShredFlags(131u8)));
shred.data_header.flags |= ShredFlags::SHRED_TICK_REFERENCE_MASK;
assert_matches!(shred.sanitize(), Err(Error::InvalidShredFlags(191u8)));
}
{
shred.data_header.size = shred.payload().len() as u16 + 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1229,
payload: 1228,
})
);
}
}
#[test]
fn test_sanitize_coding_shred() {
let mut shred = Shred::new_from_parity_shard(
1, // slot
12, // index
&[], // parity_shard
11, // fec_set_index
11, // num_data_shreds
11, // num_coding_shreds
8, // position
0, // version
);
assert_matches!(shred.sanitize(), Ok(()));
// index < position is invalid.
{
let mut shred = shred.clone();
let index = shred.index() - shred.fec_set_index() - 1;
shred.set_index(index as u32);
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
{
let mut shred = shred.clone();
shred.coding_header.num_coding_shreds = 0;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
// pos >= num_coding is invalid.
{
let mut shred = shred.clone();
let num_coding_shreds = shred.index() - shred.fec_set_index();
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
// set_index with num_coding that would imply the last
// shred has index > u32::MAX should fail.
{
let mut shred = shred.clone();
shred.common_header.fec_set_index = std::u32::MAX - 1;
shred.coding_header.num_data_shreds = 2;
shred.coding_header.num_coding_shreds = 4;
shred.coding_header.position = 1;
shred.common_header.index = std::u32::MAX - 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
shred.coding_header.num_coding_shreds = 2000;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
// Decreasing the number of num_coding_shreds will put it within
// the allowed limit.
shred.coding_header.num_coding_shreds = 2;
assert_matches!(shred.sanitize(), Ok(()));
}
{
shred.coding_header.num_coding_shreds = u16::MAX;
assert_matches!(shred.sanitize(), Err(Error::InvalidNumCodingShreds(65535)));
}
}
#[test]
fn test_serde_compat_shred_data() {
const SEED: &str = "6qG9NGWEtoTugS4Zgs46u8zTccEJuRHtrNMiUayLHCxt";
@ -1275,7 +952,7 @@ mod tests {
let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap();
ChaChaRng::from_seed(seed)
};
let mut parity_shard = vec![0u8; ENCODED_PAYLOAD_SIZE];
let mut parity_shard = vec![0u8; /*ENCODED_PAYLOAD_SIZE:*/ 1139];
rng.fill(&mut parity_shard[..]);
let keypair = Keypair::generate(&mut rng);
let mut shred = Shred::new_from_parity_shard(
@ -1340,7 +1017,7 @@ mod tests {
assert_eq!(shred.last_in_slot(), is_last_in_slot);
assert_eq!(shred.reference_tick(), reference_tick.min(63u8));
assert_eq!(
Shred::reference_tick_from_data(&shred.payload),
Shred::reference_tick_from_data(shred.payload()),
reference_tick.min(63u8),
);
}

530
ledger/src/shred/legacy.rs Normal file
View File

@ -0,0 +1,530 @@
use {
crate::shred::{
traits::{Shred, ShredCode as _, ShredData as _},
CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredType,
MAX_DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SHRED_DATA_OFFSET,
SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER,
SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, SIZE_OF_SIGNATURE,
},
solana_perf::packet::deserialize_from_with_limit,
solana_sdk::{clock::Slot, signature::Signature},
static_assertions::const_assert_eq,
std::{io::Cursor, ops::RangeInclusive},
};
// DataShredHeader.size is sum of common-shred-header, data-shred-header and
// data.len(). Broadcast stage may create zero length data shreds when the
// previous slot was interrupted:
// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79
const DATA_SHRED_SIZE_RANGE: RangeInclusive<usize> =
SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD;
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
// is never used and is not part of erasure coding.
const_assert_eq!(ENCODED_PAYLOAD_SIZE, 1139);
const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
#[derive(Clone, Debug, PartialEq)]
pub struct ShredData {
common_header: ShredCommonHeader,
data_header: DataShredHeader,
payload: Vec<u8>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ShredCode {
common_header: ShredCommonHeader,
coding_header: CodingShredHeader,
payload: Vec<u8>,
}
macro_rules! impl_shred_common {
() => {
#[inline]
fn common_header(&self) -> &ShredCommonHeader {
&self.common_header
}
#[inline]
fn payload(&self) -> &Vec<u8> {
&self.payload
}
fn into_payload(self) -> Vec<u8> {
self.payload
}
fn set_signature(&mut self, signature: Signature) {
bincode::serialize_into(&mut self.payload[..], &signature).unwrap();
self.common_header.signature = signature;
}
// Only for tests.
fn set_index(&mut self, index: u32) {
self.common_header.index = index;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
// Only for tests.
fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
};
}
impl Shred for ShredData {
impl_shred_common!();
fn from_payload(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut cursor = Cursor::new(&payload[..]);
let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?;
if common_header.shred_type != ShredType::Data {
return Err(Error::InvalidShredType);
}
let data_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/16602
payload.resize(SHRED_PAYLOAD_SIZE, 0u8);
let shred = Self {
common_header,
data_header,
payload,
};
shred.sanitize().map(|_| shred)
}
fn erasure_shard_index(&self) -> Option<usize> {
let fec_set_index = self.common_header.fec_set_index;
let index = self.common_header.index.checked_sub(fec_set_index)?;
usize::try_from(index).ok()
}
fn erasure_shard(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
let mut shard = self.payload;
shard.resize(ENCODED_PAYLOAD_SIZE, 0u8);
Ok(shard)
}
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
Ok(&self.payload[..ENCODED_PAYLOAD_SIZE])
}
fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
// TODO: assert that this is the right type!
if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Ok(shred)
}
fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
if self.erasure_shard_index().is_none() {
let headers = Box::new((self.common_header, self.data_header));
return Err(Error::InvalidErasureShardIndex(headers));
}
if self.common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex(self.common_header.index));
}
let _parent = self.parent()?;
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
let flags = self.data_header.flags;
if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT)
&& !flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
{
return Err(Error::InvalidShredFlags(self.data_header.flags.bits()));
}
Ok(())
}
fn signed_payload(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE);
&self.payload[SIZE_OF_SIGNATURE..]
}
}
impl Shred for ShredCode {
impl_shred_common!();
fn from_payload(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut cursor = Cursor::new(&payload[..]);
let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?;
if common_header.shred_type != ShredType::Code {
return Err(Error::InvalidShredType);
}
let coding_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/10109
payload.truncate(SHRED_PAYLOAD_SIZE);
let shred = Self {
common_header,
coding_header,
payload,
};
shred.sanitize().map(|_| shred)
}
fn erasure_shard_index(&self) -> Option<usize> {
// Assert that the last shred index in the erasure set does not
// overshoot u32.
self.common_header.fec_set_index.checked_add(u32::from(
self.coding_header.num_data_shreds.checked_sub(1)?,
))?;
self.first_coding_index()?.checked_add(u32::from(
self.coding_header.num_coding_shreds.checked_sub(1)?,
))?;
let num_data_shreds = usize::from(self.coding_header.num_data_shreds);
let num_coding_shreds = usize::from(self.coding_header.num_coding_shreds);
let position = usize::from(self.coding_header.position);
let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?;
let index = position.checked_add(num_data_shreds)?;
(index < fec_set_size).then(|| index)
}
fn erasure_shard(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
let mut shard = self.payload;
// SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the
// coding shreds contains the header and is not part of erasure
// coding.
shard.drain(..SIZE_OF_CODING_SHRED_HEADERS);
Ok(shard)
}
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
Ok(&self.payload[SIZE_OF_CODING_SHRED_HEADERS..])
}
fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error> {
if shred.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(shred.len()));
}
Ok(shred)
}
fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
if self.erasure_shard_index().is_none() {
let headers = Box::new((self.common_header, self.coding_header));
return Err(Error::InvalidErasureShardIndex(headers));
}
let num_coding_shreds = u32::from(self.coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
self.coding_header.num_coding_shreds,
));
}
Ok(())
}
fn signed_payload(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE);
&self.payload[SIZE_OF_SIGNATURE..]
}
}
impl super::traits::ShredData for ShredData {
#[inline]
fn data_header(&self) -> &DataShredHeader {
&self.data_header
}
fn data(&self) -> Result<&[u8], Error> {
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
Ok(&self.payload[SHRED_DATA_OFFSET..size])
}
fn bytes_to_store(&self) -> &[u8] {
// Payload will be padded out to SHRED_PAYLOAD_SIZE.
// But only need to store the bytes within data_header.size.
&self.payload[..self.data_header.size as usize]
}
// Only for tests.
fn set_last_in_slot(&mut self) {
self.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT;
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}
}
impl super::traits::ShredCode for ShredCode {
#[inline]
fn coding_header(&self) -> &CodingShredHeader {
&self.coding_header
}
}
impl ShredData {
pub(super) fn new_from_data(
slot: Slot,
index: u32,
parent_offset: u16,
data: &[u8],
flags: ShredFlags,
reference_tick: u8,
version: u16,
fec_set_index: u32,
) -> Self {
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let common_header = ShredCommonHeader {
signature: Signature::default(),
shred_type: ShredType::Data,
slot,
index,
version,
fec_set_index,
};
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16;
let flags = flags
| unsafe {
ShredFlags::from_bits_unchecked(
ShredFlags::SHRED_TICK_REFERENCE_MASK
.bits()
.min(reference_tick),
)
};
let data_header = DataShredHeader {
parent_offset,
flags,
size,
};
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &data_header).unwrap();
// TODO: Need to check if data is too large!
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SHRED_DATA_OFFSET);
payload[offset..offset + data.len()].copy_from_slice(data);
Self {
common_header,
data_header,
payload,
}
}
}
impl ShredCode {
pub(super) fn new_from_parity_shard(
slot: Slot,
index: u32,
parity_shard: &[u8],
fec_set_index: u32,
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
version: u16,
) -> Self {
let common_header = ShredCommonHeader {
signature: Signature::default(),
shred_type: ShredType::Code,
index,
slot,
version,
fec_set_index,
};
let coding_header = CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position,
};
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &coding_header).unwrap();
// Tests may have an empty parity_shard.
if !parity_shard.is_empty() {
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SIZE_OF_CODING_SHRED_HEADERS);
payload[offset..].copy_from_slice(parity_shard);
}
Self {
common_header,
coding_header,
payload,
}
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(super) fn erasure_mismatch(&self, other: &ShredCode) -> bool {
let CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position: _,
} = self.coding_header;
num_coding_shreds != other.coding_header.num_coding_shreds
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index()
}
}
#[cfg(test)]
mod test {
use {super::*, matches::assert_matches};
#[test]
fn test_sanitize_data_shred() {
let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let mut shred = ShredData::new_from_data(
420, // slot
19, // index
5, // parent_offset
&data,
ShredFlags::DATA_COMPLETE_SHRED,
3, // reference_tick
1, // version
16, // fec_set_index
);
assert_matches!(shred.sanitize(), Ok(()));
// Corrupt shred by making it too large
{
let mut shred = shred.clone();
shred.payload.push(10u8);
assert_matches!(shred.sanitize(), Err(Error::InvalidPayloadSize(1229)));
}
{
let mut shred = shred.clone();
shred.data_header.size += 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1140,
payload: 1228,
})
);
}
{
let mut shred = shred.clone();
shred.data_header.size = 0;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 0,
payload: 1228,
})
);
}
{
let mut shred = shred.clone();
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
assert_matches!(shred.sanitize(), Err(Error::InvalidDataShredIndex(32768)));
}
{
let mut shred = shred.clone();
shred.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT;
assert_matches!(shred.sanitize(), Ok(()));
shred.data_header.flags &= !ShredFlags::DATA_COMPLETE_SHRED;
assert_matches!(shred.sanitize(), Err(Error::InvalidShredFlags(131u8)));
shred.data_header.flags |= ShredFlags::SHRED_TICK_REFERENCE_MASK;
assert_matches!(shred.sanitize(), Err(Error::InvalidShredFlags(191u8)));
}
{
shred.data_header.size = shred.payload().len() as u16 + 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1229,
payload: 1228,
})
);
}
}
#[test]
fn test_sanitize_coding_shred() {
let mut shred = ShredCode::new_from_parity_shard(
1, // slot
12, // index
&[], // parity_shard
11, // fec_set_index
11, // num_data_shreds
11, // num_coding_shreds
8, // position
0, // version
);
assert_matches!(shred.sanitize(), Ok(()));
// index < position is invalid.
{
let mut shred = shred.clone();
let index = shred.common_header.index - shred.common_header.fec_set_index - 1;
shred.set_index(index as u32);
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
{
let mut shred = shred.clone();
shred.coding_header.num_coding_shreds = 0;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
// pos >= num_coding is invalid.
{
let mut shred = shred.clone();
let num_coding_shreds = shred.common_header.index - shred.common_header.fec_set_index;
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
}
// set_index with num_coding that would imply the last
// shred has index > u32::MAX should fail.
{
let mut shred = shred.clone();
shred.common_header.fec_set_index = std::u32::MAX - 1;
shred.coding_header.num_data_shreds = 2;
shred.coding_header.num_coding_shreds = 4;
shred.coding_header.position = 1;
shred.common_header.index = std::u32::MAX - 1;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
shred.coding_header.num_coding_shreds = 2000;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
// Decreasing the number of num_coding_shreds will put it within
// the allowed limit.
shred.coding_header.num_coding_shreds = 2;
assert_matches!(shred.sanitize(), Ok(()));
}
{
shred.coding_header.num_coding_shreds = u16::MAX;
assert_matches!(shred.sanitize(), Err(Error::InvalidNumCodingShreds(65535)));
}
}
}

View File

@ -0,0 +1,93 @@
use {
crate::shred::{CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags},
solana_sdk::{clock::Slot, signature::Signature},
};
pub(super) trait Shred: Sized {
fn from_payload(shred: Vec<u8>) -> Result<Self, Error>;
fn common_header(&self) -> &ShredCommonHeader;
fn sanitize(&self) -> Result<(), Error>;
fn set_signature(&mut self, signature: Signature);
fn payload(&self) -> &Vec<u8>;
fn into_payload(self) -> Vec<u8>;
// Returns the shard index within the erasure coding set.
fn erasure_shard_index(&self) -> Option<usize>;
// Returns the portion of the shred's payload which is erasure coded.
fn erasure_shard(self) -> Result<Vec<u8>, Error>;
// Like Shred::erasure_shard but returning a slice.
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>;
// Portion of the payload which is signed.
fn signed_payload(&self) -> &[u8];
// Possibly zero pads bytes stored in blockstore.
fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error>;
// Only for tests.
fn set_index(&mut self, index: u32);
fn set_slot(&mut self, slot: Slot);
}
pub(super) trait ShredData: Shred {
fn data_header(&self) -> &DataShredHeader;
fn parent(&self) -> Result<Slot, Error> {
let slot = self.common_header().slot;
let parent_offset = self.data_header().parent_offset;
if parent_offset == 0 && slot != 0 {
return Err(Error::InvalidParentOffset {
slot,
parent_offset,
});
}
slot.checked_sub(Slot::from(parent_offset))
.ok_or(Error::InvalidParentOffset {
slot,
parent_offset,
})
}
fn data(&self) -> Result<&[u8], Error>;
// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
fn bytes_to_store(&self) -> &[u8];
fn last_in_slot(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::LAST_SHRED_IN_SLOT)
}
fn data_complete(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
}
fn reference_tick(&self) -> u8 {
let flags = self.data_header().flags;
(flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits()
}
// Only for tests.
fn set_last_in_slot(&mut self);
}
pub(super) trait ShredCode: Shred {
fn coding_header(&self) -> &CodingShredHeader;
fn first_coding_index(&self) -> Option<u32> {
let position = u32::from(self.coding_header().position);
self.common_header().index.checked_sub(position)
}
fn num_data_shreds(&self) -> u16 {
self.coding_header().num_data_shreds
}
fn num_coding_shreds(&self) -> u16 {
self.coding_header().num_coding_shreds
}
}

View File

@ -1,9 +1,7 @@
use {
crate::{
shred::{
Error, Shred, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred_stats::ProcessShredsStats,
crate::shred::{
Error, ProcessShredsStats, Shred, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SIZE_OF_DATA_SHRED_PAYLOAD,
},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},