diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index e18b678608..000e966128 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -9,7 +9,7 @@ use { solana_entry::entry::{create_ticks, Entry}, solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags, - Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD, + Shredder, LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, }, solana_perf::test_tx, solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, @@ -38,12 +38,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec Vec { - let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( &make_test_entry(txs_per_entry), 2 * num_shreds as u64, - Some(shred_size), + Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -73,10 +72,10 @@ fn make_concatenated_shreds(num_shreds: usize) -> Vec { #[bench] fn bench_shredder_ticks(bencher: &mut Bencher) { let kp = Keypair::new(); - let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let shred_size = LEGACY_SHRED_DATA_CAPACITY; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; // ~1Mb - let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; + let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -87,7 +86,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { #[bench] fn bench_shredder_large_entries(bencher: &mut Bencher) { let kp = Keypair::new(); - let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let shred_size = LEGACY_SHRED_DATA_CAPACITY; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( @@ -106,7 +105,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { #[bench] fn bench_deshredder(bencher: &mut Bencher) { let kp = Keypair::new(); - let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let shred_size = LEGACY_SHRED_DATA_CAPACITY; // ~10Mb let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; @@ -121,7 +120,7 @@ fn bench_deshredder(bencher: &mut Bencher) { #[bench] fn bench_deserialize_hdr(bencher: &mut Bencher) { - let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD]; + let data = vec![0; LEGACY_SHRED_DATA_CAPACITY]; let shred = Shred::new_from_data(2, 1, 1, &data, ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 1); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index aa65c2b954..d67c81e5ec 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3180,7 +3180,7 @@ pub(crate) mod tests { create_new_tmp_ledger, genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, get_tmp_ledger_path, - shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD}, + shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY}, }, solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -3779,7 +3779,7 @@ pub(crate) mod tests { fn test_dead_fork_entry_deserialize_failure() { // Insert entry that causes deserialization failure let res = check_dead_fork(|_, bank| { - let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let gibberish = [0xa5u8; LEGACY_SHRED_DATA_CAPACITY]; let parent_offset = bank.slot() - bank.parent_slot(); let shred = Shred::new_from_data( bank.slot(), diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 5600d50922..f18edfe776 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -43,7 +43,7 @@ impl ShredSigVerifier { batches .iter() .flat_map(PacketBatch::iter) - .map(shred::layout::get_shred) + .filter_map(shred::layout::get_shred) .filter_map(shred::layout::get_slot) .collect() } diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 324cac542a..4007291d2d 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -3,7 +3,7 @@ extern crate test; use { solana_ledger::{ - shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD}, + shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY}, sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair}, }, solana_perf::{ @@ -29,7 +29,7 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { slot, 0xc0de, 0xdead, - &[5; SIZE_OF_DATA_SHRED_PAYLOAD], + &[5; LEGACY_SHRED_DATA_CAPACITY], ShredFlags::LAST_SHRED_IN_SLOT, 1, 2, @@ -60,7 +60,7 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { slot, 0xc0de, 0xdead, - &[5; SIZE_OF_DATA_SHRED_PAYLOAD], + &[5; LEGACY_SHRED_DATA_CAPACITY], ShredFlags::LAST_SHRED_IN_SLOT, 1, 2, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 08edf0f95e..4d2813d3bf 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -14,7 +14,10 @@ use { }, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, - shred::{self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder}, + shred::{ + self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredData, ShredId, ShredType, + Shredder, + }, slot_stats::{ShredSource, SlotsStats}, }, bincode::deserialize, @@ -1556,7 +1559,7 @@ impl Blockstore { pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { let shred = self.data_shred_cf.get_bytes((slot, index))?; - let shred = shred.map(Shred::resize_stored_shred).transpose(); + let shred = shred.map(ShredData::resize_stored_shred).transpose(); shred.map_err(|err| { let err = format!("Invalid stored shred: {}", err); let err = Box::new(bincode::ErrorKind::Custom(err)); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index f104356f1f..f7b80f6157 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -49,8 +49,13 @@ //! So, given a) - c), we must restrict data shred's payload length such that the entire coding //! payload can fit into one coding shred / packet. +pub(crate) use shred_data::ShredData; +pub use { + self::stats::{ProcessShredsStats, ShredFetchStats}, + crate::shredder::Shredder, +}; use { - self::traits::{Shred as _, ShredCode as _, ShredData as _}, + self::{shred_code::ShredCode, traits::Shred as _}, crate::blockstore::MAX_DATA_SHREDS_PER_SLOT, bitflags::bitflags, num_enum::{IntoPrimitive, TryFromPrimitive}, @@ -62,7 +67,6 @@ use { clock::Slot, feature_set, hash::{hashv, Hash}, - packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }, @@ -70,52 +74,38 @@ use { std::fmt::Debug, thiserror::Error, }; -pub use { - self::{ - legacy::{ShredCode, ShredData}, - stats::{ProcessShredsStats, ShredFetchStats}, - }, - crate::shredder::Shredder, -}; +mod common; mod legacy; +mod shred_code; +mod shred_data; mod stats; mod traits; pub type Nonce = u32; +pub const SIZE_OF_NONCE: usize = 4; /// The following constants are computed by hand, and hardcoded. /// `test_shred_constants` ensures that the values are correct. /// Constants are used over lazy_static for performance reasons. const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; -const SIZE_OF_DATA_SHRED_HEADER: usize = 5; -const SIZE_OF_CODING_SHRED_HEADER: usize = 6; +const SIZE_OF_DATA_SHRED_HEADERS: usize = 88; +const SIZE_OF_CODING_SHRED_HEADERS: usize = 89; const SIZE_OF_SIGNATURE: usize = 64; const SIZE_OF_SHRED_VARIANT: usize = 1; const SIZE_OF_SHRED_SLOT: usize = 8; const SIZE_OF_SHRED_INDEX: usize = 4; -pub const SIZE_OF_NONCE: usize = 4; -const_assert_eq!(SIZE_OF_CODING_SHRED_HEADERS, 89); -const SIZE_OF_CODING_SHRED_HEADERS: usize = - SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; -// Maximum size of data that a data-shred may contain (excluding headers). -const_assert_eq!(SIZE_OF_DATA_SHRED_PAYLOAD, 1051); -pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - - SIZE_OF_COMMON_SHRED_HEADER - - SIZE_OF_DATA_SHRED_HEADER - - SIZE_OF_CODING_SHRED_HEADERS - - SIZE_OF_NONCE; -const_assert_eq!(SHRED_DATA_OFFSET, 88); -const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; -const_assert_eq!(SHRED_PAYLOAD_SIZE, 1228); -const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; +// For legacy tests and benchmarks. +const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051); +pub const LEGACY_SHRED_DATA_CAPACITY: usize = legacy::ShredData::CAPACITY; + // LAST_SHRED_IN_SLOT also implies DATA_COMPLETE_SHRED. // So it cannot be LAST_SHRED_IN_SLOT if not also DATA_COMPLETE_SHRED. bitflags! { @@ -278,14 +268,14 @@ macro_rules! dispatch { impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); dispatch!(fn set_signature(&mut self, signature: Signature)); - dispatch!(fn signed_payload(&self) -> &[u8]); + dispatch!(fn signed_message(&self) -> &[u8]); // Returns the portion of the shred's payload which is erasure coded. dispatch!(pub(crate) fn erasure_shard(self) -> Result, Error>); // Like Shred::erasure_shard but returning a slice. dispatch!(pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); // Returns the shard index within the erasure coding set. - dispatch!(pub(crate) fn erasure_shard_index(&self) -> Option); + dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); dispatch!(pub fn into_payload(self) -> Vec); dispatch!(pub fn payload(&self) -> &Vec); @@ -329,11 +319,11 @@ impl Shred { Ok(match layout::get_shred_variant(&shred)? { ShredVariant::LegacyCode => { let shred = legacy::ShredCode::from_payload(shred)?; - Self::from(shred) + Self::from(ShredCode::from(shred)) } ShredVariant::LegacyData => { let shred = legacy::ShredData::from_payload(shred)?; - Self::from(shred) + Self::from(ShredData::from(shred)) } }) } @@ -396,14 +386,6 @@ impl Shred { } } - // Possibly zero pads bytes stored in blockstore. - pub(crate) fn resize_stored_shred(shred: Vec) -> Result, Error> { - match layout::get_shred_type(&shred)? { - ShredType::Code => ShredCode::resize_stored_shred(shred), - ShredType::Data => ShredData::resize_stored_shred(shred), - } - } - pub fn fec_set_index(&self) -> u32 { self.common_header().fec_set_index } @@ -429,7 +411,7 @@ impl Shred { } pub fn sign(&mut self, keypair: &Keypair) { - let signature = keypair.sign_message(self.signed_payload()); + let signature = keypair.sign_message(self.signed_message()); self.set_signature(signature); } @@ -494,7 +476,7 @@ impl Shred { } pub fn verify(&self, pubkey: &Pubkey) -> bool { - let message = self.signed_payload(); + let message = self.signed_message(); self.signature().verify(pubkey.as_ref(), message) } @@ -526,16 +508,20 @@ impl Shred { pub mod layout { use {super::*, std::ops::Range}; - fn get_shred_size(packet: &Packet) -> usize { + fn get_shred_size(packet: &Packet) -> Option { + let size = packet.data().len(); if packet.meta.repair() { - packet.meta.size.saturating_sub(SIZE_OF_NONCE) + size.checked_sub(SIZE_OF_NONCE) } else { - packet.meta.size + Some(size) } } - pub fn get_shred(packet: &Packet) -> &[u8] { - &packet.data()[..get_shred_size(packet)] + pub fn get_shred(packet: &Packet) -> Option<&[u8]> { + let size = get_shred_size(packet)?; + let shred = packet.data().get(..size)?; + // Should at least have a signature. + (size >= SIZE_OF_SIGNATURE).then(|| shred) } pub(crate) fn get_signature(shred: &[u8]) -> Option { @@ -567,14 +553,12 @@ pub mod layout { deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_INDEX..)?).ok() } - // Returns chunk of the payload which is signed. - pub(crate) fn get_signed_message(shred: &[u8]) -> Option<&[u8]> { - shred.get(SIZE_OF_SIGNATURE..) - } - - // Returns slice range of the packet payload which is signed. - pub(crate) fn get_signed_message_range(packet: &Packet) -> Range { - SIZE_OF_SIGNATURE..get_shred_size(packet) + // Returns slice range of the shred payload which is signed. + pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option> { + let range = match get_shred_variant(shred).ok()? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => legacy::SIGNED_MESSAGE_RANGE, + }; + (shred.len() <= range.end).then(|| range) } pub(crate) fn get_reference_tick(shred: &[u8]) -> Result { @@ -640,7 +624,13 @@ pub fn get_shred_slot_index_type( packet: &Packet, stats: &mut ShredFetchStats, ) -> Option<(Slot, u32, ShredType)> { - let shred = layout::get_shred(packet); + let shred = match layout::get_shred(packet) { + None => { + stats.index_overrun += 1; + return None; + } + Some(shred) => shred, + }; if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() { stats.index_overrun += 1; return None; @@ -683,7 +673,8 @@ pub fn max_entries_per_n_shred( num_shreds: u64, shred_data_size: Option, ) -> u64 { - let shred_data_size = shred_data_size.unwrap_or(SIZE_OF_DATA_SHRED_PAYLOAD) as u64; + let data_buffer_size = ShredData::capacity().unwrap(); + let shred_data_size = shred_data_size.unwrap_or(data_buffer_size) as u64; let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; @@ -702,7 +693,6 @@ pub fn verify_test_data_shred( is_last_data: bool, ) { shred.sanitize().unwrap(); - assert_eq!(shred.payload().len(), SHRED_PAYLOAD_SIZE); assert!(shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -775,11 +765,11 @@ mod tests { serialized_size(&common_header).unwrap() as usize ); assert_eq!( - SIZE_OF_CODING_SHRED_HEADER, + SIZE_OF_CODING_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER, serialized_size(&coding_shred_header).unwrap() as usize ); assert_eq!( - SIZE_OF_DATA_SHRED_HEADER, + SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER, serialized_size(&data_shred_header).unwrap() as usize ); let data_shred_header_with_size = DataShredHeader { @@ -787,7 +777,7 @@ mod tests { ..data_shred_header }; assert_eq!( - SIZE_OF_DATA_SHRED_HEADER, + SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER, serialized_size(&data_shred_header_with_size).unwrap() as usize ); assert_eq!( @@ -1011,7 +1001,7 @@ mod tests { let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap(); ChaChaRng::from_seed(seed) }; - let mut data = [0u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let mut data = [0u8; legacy::ShredData::CAPACITY]; rng.fill(&mut data[..]); let keypair = Keypair::generate(&mut rng); let mut shred = Shred::new_from_data( @@ -1029,7 +1019,7 @@ mod tests { assert_matches!(shred.sanitize(), Ok(())); let mut payload = bs58_decode(PAYLOAD); payload.extend({ - let skip = payload.len() - SHRED_DATA_OFFSET; + let skip = payload.len() - SIZE_OF_DATA_SHRED_HEADERS; data.iter().skip(skip).copied() }); let mut packet = Packet::default(); @@ -1100,7 +1090,7 @@ mod tests { let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap(); ChaChaRng::from_seed(seed) }; - let mut parity_shard = vec![0u8; /*ENCODED_PAYLOAD_SIZE:*/ 1139]; + let mut parity_shard = vec![0u8; legacy::SIZE_OF_ERASURE_ENCODED_SLICE]; rng.fill(&mut parity_shard[..]); let keypair = Keypair::generate(&mut rng); let mut shred = Shred::new_from_parity_shard( diff --git a/ledger/src/shred/common.rs b/ledger/src/shred/common.rs new file mode 100644 index 0000000000..910f7ecc63 --- /dev/null +++ b/ledger/src/shred/common.rs @@ -0,0 +1,63 @@ +macro_rules! dispatch { + ($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => { + #[inline] + $vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? { + match self { + Self::Legacy(shred) => shred.$name($($arg, )?), + } + } + }; + ($vis:vis fn $name:ident(self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => { + #[inline] + $vis fn $name(self $(, $arg:$ty)?) $(-> $out)? { + match self { + Self::Legacy(shred) => shred.$name($($arg, )?), + } + } + }; + ($vis:vis fn $name:ident(&mut self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => { + #[inline] + $vis fn $name(&mut self $(, $arg:$ty)?) $(-> $out)? { + match self { + Self::Legacy(shred) => shred.$name($($arg, )?), + } + } + } +} + +macro_rules! impl_shred_common { + () => { + #[inline] + fn common_header(&self) -> &ShredCommonHeader { + &self.common_header + } + + #[inline] + fn payload(&self) -> &Vec { + &self.payload + } + + fn into_payload(self) -> Vec { + self.payload + } + + fn set_signature(&mut self, signature: Signature) { + bincode::serialize_into(&mut self.payload[..], &signature).unwrap(); + self.common_header.signature = signature; + } + + // Only for tests. + fn set_index(&mut self, index: u32) { + self.common_header.index = index; + bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap(); + } + + // Only for tests. + fn set_slot(&mut self, slot: Slot) { + self.common_header.slot = slot; + bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap(); + } + }; +} + +pub(super) use {dispatch, impl_shred_common}; diff --git a/ledger/src/shred/legacy.rs b/ledger/src/shred/legacy.rs index cb7ed1509c..4ec27ca953 100644 --- a/ledger/src/shred/legacy.rs +++ b/ledger/src/shred/legacy.rs @@ -1,79 +1,57 @@ use { crate::shred::{ - traits::{Shred, ShredCode as _, ShredData as _}, + common::impl_shred_common, + shred_code, shred_data, + traits::{Shred, ShredCode as ShredCodeTrait, ShredData as ShredDataTrait}, CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant, - MAX_DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SHRED_DATA_OFFSET, - SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER, - SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, SIZE_OF_SIGNATURE, + SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADERS, + SIZE_OF_SIGNATURE, }, solana_perf::packet::deserialize_from_with_limit, solana_sdk::{clock::Slot, signature::Signature}, static_assertions::const_assert_eq, - std::{io::Cursor, ops::RangeInclusive}, + std::{io::Cursor, ops::Range}, }; -// DataShredHeader.size is sum of common-shred-header, data-shred-header and -// data.len(). Broadcast stage may create zero length data shreds when the -// previous slot was interrupted: -// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79 -const DATA_SHRED_SIZE_RANGE: RangeInclusive = - SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD; +// All payload including any zero paddings are signed. +// Code and data shreds have the same payload size. +pub(super) const SIGNED_MESSAGE_RANGE: Range = SIZE_OF_SIGNATURE..ShredData::SIZE_OF_PAYLOAD; +const_assert_eq!(ShredData::SIZE_OF_PAYLOAD, ShredCode::SIZE_OF_PAYLOAD); +const_assert_eq!(ShredData::SIZE_OF_PAYLOAD, 1228); +const_assert_eq!(ShredData::CAPACITY, 1051); + // SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds // is never used and is not part of erasure coding. -const_assert_eq!(ENCODED_PAYLOAD_SIZE, 1139); -const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; +const_assert_eq!(SIZE_OF_ERASURE_ENCODED_SLICE, 1139); +pub(super) const SIZE_OF_ERASURE_ENCODED_SLICE: usize = + ShredCode::SIZE_OF_PAYLOAD - SIZE_OF_CODING_SHRED_HEADERS; -#[derive(Clone, Debug, PartialEq, Eq)] +// Layout: {common, data} headers | data | zero padding +// Everything up to SIZE_OF_CODING_SHRED_HEADERS bytes at the end (which is +// part of zero padding) is erasure coded. +// All payload past signature, including the entirety of zero paddings, is +// signed. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct ShredData { common_header: ShredCommonHeader, data_header: DataShredHeader, payload: Vec, } -#[derive(Clone, Debug, PartialEq, Eq)] +// Layout: {common, coding} headers | erasure coded shard +// All payload past signature is singed. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct ShredCode { common_header: ShredCommonHeader, coding_header: CodingShredHeader, payload: Vec, } -macro_rules! impl_shred_common { - () => { - #[inline] - fn common_header(&self) -> &ShredCommonHeader { - &self.common_header - } - - #[inline] - fn payload(&self) -> &Vec { - &self.payload - } - - fn into_payload(self) -> Vec { - self.payload - } - - fn set_signature(&mut self, signature: Signature) { - bincode::serialize_into(&mut self.payload[..], &signature).unwrap(); - self.common_header.signature = signature; - } - - // Only for tests. - fn set_index(&mut self, index: u32) { - self.common_header.index = index; - bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap(); - } - - // Only for tests. - fn set_slot(&mut self, slot: Slot) { - self.common_header.slot = slot; - bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap(); - } - }; -} - impl Shred for ShredData { impl_shred_common!(); + // Legacy data shreds are always zero padded and + // the same size as coding shreds. + const SIZE_OF_PAYLOAD: usize = shred_code::ShredCode::SIZE_OF_PAYLOAD; fn from_payload(mut payload: Vec) -> Result { let mut cursor = Cursor::new(&payload[..]); @@ -82,8 +60,14 @@ impl Shred for ShredData { return Err(Error::InvalidShredVariant); } let data_header = deserialize_from_with_limit(&mut cursor)?; - // see: https://github.com/solana-labs/solana/pull/16602 - payload.resize(SHRED_PAYLOAD_SIZE, 0u8); + // Shreds stored to blockstore may have trailing zeros trimmed. + // Repair packets have nonce at the end of packet payload; see: + // https://github.com/solana-labs/solana/pull/10109 + // https://github.com/solana-labs/solana/pull/16602 + if payload.len() < SIZE_OF_DATA_SHRED_HEADERS { + return Err(Error::InvalidPayloadSize(payload.len())); + } + payload.resize(Self::SIZE_OF_PAYLOAD, 0u8); let shred = Self { common_header, data_header, @@ -92,73 +76,46 @@ impl Shred for ShredData { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Option { - let fec_set_index = self.common_header.fec_set_index; - let index = self.common_header.index.checked_sub(fec_set_index)?; - usize::try_from(index).ok() + fn erasure_shard_index(&self) -> Result { + shred_data::erasure_shard_index(self).ok_or_else(|| { + let headers = Box::new((self.common_header, self.data_header)); + Error::InvalidErasureShardIndex(headers) + }) } fn erasure_shard(self) -> Result, Error> { - if self.payload.len() != SHRED_PAYLOAD_SIZE { + if self.payload.len() != Self::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(self.payload.len())); } let mut shard = self.payload; - shard.resize(ENCODED_PAYLOAD_SIZE, 0u8); + shard.truncate(SIZE_OF_ERASURE_ENCODED_SLICE); Ok(shard) } fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> { - if self.payload.len() != SHRED_PAYLOAD_SIZE { + if self.payload.len() != Self::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(self.payload.len())); } - Ok(&self.payload[..ENCODED_PAYLOAD_SIZE]) - } - - fn resize_stored_shred(mut shred: Vec) -> Result, Error> { - // TODO: assert that this is the right type! - if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) { - return Err(Error::InvalidPayloadSize(shred.len())); - } - shred.resize(SHRED_PAYLOAD_SIZE, 0u8); - Ok(shred) + Ok(&self.payload[..SIZE_OF_ERASURE_ENCODED_SLICE]) } fn sanitize(&self) -> Result<(), Error> { - if self.payload().len() != SHRED_PAYLOAD_SIZE { - return Err(Error::InvalidPayloadSize(self.payload.len())); + match self.common_header.shred_variant { + ShredVariant::LegacyData => (), + _ => return Err(Error::InvalidShredVariant), } - if self.erasure_shard_index().is_none() { - let headers = Box::new((self.common_header, self.data_header)); - return Err(Error::InvalidErasureShardIndex(headers)); - } - if self.common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT { - return Err(Error::InvalidDataShredIndex(self.common_header.index)); - } - let _parent = self.parent()?; - let size = usize::from(self.data_header.size); - if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) { - return Err(Error::InvalidDataSize { - size: self.data_header.size, - payload: self.payload.len(), - }); - } - let flags = self.data_header.flags; - if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT) - && !flags.contains(ShredFlags::DATA_COMPLETE_SHRED) - { - return Err(Error::InvalidShredFlags(self.data_header.flags.bits())); - } - Ok(()) + shred_data::sanitize(self) } - fn signed_payload(&self) -> &[u8] { - debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE); + fn signed_message(&self) -> &[u8] { + debug_assert_eq!(self.payload.len(), Self::SIZE_OF_PAYLOAD); &self.payload[SIZE_OF_SIGNATURE..] } } impl Shred for ShredCode { impl_shred_common!(); + const SIZE_OF_PAYLOAD: usize = shred_code::ShredCode::SIZE_OF_PAYLOAD; fn from_payload(mut payload: Vec) -> Result { let mut cursor = Cursor::new(&payload[..]); @@ -167,8 +124,9 @@ impl Shred for ShredCode { return Err(Error::InvalidShredVariant); } let coding_header = deserialize_from_with_limit(&mut cursor)?; - // see: https://github.com/solana-labs/solana/pull/10109 - payload.truncate(SHRED_PAYLOAD_SIZE); + // Repair packets have nonce at the end of packet payload: + // https://github.com/solana-labs/solana/pull/10109 + payload.truncate(Self::SIZE_OF_PAYLOAD); let shred = Self { common_header, coding_header, @@ -177,25 +135,15 @@ impl Shred for ShredCode { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Option { - // Assert that the last shred index in the erasure set does not - // overshoot u32. - self.common_header.fec_set_index.checked_add(u32::from( - self.coding_header.num_data_shreds.checked_sub(1)?, - ))?; - self.first_coding_index()?.checked_add(u32::from( - self.coding_header.num_coding_shreds.checked_sub(1)?, - ))?; - let num_data_shreds = usize::from(self.coding_header.num_data_shreds); - let num_coding_shreds = usize::from(self.coding_header.num_coding_shreds); - let position = usize::from(self.coding_header.position); - let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?; - let index = position.checked_add(num_data_shreds)?; - (index < fec_set_size).then(|| index) + fn erasure_shard_index(&self) -> Result { + shred_code::erasure_shard_index(self).ok_or_else(|| { + let headers = Box::new((self.common_header, self.coding_header)); + Error::InvalidErasureShardIndex(headers) + }) } fn erasure_shard(self) -> Result, Error> { - if self.payload.len() != SHRED_PAYLOAD_SIZE { + if self.payload.len() != Self::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(self.payload.len())); } let mut shard = self.payload; @@ -207,43 +155,27 @@ impl Shred for ShredCode { } fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> { - if self.payload.len() != SHRED_PAYLOAD_SIZE { + if self.payload.len() != Self::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(self.payload.len())); } Ok(&self.payload[SIZE_OF_CODING_SHRED_HEADERS..]) } - fn resize_stored_shred(shred: Vec) -> Result, Error> { - if shred.len() != SHRED_PAYLOAD_SIZE { - return Err(Error::InvalidPayloadSize(shred.len())); - } - Ok(shred) - } - fn sanitize(&self) -> Result<(), Error> { - if self.payload().len() != SHRED_PAYLOAD_SIZE { - return Err(Error::InvalidPayloadSize(self.payload.len())); + match self.common_header.shred_variant { + ShredVariant::LegacyCode => (), + _ => return Err(Error::InvalidShredVariant), } - if self.erasure_shard_index().is_none() { - let headers = Box::new((self.common_header, self.coding_header)); - return Err(Error::InvalidErasureShardIndex(headers)); - } - let num_coding_shreds = u32::from(self.coding_header.num_coding_shreds); - if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK { - return Err(Error::InvalidNumCodingShreds( - self.coding_header.num_coding_shreds, - )); - } - Ok(()) + shred_code::sanitize(self) } - fn signed_payload(&self) -> &[u8] { - debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE); + fn signed_message(&self) -> &[u8] { + debug_assert_eq!(self.payload.len(), Self::SIZE_OF_PAYLOAD); &self.payload[SIZE_OF_SIGNATURE..] } } -impl super::traits::ShredData for ShredData { +impl ShredDataTrait for ShredData { #[inline] fn data_header(&self) -> &DataShredHeader { &self.data_header @@ -251,19 +183,16 @@ impl super::traits::ShredData for ShredData { fn data(&self) -> Result<&[u8], Error> { let size = usize::from(self.data_header.size); - if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) { + if size > self.payload.len() + || size < SIZE_OF_DATA_SHRED_HEADERS + || size > SIZE_OF_DATA_SHRED_HEADERS + Self::CAPACITY + { return Err(Error::InvalidDataSize { size: self.data_header.size, payload: self.payload.len(), }); } - Ok(&self.payload[SHRED_DATA_OFFSET..size]) - } - - fn bytes_to_store(&self) -> &[u8] { - // Payload will be padded out to SHRED_PAYLOAD_SIZE. - // But only need to store the bytes within data_header.size. - &self.payload[..self.data_header.size as usize] + Ok(&self.payload[SIZE_OF_DATA_SHRED_HEADERS..size]) } // Only for tests. @@ -274,7 +203,7 @@ impl super::traits::ShredData for ShredData { } } -impl super::traits::ShredCode for ShredCode { +impl ShredCodeTrait for ShredCode { #[inline] fn coding_header(&self) -> &CodingShredHeader { &self.coding_header @@ -282,6 +211,10 @@ impl super::traits::ShredCode for ShredCode { } impl ShredData { + // Maximum size of ledger data that can be embedded in a data-shred. + pub(super) const CAPACITY: usize = + Self::SIZE_OF_PAYLOAD - SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_CODING_SHRED_HEADERS; + pub(super) fn new_from_data( slot: Slot, index: u32, @@ -292,7 +225,7 @@ impl ShredData { version: u16, fec_set_index: u32, ) -> Self { - let mut payload = vec![0; SHRED_PAYLOAD_SIZE]; + let mut payload = vec![0; Self::SIZE_OF_PAYLOAD]; let common_header = ShredCommonHeader { signature: Signature::default(), shred_variant: ShredVariant::LegacyData, @@ -301,7 +234,7 @@ impl ShredData { version, fec_set_index, }; - let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16; + let size = (data.len() + SIZE_OF_DATA_SHRED_HEADERS) as u16; let flags = flags | unsafe { ShredFlags::from_bits_unchecked( @@ -320,7 +253,7 @@ impl ShredData { bincode::serialize_into(&mut cursor, &data_header).unwrap(); // TODO: Need to check if data is too large! let offset = cursor.position() as usize; - debug_assert_eq!(offset, SHRED_DATA_OFFSET); + debug_assert_eq!(offset, SIZE_OF_DATA_SHRED_HEADERS); payload[offset..offset + data.len()].copy_from_slice(data); Self { common_header, @@ -328,6 +261,21 @@ impl ShredData { payload, } } + + pub(super) fn bytes_to_store(&self) -> &[u8] { + // Payload will be padded out to Self::SIZE_OF_PAYLOAD. + // But only need to store the bytes within data_header.size. + &self.payload[..self.data_header.size as usize] + } + + pub(super) fn resize_stored_shred(mut shred: Vec) -> Result, Error> { + // Old shreds might have been extra zero padded. + if !(SIZE_OF_DATA_SHRED_HEADERS..=ShredCode::SIZE_OF_PAYLOAD).contains(&shred.len()) { + return Err(Error::InvalidPayloadSize(shred.len())); + } + shred.resize(Self::SIZE_OF_PAYLOAD, 0u8); + Ok(shred) + } } impl ShredCode { @@ -354,7 +302,7 @@ impl ShredCode { num_coding_shreds, position, }; - let mut payload = vec![0; SHRED_PAYLOAD_SIZE]; + let mut payload = vec![0; Self::SIZE_OF_PAYLOAD]; let mut cursor = Cursor::new(&mut payload[..]); bincode::serialize_into(&mut cursor, &common_header).unwrap(); bincode::serialize_into(&mut cursor, &coding_header).unwrap(); @@ -370,27 +318,15 @@ impl ShredCode { payload, } } - - // Returns true if the erasure coding of the two shreds mismatch. - pub(super) fn erasure_mismatch(&self, other: &ShredCode) -> bool { - let CodingShredHeader { - num_data_shreds, - num_coding_shreds, - position: _, - } = self.coding_header; - num_coding_shreds != other.coding_header.num_coding_shreds - || num_data_shreds != other.coding_header.num_data_shreds - || self.first_coding_index() != other.first_coding_index() - } } #[cfg(test)] mod test { - use {super::*, matches::assert_matches}; + use {super::*, crate::shred::MAX_DATA_SHREDS_PER_SLOT, matches::assert_matches}; #[test] fn test_sanitize_data_shred() { - let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let data = [0xa5u8; ShredData::CAPACITY]; let mut shred = ShredData::new_from_data( 420, // slot 19, // index diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs new file mode 100644 index 0000000000..77a079cff4 --- /dev/null +++ b/ledger/src/shred/shred_code.rs @@ -0,0 +1,128 @@ +use { + crate::shred::{ + common::dispatch, + legacy, + traits::{Shred, ShredCode as ShredCodeTrait}, + CodingShredHeader, Error, ShredCommonHeader, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_NONCE, + }, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, + static_assertions::const_assert_eq, +}; + +const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228); + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ShredCode { + Legacy(legacy::ShredCode), +} + +impl ShredCode { + pub(super) const SIZE_OF_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; + + dispatch!(fn coding_header(&self) -> &CodingShredHeader); + + dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader); + dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); + dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn first_coding_index(&self) -> Option); + dispatch!(pub(super) fn into_payload(self) -> Vec); + dispatch!(pub(super) fn payload(&self) -> &Vec); + dispatch!(pub(super) fn sanitize(&self) -> Result<(), Error>); + dispatch!(pub(super) fn set_signature(&mut self, signature: Signature)); + dispatch!(pub(super) fn signed_message(&self) -> &[u8]); + + // Only for tests. + dispatch!(pub(super) fn set_index(&mut self, index: u32)); + dispatch!(pub(super) fn set_slot(&mut self, slot: Slot)); + + pub(super) fn new_from_parity_shard( + slot: Slot, + index: u32, + parity_shard: &[u8], + fec_set_index: u32, + num_data_shreds: u16, + num_coding_shreds: u16, + position: u16, + version: u16, + ) -> Self { + Self::from(legacy::ShredCode::new_from_parity_shard( + slot, + index, + parity_shard, + fec_set_index, + num_data_shreds, + num_coding_shreds, + position, + version, + )) + } + + pub(super) fn num_data_shreds(&self) -> u16 { + self.coding_header().num_data_shreds + } + + pub(super) fn num_coding_shreds(&self) -> u16 { + self.coding_header().num_coding_shreds + } + + // Returns true if the erasure coding of the two shreds mismatch. + pub(super) fn erasure_mismatch(&self, other: &ShredCode) -> bool { + match (self, other) { + (Self::Legacy(shred), Self::Legacy(other)) => erasure_mismatch(shred, other), + } + } +} + +impl From for ShredCode { + fn from(shred: legacy::ShredCode) -> Self { + Self::Legacy(shred) + } +} + +#[inline] +pub(super) fn erasure_shard_index(shred: &T) -> Option { + // Assert that the last shred index in the erasure set does not + // overshoot u32. + let common_header = shred.common_header(); + let coding_header = shred.coding_header(); + common_header + .fec_set_index + .checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))?; + shred + .first_coding_index()? + .checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))?; + let num_data_shreds = usize::from(coding_header.num_data_shreds); + let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + let position = usize::from(coding_header.position); + let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?; + let index = position.checked_add(num_data_shreds)?; + (index < fec_set_size).then(|| index) +} + +pub(super) fn sanitize(shred: &T) -> Result<(), Error> { + if shred.payload().len() != T::SIZE_OF_PAYLOAD { + return Err(Error::InvalidPayloadSize(shred.payload().len())); + } + let coding_header = shred.coding_header(); + let _shard_index = shred.erasure_shard_index()?; + let _erasure_shard = shred.erasure_shard_as_slice()?; + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); + if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK { + return Err(Error::InvalidNumCodingShreds( + coding_header.num_coding_shreds, + )); + } + Ok(()) +} + +pub(super) fn erasure_mismatch(shred: &T, other: &T) -> bool { + let CodingShredHeader { + num_data_shreds, + num_coding_shreds, + position: _, + } = shred.coding_header(); + *num_coding_shreds != other.coding_header().num_coding_shreds + || *num_data_shreds != other.coding_header().num_data_shreds + || shred.first_coding_index() != other.first_coding_index() +} diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs new file mode 100644 index 0000000000..ca91bfa85c --- /dev/null +++ b/ledger/src/shred/shred_data.rs @@ -0,0 +1,130 @@ +use { + crate::shred::{ + self, + common::dispatch, + legacy, + traits::{Shred as _, ShredData as ShredDataTrait}, + DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant, + MAX_DATA_SHREDS_PER_SLOT, + }, + solana_sdk::{clock::Slot, signature::Signature}, +}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ShredData { + Legacy(legacy::ShredData), +} + +impl ShredData { + dispatch!(fn data_header(&self) -> &DataShredHeader); + + dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader); + dispatch!(pub(super) fn data(&self) -> Result<&[u8], Error>); + dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); + dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn into_payload(self) -> Vec); + dispatch!(pub(super) fn parent(&self) -> Result); + dispatch!(pub(super) fn payload(&self) -> &Vec); + dispatch!(pub(super) fn sanitize(&self) -> Result<(), Error>); + dispatch!(pub(super) fn set_last_in_slot(&mut self)); + dispatch!(pub(super) fn set_signature(&mut self, signature: Signature)); + dispatch!(pub(super) fn signed_message(&self) -> &[u8]); + + // Only for tests. + dispatch!(pub(super) fn set_index(&mut self, index: u32)); + dispatch!(pub(super) fn set_slot(&mut self, slot: Slot)); + + pub(super) fn new_from_data( + slot: Slot, + index: u32, + parent_offset: u16, + data: &[u8], + flags: ShredFlags, + reference_tick: u8, + version: u16, + fec_set_index: u32, + ) -> Self { + Self::from(legacy::ShredData::new_from_data( + slot, + index, + parent_offset, + data, + flags, + reference_tick, + version, + fec_set_index, + )) + } + + pub(super) fn last_in_slot(&self) -> bool { + let flags = self.data_header().flags; + flags.contains(ShredFlags::LAST_SHRED_IN_SLOT) + } + + pub(super) fn data_complete(&self) -> bool { + let flags = self.data_header().flags; + flags.contains(ShredFlags::DATA_COMPLETE_SHRED) + } + + pub(super) fn reference_tick(&self) -> u8 { + let flags = self.data_header().flags; + (flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits() + } + + // Possibly trimmed payload; + // Should only be used when storing shreds to blockstore. + pub(super) fn bytes_to_store(&self) -> &[u8] { + match self { + Self::Legacy(shred) => shred.bytes_to_store(), + } + } + + // Possibly zero pads bytes stored in blockstore. + pub(crate) fn resize_stored_shred(shred: Vec) -> Result, Error> { + match shred::layout::get_shred_variant(&shred)? { + ShredVariant::LegacyCode => Err(Error::InvalidShredType), + ShredVariant::LegacyData => legacy::ShredData::resize_stored_shred(shred), + } + } + + // Maximum size of ledger data that can be embedded in a data-shred. + pub(crate) fn capacity() -> Result { + Ok(legacy::ShredData::CAPACITY) + } +} + +impl From for ShredData { + fn from(shred: legacy::ShredData) -> Self { + Self::Legacy(shred) + } +} + +#[inline] +pub(super) fn erasure_shard_index(shred: &T) -> Option { + let fec_set_index = shred.common_header().fec_set_index; + let index = shred.common_header().index.checked_sub(fec_set_index)?; + usize::try_from(index).ok() +} + +pub(super) fn sanitize(shred: &T) -> Result<(), Error> { + if shred.payload().len() != T::SIZE_OF_PAYLOAD { + return Err(Error::InvalidPayloadSize(shred.payload().len())); + } + let common_header = shred.common_header(); + let data_header = shred.data_header(); + let _shard_index = shred.erasure_shard_index()?; + let _erasure_shard = shred.erasure_shard_as_slice()?; + if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT { + return Err(Error::InvalidDataShredIndex(common_header.index)); + } + let _data = shred.data()?; + let _parent = shred.parent()?; + let flags = data_header.flags; + if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT) + && !flags.contains(ShredFlags::DATA_COMPLETE_SHRED) + { + return Err(Error::InvalidShredFlags(data_header.flags.bits())); + } + Ok(()) +} diff --git a/ledger/src/shred/traits.rs b/ledger/src/shred/traits.rs index db1f9aefb0..70e049113d 100644 --- a/ledger/src/shred/traits.rs +++ b/ledger/src/shred/traits.rs @@ -1,9 +1,13 @@ use { - crate::shred::{CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags}, + crate::shred::{CodingShredHeader, DataShredHeader, Error, ShredCommonHeader}, solana_sdk::{clock::Slot, signature::Signature}, }; pub(super) trait Shred: Sized { + // Total size of payload including headers, merkle + // branches (if any), zero paddings, etc. + const SIZE_OF_PAYLOAD: usize; + fn from_payload(shred: Vec) -> Result; fn common_header(&self) -> &ShredCommonHeader; fn sanitize(&self) -> Result<(), Error>; @@ -14,17 +18,14 @@ pub(super) trait Shred: Sized { fn into_payload(self) -> Vec; // Returns the shard index within the erasure coding set. - fn erasure_shard_index(&self) -> Option; + fn erasure_shard_index(&self) -> Result; // Returns the portion of the shred's payload which is erasure coded. fn erasure_shard(self) -> Result, Error>; // Like Shred::erasure_shard but returning a slice. fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>; // Portion of the payload which is signed. - fn signed_payload(&self) -> &[u8]; - - // Possibly zero pads bytes stored in blockstore. - fn resize_stored_shred(shred: Vec) -> Result, Error>; + fn signed_message(&self) -> &[u8]; // Only for tests. fn set_index(&mut self, index: u32); @@ -52,25 +53,6 @@ pub(super) trait ShredData: Shred { fn data(&self) -> Result<&[u8], Error>; - // Possibly trimmed payload; - // Should only be used when storing shreds to blockstore. - fn bytes_to_store(&self) -> &[u8]; - - fn last_in_slot(&self) -> bool { - let flags = self.data_header().flags; - flags.contains(ShredFlags::LAST_SHRED_IN_SLOT) - } - - fn data_complete(&self) -> bool { - let flags = self.data_header().flags; - flags.contains(ShredFlags::DATA_COMPLETE_SHRED) - } - - fn reference_tick(&self) -> u8 { - let flags = self.data_header().flags; - (flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits() - } - // Only for tests. fn set_last_in_slot(&mut self); } @@ -82,12 +64,4 @@ pub(super) trait ShredCode: Shred { let position = u32::from(self.coding_header().position); self.common_header().index.checked_sub(position) } - - fn num_data_shreds(&self) -> u16 { - self.coding_header().num_data_shreds - } - - fn num_coding_shreds(&self) -> u16 { - self.coding_header().num_coding_shreds - } } diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index d47e9a6db3..0fbb6262ea 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -1,7 +1,6 @@ use { crate::shred::{ - Error, ProcessShredsStats, Shred, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, - SIZE_OF_DATA_SHRED_PAYLOAD, + Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, }, lazy_static::lazy_static, rayon::{prelude::*, ThreadPool}, @@ -110,9 +109,9 @@ impl Shredder { serialize_time.stop(); let mut gen_data_time = Measure::start("shred_gen_data_time"); - let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD; + let data_buffer_size = ShredData::capacity().unwrap(); // Integer division to ensure we have enough shreds to fit all the data - let num_shreds = (serialized_shreds.len() + payload_capacity - 1) / payload_capacity; + let num_shreds = (serialized_shreds.len() + data_buffer_size - 1) / data_buffer_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; // 1) Generate data shreds let make_data_shred = |shred_index: u32, data| { @@ -141,7 +140,7 @@ impl Shredder { }; let data_shreds: Vec = PAR_THREAD_POOL.install(|| { serialized_shreds - .par_chunks(payload_capacity) + .par_chunks(data_buffer_size) .enumerate() .map(|(i, shred_data)| { let shred_index = next_shred_index + i as u32; @@ -299,7 +298,7 @@ impl Shredder { let mut shards = vec![None; fec_set_size]; for shred in shreds { let index = match shred.erasure_shard_index() { - Some(index) if index < fec_set_size => index, + Ok(index) if index < fec_set_size => index, _ => return Err(Error::from(InvalidIndex)), }; shards[index] = Some(shred.erasure_shard()?); @@ -317,8 +316,8 @@ impl Shredder { shred.slot() == slot && shred.is_data() && match shred.erasure_shard_index() { - Some(index) => index < num_data_shreds, - None => false, + Ok(index) => index < num_data_shreds, + Err(_) => false, } }) .collect(); @@ -342,7 +341,8 @@ impl Shredder { // For backward compatibility. This is needed when the data shred // payload is None, so that deserializing to Vec results in // an empty vector. - Ok(vec![0u8; SIZE_OF_DATA_SHRED_PAYLOAD]) + let data_buffer_size = ShredData::capacity().unwrap(); + Ok(vec![0u8; data_buffer_size]) } else { Ok(data) } @@ -403,13 +403,13 @@ mod tests { }) .collect(); - let size = serialized_size(&entries).unwrap(); + let size = serialized_size(&entries).unwrap() as usize; // Integer division to ensure we have enough shreds to fit all the data - let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD as u64; - let num_expected_data_shreds = (size + payload_capacity - 1) / payload_capacity; + let data_buffer_size = ShredData::capacity().unwrap(); + let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; let num_expected_coding_shreds = (2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) - .saturating_sub(num_expected_data_shreds as usize) - .max(num_expected_data_shreds as usize); + .saturating_sub(num_expected_data_shreds) + .max(num_expected_data_shreds); let start_index = 0; let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -419,14 +419,14 @@ mod tests { start_index, // next_code_index ); let next_index = data_shreds.last().unwrap().index() + 1; - assert_eq!(next_index as u64, num_expected_data_shreds); + assert_eq!(next_index as usize, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); let mut coding_shred_indexes = HashSet::new(); for shred in data_shreds.iter() { assert_eq!(shred.shred_type(), ShredType::Data); let index = shred.index(); - let is_last = index as u64 == num_expected_data_shreds - 1; + let is_last = index as usize == num_expected_data_shreds - 1; verify_test_data_shred( shred, index, @@ -457,7 +457,7 @@ mod tests { assert!(coding_shred_indexes.contains(&i)); } - assert_eq!(data_shred_indexes.len() as u64, num_expected_data_shreds); + assert_eq!(data_shred_indexes.len(), num_expected_data_shreds); assert_eq!(coding_shred_indexes.len(), num_expected_coding_shreds); // Test reassembly @@ -575,8 +575,8 @@ mod tests { let keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); // Create enough entries to make > 1 shred - let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD; - let num_entries = max_ticks_per_n_shreds(1, Some(payload_capacity)) + 1; + let data_buffer_size = ShredData::capacity().unwrap(); + let num_entries = max_ticks_per_n_shreds(1, Some(data_buffer_size)) + 1; let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -624,9 +624,9 @@ mod tests { let entry = Entry::new(&Hash::default(), 1, vec![tx0]); let num_data_shreds: usize = 5; - let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD; + let data_buffer_size = ShredData::capacity().unwrap(); let num_entries = - max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(payload_capacity)); + max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(data_buffer_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index f85a696c7e..163e733442 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -38,7 +38,10 @@ pub fn verify_shred_cpu( if packet.meta.discard() { return false; } - let shred = shred::layout::get_shred(packet); + let shred = match shred::layout::get_shred(packet) { + None => return false, + Some(shred) => shred, + }; let slot = match shred::layout::get_slot(shred) { None => return false, Some(slot) => slot, @@ -53,10 +56,11 @@ pub fn verify_shred_cpu( Some(signature) => signature, }; trace!("signature {}", signature); - let message = match shred::layout::get_signed_message(shred) { - None => return false, - Some(message) => message, - }; + let message = + match shred::layout::get_signed_message_range(shred).and_then(|slice| shred.get(slice)) { + None => return false, + Some(message) => message, + }; signature.verify(pubkey, message) } @@ -101,7 +105,7 @@ where return Slot::MAX; } let shred = shred::layout::get_shred(packet); - match shred::layout::get_slot(shred) { + match shred.and_then(shred::layout::get_slot) { Some(slot) if slot_keys.contains_key(&slot) => slot, _ => Slot::MAX, } @@ -177,8 +181,11 @@ fn shred_gpu_offsets( let sig = shred::layout::get_signature_range(); let sig = add_offset(sig, pubkeys_end); debug_assert_eq!(sig.end - sig.start, std::mem::size_of::()); - let msg = shred::layout::get_signed_message_range(packet); - let msg = add_offset(msg, pubkeys_end); + let shred = shred::layout::get_shred(packet); + // Signature may verify for an empty message but the packet will be + // discarded during deserialization. + let msg = shred.and_then(shred::layout::get_signed_message_range); + let msg = add_offset(msg.unwrap_or_default(), pubkeys_end); signature_offsets.push(sig.start as u32); msg_start_offsets.push(msg.start as u32); let msg_size = msg.end.saturating_sub(msg.start); @@ -267,7 +274,9 @@ pub fn verify_shreds_gpu( fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { let sig = shred::layout::get_signature_range(); - let msg = shred::layout::get_signed_message_range(packet); + let msg = shred::layout::get_shred(packet) + .and_then(shred::layout::get_signed_message_range) + .unwrap(); assert!( packet.meta.size >= sig.end, "packet is not large enough for a signature" @@ -414,7 +423,7 @@ pub fn sign_shreds_gpu( mod tests { use { super::*, - crate::shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD}, + crate::shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY}, solana_sdk::signature::{Keypair, Signer}, }; @@ -589,7 +598,7 @@ mod tests { slot, 0xc0de, i as u16, - &[5; SIZE_OF_DATA_SHRED_PAYLOAD], + &[5; LEGACY_SHRED_DATA_CAPACITY], ShredFlags::LAST_SHRED_IN_SLOT, 1, 2, diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index cdc25189e7..94e36bc6a8 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -3,7 +3,7 @@ use { solana_entry::entry::Entry, solana_ledger::shred::{ max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, - MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD, + LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, }, solana_sdk::{ clock::Slot, @@ -34,7 +34,7 @@ fn test_multi_fec_block_coding() { let num_entries = max_entries_per_n_shred( &entry, num_data_shreds as u64, - Some(SIZE_OF_DATA_SHRED_PAYLOAD), + Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries: Vec<_> = (0..num_entries) @@ -200,7 +200,7 @@ fn setup_different_sized_fec_blocks( let num_entries = max_entries_per_n_shred( &entry, num_shreds_per_iter as u64, - Some(SIZE_OF_DATA_SHRED_PAYLOAD), + Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries: Vec<_> = (0..num_entries) .map(|_| { diff --git a/validator/src/main.rs b/validator/src/main.rs index ec7fde24ee..8e6baf4bb5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -413,7 +413,7 @@ fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option { for entrypoint in entrypoints { match solana_net_utils::get_cluster_shred_version(entrypoint) { Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err), - Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint), + Ok(0) => eprintln!("zero shred-version from entrypoint: {}", entrypoint), Ok(shred_version) => { info!( "obtained shred-version {} from {}",