returns Error when Shred::sanitize fails (#24653)

Including the error in the output allows to debug when Shred::sanitize
fails.
This commit is contained in:
behzad nouri 2022-04-25 23:19:37 +00:00 committed by GitHub
parent d6869773bb
commit 12ae8d3be5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 158 additions and 104 deletions

View File

@ -193,7 +193,7 @@ pub(crate) fn should_retransmit_and_persist(
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if !shred.sanitize() {
} else if shred.sanitize().is_err() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {

View File

@ -3,7 +3,7 @@ use {
itertools::Itertools,
solana_ledger::{
blockstore_meta::DuplicateSlotProof,
shred::{Shred, ShredError, ShredType},
shred::{self, Shred, ShredType},
},
solana_sdk::{
clock::Slot,
@ -55,8 +55,8 @@ pub enum Error {
InvalidSignature,
#[error("invalid size limit")]
InvalidSizeLimit,
#[error("invalid shred")]
InvalidShred(#[from] ShredError),
#[error(transparent)]
InvalidShred(#[from] shred::Error),
#[error("number of chunks mismatch")]
NumChunksMismatch,
#[error("missing data chunk")]

View File

@ -12,8 +12,8 @@ use {
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType,
Shredder, SHRED_PAYLOAD_SIZE,
self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder,
SHRED_PAYLOAD_SIZE,
},
slot_stats::{ShredSource, SlotsStats},
},
@ -1382,7 +1382,7 @@ impl Blockstore {
}
fn should_insert_coding_shred(shred: &Shred, last_root: &RwLock<u64>) -> bool {
shred.is_code() && shred.sanitize() && shred.slot() > *last_root.read().unwrap()
shred.is_code() && shred.sanitize().is_ok() && shred.slot() > *last_root.read().unwrap()
}
fn insert_coding_shred(
@ -1396,7 +1396,7 @@ impl Blockstore {
// Assert guaranteed by integrity checks on the shred that happen before
// `insert_coding_shred` is called
assert!(shred.is_code() && shred.sanitize());
assert!(shred.is_code() && shred.sanitize().is_ok());
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
@ -1445,7 +1445,7 @@ impl Blockstore {
} else {
false
};
if !shred.sanitize() {
if let Err(err) = shred.sanitize() {
let leader_pubkey = leader_schedule
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
@ -1454,8 +1454,8 @@ impl Blockstore {
(
"error",
format!(
"Leader {:?}, slot {}: received invalid shred",
leader_pubkey, slot,
"Leader {:?}, slot {}: received invalid shred: {:?}",
leader_pubkey, slot, err,
),
String
)
@ -1652,7 +1652,7 @@ impl Blockstore {
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
) -> std::result::Result<Vec<Shred>, shred::Error> {
self.slot_data_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|data| Shred::new_from_serialized_shred(data.1.to_vec()))
@ -1708,7 +1708,7 @@ impl Blockstore {
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
) -> std::result::Result<Vec<Shred>, shred::Error> {
self.slot_coding_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|code| Shred::new_from_serialized_shred(code.1.to_vec()))

View File

@ -67,7 +67,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
std::{cell::RefCell, mem::size_of, ops::RangeInclusive},
std::{cell::RefCell, fmt::Debug, mem::size_of, ops::RangeInclusive},
thiserror::Error,
};
@ -116,31 +116,28 @@ pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111;
const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000;
const DATA_COMPLETE_SHRED: u8 = 0b0100_0000;
#[derive(Error, Debug)]
pub enum ShredError {
#[error("invalid shred type")]
InvalidShredType,
#[error("invalid FEC rate; must be 0.0 < {0} < 1.0")]
InvalidFecRate(f32),
#[error("slot too low; current slot {slot} must be above parent slot {parent_slot}, but the difference must be below u16::MAX")]
SlotTooLow { slot: Slot, parent_slot: Slot },
#[error("serialization error")]
Serialize(#[from] Box<bincode::ErrorKind>),
#[error(
"invalid parent offset; parent_offset {parent_offset} must be larger than slot {slot}"
)]
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
BincodeError(#[from] bincode::Error),
#[error("Invalid data shred index: {index}")]
InvalidDataShredIndex { index: u32 },
#[error("Invalid data size: {size}, payload: {payload}")]
InvalidDataSize { size: u16, payload: usize },
#[error("Invalid erasure block index: {0:?}")]
InvalidErasureBlockIndex(Box<dyn Debug>),
#[error("Invalid num coding shreds: {0}")]
InvalidNumCodingShreds(u16),
#[error("Invalid parent_offset: {parent_offset}, slot: {slot}")]
InvalidParentOffset { slot: Slot, parent_offset: u16 },
#[error("invalid payload")]
InvalidPayload,
#[error("Invalid parent slot: {parent_slot}, slot: {slot}")]
InvalidParentSlot { slot: Slot, parent_slot: Slot },
#[error("Invalid payload size: {size}")]
InvalidPayloadSize { size: usize },
#[error("Invalid shred type: {0:?}")]
InvalidShredType(ShredType),
}
pub type Result<T> = std::result::Result<T, ShredError>;
#[repr(u8)]
#[derive(
Clone,
@ -169,7 +166,7 @@ impl Default for ShredType {
}
/// A common header that is present in data and code shred headers
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
struct ShredCommonHeader {
signature: Signature,
shred_type: ShredType,
@ -180,7 +177,7 @@ struct ShredCommonHeader {
}
/// The data shred header has parent offset and flags
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
struct DataShredHeader {
parent_offset: u16,
flags: u8,
@ -188,7 +185,7 @@ struct DataShredHeader {
}
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Deserialize, Serialize)]
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
@ -335,7 +332,7 @@ impl Shred {
}
}
pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self> {
pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
@ -362,11 +359,7 @@ impl Shred {
coding_header,
payload,
};
// TODO: Should return why sanitize failed.
shred
.sanitize()
.then(|| shred)
.ok_or(ShredError::InvalidPayload)
shred.sanitize().map(|_| shred)
}
pub fn new_empty_coding(
@ -449,24 +442,24 @@ impl Shred {
self.common_header.slot
}
pub fn parent(&self) -> Result<Slot> {
pub fn parent(&self) -> Result<Slot, Error> {
match self.shred_type() {
ShredType::Data => {
let slot = self.slot();
let parent_offset = Slot::from(self.data_header.parent_offset);
let parent_offset = self.data_header.parent_offset;
if parent_offset == 0 && slot != 0 {
return Err(ShredError::InvalidParentOffset {
return Err(Error::InvalidParentOffset {
slot,
parent_offset: 0,
parent_offset,
});
}
slot.checked_sub(parent_offset)
.ok_or(ShredError::InvalidParentOffset {
slot.checked_sub(Slot::from(parent_offset))
.ok_or(Error::InvalidParentOffset {
slot,
parent_offset: self.data_header.parent_offset,
parent_offset,
})
}
ShredType::Code => Err(ShredError::InvalidShredType),
ShredType::Code => Err(Error::InvalidShredType(ShredType::Code)),
}
}
@ -511,23 +504,45 @@ impl Shred {
}
// Returns true if the shred passes sanity checks.
// TODO: Should return why sanitize failed!
pub fn sanitize(&self) -> bool {
self.payload.len() <= SHRED_PAYLOAD_SIZE
&& self.erasure_block_index().is_some()
&& match self.shred_type() {
ShredType::Data => {
let size = usize::from(self.data_header.size);
self.index() < MAX_DATA_SHREDS_PER_SLOT as u32
&& self.parent().is_ok()
&& size <= self.payload.len()
&& DATA_SHRED_SIZE_RANGE.contains(&size)
pub fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() > SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize {
size: self.payload.len(),
});
}
if self.erasure_block_index().is_none() {
let headers: Box<dyn Debug> = match self.shred_type() {
ShredType::Data => Box::new((self.common_header, self.data_header)),
ShredType::Code => Box::new((self.common_header, self.coding_header)),
};
return Err(Error::InvalidErasureBlockIndex(headers));
}
match self.shred_type() {
ShredType::Data => {
if self.index() as usize > MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex {
index: self.index(),
});
}
ShredType::Code => {
u32::from(self.coding_header.num_coding_shreds)
<= 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK
let _parent = self.parent()?;
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
}
ShredType::Code => {
let num_coding_shreds = u32::from(self.coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
self.coding_header.num_coding_shreds,
));
}
}
}
Ok(())
}
pub fn version(&self) -> u16 {
@ -716,7 +731,7 @@ impl Shred {
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(crate) fn erasure_mismatch(self: &Shred, other: &Shred) -> Result<bool> {
pub(crate) fn erasure_mismatch(self: &Shred, other: &Shred) -> Result<bool, Error> {
match (self.shred_type(), other.shred_type()) {
(ShredType::Code, ShredType::Code) => {
let CodingShredHeader {
@ -728,20 +743,20 @@ impl Shred {
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index())
}
_ => Err(ShredError::InvalidShredType),
_ => Err(Error::InvalidShredType(ShredType::Data)),
}
}
pub(crate) fn num_data_shreds(self: &Shred) -> Result<u16> {
pub(crate) fn num_data_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(ShredError::InvalidShredType),
ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)),
ShredType::Code => Ok(self.coding_header.num_data_shreds),
}
}
pub(crate) fn num_coding_shreds(self: &Shred) -> Result<u16> {
pub(crate) fn num_coding_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(ShredError::InvalidShredType),
ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)),
ShredType::Code => Ok(self.coding_header.num_coding_shreds),
}
}
@ -756,9 +771,14 @@ pub struct Shredder {
}
impl Shredder {
pub fn new(slot: Slot, parent_slot: Slot, reference_tick: u8, version: u16) -> Result<Self> {
pub fn new(
slot: Slot,
parent_slot: Slot,
reference_tick: u8,
version: u16,
) -> Result<Self, Error> {
if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) {
Err(ShredError::SlotTooLow { slot, parent_slot })
Err(Error::InvalidParentSlot { slot, parent_slot })
} else {
Ok(Self {
slot,
@ -876,7 +896,7 @@ impl Shredder {
is_last_in_slot: bool,
next_code_index: u32,
process_stats: &mut ProcessShredsStats,
) -> Result<Vec<Shred>> {
) -> Result<Vec<Shred>, Error> {
if data_shreds.is_empty() {
return Ok(Vec::default());
}
@ -985,9 +1005,7 @@ impl Shredder {
.collect()
}
pub fn try_recovery(
shreds: Vec<Shred>,
) -> std::result::Result<Vec<Shred>, reed_solomon_erasure::Error> {
pub fn try_recovery(shreds: Vec<Shred>) -> Result<Vec<Shred>, reed_solomon_erasure::Error> {
use reed_solomon_erasure::Error::InvalidIndex;
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
let (slot, fec_set_index) = match shreds.first() {
@ -1050,7 +1068,7 @@ impl Shredder {
}
/// Combines all shreds to recreate the original buffer
pub fn deshred(shreds: &[Shred]) -> std::result::Result<Vec<u8>, reed_solomon_erasure::Error> {
pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
use reed_solomon_erasure::Error::TooFewDataShards;
Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?;
let index = shreds.first().ok_or(TooFewDataShards)?.index();
@ -1085,7 +1103,7 @@ impl Shredder {
fn verify_consistent_shred_payload_sizes(
caller: &str,
shreds: &[Shred],
) -> std::result::Result<(), reed_solomon_erasure::Error> {
) -> Result<(), reed_solomon_erasure::Error> {
if shreds.is_empty() {
return Err(reed_solomon_erasure::Error::TooFewShardsPresent);
}
@ -1270,18 +1288,12 @@ mod tests {
// Test that parent cannot be > current slot
assert_matches!(
Shredder::new(slot, slot + 1, 0, 0),
Err(ShredError::SlotTooLow {
slot: _,
parent_slot: _,
})
Err(Error::InvalidParentSlot { .. })
);
// Test that slot - parent cannot be > u16 MAX
assert_matches!(
Shredder::new(slot, slot - 1 - 0xffff, 0, 0),
Err(ShredError::SlotTooLow {
slot: _,
parent_slot: _,
})
Err(Error::InvalidParentSlot { .. })
);
let parent_slot = slot - 5;
let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap();
@ -1966,12 +1978,18 @@ mod tests {
let shred_res = Shred::new_from_serialized_shred(packet.data.to_vec());
assert_matches!(
shred.parent(),
Err(ShredError::InvalidParentOffset {
Err(Error::InvalidParentOffset {
slot: 10,
parent_offset: 1000
})
);
assert_matches!(
shred_res,
Err(Error::InvalidParentOffset {
slot: 10,
parent_offset: 1000
})
);
assert_matches!(shred_res, Err(ShredError::InvalidPayload));
}
#[test]
@ -2086,26 +2104,47 @@ mod tests {
1, // version
16, // fec_set_index
);
assert!(shred.sanitize());
assert_matches!(shred.sanitize(), Ok(()));
// Corrupt shred by making it too large
{
let mut shred = shred.clone();
shred.payload.push(10u8);
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidPayloadSize { size: 1229 })
);
}
{
let mut shred = shred.clone();
shred.data_header.size += 1;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1140,
payload: 1228,
})
);
}
{
let mut shred = shred.clone();
shred.data_header.size = 0;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 0,
payload: 1228,
})
);
}
{
shred.data_header.size = shred.payload().len() as u16 + 1;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidDataSize {
size: 1229,
payload: 1228,
})
);
}
}
@ -2120,25 +2159,34 @@ mod tests {
8, // position
0, // version
);
assert!(shred.sanitize());
assert_matches!(shred.sanitize(), Ok(()));
// index < position is invalid.
{
let mut shred = shred.clone();
let index = shred.index() - shred.fec_set_index() - 1;
shred.set_index(index as u32);
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureBlockIndex { .. })
);
}
{
let mut shred = shred.clone();
shred.coding_header.num_coding_shreds = 0;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureBlockIndex { .. })
);
}
// pos >= num_coding is invalid.
{
let mut shred = shred.clone();
let num_coding_shreds = shred.index() - shred.fec_set_index();
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureBlockIndex { .. })
);
}
// set_index with num_coding that would imply the last
// shred has index > u32::MAX should fail.
@ -2149,19 +2197,25 @@ mod tests {
shred.coding_header.num_coding_shreds = 4;
shred.coding_header.position = 1;
shred.common_header.index = std::u32::MAX - 1;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureBlockIndex { .. })
);
shred.coding_header.num_coding_shreds = 2000;
assert!(!shred.sanitize());
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureBlockIndex { .. })
);
// Decreasing the number of num_coding_shreds will put it within
// the allowed limit.
shred.coding_header.num_coding_shreds = 2;
assert!(shred.sanitize());
assert_matches!(shred.sanitize(), Ok(()));
}
{
shred.coding_header.num_coding_shreds = u16::MAX;
assert!(!shred.sanitize());
assert_matches!(shred.sanitize(), Err(Error::InvalidNumCodingShreds(65535)));
}
}
}