adds support for different variants of ShredCode and ShredData

The commit implements two new types:
    pub enum ShredCode {
        Legacy(legacy::ShredCode),
    }
    pub enum ShredData {
        Legacy(legacy::ShredData),
    }

Following commits will extend these types by adding merkle variants:
    pub enum ShredCode {
        Legacy(legacy::ShredCode),
        Merkle(merkle::ShredCode),
    }
    pub enum ShredData {
        Legacy(legacy::ShredData),
        Merkle(merkle::ShredData),
    }
This commit is contained in:
behzad nouri 2022-05-30 08:51:19 -04:00
parent a913068512
commit 81231a89b9
15 changed files with 543 additions and 311 deletions

View File

@ -9,7 +9,7 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
Shredder, LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
@ -38,12 +38,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec<Ent
}
fn make_shreds(num_shreds: usize) -> Vec<Shred> {
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let txs_per_entry = 128;
let num_entries = max_entries_per_n_shred(
&make_test_entry(txs_per_entry),
2 * num_shreds as u64,
Some(shred_size),
Some(LEGACY_SHRED_DATA_CAPACITY),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
@ -73,10 +72,10 @@ fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
#[bench]
fn bench_shredder_ticks(bencher: &mut Bencher) {
let kp = Keypair::new();
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let shred_size = LEGACY_SHRED_DATA_CAPACITY;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64;
let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
@ -87,7 +86,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
#[bench]
fn bench_shredder_large_entries(bencher: &mut Bencher) {
let kp = Keypair::new();
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let shred_size = LEGACY_SHRED_DATA_CAPACITY;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
let txs_per_entry = 128;
let num_entries = max_entries_per_n_shred(
@ -106,7 +105,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
#[bench]
fn bench_deshredder(bencher: &mut Bencher) {
let kp = Keypair::new();
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let shred_size = LEGACY_SHRED_DATA_CAPACITY;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
@ -121,7 +120,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
#[bench]
fn bench_deserialize_hdr(bencher: &mut Bencher) {
let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD];
let data = vec![0; LEGACY_SHRED_DATA_CAPACITY];
let shred = Shred::new_from_data(2, 1, 1, &data, ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 1);

View File

@ -3180,7 +3180,7 @@ pub(crate) mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD},
shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
@ -3779,7 +3779,7 @@ pub(crate) mod tests {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let gibberish = [0xa5u8; LEGACY_SHRED_DATA_CAPACITY];
let parent_offset = bank.slot() - bank.parent_slot();
let shred = Shred::new_from_data(
bank.slot(),

View File

@ -43,7 +43,7 @@ impl ShredSigVerifier {
batches
.iter()
.flat_map(PacketBatch::iter)
.map(shred::layout::get_shred)
.filter_map(shred::layout::get_shred)
.filter_map(shred::layout::get_slot)
.collect()
}

View File

@ -3,7 +3,7 @@
extern crate test;
use {
solana_ledger::{
shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD},
shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair},
},
solana_perf::{
@ -29,7 +29,7 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
slot,
0xc0de,
0xdead,
&[5; SIZE_OF_DATA_SHRED_PAYLOAD],
&[5; LEGACY_SHRED_DATA_CAPACITY],
ShredFlags::LAST_SHRED_IN_SLOT,
1,
2,
@ -60,7 +60,7 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) {
slot,
0xc0de,
0xdead,
&[5; SIZE_OF_DATA_SHRED_PAYLOAD],
&[5; LEGACY_SHRED_DATA_CAPACITY],
ShredFlags::LAST_SHRED_IN_SLOT,
1,
2,

View File

@ -14,7 +14,10 @@ use {
},
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder},
shred::{
self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredData, ShredId, ShredType,
Shredder,
},
slot_stats::{ShredSource, SlotsStats},
},
bincode::deserialize,
@ -1556,7 +1559,7 @@ impl Blockstore {
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
let shred = self.data_shred_cf.get_bytes((slot, index))?;
let shred = shred.map(Shred::resize_stored_shred).transpose();
let shred = shred.map(ShredData::resize_stored_shred).transpose();
shred.map_err(|err| {
let err = format!("Invalid stored shred: {}", err);
let err = Box::new(bincode::ErrorKind::Custom(err));

View File

@ -49,8 +49,13 @@
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
//! payload can fit into one coding shred / packet.
pub(crate) use shred_data::ShredData;
pub use {
self::stats::{ProcessShredsStats, ShredFetchStats},
crate::shredder::Shredder,
};
use {
self::traits::{Shred as _, ShredCode as _, ShredData as _},
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::MAX_DATA_SHREDS_PER_SLOT,
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
@ -62,7 +67,6 @@ use {
clock::Slot,
feature_set,
hash::{hashv, Hash},
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
@ -70,52 +74,38 @@ use {
std::fmt::Debug,
thiserror::Error,
};
pub use {
self::{
legacy::{ShredCode, ShredData},
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::Shredder,
};
mod common;
mod legacy;
mod shred_code;
mod shred_data;
mod stats;
mod traits;
pub type Nonce = u32;
pub const SIZE_OF_NONCE: usize = 4;
/// The following constants are computed by hand, and hardcoded.
/// `test_shred_constants` ensures that the values are correct.
/// Constants are used over lazy_static for performance reasons.
const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
const SIZE_OF_DATA_SHRED_HEADER: usize = 5;
const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
const SIZE_OF_DATA_SHRED_HEADERS: usize = 88;
const SIZE_OF_CODING_SHRED_HEADERS: usize = 89;
const SIZE_OF_SIGNATURE: usize = 64;
const SIZE_OF_SHRED_VARIANT: usize = 1;
const SIZE_OF_SHRED_SLOT: usize = 8;
const SIZE_OF_SHRED_INDEX: usize = 4;
pub const SIZE_OF_NONCE: usize = 4;
const_assert_eq!(SIZE_OF_CODING_SHRED_HEADERS, 89);
const SIZE_OF_CODING_SHRED_HEADERS: usize =
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
// Maximum size of data that a data-shred may contain (excluding headers).
const_assert_eq!(SIZE_OF_DATA_SHRED_PAYLOAD, 1051);
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
- SIZE_OF_COMMON_SHRED_HEADER
- SIZE_OF_DATA_SHRED_HEADER
- SIZE_OF_CODING_SHRED_HEADERS
- SIZE_OF_NONCE;
const_assert_eq!(SHRED_DATA_OFFSET, 88);
const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
const_assert_eq!(SHRED_PAYLOAD_SIZE, 1228);
const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32;
// For legacy tests and benchmarks.
const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051);
pub const LEGACY_SHRED_DATA_CAPACITY: usize = legacy::ShredData::CAPACITY;
// LAST_SHRED_IN_SLOT also implies DATA_COMPLETE_SHRED.
// So it cannot be LAST_SHRED_IN_SLOT if not also DATA_COMPLETE_SHRED.
bitflags! {
@ -278,14 +268,14 @@ macro_rules! dispatch {
impl Shred {
dispatch!(fn common_header(&self) -> &ShredCommonHeader);
dispatch!(fn set_signature(&mut self, signature: Signature));
dispatch!(fn signed_payload(&self) -> &[u8]);
dispatch!(fn signed_message(&self) -> &[u8]);
// Returns the portion of the shred's payload which is erasure coded.
dispatch!(pub(crate) fn erasure_shard(self) -> Result<Vec<u8>, Error>);
// Like Shred::erasure_shard but returning a slice.
dispatch!(pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>);
// Returns the shard index within the erasure coding set.
dispatch!(pub(crate) fn erasure_shard_index(&self) -> Option<usize>);
dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result<usize, Error>);
dispatch!(pub fn into_payload(self) -> Vec<u8>);
dispatch!(pub fn payload(&self) -> &Vec<u8>);
@ -329,11 +319,11 @@ impl Shred {
Ok(match layout::get_shred_variant(&shred)? {
ShredVariant::LegacyCode => {
let shred = legacy::ShredCode::from_payload(shred)?;
Self::from(shred)
Self::from(ShredCode::from(shred))
}
ShredVariant::LegacyData => {
let shred = legacy::ShredData::from_payload(shred)?;
Self::from(shred)
Self::from(ShredData::from(shred))
}
})
}
@ -396,14 +386,6 @@ impl Shred {
}
}
// Possibly zero pads bytes stored in blockstore.
pub(crate) fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error> {
match layout::get_shred_type(&shred)? {
ShredType::Code => ShredCode::resize_stored_shred(shred),
ShredType::Data => ShredData::resize_stored_shred(shred),
}
}
pub fn fec_set_index(&self) -> u32 {
self.common_header().fec_set_index
}
@ -429,7 +411,7 @@ impl Shred {
}
pub fn sign(&mut self, keypair: &Keypair) {
let signature = keypair.sign_message(self.signed_payload());
let signature = keypair.sign_message(self.signed_message());
self.set_signature(signature);
}
@ -494,7 +476,7 @@ impl Shred {
}
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let message = self.signed_payload();
let message = self.signed_message();
self.signature().verify(pubkey.as_ref(), message)
}
@ -526,16 +508,20 @@ impl Shred {
pub mod layout {
use {super::*, std::ops::Range};
fn get_shred_size(packet: &Packet) -> usize {
fn get_shred_size(packet: &Packet) -> Option<usize> {
let size = packet.data().len();
if packet.meta.repair() {
packet.meta.size.saturating_sub(SIZE_OF_NONCE)
size.checked_sub(SIZE_OF_NONCE)
} else {
packet.meta.size
Some(size)
}
}
pub fn get_shred(packet: &Packet) -> &[u8] {
&packet.data()[..get_shred_size(packet)]
pub fn get_shred(packet: &Packet) -> Option<&[u8]> {
let size = get_shred_size(packet)?;
let shred = packet.data().get(..size)?;
// Should at least have a signature.
(size >= SIZE_OF_SIGNATURE).then(|| shred)
}
pub(crate) fn get_signature(shred: &[u8]) -> Option<Signature> {
@ -567,14 +553,12 @@ pub mod layout {
deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_INDEX..)?).ok()
}
// Returns chunk of the payload which is signed.
pub(crate) fn get_signed_message(shred: &[u8]) -> Option<&[u8]> {
shred.get(SIZE_OF_SIGNATURE..)
}
// Returns slice range of the packet payload which is signed.
pub(crate) fn get_signed_message_range(packet: &Packet) -> Range<usize> {
SIZE_OF_SIGNATURE..get_shred_size(packet)
// Returns slice range of the shred payload which is signed.
pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option<Range<usize>> {
let range = match get_shred_variant(shred).ok()? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => legacy::SIGNED_MESSAGE_RANGE,
};
(shred.len() <= range.end).then(|| range)
}
pub(crate) fn get_reference_tick(shred: &[u8]) -> Result<u8, Error> {
@ -640,7 +624,13 @@ pub fn get_shred_slot_index_type(
packet: &Packet,
stats: &mut ShredFetchStats,
) -> Option<(Slot, u32, ShredType)> {
let shred = layout::get_shred(packet);
let shred = match layout::get_shred(packet) {
None => {
stats.index_overrun += 1;
return None;
}
Some(shred) => shred,
};
if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() {
stats.index_overrun += 1;
return None;
@ -683,7 +673,8 @@ pub fn max_entries_per_n_shred(
num_shreds: u64,
shred_data_size: Option<usize>,
) -> u64 {
let shred_data_size = shred_data_size.unwrap_or(SIZE_OF_DATA_SHRED_PAYLOAD) as u64;
let data_buffer_size = ShredData::capacity().unwrap();
let shred_data_size = shred_data_size.unwrap_or(data_buffer_size) as u64;
let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
let entry_size = bincode::serialized_size(entry).unwrap();
let count_size = vec_size - entry_size;
@ -702,7 +693,6 @@ pub fn verify_test_data_shred(
is_last_data: bool,
) {
shred.sanitize().unwrap();
assert_eq!(shred.payload().len(), SHRED_PAYLOAD_SIZE);
assert!(shred.is_data());
assert_eq!(shred.index(), index);
assert_eq!(shred.slot(), slot);
@ -775,11 +765,11 @@ mod tests {
serialized_size(&common_header).unwrap() as usize
);
assert_eq!(
SIZE_OF_CODING_SHRED_HEADER,
SIZE_OF_CODING_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER,
serialized_size(&coding_shred_header).unwrap() as usize
);
assert_eq!(
SIZE_OF_DATA_SHRED_HEADER,
SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER,
serialized_size(&data_shred_header).unwrap() as usize
);
let data_shred_header_with_size = DataShredHeader {
@ -787,7 +777,7 @@ mod tests {
..data_shred_header
};
assert_eq!(
SIZE_OF_DATA_SHRED_HEADER,
SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_COMMON_SHRED_HEADER,
serialized_size(&data_shred_header_with_size).unwrap() as usize
);
assert_eq!(
@ -1011,7 +1001,7 @@ mod tests {
let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap();
ChaChaRng::from_seed(seed)
};
let mut data = [0u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let mut data = [0u8; legacy::ShredData::CAPACITY];
rng.fill(&mut data[..]);
let keypair = Keypair::generate(&mut rng);
let mut shred = Shred::new_from_data(
@ -1029,7 +1019,7 @@ mod tests {
assert_matches!(shred.sanitize(), Ok(()));
let mut payload = bs58_decode(PAYLOAD);
payload.extend({
let skip = payload.len() - SHRED_DATA_OFFSET;
let skip = payload.len() - SIZE_OF_DATA_SHRED_HEADERS;
data.iter().skip(skip).copied()
});
let mut packet = Packet::default();
@ -1100,7 +1090,7 @@ mod tests {
let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap();
ChaChaRng::from_seed(seed)
};
let mut parity_shard = vec![0u8; /*ENCODED_PAYLOAD_SIZE:*/ 1139];
let mut parity_shard = vec![0u8; legacy::SIZE_OF_ERASURE_ENCODED_SLICE];
rng.fill(&mut parity_shard[..]);
let keypair = Keypair::generate(&mut rng);
let mut shred = Shred::new_from_parity_shard(

View File

@ -0,0 +1,63 @@
macro_rules! dispatch {
($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::Legacy(shred) => shred.$name($($arg, )?),
}
}
};
($vis:vis fn $name:ident(self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::Legacy(shred) => shred.$name($($arg, )?),
}
}
};
($vis:vis fn $name:ident(&mut self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&mut self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::Legacy(shred) => shred.$name($($arg, )?),
}
}
}
}
macro_rules! impl_shred_common {
() => {
#[inline]
fn common_header(&self) -> &ShredCommonHeader {
&self.common_header
}
#[inline]
fn payload(&self) -> &Vec<u8> {
&self.payload
}
fn into_payload(self) -> Vec<u8> {
self.payload
}
fn set_signature(&mut self, signature: Signature) {
bincode::serialize_into(&mut self.payload[..], &signature).unwrap();
self.common_header.signature = signature;
}
// Only for tests.
fn set_index(&mut self, index: u32) {
self.common_header.index = index;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
// Only for tests.
fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
};
}
pub(super) use {dispatch, impl_shred_common};

View File

@ -1,79 +1,57 @@
use {
crate::shred::{
traits::{Shred, ShredCode as _, ShredData as _},
common::impl_shred_common,
shred_code, shred_data,
traits::{Shred, ShredCode as ShredCodeTrait, ShredData as ShredDataTrait},
CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant,
MAX_DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SHRED_DATA_OFFSET,
SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER,
SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, SIZE_OF_SIGNATURE,
SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADERS,
SIZE_OF_SIGNATURE,
},
solana_perf::packet::deserialize_from_with_limit,
solana_sdk::{clock::Slot, signature::Signature},
static_assertions::const_assert_eq,
std::{io::Cursor, ops::RangeInclusive},
std::{io::Cursor, ops::Range},
};
// DataShredHeader.size is sum of common-shred-header, data-shred-header and
// data.len(). Broadcast stage may create zero length data shreds when the
// previous slot was interrupted:
// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79
const DATA_SHRED_SIZE_RANGE: RangeInclusive<usize> =
SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD;
// All payload including any zero paddings are signed.
// Code and data shreds have the same payload size.
pub(super) const SIGNED_MESSAGE_RANGE: Range<usize> = SIZE_OF_SIGNATURE..ShredData::SIZE_OF_PAYLOAD;
const_assert_eq!(ShredData::SIZE_OF_PAYLOAD, ShredCode::SIZE_OF_PAYLOAD);
const_assert_eq!(ShredData::SIZE_OF_PAYLOAD, 1228);
const_assert_eq!(ShredData::CAPACITY, 1051);
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
// is never used and is not part of erasure coding.
const_assert_eq!(ENCODED_PAYLOAD_SIZE, 1139);
const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
const_assert_eq!(SIZE_OF_ERASURE_ENCODED_SLICE, 1139);
pub(super) const SIZE_OF_ERASURE_ENCODED_SLICE: usize =
ShredCode::SIZE_OF_PAYLOAD - SIZE_OF_CODING_SHRED_HEADERS;
#[derive(Clone, Debug, PartialEq, Eq)]
// Layout: {common, data} headers | data | zero padding
// Everything up to SIZE_OF_CODING_SHRED_HEADERS bytes at the end (which is
// part of zero padding) is erasure coded.
// All payload past signature, including the entirety of zero paddings, is
// signed.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ShredData {
common_header: ShredCommonHeader,
data_header: DataShredHeader,
payload: Vec<u8>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
// Layout: {common, coding} headers | erasure coded shard
// All payload past signature is singed.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ShredCode {
common_header: ShredCommonHeader,
coding_header: CodingShredHeader,
payload: Vec<u8>,
}
macro_rules! impl_shred_common {
() => {
#[inline]
fn common_header(&self) -> &ShredCommonHeader {
&self.common_header
}
#[inline]
fn payload(&self) -> &Vec<u8> {
&self.payload
}
fn into_payload(self) -> Vec<u8> {
self.payload
}
fn set_signature(&mut self, signature: Signature) {
bincode::serialize_into(&mut self.payload[..], &signature).unwrap();
self.common_header.signature = signature;
}
// Only for tests.
fn set_index(&mut self, index: u32) {
self.common_header.index = index;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
// Only for tests.
fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}
};
}
impl Shred for ShredData {
impl_shred_common!();
// Legacy data shreds are always zero padded and
// the same size as coding shreds.
const SIZE_OF_PAYLOAD: usize = shred_code::ShredCode::SIZE_OF_PAYLOAD;
fn from_payload(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut cursor = Cursor::new(&payload[..]);
@ -82,8 +60,14 @@ impl Shred for ShredData {
return Err(Error::InvalidShredVariant);
}
let data_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/16602
payload.resize(SHRED_PAYLOAD_SIZE, 0u8);
// Shreds stored to blockstore may have trailing zeros trimmed.
// Repair packets have nonce at the end of packet payload; see:
// https://github.com/solana-labs/solana/pull/10109
// https://github.com/solana-labs/solana/pull/16602
if payload.len() < SIZE_OF_DATA_SHRED_HEADERS {
return Err(Error::InvalidPayloadSize(payload.len()));
}
payload.resize(Self::SIZE_OF_PAYLOAD, 0u8);
let shred = Self {
common_header,
data_header,
@ -92,73 +76,46 @@ impl Shred for ShredData {
shred.sanitize().map(|_| shred)
}
fn erasure_shard_index(&self) -> Option<usize> {
let fec_set_index = self.common_header.fec_set_index;
let index = self.common_header.index.checked_sub(fec_set_index)?;
usize::try_from(index).ok()
fn erasure_shard_index(&self) -> Result<usize, Error> {
shred_data::erasure_shard_index(self).ok_or_else(|| {
let headers = Box::new((self.common_header, self.data_header));
Error::InvalidErasureShardIndex(headers)
})
}
fn erasure_shard(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
if self.payload.len() != Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
let mut shard = self.payload;
shard.resize(ENCODED_PAYLOAD_SIZE, 0u8);
shard.truncate(SIZE_OF_ERASURE_ENCODED_SLICE);
Ok(shard)
}
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
if self.payload.len() != Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
Ok(&self.payload[..ENCODED_PAYLOAD_SIZE])
}
fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
// TODO: assert that this is the right type!
if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Ok(shred)
Ok(&self.payload[..SIZE_OF_ERASURE_ENCODED_SLICE])
}
fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
match self.common_header.shred_variant {
ShredVariant::LegacyData => (),
_ => return Err(Error::InvalidShredVariant),
}
if self.erasure_shard_index().is_none() {
let headers = Box::new((self.common_header, self.data_header));
return Err(Error::InvalidErasureShardIndex(headers));
}
if self.common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex(self.common_header.index));
}
let _parent = self.parent()?;
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
let flags = self.data_header.flags;
if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT)
&& !flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
{
return Err(Error::InvalidShredFlags(self.data_header.flags.bits()));
}
Ok(())
shred_data::sanitize(self)
}
fn signed_payload(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE);
fn signed_message(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), Self::SIZE_OF_PAYLOAD);
&self.payload[SIZE_OF_SIGNATURE..]
}
}
impl Shred for ShredCode {
impl_shred_common!();
const SIZE_OF_PAYLOAD: usize = shred_code::ShredCode::SIZE_OF_PAYLOAD;
fn from_payload(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut cursor = Cursor::new(&payload[..]);
@ -167,8 +124,9 @@ impl Shred for ShredCode {
return Err(Error::InvalidShredVariant);
}
let coding_header = deserialize_from_with_limit(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/10109
payload.truncate(SHRED_PAYLOAD_SIZE);
// Repair packets have nonce at the end of packet payload:
// https://github.com/solana-labs/solana/pull/10109
payload.truncate(Self::SIZE_OF_PAYLOAD);
let shred = Self {
common_header,
coding_header,
@ -177,25 +135,15 @@ impl Shred for ShredCode {
shred.sanitize().map(|_| shred)
}
fn erasure_shard_index(&self) -> Option<usize> {
// Assert that the last shred index in the erasure set does not
// overshoot u32.
self.common_header.fec_set_index.checked_add(u32::from(
self.coding_header.num_data_shreds.checked_sub(1)?,
))?;
self.first_coding_index()?.checked_add(u32::from(
self.coding_header.num_coding_shreds.checked_sub(1)?,
))?;
let num_data_shreds = usize::from(self.coding_header.num_data_shreds);
let num_coding_shreds = usize::from(self.coding_header.num_coding_shreds);
let position = usize::from(self.coding_header.position);
let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?;
let index = position.checked_add(num_data_shreds)?;
(index < fec_set_size).then(|| index)
fn erasure_shard_index(&self) -> Result<usize, Error> {
shred_code::erasure_shard_index(self).ok_or_else(|| {
let headers = Box::new((self.common_header, self.coding_header));
Error::InvalidErasureShardIndex(headers)
})
}
fn erasure_shard(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
if self.payload.len() != Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
let mut shard = self.payload;
@ -207,43 +155,27 @@ impl Shred for ShredCode {
}
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
if self.payload.len() != Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(self.payload.len()));
}
Ok(&self.payload[SIZE_OF_CODING_SHRED_HEADERS..])
}
fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error> {
if shred.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(shred.len()));
}
Ok(shred)
}
fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(self.payload.len()));
match self.common_header.shred_variant {
ShredVariant::LegacyCode => (),
_ => return Err(Error::InvalidShredVariant),
}
if self.erasure_shard_index().is_none() {
let headers = Box::new((self.common_header, self.coding_header));
return Err(Error::InvalidErasureShardIndex(headers));
}
let num_coding_shreds = u32::from(self.coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
self.coding_header.num_coding_shreds,
));
}
Ok(())
shred_code::sanitize(self)
}
fn signed_payload(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), SHRED_PAYLOAD_SIZE);
fn signed_message(&self) -> &[u8] {
debug_assert_eq!(self.payload.len(), Self::SIZE_OF_PAYLOAD);
&self.payload[SIZE_OF_SIGNATURE..]
}
}
impl super::traits::ShredData for ShredData {
impl ShredDataTrait for ShredData {
#[inline]
fn data_header(&self) -> &DataShredHeader {
&self.data_header
@ -251,19 +183,16 @@ impl super::traits::ShredData for ShredData {
fn data(&self) -> Result<&[u8], Error> {
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
if size > self.payload.len()
|| size < SIZE_OF_DATA_SHRED_HEADERS
|| size > SIZE_OF_DATA_SHRED_HEADERS + Self::CAPACITY
{
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
Ok(&self.payload[SHRED_DATA_OFFSET..size])
}
fn bytes_to_store(&self) -> &[u8] {
// Payload will be padded out to SHRED_PAYLOAD_SIZE.
// But only need to store the bytes within data_header.size.
&self.payload[..self.data_header.size as usize]
Ok(&self.payload[SIZE_OF_DATA_SHRED_HEADERS..size])
}
// Only for tests.
@ -274,7 +203,7 @@ impl super::traits::ShredData for ShredData {
}
}
impl super::traits::ShredCode for ShredCode {
impl ShredCodeTrait for ShredCode {
#[inline]
fn coding_header(&self) -> &CodingShredHeader {
&self.coding_header
@ -282,6 +211,10 @@ impl super::traits::ShredCode for ShredCode {
}
impl ShredData {
// Maximum size of ledger data that can be embedded in a data-shred.
pub(super) const CAPACITY: usize =
Self::SIZE_OF_PAYLOAD - SIZE_OF_DATA_SHRED_HEADERS - SIZE_OF_CODING_SHRED_HEADERS;
pub(super) fn new_from_data(
slot: Slot,
index: u32,
@ -292,7 +225,7 @@ impl ShredData {
version: u16,
fec_set_index: u32,
) -> Self {
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut payload = vec![0; Self::SIZE_OF_PAYLOAD];
let common_header = ShredCommonHeader {
signature: Signature::default(),
shred_variant: ShredVariant::LegacyData,
@ -301,7 +234,7 @@ impl ShredData {
version,
fec_set_index,
};
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER) as u16;
let size = (data.len() + SIZE_OF_DATA_SHRED_HEADERS) as u16;
let flags = flags
| unsafe {
ShredFlags::from_bits_unchecked(
@ -320,7 +253,7 @@ impl ShredData {
bincode::serialize_into(&mut cursor, &data_header).unwrap();
// TODO: Need to check if data is too large!
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SHRED_DATA_OFFSET);
debug_assert_eq!(offset, SIZE_OF_DATA_SHRED_HEADERS);
payload[offset..offset + data.len()].copy_from_slice(data);
Self {
common_header,
@ -328,6 +261,21 @@ impl ShredData {
payload,
}
}
pub(super) fn bytes_to_store(&self) -> &[u8] {
// Payload will be padded out to Self::SIZE_OF_PAYLOAD.
// But only need to store the bytes within data_header.size.
&self.payload[..self.data_header.size as usize]
}
pub(super) fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
// Old shreds might have been extra zero padded.
if !(SIZE_OF_DATA_SHRED_HEADERS..=ShredCode::SIZE_OF_PAYLOAD).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(Self::SIZE_OF_PAYLOAD, 0u8);
Ok(shred)
}
}
impl ShredCode {
@ -354,7 +302,7 @@ impl ShredCode {
num_coding_shreds,
position,
};
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut payload = vec![0; Self::SIZE_OF_PAYLOAD];
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &coding_header).unwrap();
@ -370,27 +318,15 @@ impl ShredCode {
payload,
}
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(super) fn erasure_mismatch(&self, other: &ShredCode) -> bool {
let CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position: _,
} = self.coding_header;
num_coding_shreds != other.coding_header.num_coding_shreds
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index()
}
}
#[cfg(test)]
mod test {
use {super::*, matches::assert_matches};
use {super::*, crate::shred::MAX_DATA_SHREDS_PER_SLOT, matches::assert_matches};
#[test]
fn test_sanitize_data_shred() {
let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let data = [0xa5u8; ShredData::CAPACITY];
let mut shred = ShredData::new_from_data(
420, // slot
19, // index

View File

@ -0,0 +1,128 @@
use {
crate::shred::{
common::dispatch,
legacy,
traits::{Shred, ShredCode as ShredCodeTrait},
CodingShredHeader, Error, ShredCommonHeader, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_NONCE,
},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature},
static_assertions::const_assert_eq,
};
const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228);
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ShredCode {
Legacy(legacy::ShredCode),
}
impl ShredCode {
pub(super) const SIZE_OF_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
dispatch!(fn coding_header(&self) -> &CodingShredHeader);
dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader);
dispatch!(pub(super) fn erasure_shard(self) -> Result<Vec<u8>, Error>);
dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>);
dispatch!(pub(super) fn erasure_shard_index(&self) -> Result<usize, Error>);
dispatch!(pub(super) fn first_coding_index(&self) -> Option<u32>);
dispatch!(pub(super) fn into_payload(self) -> Vec<u8>);
dispatch!(pub(super) fn payload(&self) -> &Vec<u8>);
dispatch!(pub(super) fn sanitize(&self) -> Result<(), Error>);
dispatch!(pub(super) fn set_signature(&mut self, signature: Signature));
dispatch!(pub(super) fn signed_message(&self) -> &[u8]);
// Only for tests.
dispatch!(pub(super) fn set_index(&mut self, index: u32));
dispatch!(pub(super) fn set_slot(&mut self, slot: Slot));
pub(super) fn new_from_parity_shard(
slot: Slot,
index: u32,
parity_shard: &[u8],
fec_set_index: u32,
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
version: u16,
) -> Self {
Self::from(legacy::ShredCode::new_from_parity_shard(
slot,
index,
parity_shard,
fec_set_index,
num_data_shreds,
num_coding_shreds,
position,
version,
))
}
pub(super) fn num_data_shreds(&self) -> u16 {
self.coding_header().num_data_shreds
}
pub(super) fn num_coding_shreds(&self) -> u16 {
self.coding_header().num_coding_shreds
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(super) fn erasure_mismatch(&self, other: &ShredCode) -> bool {
match (self, other) {
(Self::Legacy(shred), Self::Legacy(other)) => erasure_mismatch(shred, other),
}
}
}
impl From<legacy::ShredCode> for ShredCode {
fn from(shred: legacy::ShredCode) -> Self {
Self::Legacy(shred)
}
}
#[inline]
pub(super) fn erasure_shard_index<T: ShredCodeTrait>(shred: &T) -> Option<usize> {
// Assert that the last shred index in the erasure set does not
// overshoot u32.
let common_header = shred.common_header();
let coding_header = shred.coding_header();
common_header
.fec_set_index
.checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))?;
shred
.first_coding_index()?
.checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))?;
let num_data_shreds = usize::from(coding_header.num_data_shreds);
let num_coding_shreds = usize::from(coding_header.num_coding_shreds);
let position = usize::from(coding_header.position);
let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?;
let index = position.checked_add(num_data_shreds)?;
(index < fec_set_size).then(|| index)
}
pub(super) fn sanitize<T: ShredCodeTrait>(shred: &T) -> Result<(), Error> {
if shred.payload().len() != T::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(shred.payload().len()));
}
let coding_header = shred.coding_header();
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
let num_coding_shreds = u32::from(coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
coding_header.num_coding_shreds,
));
}
Ok(())
}
pub(super) fn erasure_mismatch<T: ShredCodeTrait>(shred: &T, other: &T) -> bool {
let CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position: _,
} = shred.coding_header();
*num_coding_shreds != other.coding_header().num_coding_shreds
|| *num_data_shreds != other.coding_header().num_data_shreds
|| shred.first_coding_index() != other.first_coding_index()
}

View File

@ -0,0 +1,130 @@
use {
crate::shred::{
self,
common::dispatch,
legacy,
traits::{Shred as _, ShredData as ShredDataTrait},
DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant,
MAX_DATA_SHREDS_PER_SLOT,
},
solana_sdk::{clock::Slot, signature::Signature},
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ShredData {
Legacy(legacy::ShredData),
}
impl ShredData {
dispatch!(fn data_header(&self) -> &DataShredHeader);
dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader);
dispatch!(pub(super) fn data(&self) -> Result<&[u8], Error>);
dispatch!(pub(super) fn erasure_shard(self) -> Result<Vec<u8>, Error>);
dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>);
dispatch!(pub(super) fn erasure_shard_index(&self) -> Result<usize, Error>);
dispatch!(pub(super) fn into_payload(self) -> Vec<u8>);
dispatch!(pub(super) fn parent(&self) -> Result<Slot, Error>);
dispatch!(pub(super) fn payload(&self) -> &Vec<u8>);
dispatch!(pub(super) fn sanitize(&self) -> Result<(), Error>);
dispatch!(pub(super) fn set_last_in_slot(&mut self));
dispatch!(pub(super) fn set_signature(&mut self, signature: Signature));
dispatch!(pub(super) fn signed_message(&self) -> &[u8]);
// Only for tests.
dispatch!(pub(super) fn set_index(&mut self, index: u32));
dispatch!(pub(super) fn set_slot(&mut self, slot: Slot));
pub(super) fn new_from_data(
slot: Slot,
index: u32,
parent_offset: u16,
data: &[u8],
flags: ShredFlags,
reference_tick: u8,
version: u16,
fec_set_index: u32,
) -> Self {
Self::from(legacy::ShredData::new_from_data(
slot,
index,
parent_offset,
data,
flags,
reference_tick,
version,
fec_set_index,
))
}
pub(super) fn last_in_slot(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::LAST_SHRED_IN_SLOT)
}
pub(super) fn data_complete(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
}
pub(super) fn reference_tick(&self) -> u8 {
let flags = self.data_header().flags;
(flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits()
}
// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
pub(super) fn bytes_to_store(&self) -> &[u8] {
match self {
Self::Legacy(shred) => shred.bytes_to_store(),
}
}
// Possibly zero pads bytes stored in blockstore.
pub(crate) fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error> {
match shred::layout::get_shred_variant(&shred)? {
ShredVariant::LegacyCode => Err(Error::InvalidShredType),
ShredVariant::LegacyData => legacy::ShredData::resize_stored_shred(shred),
}
}
// Maximum size of ledger data that can be embedded in a data-shred.
pub(crate) fn capacity() -> Result<usize, Error> {
Ok(legacy::ShredData::CAPACITY)
}
}
impl From<legacy::ShredData> for ShredData {
fn from(shred: legacy::ShredData) -> Self {
Self::Legacy(shred)
}
}
#[inline]
pub(super) fn erasure_shard_index<T: ShredDataTrait>(shred: &T) -> Option<usize> {
let fec_set_index = shred.common_header().fec_set_index;
let index = shred.common_header().index.checked_sub(fec_set_index)?;
usize::try_from(index).ok()
}
pub(super) fn sanitize<T: ShredDataTrait>(shred: &T) -> Result<(), Error> {
if shred.payload().len() != T::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(shred.payload().len()));
}
let common_header = shred.common_header();
let data_header = shred.data_header();
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex(common_header.index));
}
let _data = shred.data()?;
let _parent = shred.parent()?;
let flags = data_header.flags;
if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT)
&& !flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
{
return Err(Error::InvalidShredFlags(data_header.flags.bits()));
}
Ok(())
}

View File

@ -1,9 +1,13 @@
use {
crate::shred::{CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags},
crate::shred::{CodingShredHeader, DataShredHeader, Error, ShredCommonHeader},
solana_sdk::{clock::Slot, signature::Signature},
};
pub(super) trait Shred: Sized {
// Total size of payload including headers, merkle
// branches (if any), zero paddings, etc.
const SIZE_OF_PAYLOAD: usize;
fn from_payload(shred: Vec<u8>) -> Result<Self, Error>;
fn common_header(&self) -> &ShredCommonHeader;
fn sanitize(&self) -> Result<(), Error>;
@ -14,17 +18,14 @@ pub(super) trait Shred: Sized {
fn into_payload(self) -> Vec<u8>;
// Returns the shard index within the erasure coding set.
fn erasure_shard_index(&self) -> Option<usize>;
fn erasure_shard_index(&self) -> Result<usize, Error>;
// Returns the portion of the shred's payload which is erasure coded.
fn erasure_shard(self) -> Result<Vec<u8>, Error>;
// Like Shred::erasure_shard but returning a slice.
fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>;
// Portion of the payload which is signed.
fn signed_payload(&self) -> &[u8];
// Possibly zero pads bytes stored in blockstore.
fn resize_stored_shred(shred: Vec<u8>) -> Result<Vec<u8>, Error>;
fn signed_message(&self) -> &[u8];
// Only for tests.
fn set_index(&mut self, index: u32);
@ -52,25 +53,6 @@ pub(super) trait ShredData: Shred {
fn data(&self) -> Result<&[u8], Error>;
// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
fn bytes_to_store(&self) -> &[u8];
fn last_in_slot(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::LAST_SHRED_IN_SLOT)
}
fn data_complete(&self) -> bool {
let flags = self.data_header().flags;
flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
}
fn reference_tick(&self) -> u8 {
let flags = self.data_header().flags;
(flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits()
}
// Only for tests.
fn set_last_in_slot(&mut self);
}
@ -82,12 +64,4 @@ pub(super) trait ShredCode: Shred {
let position = u32::from(self.coding_header().position);
self.common_header().index.checked_sub(position)
}
fn num_data_shreds(&self) -> u16 {
self.coding_header().num_data_shreds
}
fn num_coding_shreds(&self) -> u16 {
self.coding_header().num_coding_shreds
}
}

View File

@ -1,7 +1,6 @@
use {
crate::shred::{
Error, ProcessShredsStats, Shred, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SIZE_OF_DATA_SHRED_PAYLOAD,
Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK,
},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
@ -110,9 +109,9 @@ impl Shredder {
serialize_time.stop();
let mut gen_data_time = Measure::start("shred_gen_data_time");
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
let data_buffer_size = ShredData::capacity().unwrap();
// Integer division to ensure we have enough shreds to fit all the data
let num_shreds = (serialized_shreds.len() + payload_capacity - 1) / payload_capacity;
let num_shreds = (serialized_shreds.len() + data_buffer_size - 1) / data_buffer_size;
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
// 1) Generate data shreds
let make_data_shred = |shred_index: u32, data| {
@ -141,7 +140,7 @@ impl Shredder {
};
let data_shreds: Vec<Shred> = PAR_THREAD_POOL.install(|| {
serialized_shreds
.par_chunks(payload_capacity)
.par_chunks(data_buffer_size)
.enumerate()
.map(|(i, shred_data)| {
let shred_index = next_shred_index + i as u32;
@ -299,7 +298,7 @@ impl Shredder {
let mut shards = vec![None; fec_set_size];
for shred in shreds {
let index = match shred.erasure_shard_index() {
Some(index) if index < fec_set_size => index,
Ok(index) if index < fec_set_size => index,
_ => return Err(Error::from(InvalidIndex)),
};
shards[index] = Some(shred.erasure_shard()?);
@ -317,8 +316,8 @@ impl Shredder {
shred.slot() == slot
&& shred.is_data()
&& match shred.erasure_shard_index() {
Some(index) => index < num_data_shreds,
None => false,
Ok(index) => index < num_data_shreds,
Err(_) => false,
}
})
.collect();
@ -342,7 +341,8 @@ impl Shredder {
// For backward compatibility. This is needed when the data shred
// payload is None, so that deserializing to Vec<Entry> results in
// an empty vector.
Ok(vec![0u8; SIZE_OF_DATA_SHRED_PAYLOAD])
let data_buffer_size = ShredData::capacity().unwrap();
Ok(vec![0u8; data_buffer_size])
} else {
Ok(data)
}
@ -403,13 +403,13 @@ mod tests {
})
.collect();
let size = serialized_size(&entries).unwrap();
let size = serialized_size(&entries).unwrap() as usize;
// Integer division to ensure we have enough shreds to fit all the data
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
let num_expected_data_shreds = (size + payload_capacity - 1) / payload_capacity;
let data_buffer_size = ShredData::capacity().unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_coding_shreds = (2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
.saturating_sub(num_expected_data_shreds as usize)
.max(num_expected_data_shreds as usize);
.saturating_sub(num_expected_data_shreds)
.max(num_expected_data_shreds);
let start_index = 0;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
@ -419,14 +419,14 @@ mod tests {
start_index, // next_code_index
);
let next_index = data_shreds.last().unwrap().index() + 1;
assert_eq!(next_index as u64, num_expected_data_shreds);
assert_eq!(next_index as usize, num_expected_data_shreds);
let mut data_shred_indexes = HashSet::new();
let mut coding_shred_indexes = HashSet::new();
for shred in data_shreds.iter() {
assert_eq!(shred.shred_type(), ShredType::Data);
let index = shred.index();
let is_last = index as u64 == num_expected_data_shreds - 1;
let is_last = index as usize == num_expected_data_shreds - 1;
verify_test_data_shred(
shred,
index,
@ -457,7 +457,7 @@ mod tests {
assert!(coding_shred_indexes.contains(&i));
}
assert_eq!(data_shred_indexes.len() as u64, num_expected_data_shreds);
assert_eq!(data_shred_indexes.len(), num_expected_data_shreds);
assert_eq!(coding_shred_indexes.len(), num_expected_coding_shreds);
// Test reassembly
@ -575,8 +575,8 @@ mod tests {
let keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap();
// Create enough entries to make > 1 shred
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_entries = max_ticks_per_n_shreds(1, Some(payload_capacity)) + 1;
let data_buffer_size = ShredData::capacity().unwrap();
let num_entries = max_ticks_per_n_shreds(1, Some(data_buffer_size)) + 1;
let entries: Vec<_> = (0..num_entries)
.map(|_| {
let keypair0 = Keypair::new();
@ -624,9 +624,9 @@ mod tests {
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
let num_data_shreds: usize = 5;
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
let data_buffer_size = ShredData::capacity().unwrap();
let num_entries =
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(payload_capacity));
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(data_buffer_size));
let entries: Vec<_> = (0..num_entries)
.map(|_| {
let keypair0 = Keypair::new();

View File

@ -38,7 +38,10 @@ pub fn verify_shred_cpu(
if packet.meta.discard() {
return false;
}
let shred = shred::layout::get_shred(packet);
let shred = match shred::layout::get_shred(packet) {
None => return false,
Some(shred) => shred,
};
let slot = match shred::layout::get_slot(shred) {
None => return false,
Some(slot) => slot,
@ -53,10 +56,11 @@ pub fn verify_shred_cpu(
Some(signature) => signature,
};
trace!("signature {}", signature);
let message = match shred::layout::get_signed_message(shred) {
None => return false,
Some(message) => message,
};
let message =
match shred::layout::get_signed_message_range(shred).and_then(|slice| shred.get(slice)) {
None => return false,
Some(message) => message,
};
signature.verify(pubkey, message)
}
@ -101,7 +105,7 @@ where
return Slot::MAX;
}
let shred = shred::layout::get_shred(packet);
match shred::layout::get_slot(shred) {
match shred.and_then(shred::layout::get_slot) {
Some(slot) if slot_keys.contains_key(&slot) => slot,
_ => Slot::MAX,
}
@ -177,8 +181,11 @@ fn shred_gpu_offsets(
let sig = shred::layout::get_signature_range();
let sig = add_offset(sig, pubkeys_end);
debug_assert_eq!(sig.end - sig.start, std::mem::size_of::<Signature>());
let msg = shred::layout::get_signed_message_range(packet);
let msg = add_offset(msg, pubkeys_end);
let shred = shred::layout::get_shred(packet);
// Signature may verify for an empty message but the packet will be
// discarded during deserialization.
let msg = shred.and_then(shred::layout::get_signed_message_range);
let msg = add_offset(msg.unwrap_or_default(), pubkeys_end);
signature_offsets.push(sig.start as u32);
msg_start_offsets.push(msg.start as u32);
let msg_size = msg.end.saturating_sub(msg.start);
@ -267,7 +274,9 @@ pub fn verify_shreds_gpu(
fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) {
let sig = shred::layout::get_signature_range();
let msg = shred::layout::get_signed_message_range(packet);
let msg = shred::layout::get_shred(packet)
.and_then(shred::layout::get_signed_message_range)
.unwrap();
assert!(
packet.meta.size >= sig.end,
"packet is not large enough for a signature"
@ -414,7 +423,7 @@ pub fn sign_shreds_gpu(
mod tests {
use {
super::*,
crate::shred::{Shred, ShredFlags, SIZE_OF_DATA_SHRED_PAYLOAD},
crate::shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
solana_sdk::signature::{Keypair, Signer},
};
@ -589,7 +598,7 @@ mod tests {
slot,
0xc0de,
i as u16,
&[5; SIZE_OF_DATA_SHRED_PAYLOAD],
&[5; LEGACY_SHRED_DATA_CAPACITY],
ShredFlags::LAST_SHRED_IN_SLOT,
1,
2,

View File

@ -3,7 +3,7 @@ use {
solana_entry::entry::Entry,
solana_ledger::shred::{
max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK,
},
solana_sdk::{
clock::Slot,
@ -34,7 +34,7 @@ fn test_multi_fec_block_coding() {
let num_entries = max_entries_per_n_shred(
&entry,
num_data_shreds as u64,
Some(SIZE_OF_DATA_SHRED_PAYLOAD),
Some(LEGACY_SHRED_DATA_CAPACITY),
);
let entries: Vec<_> = (0..num_entries)
@ -200,7 +200,7 @@ fn setup_different_sized_fec_blocks(
let num_entries = max_entries_per_n_shred(
&entry,
num_shreds_per_iter as u64,
Some(SIZE_OF_DATA_SHRED_PAYLOAD),
Some(LEGACY_SHRED_DATA_CAPACITY),
);
let entries: Vec<_> = (0..num_entries)
.map(|_| {

View File

@ -413,7 +413,7 @@ fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option<u16> {
for entrypoint in entrypoints {
match solana_net_utils::get_cluster_shred_version(entrypoint) {
Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err),
Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint),
Ok(0) => eprintln!("zero shred-version from entrypoint: {}", entrypoint),
Ok(shred_version) => {
info!(
"obtained shred-version {} from {}",