Rework shred headers to fix position of signature (#6451)

* Rework shred headers to fix position of signature

* fix clippy
This commit is contained in:
Pankaj Garg 2019-10-18 22:55:59 -07:00 committed by GitHub
parent e59af8269e
commit badeb4d31a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 293 additions and 201 deletions

View File

@ -6,10 +6,9 @@ use solana_core::test_tx;
use solana_ledger::entry::{create_ticks, Entry};
use solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE,
SIZE_OF_SHRED_HEADER,
SIZE_OF_DATA_SHRED_PAYLOAD,
};
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
use test::Bencher;
@ -30,7 +29,7 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec<Ent
#[bench]
fn bench_shredder_ticks(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let shred_size = *SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
@ -44,7 +43,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
#[bench]
fn bench_shredder_large_entries(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let shred_size = *SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
let txs_per_entry = 128;
let num_entries = max_entries_per_n_shred(&make_test_entry(txs_per_entry), num_shreds as u64);
@ -59,7 +58,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
#[bench]
fn bench_deshredder(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let shred_size = *SIZE_OF_DATA_SHRED_PAYLOAD;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
@ -74,7 +73,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
#[bench]
fn bench_deserialize_hdr(bencher: &mut Bencher) {
let data = vec![0; PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER];
let data = vec![0; *SIZE_OF_DATA_SHRED_PAYLOAD];
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true);

View File

@ -153,7 +153,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "CGL4L6Q2QwiZQDCMwzshqj3S9riroUQuDjx8bS7ra2PU"
let golden: Hash = "F3Grk43JpRUPeCuB8CbYovjxq2Bh77bh4uLB2UXKBFN8"
.parse()
.unwrap();

View File

@ -1788,7 +1788,9 @@ mod tests {
use solana_ledger::blocktree::get_tmp_ledger_path;
use solana_ledger::blocktree::make_many_slot_entries;
use solana_ledger::blocktree::Blocktree;
use solana_ledger::shred::{max_ticks_per_n_shreds, Shred, ShredHeader};
use solana_ledger::shred::{
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
};
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
@ -1946,11 +1948,16 @@ mod tests {
0,
);
assert!(rv.is_empty());
let mut data_shred = ShredHeader::default();
data_shred.data_header.common_header.slot = 2;
data_shred.data_header.parent_offset = 1;
data_shred.data_header.common_header.index = 1;
let shred_info = Shred::new_empty_from_header(data_shred);
let mut common_header = ShredCommonHeader::default();
common_header.slot = 2;
common_header.index = 1;
let mut data_header = DataShredHeader::default();
data_header.parent_offset = 1;
let shred_info = Shred::new_empty_from_header(
common_header,
data_header,
CodingShredHeader::default(),
);
blocktree
.insert_shreds(vec![shred_info], None)

View File

@ -874,7 +874,10 @@ mod test {
use solana_ledger::blocktree::make_slot_entries;
use solana_ledger::blocktree::{entries_to_test_shreds, get_tmp_ledger_path, BlocktreeError};
use solana_ledger::entry;
use solana_ledger::shred::{Shred, ShredHeader, DATA_COMPLETE_SHRED, SIZE_OF_SHRED_HEADER};
use solana_ledger::shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
};
use solana_runtime::genesis_utils::GenesisBlockInfo;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::packet::PACKET_DATA_SIZE;
@ -999,15 +1002,20 @@ mod test {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, _| {
let payload_len = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let payload_len = *SIZE_OF_DATA_SHRED_PAYLOAD;
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut header = ShredHeader::default();
header.data_header.flags = DATA_COMPLETE_SHRED;
let mut shred = Shred::new_empty_from_header(header);
let _ = bincode::serialize_into(
&mut shred.payload[*SIZE_OF_SHRED_HEADER..],
&gibberish[..payload_len],
let mut data_header = DataShredHeader::default();
data_header.flags = DATA_COMPLETE_SHRED;
let mut shred = Shred::new_empty_from_header(
ShredCommonHeader::default(),
data_header,
CodingShredHeader::default(),
);
bincode::serialize_into(
&mut shred.payload[*SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_DATA_SHRED_HEADER..],
&gibberish[..payload_len],
)
.unwrap();
vec![shred]
});

View File

@ -275,10 +275,11 @@ mod test {
service::Service,
};
use rand::{seq::SliceRandom, thread_rng};
use solana_ledger::shred::DataShredHeader;
use solana_ledger::{
blocktree::{get_tmp_ledger_path, make_many_slot_entries, Blocktree},
entry::{create_ticks, Entry},
shred::{Shredder, SIZE_OF_SHRED_TYPE},
shred::Shredder,
};
use solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
@ -345,9 +346,10 @@ mod test {
);
// If it's a coding shred, test that slot >= root
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0);
let mut coding_shred =
Shred::new_empty_from_header(Shredder::new_coding_shred_header(5, 5, 6, 6, 0));
Shredder::sign_shred(&leader_keypair, &mut coding_shred, *SIZE_OF_SHRED_TYPE);
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
Shredder::sign_shred(&leader_keypair, &mut coding_shred);
assert_eq!(
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0),
true

View File

@ -1735,7 +1735,7 @@ pub fn make_chaining_slot_entries(
pub mod tests {
use super::*;
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::shred::max_ticks_per_n_shreds;
use crate::shred::{max_ticks_per_n_shreds, DataShredHeader};
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -3239,8 +3239,12 @@ pub mod tests {
let last_root = RwLock::new(0);
let slot = 1;
let mut shred = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10);
let coding_shred = Shred::new_empty_from_header(shred.clone());
let (mut shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10);
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
// Insert a good coding shred
assert!(Blocktree::should_insert_coding_shred(
@ -3256,10 +3260,7 @@ pub mod tests {
// Trying to insert the same shred again should fail
{
let index = index_cf
.get(shred.coding_header.common_header.slot)
.unwrap()
.unwrap();
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
@ -3267,15 +3268,16 @@ pub mod tests {
));
}
shred.coding_header.common_header.index += 1;
shred.index += 1;
// Establish a baseline that works
{
let coding_shred = Shred::new_empty_from_header(shred.clone());
let index = index_cf
.get(shred.coding_header.common_header.slot)
.unwrap()
.unwrap();
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
@ -3285,8 +3287,12 @@ pub mod tests {
// Trying to insert a shred with index < position should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
let index = coding_shred.headers.coding_header.position - 1;
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = coding_shred.coding_header.position - 1;
coding_shred.set_index(index as u32);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
@ -3299,8 +3305,12 @@ pub mod tests {
// Trying to insert shred with num_coding == 0 should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.coding_header.num_coding_shreds = 0;
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3311,9 +3321,12 @@ pub mod tests {
// Trying to insert shred with pos >= num_coding should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.coding_header.num_coding_shreds =
coding_shred.headers.coding_header.position;
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3325,10 +3338,14 @@ pub mod tests {
// Trying to insert with set_index with num_coding that would imply the last blob
// has index > u32::MAX should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
coding_shred.headers.coding_header.num_coding_shreds = 3;
coding_shred.headers.coding_header.common_header.index = std::u32::MAX - 1;
coding_shred.headers.coding_header.position = 0;
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.common_header.index = std::u32::MAX - 1;
coding_shred.coding_header.position = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
@ -3337,7 +3354,7 @@ pub mod tests {
));
// Decreasing the number of num_coding_shreds will put it within the allowed limit
coding_shred.headers.coding_header.num_coding_shreds = 2;
coding_shred.coding_header.num_coding_shreds = 2;
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
@ -3350,7 +3367,11 @@ pub mod tests {
// Trying to insert value into slot <= than last root should fail
{
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blocktree::should_insert_coding_shred(

View File

@ -14,21 +14,26 @@ use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use std::io;
use std::sync::Arc;
use std::time::Instant;
lazy_static! {
pub static ref SIZE_OF_COMMON_SHRED_HEADER: usize =
{ serialized_size(&ShredCommonHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_CODING_SHRED_HEADER: usize =
{ serialized_size(&CodingShredHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_DATA_SHRED_HEADER: usize =
{ serialized_size(&DataShredHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_SHRED_HEADER: usize =
{ serialized_size(&ShredHeader::default()).unwrap() as usize };
pub static ref SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize =
{ *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_CODING_SHRED_HEADER };
pub static ref SIZE_OF_DATA_SHRED_PAYLOAD: usize = {
PACKET_DATA_SIZE
- *SIZE_OF_COMMON_SHRED_HEADER
- *SIZE_OF_DATA_SHRED_HEADER
- *SIZE_OF_DATA_SHRED_IGNORED_TAIL
};
static ref SIZE_OF_SIGNATURE: usize =
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
pub static ref SIZE_OF_SHRED_TYPE: usize =
{ bincode::serialized_size(&ShredType(DATA_SHRED)).unwrap() as usize };
}
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
@ -68,11 +73,17 @@ impl std::convert::From<std::boxed::Box<bincode::ErrorKind>> for ShredError {
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredType(pub u8);
impl Default for ShredType {
fn default() -> Self {
ShredType(DATA_SHRED)
}
}
/// A common header that is present in data and code shred headers
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
pub shred_type: ShredType,
pub slot: u64,
pub index: u32,
}
@ -80,7 +91,6 @@ pub struct ShredCommonHeader {
/// The data shred header has parent offset and flags
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub common_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
}
@ -88,42 +98,41 @@ pub struct DataShredHeader {
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader {
pub common_header: ShredCommonHeader,
pub num_data_shreds: u16,
pub num_coding_shreds: u16,
pub position: u16,
}
/// A common header that is present at start of every shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredHeader {
pub shred_type: ShredType,
pub coding_header: CodingShredHeader,
pub data_header: DataShredHeader,
}
impl Default for ShredHeader {
fn default() -> Self {
ShredHeader {
shred_type: ShredType(DATA_SHRED),
coding_header: CodingShredHeader::default(),
data_header: DataShredHeader::default(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Shred {
pub headers: ShredHeader,
pub common_header: ShredCommonHeader,
pub data_header: DataShredHeader,
pub coding_header: CodingShredHeader,
pub payload: Vec<u8>,
}
impl Shred {
fn new(header: ShredHeader, shred_buf: Vec<u8>) -> Self {
Shred {
headers: header,
payload: shred_buf,
}
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
where
T: Deserialize<'de>,
{
let ret = bincode::deserialize(&buf[*index..*index + size])?;
*index += size;
Ok(ret)
}
fn serialize_obj_into<'de, T>(
index: &mut usize,
size: usize,
buf: &'de mut [u8],
obj: &T,
) -> bincode::Result<()>
where
T: Serialize,
{
bincode::serialize_into(&mut buf[*index..*index + size], obj)?;
*index += size;
Ok(())
}
pub fn new_from_data(
@ -134,134 +143,176 @@ impl Shred {
is_last_data: bool,
is_last_in_slot: bool,
) -> Self {
let mut shred_buf = vec![0; PACKET_DATA_SIZE];
let mut header = ShredHeader::default();
header.data_header.common_header.slot = slot;
header.data_header.common_header.index = index;
header.data_header.parent_offset = parent_offset;
header.data_header.flags = 0;
let mut payload = vec![0; PACKET_DATA_SIZE];
let mut common_header = ShredCommonHeader::default();
common_header.slot = slot;
common_header.index = index;
let mut data_header = DataShredHeader::default();
data_header.parent_offset = parent_offset;
if is_last_data {
header.data_header.flags |= DATA_COMPLETE_SHRED
data_header.flags |= DATA_COMPLETE_SHRED
}
if is_last_in_slot {
header.data_header.flags |= LAST_SHRED_IN_SLOT
data_header.flags |= LAST_SHRED_IN_SLOT
}
if let Some(data) = data {
bincode::serialize_into(&mut shred_buf[..*SIZE_OF_SHRED_HEADER], &header)
.expect("Failed to write header into shred buffer");
shred_buf[*SIZE_OF_SHRED_HEADER..*SIZE_OF_SHRED_HEADER + data.len()]
.clone_from_slice(data);
let mut start = 0;
Self::serialize_obj_into(
&mut start,
*SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write header into shred buffer");
Self::serialize_obj_into(
&mut start,
*SIZE_OF_DATA_SHRED_HEADER,
&mut payload,
&data_header,
)
.expect("Failed to write data header into shred buffer");
payload[start..start + data.len()].clone_from_slice(data);
}
Self::new(header, shred_buf)
Self {
common_header,
data_header,
coding_header: CodingShredHeader::default(),
payload,
}
}
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> Result<Self> {
let shred_type: ShredType = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?;
let mut header = if shred_type == ShredType(CODING_SHRED) {
let start = *SIZE_OF_SHRED_TYPE;
let end = start + *SIZE_OF_CODING_SHRED_HEADER;
let mut header = ShredHeader::default();
header.coding_header = bincode::deserialize(&shred_buf[start..end])?;
header
} else if shred_type == ShredType(DATA_SHRED) {
let start = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
let end = start + *SIZE_OF_DATA_SHRED_HEADER;
let mut header = ShredHeader::default();
header.data_header = bincode::deserialize(&shred_buf[start..end])?;
header
pub fn new_from_serialized_shred(payload: Vec<u8>) -> Result<Self> {
let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, *SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
let coding_header: CodingShredHeader =
Self::deserialize_obj(&mut start, *SIZE_OF_CODING_SHRED_HEADER, &payload)?;
Self {
common_header,
data_header: DataShredHeader::default(),
coding_header,
payload,
}
} else if common_header.shred_type == ShredType(DATA_SHRED) {
let data_header: DataShredHeader =
Self::deserialize_obj(&mut start, *SIZE_OF_DATA_SHRED_HEADER, &payload)?;
Self {
common_header,
data_header,
coding_header: CodingShredHeader::default(),
payload,
}
} else {
return Err(ShredError::InvalidShredType);
};
header.shred_type = shred_type;
Ok(Self::new(header, shred_buf))
Ok(shred)
}
pub fn new_empty_from_header(headers: ShredHeader) -> Self {
pub fn new_empty_from_header(
common_header: ShredCommonHeader,
data_header: DataShredHeader,
coding_header: CodingShredHeader,
) -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_SHRED_HEADER]);
bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred");
Shred { headers, payload }
let mut start = 0;
Self::serialize_obj_into(
&mut start,
*SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write header into shred buffer");
if common_header.shred_type == ShredType(DATA_SHRED) {
Self::serialize_obj_into(
&mut start,
*SIZE_OF_DATA_SHRED_HEADER,
&mut payload,
&data_header,
)
.expect("Failed to write data header into shred buffer");
} else if common_header.shred_type == ShredType(CODING_SHRED) {
Self::serialize_obj_into(
&mut start,
*SIZE_OF_CODING_SHRED_HEADER,
&mut payload,
&coding_header,
)
.expect("Failed to write data header into shred buffer");
}
Shred {
common_header,
data_header,
coding_header,
payload,
}
}
pub fn new_empty_data_shred() -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
payload[0] = DATA_SHRED;
let headers = ShredHeader::default();
Shred { headers, payload }
}
pub fn header(&self) -> &ShredCommonHeader {
if self.is_data() {
&self.headers.data_header.common_header
} else {
&self.headers.coding_header.common_header
}
}
pub fn header_mut(&mut self) -> &mut ShredCommonHeader {
if self.is_data() {
&mut self.headers.data_header.common_header
} else {
&mut self.headers.coding_header.common_header
}
Self::new_empty_from_header(
ShredCommonHeader::default(),
DataShredHeader::default(),
CodingShredHeader::default(),
)
}
pub fn slot(&self) -> u64 {
self.header().slot
self.common_header.slot
}
pub fn parent(&self) -> u64 {
if self.is_data() {
self.headers.data_header.common_header.slot
- u64::from(self.headers.data_header.parent_offset)
self.common_header.slot - u64::from(self.data_header.parent_offset)
} else {
std::u64::MAX
}
}
pub fn index(&self) -> u32 {
self.header().index
self.common_header.index
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_index(&mut self, index: u32) {
self.header_mut().index = index
self.common_header.index = index
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_slot(&mut self, slot: u64) {
self.header_mut().slot = slot
self.common_header.slot = slot
}
pub fn signature(&self) -> Signature {
self.header().signature
self.common_header.signature
}
pub fn seed(&self) -> [u8; 32] {
let mut seed = [0; 32];
let seed_len = seed.len();
let sig = self.header().signature.as_ref();
let sig = self.common_header.signature.as_ref();
seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]);
seed
}
pub fn is_data(&self) -> bool {
self.headers.shred_type == ShredType(DATA_SHRED)
self.common_header.shred_type == ShredType(DATA_SHRED)
}
pub fn is_code(&self) -> bool {
self.headers.shred_type == ShredType(CODING_SHRED)
self.common_header.shred_type == ShredType(CODING_SHRED)
}
pub fn last_in_slot(&self) -> bool {
if self.is_data() {
self.headers.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT
self.data_header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT
} else {
false
}
@ -271,13 +322,13 @@ impl Shred {
/// Use this only for test code which doesn't care about actual shred
pub fn set_last_in_slot(&mut self) {
if self.is_data() {
self.headers.data_header.flags |= LAST_SHRED_IN_SLOT
self.data_header.flags |= LAST_SHRED_IN_SLOT
}
}
pub fn data_complete(&self) -> bool {
if self.is_data() {
self.headers.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
self.data_header.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
} else {
false
}
@ -285,11 +336,10 @@ impl Shred {
pub fn coding_params(&self) -> Option<(u16, u16, u16)> {
if self.is_code() {
let header = &self.headers.coding_header;
Some((
header.num_data_shreds,
header.num_coding_shreds,
header.position,
self.coding_header.num_data_shreds,
self.coding_header.num_coding_shreds,
self.coding_header.position,
))
} else {
None
@ -297,15 +347,8 @@ impl Shred {
}
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let signed_payload_offset = if self.is_data() {
*SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE
} else if self.is_code() {
*SIZE_OF_SHRED_TYPE
} else {
return false;
} + *SIZE_OF_SIGNATURE;
self.signature()
.verify(pubkey.as_ref(), &self.payload[signed_payload_offset..])
.verify(pubkey.as_ref(), &self.payload[*SIZE_OF_SIGNATURE..])
}
}
@ -346,7 +389,7 @@ impl Shredder {
bincode::serialize(entries).expect("Expect to serialize all entries");
let serialize_time = now.elapsed().as_millis();
let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER;
let no_header_size = *SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
@ -376,11 +419,7 @@ impl Shredder {
is_last_in_slot,
);
Shredder::sign_shred(
&self.keypair,
&mut shred,
*SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE,
);
Shredder::sign_shred(&self.keypair, &mut shred);
shred
})
.collect()
@ -403,7 +442,7 @@ impl Shredder {
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
coding_shreds.par_iter_mut().for_each(|mut coding_shred| {
Shredder::sign_shred(&self.keypair, &mut coding_shred, *SIZE_OF_SHRED_TYPE);
Shredder::sign_shred(&self.keypair, &mut coding_shred);
})
})
});
@ -422,14 +461,11 @@ impl Shredder {
(data_shreds, coding_shreds, last_shred_index + 1)
}
pub fn sign_shred(signer: &Arc<Keypair>, shred_info: &mut Shred, signature_offset: usize) {
let data_offset = signature_offset + *SIZE_OF_SIGNATURE;
let signature = signer.sign_message(&shred_info.payload[data_offset..]);
let serialized_signature =
bincode::serialize(&signature).expect("Failed to generate serialized signature");
shred_info.payload[signature_offset..signature_offset + serialized_signature.len()]
.copy_from_slice(&serialized_signature);
shred_info.header_mut().signature = signature;
pub fn sign_shred(signer: &Arc<Keypair>, shred: &mut Shred) {
let signature = signer.sign_message(&shred.payload[*SIZE_OF_SIGNATURE..]);
bincode::serialize_into(&mut shred.payload[..*SIZE_OF_SIGNATURE], &signature)
.expect("Failed to generate serialized signature");
shred.common_header.signature = signature;
}
pub fn new_coding_shred_header(
@ -438,15 +474,19 @@ impl Shredder {
num_data: usize,
num_code: usize,
position: usize,
) -> ShredHeader {
let mut header = ShredHeader::default();
) -> (ShredCommonHeader, CodingShredHeader) {
let mut header = ShredCommonHeader::default();
header.shred_type = ShredType(CODING_SHRED);
header.coding_header.common_header.index = index;
header.coding_header.common_header.slot = slot;
header.coding_header.num_coding_shreds = num_code as u16;
header.coding_header.num_data_shreds = num_data as u16;
header.coding_header.position = position as u16;
header
header.index = index;
header.slot = slot;
(
header,
CodingShredHeader {
num_data_shreds: num_data as u16,
num_coding_shreds: num_code as u16,
position: position as u16,
},
)
}
/// Generates coding shreds for the data shreds in the current FEC set
@ -462,30 +502,32 @@ impl Shredder {
let num_coding = Self::calculate_num_coding_shreds(num_data as f32, fec_rate);
let session =
Session::new(num_data, num_coding).expect("Failed to create erasure session");
let start_index = data_shred_batch[0].header().index;
let start_index = data_shred_batch[0].common_header.index;
// All information after coding shred field in a data shred is encoded
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL;
let data_ptrs: Vec<_> = data_shred_batch
.iter()
.map(|data| &data.payload[coding_block_offset..])
.map(|data| &data.payload[..valid_data_len])
.collect();
// Create empty coding shreds, with correctly populated headers
let mut coding_shreds = Vec::with_capacity(num_coding);
(0..num_coding).for_each(|i| {
let header = Self::new_coding_shred_header(
let (header, coding_header) = Self::new_coding_shred_header(
slot,
start_index + i as u32,
num_data,
num_coding,
i,
);
let shred = Shred::new_empty_from_header(header);
let shred =
Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header);
coding_shreds.push(shred.payload);
});
// Grab pointers for the coding blocks
let coding_block_offset = *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_CODING_SHRED_HEADER;
let mut coding_ptrs: Vec<_> = coding_shreds
.iter_mut()
.map(|buffer| &mut buffer[coding_block_offset..])
@ -500,15 +542,20 @@ impl Shredder {
coding_shreds
.into_iter()
.enumerate()
.map(|(i, code)| {
let header = Self::new_coding_shred_header(
.map(|(i, payload)| {
let (common_header, coding_header) = Self::new_coding_shred_header(
slot,
start_index + i as u32,
num_data,
num_coding,
i,
);
Shred::new(header, code)
Shred {
common_header,
data_header: DataShredHeader::default(),
coding_header,
payload,
}
})
.collect()
} else {
@ -561,8 +608,6 @@ impl Shredder {
let fec_set_size = num_data + num_coding;
if num_coding > 0 && shreds.len() < fec_set_size {
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_SHRED_TYPE;
// Let's try recovering missing shreds using erasure
let mut present = &mut vec![true; fec_set_size];
let mut next_expected_index = first_index;
@ -603,9 +648,18 @@ impl Shredder {
let session = Session::new(num_data, num_coding).unwrap();
let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL;
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER + *SIZE_OF_COMMON_SHRED_HEADER;
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
.iter_mut()
.map(|x| x[coding_block_offset..].as_mut())
.enumerate()
.map(|(position, x)| {
if position < num_data {
x[..valid_data_len].as_mut()
} else {
x[coding_block_offset..].as_mut()
}
})
.zip(present.clone())
.collect();
session.decode_blocks(&mut blocks)?;
@ -667,11 +721,12 @@ impl Shredder {
}
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec<u8>>) -> Vec<u8> {
let valid_data_len = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_IGNORED_TAIL;
data_shred_bufs[..num_data]
.iter()
.flat_map(|data| {
let offset = *SIZE_OF_SHRED_HEADER;
data[offset as usize..].iter()
let offset = *SIZE_OF_COMMON_SHRED_HEADER + *SIZE_OF_DATA_SHRED_HEADER;
data[offset..valid_data_len].iter()
})
.cloned()
.collect()
@ -684,7 +739,7 @@ pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 {
}
pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 {
let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64;
let shred_data_size = *SIZE_OF_DATA_SHRED_PAYLOAD as u64;
let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
let entry_size = bincode::serialized_size(entry).unwrap();
let count_size = vec_size - entry_size;
@ -774,7 +829,7 @@ pub mod tests {
.collect();
let size = serialized_size(&entries).unwrap();
let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_SHRED_HEADER) as u64;
let no_header_size = *SIZE_OF_DATA_SHRED_PAYLOAD as u64;
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
let num_expected_coding_shreds =
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate);
@ -787,8 +842,8 @@ pub mod tests {
let mut data_shred_indexes = HashSet::new();
let mut coding_shred_indexes = HashSet::new();
for shred in data_shreds.iter() {
assert_eq!(shred.headers.shred_type, ShredType(DATA_SHRED));
let index = shred.headers.data_header.common_header.index;
assert_eq!(shred.common_header.shred_type, ShredType(DATA_SHRED));
let index = shred.common_header.index;
let is_last = index as u64 == num_expected_data_shreds - 1;
verify_test_data_shred(
shred,
@ -805,8 +860,8 @@ pub mod tests {
}
for shred in coding_shreds.iter() {
let index = shred.headers.data_header.common_header.index;
assert_eq!(shred.headers.shred_type, ShredType(CODING_SHRED));
let index = shred.common_header.index;
assert_eq!(shred.common_header.shred_type, ShredType(CODING_SHRED));
verify_test_code_shred(shred, index, slot, &keypair.pubkey(), true);
assert!(!coding_shred_indexes.contains(&index));
coding_shred_indexes.insert(index);