From 008860bb3679d3709041af78f5337faf11ced6bd Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 28 Apr 2022 23:42:37 +0000 Subject: [PATCH] removes SHRED_PAYLOAD_SIZE from shred public interface (#24806) --- ledger/src/blockstore.rs | 23 +++++---------- ledger/src/shred.rs | 63 ++++++++++++++++++++++++++++++---------- ledger/src/shredder.rs | 47 +++++------------------------- 3 files changed, 63 insertions(+), 70 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 1f2dd00830..44d2f3f30c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -11,10 +11,7 @@ use { blockstore_meta::*, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, - shred::{ - self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder, - SHRED_PAYLOAD_SIZE, - }, + shred::{self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder}, slot_stats::{ShredSource, SlotsStats}, }, bincode::deserialize, @@ -1637,14 +1634,12 @@ impl Blockstore { } pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - self.data_shred_cf.get_bytes((slot, index)).map(|data| { - data.map(|mut d| { - // Only data_header.size bytes stored in the blockstore so - // pad the payload out to SHRED_PAYLOAD_SIZE so that the - // erasure recovery works properly. - d.resize(cmp::max(d.len(), SHRED_PAYLOAD_SIZE), 0); - d - }) + let shred = self.data_shred_cf.get_bytes((slot, index))?; + let shred = shred.map(Shred::resize_stored_shred).transpose(); + shred.map_err(|err| { + let err = format!("Invalid stored shred: {}", err); + let err = Box::new(bincode::ErrorKind::Custom(err)); + BlockstoreError::InvalidShredData(err) }) } @@ -3107,15 +3102,13 @@ impl Blockstore { // Returns the existing shred if `new_shred` is not equal to the existing shred at the // given slot and index as this implies the leader generated two different shreds with // the same slot and index - pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec) -> Option> { + pub fn is_shred_duplicate(&self, shred: ShredId, payload: Vec) -> Option> { let (slot, index, shred_type) = shred.unwrap(); let existing_shred = match shred_type { ShredType::Data => self.get_data_shred(slot, index as u64), ShredType::Code => self.get_coding_shred(slot, index as u64), } .expect("fetch from DuplicateSlots column family failed")?; - let size = payload.len().max(SHRED_PAYLOAD_SIZE); - payload.resize(size, 0u8); let new_shred = Shred::new_from_serialized_shred(payload).unwrap(); (existing_shred != *new_shred.payload()).then(|| existing_shred) } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index ef102a5e76..774d6f0b31 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -102,7 +102,7 @@ const DATA_SHRED_SIZE_RANGE: RangeInclusive = const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; -pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; +const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; // SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds // is never used and is not part of erasure coding. const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; @@ -133,8 +133,8 @@ pub enum Error { InvalidParentSlot { slot: Slot, parent_slot: Slot }, #[error("Invalid payload size: {size}")] InvalidPayloadSize { size: usize }, - #[error("Invalid shred type: {0:?}")] - InvalidShredType(ShredType), + #[error("Invalid shred type")] + InvalidShredType, } #[repr(u8)] @@ -432,7 +432,7 @@ impl Shred { parent_offset, }) } - ShredType::Code => Err(Error::InvalidShredType(ShredType::Code)), + ShredType::Code => Err(Error::InvalidShredType), } } @@ -442,7 +442,7 @@ impl Shred { pub(crate) fn data(&self) -> Result<&[u8], Error> { match self.shred_type() { - ShredType::Code => Err(Error::InvalidShredType(ShredType::Code)), + ShredType::Code => Err(Error::InvalidShredType), ShredType::Data => { let size = usize::from(self.data_header.size); if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) { @@ -474,6 +474,27 @@ impl Shred { } } + // Possibly zero pads bytes stored in blockstore. + pub(crate) fn resize_stored_shred(mut shred: Vec) -> Result, Error> { + let shred_type = match shred.get(OFFSET_OF_SHRED_TYPE) { + None => return Err(Error::InvalidPayloadSize { size: shred.len() }), + Some(shred_type) => match ShredType::try_from(*shred_type) { + Err(_) => return Err(Error::InvalidShredType), + Ok(shred_type) => shred_type, + }, + }; + match shred_type { + ShredType::Code => Ok(shred), + ShredType::Data => { + if shred.len() > SHRED_PAYLOAD_SIZE { + return Err(Error::InvalidPayloadSize { size: shred.len() }); + } + shred.resize(SHRED_PAYLOAD_SIZE, 0u8); + Ok(shred) + } + } + } + pub fn into_payload(self) -> Vec { self.payload } @@ -494,7 +515,7 @@ impl Shred { // Returns true if the shred passes sanity checks. pub fn sanitize(&self) -> Result<(), Error> { - if self.payload().len() > SHRED_PAYLOAD_SIZE { + if self.payload().len() != SHRED_PAYLOAD_SIZE { return Err(Error::InvalidPayloadSize { size: self.payload.len(), }); @@ -570,7 +591,12 @@ impl Shred { } // Returns the portion of the shred's payload which is erasure coded. - pub(crate) fn erasure_block(self) -> Vec { + pub(crate) fn erasure_block(self) -> Result, Error> { + if self.payload.len() != SHRED_PAYLOAD_SIZE { + return Err(Error::InvalidPayloadSize { + size: self.payload.len(), + }); + } let shred_type = self.shred_type(); let mut block = self.payload; match shred_type { @@ -581,19 +607,23 @@ impl Shred { // SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the // coding shreds contains the header and is not part of erasure // coding. - let offset = SIZE_OF_CODING_SHRED_HEADERS.min(block.len()); - block.drain(..offset); + block.drain(..SIZE_OF_CODING_SHRED_HEADERS); } } - block + Ok(block) } // Like Shred::erasure_block but returning a slice - pub(crate) fn erasure_block_as_slice(&self) -> &[u8] { - match self.shred_type() { + pub(crate) fn erasure_block_as_slice(&self) -> Result<&[u8], Error> { + if self.payload.len() != SHRED_PAYLOAD_SIZE { + return Err(Error::InvalidPayloadSize { + size: self.payload.len(), + }); + } + Ok(match self.shred_type() { ShredType::Data => &self.payload[..ENCODED_PAYLOAD_SIZE], ShredType::Code => &self.payload[SIZE_OF_CODING_SHRED_HEADERS..], - } + }) } pub fn set_index(&mut self, index: u32) { @@ -737,20 +767,20 @@ impl Shred { || num_data_shreds != other.coding_header.num_data_shreds || self.first_coding_index() != other.first_coding_index()) } - _ => Err(Error::InvalidShredType(ShredType::Data)), + _ => Err(Error::InvalidShredType), } } pub(crate) fn num_data_shreds(self: &Shred) -> Result { match self.shred_type() { - ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)), + ShredType::Data => Err(Error::InvalidShredType), ShredType::Code => Ok(self.coding_header.num_data_shreds), } } pub(crate) fn num_coding_shreds(self: &Shred) -> Result { match self.shred_type() { - ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)), + ShredType::Data => Err(Error::InvalidShredType), ShredType::Code => Ok(self.coding_header.num_coding_shreds), } } @@ -833,6 +863,7 @@ pub fn verify_test_data_shred( is_last_in_slot: bool, is_last_data: bool, ) { + shred.sanitize().unwrap(); assert_eq!(shred.payload.len(), SHRED_PAYLOAD_SIZE); assert!(shred.is_data()); assert_eq!(shred.index(), index); diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index 1ecbe27cfc..cb259945f9 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -1,13 +1,11 @@ use { crate::{ erasure::Session, - shred::{ - Error, Shred, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, - SIZE_OF_DATA_SHRED_PAYLOAD, - }, + shred::{Error, Shred, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD}, shred_stats::ProcessShredsStats, }, rayon::{prelude::*, ThreadPool}, + reed_solomon_erasure::Error::{InvalidIndex, TooFewDataShards, TooFewShardsPresent}, solana_entry::entry::Entry, solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, @@ -234,7 +232,8 @@ impl Shredder { } else { num_data }; - let data: Vec<_> = data.iter().map(Shred::erasure_block_as_slice).collect(); + let data = data.iter().map(Shred::erasure_block_as_slice); + let data: Vec<_> = data.collect::>().unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; Session::new(num_data, num_coding) .unwrap() @@ -262,10 +261,8 @@ impl Shredder { } pub fn try_recovery(shreds: Vec) -> Result, Error> { - use reed_solomon_erasure::Error::InvalidIndex; - Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?; let (slot, fec_set_index) = match shreds.first() { - None => return Ok(Vec::default()), + None => return Err(Error::from(TooFewShardsPresent)), Some(shred) => (shred.slot(), shred.fec_set_index()), }; let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code()) @@ -298,7 +295,7 @@ impl Shredder { Some(index) if index < fec_set_size => index, _ => return Err(Error::from(InvalidIndex)), }; - blocks[index] = Some(shred.erasure_block()); + blocks[index] = Some(shred.erasure_block()?); if index < num_data_shreds { mask[index] = true; } @@ -323,8 +320,6 @@ impl Shredder { /// Combines all shreds to recreate the original buffer pub fn deshred(shreds: &[Shred]) -> Result, Error> { - use reed_solomon_erasure::Error::TooFewDataShards; - Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?; let index = shreds.first().ok_or(TooFewDataShards)?.index(); let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i); let data_complete = { @@ -345,30 +340,6 @@ impl Shredder { Ok(data) } } - - fn verify_consistent_shred_payload_sizes( - caller: &str, - shreds: &[Shred], - ) -> Result<(), reed_solomon_erasure::Error> { - if shreds.is_empty() { - return Err(reed_solomon_erasure::Error::TooFewShardsPresent); - } - let slot = shreds[0].slot(); - for shred in shreds { - if shred.payload().len() != SHRED_PAYLOAD_SIZE { - error!( - "{} Shreds for slot: {} are inconsistent sizes. Expected: {} actual: {}", - caller, - slot, - SHRED_PAYLOAD_SIZE, - shred.payload().len() - ); - return Err(reed_solomon_erasure::Error::IncorrectShardSize); - } - } - - Ok(()) - } } #[cfg(test)] @@ -393,7 +364,7 @@ mod tests { }; fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) { - assert_eq!(shred.payload().len(), SHRED_PAYLOAD_SIZE); + assert_matches!(shred.sanitize(), Ok(())); assert!(!shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -783,9 +754,7 @@ mod tests { assert_eq!(shreds.len(), 3); assert_matches!( Shredder::deshred(&shreds), - Err(Error::ErasureError( - reed_solomon_erasure::Error::TooFewDataShards - )) + Err(Error::ErasureError(TooFewDataShards)) ); // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds