Cleanup shred header structures (#6378)

automerge
This commit is contained in:
Pankaj Garg 2019-10-15 20:48:45 -07:00 committed by Grimes
parent c1b401a04a
commit 33052c1dd2
4 changed files with 103 additions and 128 deletions

View File

@ -6,7 +6,7 @@ use solana_core::entry::create_ticks;
use solana_core::entry::Entry;
use solana_core::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE,
SIZE_OF_DATA_SHRED_HEADER,
SIZE_OF_SHRED_HEADER,
};
use solana_core::test_tx;
use solana_sdk::hash::Hash;
@ -31,7 +31,7 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec<Ent
#[bench]
fn bench_shredder_ticks(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
@ -45,7 +45,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
#[bench]
fn bench_shredder_large_entries(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
let txs_per_entry = 128;
let num_entries = max_entries_per_n_shred(&make_test_entry(txs_per_entry), num_shreds as u64);
@ -60,7 +60,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
#[bench]
fn bench_deshredder(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
@ -75,7 +75,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
#[bench]
fn bench_deserialize_hdr(bencher: &mut Bencher) {
let data = vec![0; PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER];
let data = vec![0; PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER];
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true);

View File

@ -3175,7 +3175,7 @@ pub mod tests {
// Trying to insert the same shred again should fail
{
let index = index_cf
.get(shred.common_header.coding_header.slot)
.get(shred.coding_header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
@ -3185,13 +3185,13 @@ pub mod tests {
));
}
shred.common_header.coding_header.index += 1;
shred.coding_header.common_header.index += 1;
// Establish a baseline that works
{
let coding_shred = Shred::new_empty_from_header(shred.clone());
let index = index_cf
.get(shred.common_header.coding_header.slot)
.get(shred.coding_header.common_header.slot)
.unwrap()
.unwrap();
assert!(Blocktree::should_insert_coding_shred(
@ -3204,7 +3204,7 @@ pub mod tests {
// Trying to insert a shred with index < position should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
let index = coding_shred.headers.common_header.position - 1;
let index = coding_shred.headers.coding_header.position - 1;
coding_shred.set_index(index as u32);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
@ -3218,7 +3218,7 @@ pub mod tests {
// Trying to insert shred with num_coding == 0 should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.common_header.num_coding_shreds = 0;
coding_shred.headers.coding_header.num_coding_shreds = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3230,8 +3230,8 @@ pub mod tests {
// Trying to insert shred with pos >= num_coding should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.common_header.num_coding_shreds =
coding_shred.headers.common_header.position;
coding_shred.headers.coding_header.num_coding_shreds =
coding_shred.headers.coding_header.position;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3244,9 +3244,9 @@ pub mod tests {
// has index > u32::MAX should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.common_header.num_coding_shreds = 3;
coding_shred.headers.common_header.coding_header.index = std::u32::MAX - 1;
coding_shred.headers.common_header.position = 0;
coding_shred.headers.coding_header.num_coding_shreds = 3;
coding_shred.headers.coding_header.common_header.index = std::u32::MAX - 1;
coding_shred.headers.coding_header.position = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3255,7 +3255,7 @@ pub mod tests {
));
// Decreasing the number of num_coding_shreds will put it within the allowed limit
coding_shred.headers.common_header.num_coding_shreds = 2;
coding_shred.headers.coding_header.num_coding_shreds = 2;
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),

View File

@ -1787,7 +1787,7 @@ mod tests {
use crate::repair_service::RepairType;
use crate::result::Error;
use crate::shred::max_ticks_per_n_shreds;
use crate::shred::{DataShredHeader, Shred};
use crate::shred::{Shred, ShredHeader};
use crate::test_tx::test_tx;
use rayon::prelude::*;
use solana_sdk::hash::Hash;
@ -1947,10 +1947,10 @@ mod tests {
0,
);
assert!(rv.is_empty());
let mut data_shred = DataShredHeader::default();
data_shred.data_header.slot = 2;
data_shred.parent_offset = 1;
data_shred.data_header.index = 1;
let mut data_shred = ShredHeader::default();
data_shred.data_header.common_header.slot = 2;
data_shred.data_header.parent_offset = 1;
data_shred.data_header.common_header.index = 1;
let shred_info = Shred::new_empty_from_header(data_shred);
blocktree

View File

@ -27,14 +27,12 @@ lazy_static! {
{ serialized_size(&CodingShredHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_DATA_SHRED_HEADER: usize =
{ serialized_size(&DataShredHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_COMMON_SHRED_HEADER: usize =
{ serialized_size(&ShredCommonHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_SHRED_HEADER: usize =
{ serialized_size(&ShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_SIGNATURE: usize =
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
pub static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
pub static ref SIZE_OF_PARENT_OFFSET: usize =
{ bincode::serialized_size(&0u16).unwrap() as usize };
pub static ref SIZE_OF_FLAGS: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
pub static ref SIZE_OF_SHRED_TYPE: usize =
{ bincode::serialized_size(&ShredType(DATA_SHRED)).unwrap() as usize };
}
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
@ -56,7 +54,10 @@ pub const RECOMMENDED_FEC_RATE: f32 = 0.25;
const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001;
const DATA_COMPLETE_SHRED: u8 = 0b0000_0010;
/// A common header that is present at start of every shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredType(u8);
/// A common header that is present in data and code shred headers
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
@ -64,59 +65,49 @@ pub struct ShredCommonHeader {
pub index: u32,
}
/// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
/// The data shred header has parent offset and flags
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub common_header: CodingShredHeader,
pub data_header: ShredCommonHeader,
pub common_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
}
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader {
pub shred_type: u8,
pub coding_header: ShredCommonHeader,
pub common_header: ShredCommonHeader,
pub num_data_shreds: u16,
pub num_coding_shreds: u16,
pub position: u16,
}
impl Default for DataShredHeader {
fn default() -> Self {
DataShredHeader {
common_header: CodingShredHeader {
shred_type: DATA_SHRED,
..CodingShredHeader::default()
},
data_header: ShredCommonHeader::default(),
parent_offset: 0,
flags: 0,
}
}
/// A common header that is present at start of every shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredHeader {
pub shred_type: ShredType,
pub coding_header: CodingShredHeader,
pub data_header: DataShredHeader,
}
impl Default for CodingShredHeader {
impl Default for ShredHeader {
fn default() -> Self {
CodingShredHeader {
shred_type: CODING_SHRED,
coding_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
ShredHeader {
shred_type: ShredType(DATA_SHRED),
coding_header: CodingShredHeader::default(),
data_header: DataShredHeader::default(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Shred {
pub headers: DataShredHeader,
pub headers: ShredHeader,
pub payload: Vec<u8>,
}
impl Shred {
fn new(header: DataShredHeader, shred_buf: Vec<u8>) -> Self {
fn new(header: ShredHeader, shred_buf: Vec<u8>) -> Self {
Shred {
headers: header,
payload: shred_buf,
@ -132,74 +123,57 @@ impl Shred {
is_last_in_slot: bool,
) -> Self {
let mut shred_buf = vec![0; PACKET_DATA_SIZE];
let mut header = DataShredHeader::default();
header.data_header.slot = slot;
header.data_header.index = index;
header.parent_offset = parent_offset;
header.flags = 0;
let mut header = ShredHeader::default();
header.data_header.common_header.slot = slot;
header.data_header.common_header.index = index;
header.data_header.parent_offset = parent_offset;
header.data_header.flags = 0;
if is_last_data {
header.flags |= DATA_COMPLETE_SHRED
header.data_header.flags |= DATA_COMPLETE_SHRED
}
if is_last_in_slot {
header.flags |= LAST_SHRED_IN_SLOT
header.data_header.flags |= LAST_SHRED_IN_SLOT
}
if let Some(data) = data {
bincode::serialize_into(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER], &header)
bincode::serialize_into(&mut shred_buf[..*SIZE_OF_SHRED_HEADER], &header)
.expect("Failed to write header into shred buffer");
shred_buf[*SIZE_OF_DATA_SHRED_HEADER..*SIZE_OF_DATA_SHRED_HEADER + data.len()]
shred_buf[*SIZE_OF_SHRED_HEADER..*SIZE_OF_SHRED_HEADER + data.len()]
.clone_from_slice(data);
}
Self::new(header, shred_buf)
}
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> result::Result<T>
where
T: Deserialize<'de>,
{
let ret = bincode::deserialize(&buf[*index..*index + size])?;
*index += size;
Ok(ret)
}
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> result::Result<Self> {
let shred_type: u8 = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?;
let header = if shred_type == CODING_SHRED {
let end = *SIZE_OF_CODING_SHRED_HEADER;
let mut header = DataShredHeader::default();
header.common_header = bincode::deserialize(&shred_buf[..end])?;
let shred_type: ShredType = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?;
let mut header = if shred_type == ShredType(CODING_SHRED) {
let start = *SIZE_OF_SHRED_TYPE;
let end = start + *SIZE_OF_CODING_SHRED_HEADER;
let mut header = ShredHeader::default();
header.coding_header = bincode::deserialize(&shred_buf[start..end])?;
header
} else if shred_type == ShredType(DATA_SHRED) {
let start = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
let end = start + *SIZE_OF_DATA_SHRED_HEADER;
let mut header = ShredHeader::default();
header.data_header = bincode::deserialize(&shred_buf[start..end])?;
header
} else if shred_type == DATA_SHRED {
let mut start = *SIZE_OF_CODING_SHRED_HEADER;
let common_hdr: ShredCommonHeader =
Self::deserialize_obj(&mut start, *SIZE_OF_COMMON_SHRED_HEADER, &shred_buf)?;
let parent_offset: u16 =
Self::deserialize_obj(&mut start, *SIZE_OF_PARENT_OFFSET, &shred_buf)?;
let flags: u8 = Self::deserialize_obj(&mut start, *SIZE_OF_FLAGS, &shred_buf)?;
let mut hdr = DataShredHeader {
common_header: CodingShredHeader::default(),
data_header: common_hdr,
parent_offset,
flags,
};
hdr.common_header.shred_type = shred_type;
hdr
} else {
return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(
Box::new(bincode::ErrorKind::Custom("Invalid shred type".to_string())),
)));
};
header.shred_type = shred_type;
Ok(Self::new(header, shred_buf))
}
pub fn new_empty_from_header(headers: DataShredHeader) -> Self {
pub fn new_empty_from_header(headers: ShredHeader) -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_DATA_SHRED_HEADER]);
let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_SHRED_HEADER]);
bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred");
Shred { headers, payload }
}
@ -207,23 +181,23 @@ impl Shred {
pub fn new_empty_data_shred() -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
payload[0] = DATA_SHRED;
let headers = DataShredHeader::default();
let headers = ShredHeader::default();
Shred { headers, payload }
}
pub fn header(&self) -> &ShredCommonHeader {
if self.is_data() {
&self.headers.data_header
&self.headers.data_header.common_header
} else {
&self.headers.common_header.coding_header
&self.headers.coding_header.common_header
}
}
pub fn header_mut(&mut self) -> &mut ShredCommonHeader {
if self.is_data() {
&mut self.headers.data_header
&mut self.headers.data_header.common_header
} else {
&mut self.headers.common_header.coding_header
&mut self.headers.coding_header.common_header
}
}
@ -233,7 +207,8 @@ impl Shred {
pub fn parent(&self) -> u64 {
if self.is_data() {
self.headers.data_header.slot - u64::from(self.headers.parent_offset)
self.headers.data_header.common_header.slot
- u64::from(self.headers.data_header.parent_offset)
} else {
std::u64::MAX
}
@ -268,12 +243,12 @@ impl Shred {
}
pub fn is_data(&self) -> bool {
self.headers.common_header.shred_type == DATA_SHRED
self.headers.shred_type == ShredType(DATA_SHRED)
}
pub fn last_in_slot(&self) -> bool {
if self.is_data() {
self.headers.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT
self.headers.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT
} else {
false
}
@ -283,13 +258,13 @@ impl Shred {
/// Use this only for test code which doesn't care about actual shred
pub fn set_last_in_slot(&mut self) {
if self.is_data() {
self.headers.flags |= LAST_SHRED_IN_SLOT
self.headers.data_header.flags |= LAST_SHRED_IN_SLOT
}
}
pub fn data_complete(&self) -> bool {
if self.is_data() {
self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
self.headers.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
} else {
false
}
@ -297,7 +272,7 @@ impl Shred {
pub fn coding_params(&self) -> Option<(u16, u16, u16)> {
if !self.is_data() {
let header = &self.headers.common_header;
let header = &self.headers.coding_header;
Some((
header.num_data_shreds,
header.num_coding_shreds,
@ -310,7 +285,7 @@ impl Shred {
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let signed_payload_offset = if self.is_data() {
*SIZE_OF_CODING_SHRED_HEADER
*SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE
} else {
*SIZE_OF_SHRED_TYPE
} + *SIZE_OF_SIGNATURE;
@ -373,7 +348,7 @@ impl Shredder {
bincode::serialize(entries).expect("Expect to serialize all entries");
let serialize_time = now.elapsed().as_millis();
let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
@ -406,7 +381,7 @@ impl Shredder {
Shredder::sign_shred(
&self.keypair,
&mut shred,
*SIZE_OF_CODING_SHRED_HEADER,
*SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE,
);
shred
})
@ -465,14 +440,14 @@ impl Shredder {
num_data: usize,
num_code: usize,
position: usize,
) -> DataShredHeader {
let mut header = DataShredHeader::default();
header.common_header.shred_type = CODING_SHRED;
header.common_header.coding_header.index = index;
header.common_header.coding_header.slot = slot;
header.common_header.num_coding_shreds = num_code as u16;
header.common_header.num_data_shreds = num_data as u16;
header.common_header.position = position as u16;
) -> ShredHeader {
let mut header = ShredHeader::default();
header.shred_type = ShredType(CODING_SHRED);
header.coding_header.common_header.index = index;
header.coding_header.common_header.slot = slot;
header.coding_header.num_coding_shreds = num_code as u16;
header.coding_header.num_data_shreds = num_data as u16;
header.coding_header.position = position as u16;
header
}
@ -492,7 +467,7 @@ impl Shredder {
let start_index = data_shred_batch[0].header().index;
// All information after coding shred field in a data shred is encoded
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER;
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
let data_ptrs: Vec<_> = data_shred_batch
.iter()
.map(|data| &data.payload[coding_block_offset..])
@ -588,7 +563,7 @@ impl Shredder {
let fec_set_size = num_data + num_coding;
if num_coding > 0 && shreds.len() < fec_set_size {
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER;
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
// Let's try recovering missing shreds using erasure
let mut present = &mut vec![true; fec_set_size];
@ -697,7 +672,7 @@ impl Shredder {
data_shred_bufs[..num_data]
.iter()
.flat_map(|data| {
let offset = *SIZE_OF_DATA_SHRED_HEADER;
let offset = *SIZE_OF_SHRED_HEADER;
data[offset as usize..].iter()
})
.cloned()
@ -711,7 +686,7 @@ pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 {
}
pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 {
let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64;
let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64;
let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
let entry_size = bincode::serialized_size(entry).unwrap();
let count_size = vec_size - entry_size;
@ -794,7 +769,7 @@ pub mod tests {
.collect();
let size = serialized_size(&entries).unwrap();
let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64;
let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64;
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
let num_expected_coding_shreds =
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate);
@ -807,8 +782,8 @@ pub mod tests {
let mut data_shred_indexes = HashSet::new();
let mut coding_shred_indexes = HashSet::new();
for shred in data_shreds.iter() {
assert_eq!(shred.headers.common_header.shred_type, DATA_SHRED);
let index = shred.headers.data_header.index;
assert_eq!(shred.headers.shred_type, ShredType(DATA_SHRED));
let index = shred.headers.data_header.common_header.index;
let is_last = index as u64 == num_expected_data_shreds - 1;
verify_test_data_shred(
shred,
@ -825,8 +800,8 @@ pub mod tests {
}
for shred in coding_shreds.iter() {
let index = shred.headers.data_header.index;
assert_eq!(shred.headers.common_header.shred_type, CODING_SHRED);
let index = shred.headers.data_header.common_header.index;
assert_eq!(shred.headers.shred_type, ShredType(CODING_SHRED));
verify_test_code_shred(shred, index, slot, &keypair.pubkey(), true);
assert!(!coding_shred_indexes.contains(&index));
coding_shred_indexes.insert(index);