A new data-structure in shreds for partial deserialization (#5915)

* A new datastructure in shreds for partial deserialization

* fix chacha golden hash

* fix clippy and address review comments
This commit is contained in:
Pankaj Garg 2019-09-16 20:28:54 -07:00 committed by GitHub
parent c44e7ce184
commit 7459eb15c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 402 additions and 263 deletions

View File

@ -4,7 +4,7 @@
use crate::entry::Entry; use crate::entry::Entry;
use crate::erasure::ErasureConfig; use crate::erasure::ErasureConfig;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::shred::{Shred, ShredMetaBuf, Shredder}; use crate::shred::{Shred, ShredInfo, Shredder};
#[cfg(feature = "kvstore")] #[cfg(feature = "kvstore")]
use solana_kvstore as kvstore; use solana_kvstore as kvstore;
@ -320,8 +320,8 @@ impl Blocktree {
db: &Database, db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>, erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, Index>, index_working_set: &HashMap<u64, Index>,
prev_inserted_datas: &mut HashMap<(u64, u64), ShredMetaBuf>, prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>,
prev_inserted_codes: &mut HashMap<(u64, u64), ShredMetaBuf>, prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>,
) -> Vec<Shred> { ) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>(); let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>(); let code_cf = db.column::<cf::ShredCode>();
@ -357,12 +357,7 @@ impl Blocktree {
.get_bytes((slot, i)) .get_bytes((slot, i))
.expect("Database failure, could not fetch data shred"); .expect("Database failure, could not fetch data shred");
if let Some(data) = some_data { if let Some(data) = some_data {
Some(ShredMetaBuf { Some(ShredInfo::new_from_serialized_shred(data))
slot,
index: i as u32,
data_shred: true,
shred_buf: data,
})
} else { } else {
warn!("Data shred deleted while reading for recovery"); warn!("Data shred deleted while reading for recovery");
None None
@ -382,12 +377,7 @@ impl Blocktree {
.get_bytes((slot, i)) .get_bytes((slot, i))
.expect("Database failure, could not fetch code shred"); .expect("Database failure, could not fetch code shred");
if let Some(code) = some_code { if let Some(code) = some_code {
Some(ShredMetaBuf { Some(ShredInfo::new_from_serialized_shred(code))
slot,
index: i as u32,
data_shred: false,
shred_buf: code,
})
} else { } else {
warn!("Code shred deleted while reading for recovery"); warn!("Code shred deleted while reading for recovery");
None None
@ -439,15 +429,7 @@ impl Blocktree {
let mut index_working_set = HashMap::new(); let mut index_working_set = HashMap::new();
shreds.into_iter().for_each(|shred| { shreds.into_iter().for_each(|shred| {
if let Shred::Coding(_) = &shred { if shred.is_data() {
self.check_insert_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
);
} else {
self.check_insert_data_shred( self.check_insert_data_shred(
shred, shred,
&mut index_working_set, &mut index_working_set,
@ -455,6 +437,14 @@ impl Blocktree {
&mut write_batch, &mut write_batch,
&mut just_inserted_data_shreds, &mut just_inserted_data_shreds,
); );
} else {
self.check_insert_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
);
} }
}); });
@ -523,7 +513,7 @@ impl Blocktree {
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>, index_working_set: &mut HashMap<u64, Index>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredInfo>,
) { ) {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
@ -538,15 +528,10 @@ impl Blocktree {
if let Ok(shred_buf) = if let Ok(shred_buf) =
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
{ {
let shred_meta = ShredMetaBuf { let shred_info = ShredInfo::new_from_shred(&shred, shred_buf);
slot,
index: shred_index as u32,
data_shred: false,
shred_buf,
};
just_inserted_coding_shreds just_inserted_coding_shreds
.entry((slot, shred_index)) .entry((slot, shred_index))
.or_insert_with(|| shred_meta); .or_insert_with(|| shred_info);
new_index_meta.map(|n| index_working_set.insert(slot, n)); new_index_meta.map(|n| index_working_set.insert(slot, n));
} }
} }
@ -558,7 +543,7 @@ impl Blocktree {
index_working_set: &mut HashMap<u64, Index>, index_working_set: &mut HashMap<u64, Index>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>, slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredInfo>,
) { ) {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
@ -584,13 +569,8 @@ impl Blocktree {
&shred, &shred,
write_batch, write_batch,
) { ) {
let shred_meta = ShredMetaBuf { let shred_info = ShredInfo::new_from_shred(&shred, shred_buf);
slot, just_inserted_data_shreds.insert((slot, shred_index), shred_info);
index: shred_index as u32,
data_shred: true,
shred_buf,
};
just_inserted_data_shreds.insert((slot, shred_index), shred_meta);
new_index_meta.map(|n| index_working_set.insert(slot, n)); new_index_meta.map(|n| index_working_set.insert(slot, n));
true true
} else { } else {
@ -614,24 +594,17 @@ impl Blocktree {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = shred.index(); let shred_index = shred.index();
let (pos, num_coding) = { let (_, num_coding, pos) = shred
if let Shred::Coding(coding_shred) = &shred { .coding_params()
( .expect("should_insert_coding_shred called with non-coding shred");
u32::from(coding_shred.header.position),
coding_shred.header.num_coding_shreds,
)
} else {
panic!("should_insert_coding_shred called with non-coding shred")
}
};
if shred_index < pos { if shred_index < u32::from(pos) {
return false; return false;
} }
let set_index = shred_index - pos; let set_index = shred_index - u32::from(pos);
!(num_coding == 0 !(num_coding == 0
|| pos >= u32::from(num_coding) || pos >= num_coding
|| std::u32::MAX - set_index < u32::from(num_coding) - 1 || std::u32::MAX - set_index < u32::from(num_coding) - 1
|| coding_index.is_present(u64::from(shred_index)) || coding_index.is_present(u64::from(shred_index))
|| slot <= *last_root.read().unwrap()) || slot <= *last_root.read().unwrap())
@ -646,29 +619,21 @@ impl Blocktree {
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
let (num_data, num_coding, pos) = { let (num_data, num_coding, pos) = shred
if let Shred::Coding(coding_shred) = &shred { .coding_params()
( .expect("insert_coding_shred called with non-coding shred");
coding_shred.header.num_data_shreds as usize,
coding_shred.header.num_coding_shreds as usize,
u64::from(coding_shred.header.position),
)
} else {
panic!("insert_coding_shred called with non-coding shred")
}
};
// Assert guaranteed by integrity checks on the shred that happen before // Assert guaranteed by integrity checks on the shred that happen before
// `insert_coding_shred` is called // `insert_coding_shred` is called
if shred_index < pos { if shred_index < u64::from(pos) {
error!("Due to earlier validation, shred index must be >= pos"); error!("Due to earlier validation, shred index must be >= pos");
return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(
Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())), Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())),
))); )));
} }
let set_index = shred_index - pos; let set_index = shred_index - u64::from(pos);
let erasure_config = ErasureConfig::new(num_data, num_coding); let erasure_config = ErasureConfig::new(num_data as usize, num_coding as usize);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta_cf self.erasure_meta_cf
@ -3076,7 +3041,7 @@ pub mod tests {
#[test] #[test]
pub fn test_should_insert_data_shred() { pub fn test_should_insert_data_shred() {
let (shreds, _) = make_slot_entries(0, 0, 100); let (mut shreds, _) = make_slot_entries(0, 0, 100);
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
{ {
let blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
@ -3122,10 +3087,9 @@ pub mod tests {
let index = index_cf.get(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9); assert_eq!(slot_meta.received, 9);
let shred7 = { let shred7 = {
if let Shred::Data(ref s) = shreds[7] { if shreds[7].is_data() {
let mut shred = Shred::Data(s.clone()); shreds[7].set_last_in_slot();
shred.set_last_in_slot(); shreds[7].clone()
shred
} else { } else {
panic!("Shred in unexpected format") panic!("Shred in unexpected format")
} }
@ -3144,8 +3108,8 @@ pub mod tests {
let index = index_cf.get(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap();
// Trying to insert a shred with index > the "is_last" shred should fail // Trying to insert a shred with index > the "is_last" shred should fail
if let Shred::Data(ref mut s) = shred8 { if shred8.is_data() {
s.header.common_header.slot = slot_meta.last_index + 1; shred8.set_slot(slot_meta.last_index + 1);
} else { } else {
panic!("Shred in unexpected format") panic!("Shred in unexpected format")
} }
@ -3170,8 +3134,8 @@ pub mod tests {
let mut shred = CodingShred::default(); let mut shred = CodingShred::default();
let slot = 1; let slot = 1;
shred.header.position = 10; shred.header.position = 10;
shred.header.common_header.index = 11; shred.header.coding_header.index = 11;
shred.header.common_header.slot = 1; shred.header.coding_header.slot = 1;
shred.header.num_coding_shreds = shred.header.position + 1; shred.header.num_coding_shreds = shred.header.position + 1;
let coding_shred = Shred::Coding(shred.clone()); let coding_shred = Shred::Coding(shred.clone());
@ -3190,7 +3154,7 @@ pub mod tests {
// Trying to insert the same shred again should fail // Trying to insert the same shred again should fail
{ {
let index = index_cf let index = index_cf
.get(shred.header.common_header.slot) .get(shred.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
@ -3200,12 +3164,12 @@ pub mod tests {
)); ));
} }
shred.header.common_header.index += 1; shred.header.coding_header.index += 1;
// Establish a baseline that works // Establish a baseline that works
{ {
let index = index_cf let index = index_cf
.get(shred.header.common_header.slot) .get(shred.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(Blocktree::should_insert_coding_shred( assert!(Blocktree::should_insert_coding_shred(
@ -3218,9 +3182,9 @@ pub mod tests {
// Trying to insert a shred with index < position should fail // Trying to insert a shred with index < position should fail
{ {
let mut shred_ = shred.clone(); let mut shred_ = shred.clone();
shred_.header.common_header.index = (shred_.header.position - 1).into(); shred_.header.coding_header.index = (shred_.header.position - 1).into();
let index = index_cf let index = index_cf
.get(shred_.header.common_header.slot) .get(shred_.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
@ -3235,7 +3199,7 @@ pub mod tests {
let mut shred_ = shred.clone(); let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 0; shred_.header.num_coding_shreds = 0;
let index = index_cf let index = index_cf
.get(shred_.header.common_header.slot) .get(shred_.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
@ -3250,7 +3214,7 @@ pub mod tests {
let mut shred_ = shred.clone(); let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = shred_.header.position; shred_.header.num_coding_shreds = shred_.header.position;
let index = index_cf let index = index_cf
.get(shred_.header.common_header.slot) .get(shred_.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
@ -3265,10 +3229,10 @@ pub mod tests {
{ {
let mut shred_ = shred.clone(); let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 3; shred_.header.num_coding_shreds = 3;
shred_.header.common_header.index = std::u32::MAX - 1; shred_.header.coding_header.index = std::u32::MAX - 1;
shred_.header.position = 0; shred_.header.position = 0;
let index = index_cf let index = index_cf
.get(shred_.header.common_header.slot) .get(shred_.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
@ -3294,10 +3258,10 @@ pub mod tests {
{ {
let mut shred_ = shred.clone(); let mut shred_ = shred.clone();
let index = index_cf let index = index_cf
.get(shred_.header.common_header.slot) .get(shred_.header.coding_header.slot)
.unwrap() .unwrap()
.unwrap(); .unwrap();
shred_.header.common_header.slot = *last_root.read().unwrap(); shred_.header.coding_header.slot = *last_root.read().unwrap();
assert!(!Blocktree::should_insert_coding_shred( assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_), &Shred::Coding(shred_),
index.coding(), index.coding(),

View File

@ -79,11 +79,11 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
if i <= self.partition { if i <= self.partition {
// Send fake blobs to the first N peers // Send fake blobs to the first N peers
fake_shred_bufs.iter().for_each(|b| { fake_shred_bufs.iter().for_each(|b| {
sock.send_to(b, &peer.tvu_forwards).unwrap(); sock.send_to(&b.shred, &peer.tvu_forwards).unwrap();
}); });
} else { } else {
shred_bufs.iter().for_each(|b| { shred_bufs.iter().for_each(|b| {
sock.send_to(b, &peer.tvu_forwards).unwrap(); sock.send_to(&b.shred, &peer.tvu_forwards).unwrap();
}); });
} }
}); });

View File

@ -1,7 +1,7 @@
use crate::entry::Entry; use crate::entry::Entry;
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntries;
use crate::result::Result; use crate::result::Result;
use crate::shred::{Shred, Shredder}; use crate::shred::{Shred, ShredInfo, Shredder};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::signature::Keypair; use solana_sdk::signature::Keypair;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
@ -78,7 +78,7 @@ pub(super) fn entries_to_shreds(
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
mut latest_shred_index: u64, mut latest_shred_index: u64,
parent_slot: u64, parent_slot: u64,
) -> (Vec<Shred>, Vec<Vec<u8>>, u64) { ) -> (Vec<Shred>, Vec<ShredInfo>, u64) {
let mut all_shred_bufs = vec![]; let mut all_shred_bufs = vec![];
let mut all_shreds = vec![]; let mut all_shreds = vec![];
let num_ventries = ventries.len(); let num_ventries = ventries.len();
@ -101,7 +101,7 @@ pub(super) fn entries_to_shreds(
shredder.finalize_data(); shredder.finalize_data();
} }
let (mut shreds, mut shred_bufs): (Vec<Shred>, Vec<Vec<u8>>) = let (mut shreds, mut shred_bufs): (Vec<Shred>, Vec<ShredInfo>) =
shredder.shred_tuples.into_iter().unzip(); shredder.shred_tuples.into_iter().unzip();
trace!("Inserting {:?} shreds in blocktree", shreds.len()); trace!("Inserting {:?} shreds in blocktree", shreds.len());

View File

@ -41,7 +41,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
.map(|meta| meta.consumed) .map(|meta| meta.consumed)
.unwrap_or(0); .unwrap_or(0);
let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds( let (shreds, shred_infos, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries, receive_results.ventries,
bank.slot(), bank.slot(),
last_tick, last_tick,
@ -59,6 +59,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let bank_epoch = bank.get_stakers_epoch(bank.slot()); let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.shred).collect();
// Broadcast data + erasures // Broadcast data + erasures
cluster_info.read().unwrap().broadcast_shreds( cluster_info.read().unwrap().broadcast_shreds(
sock, sock,

View File

@ -79,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun {
0 0
}; };
let (all_shreds, all_shred_bufs, latest_shred_index) = entries_to_shreds( let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds(
receive_results.ventries, receive_results.ventries,
bank.slot(), bank.slot(),
last_tick, last_tick,
@ -102,6 +102,7 @@ impl BroadcastRun for StandardBroadcastRun {
let bank_epoch = bank.get_stakers_epoch(bank.slot()); let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let all_shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.shred).collect();
trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
cluster_info.read().unwrap().broadcast_shreds( cluster_info.read().unwrap().broadcast_shreds(
sock, sock,

View File

@ -153,7 +153,7 @@ mod tests {
hasher.hash(&buf[..size]); hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes.... // golden needs to be updated if blob stuff changes....
let golden: Hash = "9aHjDwufwfZgbmSdug5g58KSGqTsFx9faXuwoikDe4e4" let golden: Hash = "C7RmQ7oDswQfgquukXHGvpYYSCcKTgPnJrYA3ABbX9oG"
.parse() .parse()
.unwrap(); .unwrap();

View File

@ -14,6 +14,10 @@ use std::sync::Arc;
use std::{cmp, io}; use std::{cmp, io};
lazy_static! { lazy_static! {
static ref SIZE_OF_CODING_SHRED_HEADER: usize =
{ serialized_size(&CodingShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_DATA_SHRED_HEADER: usize =
{ serialized_size(&DataShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_CODING_SHRED: usize = static ref SIZE_OF_EMPTY_CODING_SHRED: usize =
{ serialized_size(&CodingShred::empty_shred()).unwrap() as usize }; { serialized_size(&CodingShred::empty_shred()).unwrap() as usize };
static ref SIZE_OF_EMPTY_DATA_SHRED: usize = static ref SIZE_OF_EMPTY_DATA_SHRED: usize =
@ -26,14 +30,134 @@ lazy_static! {
{ bincode::serialized_size(&Signature::default()).unwrap() as usize }; { bincode::serialized_size(&Signature::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_VEC: usize = static ref SIZE_OF_EMPTY_VEC: usize =
{ bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize }; { bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize };
static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
} }
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] /// The constants that define if a shred is data or coding
pub struct ShredMetaBuf { const DATA_SHRED: u8 = 0b1010_0101;
pub slot: u64, const CODING_SHRED: u8 = 0b0101_1010;
pub index: u32,
pub data_shred: bool, #[derive(Clone, Debug)]
pub shred_buf: Vec<u8>, pub struct ShredInfo {
pub headers: DataShredHeader,
pub shred: Vec<u8>,
}
impl ShredInfo {
fn new(header: DataShredHeader, shred_buf: Vec<u8>) -> Self {
ShredInfo {
headers: header,
shred: shred_buf,
}
}
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> Self {
let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
let shred_type: u8 =
bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])
.unwrap();
let header = if shred_type == CODING_SHRED {
let end = *SIZE_OF_CODING_SHRED_HEADER;
let mut header = DataShredHeader::default();
header.common_header.header =
bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap();
header
} else {
let end = *SIZE_OF_DATA_SHRED_HEADER;
bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap()
};
Self::new(header, shred_buf)
}
pub fn new_from_shred(shred: &Shred, shred_buf: Vec<u8>) -> Self {
let header = match shred {
Shred::Data(s) => s.header.clone(),
Shred::Coding(s) => {
let mut hdr = DataShredHeader::default();
hdr.common_header.header = s.header.clone();
hdr
}
};
Self::new(header, shred_buf)
}
fn header(&self) -> &ShredCommonHeader {
if self.is_data() {
&self.headers.data_header
} else {
&self.headers.common_header.header.coding_header
}
}
pub fn header_mut(&mut self) -> &mut ShredCommonHeader {
if self.is_data() {
&mut self.headers.data_header
} else {
&mut self.headers.common_header.header.coding_header
}
}
pub fn slot(&self) -> u64 {
self.header().slot
}
pub fn parent(&self) -> u64 {
if self.is_data() {
self.headers.data_header.slot - u64::from(self.headers.parent_offset)
} else {
std::u64::MAX
}
}
pub fn index(&self) -> u32 {
self.header().index
}
pub fn signature(&self) -> Signature {
self.header().signature
}
pub fn seed(&self) -> [u8; 32] {
let mut seed = [0; 32];
let seed_len = seed.len();
let sig = self.header().signature.as_ref();
seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]);
seed
}
pub fn is_data(&self) -> bool {
self.headers.common_header.header.shred_type == DATA_SHRED
}
pub fn last_in_slot(&self) -> bool {
if self.is_data() {
self.headers.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT
} else {
false
}
}
pub fn data_complete(&self) -> bool {
if self.is_data() {
self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
} else {
false
}
}
pub fn coding_params(&self) -> Option<(u16, u16, u16)> {
if !self.is_data() {
let header = &self.headers.common_header.header;
Some((
header.num_data_shreds,
header.num_coding_shreds,
header.position,
))
} else {
None
}
}
} }
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
@ -46,20 +170,20 @@ pub enum Shred {
/// a public constant defined for it. /// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 4; const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 4;
const LAST_SHRED_IN_SLOT: u8 = 1; const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001;
const DATA_COMPLETE_SHRED: u8 = 2; const DATA_COMPLETE_SHRED: u8 = 0b0000_0010;
impl Shred { impl Shred {
pub fn slot(&self) -> u64 { pub fn slot(&self) -> u64 {
match self { match self {
Shred::Data(s) => s.header.common_header.slot, Shred::Data(s) => s.header.data_header.slot,
Shred::Coding(s) => s.header.common_header.slot, Shred::Coding(s) => s.header.coding_header.slot,
} }
} }
pub fn parent(&self) -> u64 { pub fn parent(&self) -> u64 {
match self { match self {
Shred::Data(s) => s.header.common_header.slot - u64::from(s.header.parent_offset), Shred::Data(s) => s.header.data_header.slot - u64::from(s.header.parent_offset),
Shred::Coding(_) => std::u64::MAX, Shred::Coding(_) => std::u64::MAX,
} }
} }
@ -68,38 +192,38 @@ impl Shred {
let parent = self.parent(); let parent = self.parent();
match self { match self {
Shred::Data(s) => { Shred::Data(s) => {
s.header.common_header.slot = slot; s.header.data_header.slot = slot;
s.header.parent_offset = (slot - parent) as u16; s.header.parent_offset = (slot - parent) as u16;
} }
Shred::Coding(s) => s.header.common_header.slot = slot, Shred::Coding(s) => s.header.coding_header.slot = slot,
}; };
} }
pub fn index(&self) -> u32 { pub fn index(&self) -> u32 {
match self { match self {
Shred::Data(s) => s.header.common_header.index, Shred::Data(s) => s.header.data_header.index,
Shred::Coding(s) => s.header.common_header.index, Shred::Coding(s) => s.header.coding_header.index,
} }
} }
pub fn set_index(&mut self, index: u32) { pub fn set_index(&mut self, index: u32) {
match self { match self {
Shred::Data(s) => s.header.common_header.index = index, Shred::Data(s) => s.header.data_header.index = index,
Shred::Coding(s) => s.header.common_header.index = index, Shred::Coding(s) => s.header.coding_header.index = index,
}; };
} }
pub fn signature(&self) -> Signature { pub fn signature(&self) -> Signature {
match self { match self {
Shred::Data(s) => s.header.common_header.signature, Shred::Data(s) => s.header.data_header.signature,
Shred::Coding(s) => s.header.common_header.signature, Shred::Coding(s) => s.header.coding_header.signature,
} }
} }
pub fn set_signature(&mut self, sig: Signature) { pub fn set_signature(&mut self, sig: Signature) {
match self { match self {
Shred::Data(s) => s.header.common_header.signature = sig, Shred::Data(s) => s.header.data_header.signature = sig,
Shred::Coding(s) => s.header.common_header.signature = sig, Shred::Coding(s) => s.header.coding_header.signature = sig,
}; };
} }
@ -107,8 +231,8 @@ impl Shred {
let mut seed = [0; 32]; let mut seed = [0; 32];
let seed_len = seed.len(); let seed_len = seed.len();
let sig = match self { let sig = match self {
Shred::Data(s) => &s.header.common_header.signature, Shred::Data(s) => &s.header.data_header.signature,
Shred::Coding(s) => &s.header.common_header.signature, Shred::Coding(s) => &s.header.coding_header.signature,
} }
.as_ref(); .as_ref();
@ -124,7 +248,11 @@ impl Shred {
pub fn fast_verify(&self, shred_buf: &[u8], pubkey: &Pubkey) -> bool { pub fn fast_verify(&self, shred_buf: &[u8], pubkey: &Pubkey) -> bool {
let signed_payload_offset = match self { let signed_payload_offset = match self {
Shred::Data(_) => CodingShred::overhead(), Shred::Data(_) => CodingShred::overhead(),
Shred::Coding(_) => CodingShred::overhead() - *SIZE_OF_EMPTY_CODING_SHRED, Shred::Coding(_) => {
CodingShred::overhead() + *SIZE_OF_SHRED_TYPE
- *SIZE_OF_CODING_SHRED_HEADER
- *SIZE_OF_EMPTY_VEC
}
} + *SIZE_OF_SIGNATURE; } + *SIZE_OF_SIGNATURE;
self.signature() self.signature()
.verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..]) .verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..])
@ -158,6 +286,18 @@ impl Shred {
Shred::Coding(_) => false, Shred::Coding(_) => false,
} }
} }
pub fn coding_params(&self) -> Option<(u16, u16, u16)> {
if let Shred::Coding(s) = self {
Some((
s.header.num_data_shreds,
s.header.num_coding_shreds,
s.header.position,
))
} else {
None
}
}
} }
/// A common header that is present at start of every shred /// A common header that is present at start of every shred
@ -169,22 +309,22 @@ pub struct ShredCommonHeader {
} }
/// A common header that is present at start of every data shred /// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader { pub struct DataShredHeader {
_reserved: CodingShredHeader, common_header: CodingShred,
pub common_header: ShredCommonHeader, pub data_header: ShredCommonHeader,
pub parent_offset: u16, pub parent_offset: u16,
pub flags: u8, pub flags: u8,
} }
/// The coding shred header has FEC information /// The coding shred header has FEC information
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader { pub struct CodingShredHeader {
pub common_header: ShredCommonHeader, pub shred_type: u8,
pub coding_header: ShredCommonHeader,
pub num_data_shreds: u16, pub num_data_shreds: u16,
pub num_coding_shreds: u16, pub num_coding_shreds: u16,
pub position: u16, pub position: u16,
pub payload: Vec<u8>,
} }
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
@ -196,6 +336,39 @@ pub struct DataShred {
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShred { pub struct CodingShred {
pub header: CodingShredHeader, pub header: CodingShredHeader,
pub payload: Vec<u8>,
}
impl Default for DataShredHeader {
fn default() -> Self {
DataShredHeader {
common_header: CodingShred {
header: CodingShredHeader {
shred_type: DATA_SHRED,
coding_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
},
payload: vec![],
},
data_header: ShredCommonHeader::default(),
parent_offset: 0,
flags: 0,
}
}
}
impl Default for CodingShredHeader {
fn default() -> Self {
CodingShredHeader {
shred_type: CODING_SHRED,
coding_header: ShredCommonHeader::default(),
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
}
}
} }
/// Default shred is sized correctly to meet MTU/Packet size requirements /// Default shred is sized correctly to meet MTU/Packet size requirements
@ -214,13 +387,8 @@ impl Default for CodingShred {
fn default() -> Self { fn default() -> Self {
let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED; let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED;
CodingShred { CodingShred {
header: CodingShredHeader { header: CodingShredHeader::default(),
common_header: ShredCommonHeader::default(), payload: vec![0; size],
num_data_shreds: 0,
num_coding_shreds: 0,
position: 0,
payload: vec![0; size],
},
} }
} }
} }
@ -260,11 +428,11 @@ impl ShredCommon for DataShred {
impl ShredCommon for CodingShred { impl ShredCommon for CodingShred {
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
let mut capacity = self.header.payload.len().saturating_sub(offset); let mut capacity = self.payload.len().saturating_sub(offset);
let slice_len = cmp::min(capacity, buf.len()); let slice_len = cmp::min(capacity, buf.len());
capacity -= slice_len; capacity -= slice_len;
if slice_len > 0 { if slice_len > 0 {
self.header.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
} }
(slice_len, capacity) (slice_len, capacity)
} }
@ -276,6 +444,7 @@ impl ShredCommon for CodingShred {
fn empty_shred() -> Self { fn empty_shred() -> Self {
CodingShred { CodingShred {
header: CodingShredHeader::default(), header: CodingShredHeader::default(),
payload: vec![],
} }
} }
} }
@ -288,7 +457,7 @@ pub struct Shredder {
parent_offset: u16, parent_offset: u16,
fec_rate: f32, fec_rate: f32,
signer: Arc<Keypair>, signer: Arc<Keypair>,
pub shred_tuples: Vec<(Shred, Vec<u8>)>, pub shred_tuples: Vec<(Shred, ShredInfo)>,
fec_set_shred_start: usize, fec_set_shred_start: usize,
active_shred: Shred, active_shred: Shred,
active_offset: usize, active_offset: usize,
@ -359,8 +528,8 @@ impl Shredder {
))) )))
} else { } else {
let mut data_shred = DataShred::default(); let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = slot; data_shred.header.data_header.slot = slot;
data_shred.header.common_header.index = index; data_shred.header.data_header.index = index;
data_shred.header.parent_offset = (slot - parent) as u16; data_shred.header.parent_offset = (slot - parent) as u16;
let active_shred = Shred::Data(data_shred); let active_shred = Shred::Data(data_shred);
Ok(Shredder { Ok(Shredder {
@ -381,16 +550,17 @@ impl Shredder {
fn sign_shred( fn sign_shred(
signer: &Arc<Keypair>, signer: &Arc<Keypair>,
shred: &mut Shred, shred: &mut Shred,
shred_buf: &mut [u8], shred_info: &mut ShredInfo,
signature_offset: usize, signature_offset: usize,
) { ) {
let data_offset = signature_offset + *SIZE_OF_SIGNATURE; let data_offset = signature_offset + *SIZE_OF_SIGNATURE;
let signature = signer.sign_message(&shred_buf[data_offset..]); let signature = signer.sign_message(&shred_info.shred[data_offset..]);
let serialized_signature = let serialized_signature =
bincode::serialize(&signature).expect("Failed to generate serialized signature"); bincode::serialize(&signature).expect("Failed to generate serialized signature");
shred.set_signature(signature); shred.set_signature(signature);
shred_buf[signature_offset..signature_offset + serialized_signature.len()] shred_info.shred[signature_offset..signature_offset + serialized_signature.len()]
.copy_from_slice(&serialized_signature); .copy_from_slice(&serialized_signature);
shred_info.header_mut().signature = signature;
} }
fn sign_unsigned_shreds_and_generate_codes(&mut self) { fn sign_unsigned_shreds_and_generate_codes(&mut self) {
@ -401,7 +571,9 @@ impl Shredder {
.for_each(|(s, d)| Self::sign_shred(&signer, s, d, signature_offset)); .for_each(|(s, d)| Self::sign_shred(&signer, s, d, signature_offset));
let unsigned_coding_shred_start = self.shred_tuples.len(); let unsigned_coding_shred_start = self.shred_tuples.len();
self.generate_coding_shreds(); self.generate_coding_shreds();
let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED + *SIZE_OF_SHRED_TYPE
- *SIZE_OF_CODING_SHRED_HEADER
- *SIZE_OF_EMPTY_VEC;
self.shred_tuples[unsigned_coding_shred_start..] self.shred_tuples[unsigned_coding_shred_start..]
.iter_mut() .iter_mut()
.for_each(|(s, d)| Self::sign_shred(&signer, s, d, coding_header_offset)); .for_each(|(s, d)| Self::sign_shred(&signer, s, d, coding_header_offset));
@ -419,14 +591,15 @@ impl Shredder {
let mut shred = Shred::Data(self.new_data_shred()); let mut shred = Shred::Data(self.new_data_shred());
std::mem::swap(&mut shred, &mut self.active_shred); std::mem::swap(&mut shred, &mut self.active_shred);
self.shred_tuples.push((shred, data)); let shred_info = ShredInfo::new_from_shred(&shred, data);
self.shred_tuples.push((shred, shred_info));
} }
/// Creates a new data shred /// Creates a new data shred
fn new_data_shred(&self) -> DataShred { fn new_data_shred(&self) -> DataShred {
let mut data_shred = DataShred::default(); let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = self.slot; data_shred.header.data_header.slot = self.slot;
data_shred.header.common_header.index = self.index; data_shred.header.data_header.index = self.index;
data_shred.header.parent_offset = self.parent_offset; data_shred.header.parent_offset = self.parent_offset;
data_shred data_shred
} }
@ -439,8 +612,8 @@ impl Shredder {
position: usize, position: usize,
) -> CodingShred { ) -> CodingShred {
let mut coding_shred = CodingShred::default(); let mut coding_shred = CodingShred::default();
coding_shred.header.common_header.slot = slot; coding_shred.header.coding_header.slot = slot;
coding_shred.header.common_header.index = index; coding_shred.header.coding_header.index = index;
coding_shred.header.num_data_shreds = num_data as u16; coding_shred.header.num_data_shreds = num_data as u16;
coding_shred.header.num_coding_shreds = num_code as u16; coding_shred.header.num_coding_shreds = num_code as u16;
coding_shred.header.position = position as u16; coding_shred.header.position = position as u16;
@ -460,7 +633,7 @@ impl Shredder {
let coding_block_offset = CodingShred::overhead(); let coding_block_offset = CodingShred::overhead();
let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..] let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..]
.iter() .iter()
.map(|(_, data)| &data[coding_block_offset..]) .map(|(_, data)| &data.shred[coding_block_offset..])
.collect(); .collect();
// Create empty coding shreds, with correctly populated headers // Create empty coding shreds, with correctly populated headers
@ -491,7 +664,8 @@ impl Shredder {
// append to the shred list // append to the shred list
coding_shreds.into_iter().for_each(|code| { coding_shreds.into_iter().for_each(|code| {
let shred: Shred = bincode::deserialize(&code).unwrap(); let shred: Shred = bincode::deserialize(&code).unwrap();
self.shred_tuples.push((shred, code)); let shred_info = ShredInfo::new_from_shred(&shred, code);
self.shred_tuples.push((shred, shred_info));
}); });
self.fec_set_index = self.index; self.fec_set_index = self.index;
} }
@ -529,7 +703,7 @@ impl Shredder {
} }
fn fill_in_missing_shreds( fn fill_in_missing_shreds(
shred: &ShredMetaBuf, shred: &ShredInfo,
num_data: usize, num_data: usize,
num_coding: usize, num_coding: usize,
slot: u64, slot: u64,
@ -563,8 +737,8 @@ impl Shredder {
) -> Vec<u8> { ) -> Vec<u8> {
let missing_shred = if missing < first_index + num_data { let missing_shred = if missing < first_index + num_data {
let mut data_shred = DataShred::default(); let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = slot; data_shred.header.data_header.slot = slot;
data_shred.header.common_header.index = missing as u32; data_shred.header.data_header.index = missing as u32;
Shred::Data(data_shred) Shred::Data(data_shred)
} else { } else {
Shred::Coding(Self::new_coding_shred( Shred::Coding(Self::new_coding_shred(
@ -579,7 +753,7 @@ impl Shredder {
} }
pub fn try_recovery( pub fn try_recovery(
shreds: Vec<ShredMetaBuf>, shreds: Vec<ShredInfo>,
num_data: usize, num_data: usize,
num_coding: usize, num_coding: usize,
first_index: usize, first_index: usize,
@ -606,7 +780,7 @@ impl Shredder {
next_expected_index, next_expected_index,
&mut present, &mut present,
); );
blocks.push(shred.shred_buf); blocks.push(shred.shred);
next_expected_index = last_index + 1; next_expected_index = last_index + 1;
blocks blocks
}) })
@ -693,11 +867,11 @@ impl Shredder {
Ok(Self::reassemble_payload(num_data, data_shred_bufs)) Ok(Self::reassemble_payload(num_data, data_shred_bufs))
} }
fn get_shred_index(shred: &ShredMetaBuf, num_data: usize) -> usize { fn get_shred_index(shred: &ShredInfo, num_data: usize) -> usize {
if shred.data_shred { if shred.is_data() {
shred.index as usize shred.index() as usize
} else { } else {
shred.index as usize + num_data shred.index() as usize + num_data
} }
} }
@ -762,13 +936,13 @@ mod tests {
// Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder)
let (_, shred) = &shredder.shred_tuples[0]; let (_, shred) = &shredder.shred_tuples[0];
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
info!("Len: {}", shred.len()); info!("Len: {}", shred.shred.len());
info!("{:?}", shred); info!("{:?}", shred);
// Test4: Try deserialize the PDU and assert that it matches the original shred // Test4: Try deserialize the PDU and assert that it matches the original shred
let deserialized_shred: Shred = let deserialized_shred: Shred =
bincode::deserialize(&shred).expect("Failed in deserializing the PDU"); bincode::deserialize(&shred.shred).expect("Failed in deserializing the PDU");
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.index(), 0);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -791,9 +965,9 @@ mod tests {
// Must be Last in FEC Set // Must be Last in FEC Set
let (_, shred) = &shredder.shred_tuples[1]; let (_, shred) = &shredder.shred_tuples[1];
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -813,9 +987,9 @@ mod tests {
assert!(!shredder.shred_tuples.is_empty()); assert!(!shredder.shred_tuples.is_empty());
let (_, shred) = &shredder.shred_tuples[2]; let (_, shred) = &shredder.shred_tuples[2];
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -830,9 +1004,9 @@ mod tests {
// Must be a Data shred // Must be a Data shred
let (_, shred) = &shredder.shred_tuples[3]; let (_, shred) = &shredder.shred_tuples[3];
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 3); assert_eq!(deserialized_shred.index(), 3);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -850,9 +1024,9 @@ mod tests {
// Must be LastInSlot // Must be LastInSlot
let (_, shred) = &shredder.shred_tuples[4]; let (_, shred) = &shredder.shred_tuples[4];
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 4); assert_eq!(deserialized_shred.index(), 4);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -883,8 +1057,8 @@ mod tests {
assert_eq!(shredder.shred_tuples.len(), 2); assert_eq!(shredder.shred_tuples.len(), 2);
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.index(), 0);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -892,8 +1066,8 @@ mod tests {
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -918,8 +1092,8 @@ mod tests {
// We should have 1 shred now (LastInFECBlock) // We should have 1 shred now (LastInFECBlock)
assert_eq!(shredder.shred_tuples.len(), 1); assert_eq!(shredder.shred_tuples.len(), 1);
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -955,8 +1129,8 @@ mod tests {
// Finalize must have created 1 final data shred and 3 coding shreds // Finalize must have created 1 final data shred and 3 coding shreds
// assert_eq!(shredder.shreds.len(), 6); // assert_eq!(shredder.shreds.len(), 6);
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.index(), 0);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -964,8 +1138,8 @@ mod tests {
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -973,8 +1147,8 @@ mod tests {
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Data(_)); assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -982,24 +1156,24 @@ mod tests {
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_)); assert_matches!(deserialized_shred, Shred::Coding(_));
assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.index(), 0);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_)); assert_matches!(deserialized_shred, Shred::Coding(_));
assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
assert!(deserialized_shred.verify(&keypair.pubkey())); assert!(deserialized_shred.verify(&keypair.pubkey()));
let (_, shred) = shredder.shred_tuples.remove(0); let (_, shred) = shredder.shred_tuples.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE); assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); let deserialized_shred: Shred = bincode::deserialize(&shred.shred).unwrap();
assert_matches!(deserialized_shred, Shred::Coding(_)); assert_matches!(deserialized_shred, Shred::Coding(_));
assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.slot(), slot);
@ -1038,26 +1212,16 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count); assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder let (shreds, shred_infos): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples .shred_tuples
.iter() .iter()
.map(|(s, b)| { .map(|(s, b)| (s.clone(), b.clone()))
(
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
)
})
.unzip(); .unzip();
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
assert_matches!( assert_matches!(
Shredder::try_recovery( Shredder::try_recovery(
shred_meta_bufs[..3].to_vec(), shred_infos[..3].to_vec(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
0, 0,
@ -1068,7 +1232,7 @@ mod tests {
// Test1: Try recovery/reassembly with only data shreds. Hint: should work // Test1: Try recovery/reassembly with only data shreds. Hint: should work
let result = Shredder::try_recovery( let result = Shredder::try_recovery(
shred_meta_bufs[..4].to_vec(), shred_infos[..4].to_vec(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
0, 0,
@ -1083,21 +1247,13 @@ mod tests {
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples .shred_tuples
.iter() .iter()
.enumerate() .enumerate()
.filter_map(|(i, (s, b))| { .filter_map(|(i, (s, b))| {
if i % 2 == 0 { if i % 2 == 0 {
Some(( Some((s.clone(), b.clone()))
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else { } else {
None None
} }
@ -1105,7 +1261,7 @@ mod tests {
.unzip(); .unzip();
let mut result = Shredder::try_recovery( let mut result = Shredder::try_recovery(
shred_meta_bufs, shred_info,
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
0, 0,
@ -1137,16 +1293,16 @@ mod tests {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 1); assert_eq!(code.header.position, 1);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 1); assert_eq!(code.header.coding_header.index, 1);
} }
let recovered_shred = result.recovered_code.remove(0); let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred { if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 3); assert_eq!(code.header.position, 3);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 3); assert_eq!(code.header.coding_header.index, 3);
} }
let result = Shredder::deshred(&shreds[..4]).unwrap(); let result = Shredder::deshred(&shreds[..4]).unwrap();
@ -1154,21 +1310,13 @@ mod tests {
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples .shred_tuples
.iter() .iter()
.enumerate() .enumerate()
.filter_map(|(i, (s, b))| { .filter_map(|(i, (s, b))| {
if i % 2 != 0 { if i % 2 != 0 {
Some(( Some((s.clone(), b.clone()))
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else { } else {
None None
} }
@ -1176,7 +1324,7 @@ mod tests {
.unzip(); .unzip();
let mut result = Shredder::try_recovery( let mut result = Shredder::try_recovery(
shred_meta_bufs, shred_info,
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
0, 0,
@ -1208,16 +1356,16 @@ mod tests {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0); assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 0); assert_eq!(code.header.coding_header.index, 0);
} }
let recovered_shred = result.recovered_code.remove(0); let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred { if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2); assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 2); assert_eq!(code.header.coding_header.index, 2);
} }
let result = Shredder::deshred(&shreds[..4]).unwrap(); let result = Shredder::deshred(&shreds[..4]).unwrap();
@ -1248,21 +1396,13 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count); assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples .shred_tuples
.iter() .iter()
.enumerate() .enumerate()
.filter_map(|(i, (s, b))| { .filter_map(|(i, (s, b))| {
if i % 2 != 0 { if i % 2 != 0 {
Some(( Some((s.clone(), b.clone()))
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else { } else {
None None
} }
@ -1270,7 +1410,7 @@ mod tests {
.unzip(); .unzip();
let mut result = Shredder::try_recovery( let mut result = Shredder::try_recovery(
shred_meta_bufs, shred_info,
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
0, 0,
@ -1302,16 +1442,16 @@ mod tests {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0); assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 0); assert_eq!(code.header.coding_header.index, 0);
} }
let recovered_shred = result.recovered_code.remove(0); let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred { if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2); assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 2); assert_eq!(code.header.coding_header.index, 2);
} }
let result = Shredder::deshred(&shreds[..4]).unwrap(); let result = Shredder::deshred(&shreds[..4]).unwrap();
@ -1362,21 +1502,13 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count); assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples .shred_tuples
.iter() .iter()
.enumerate() .enumerate()
.filter_map(|(i, (s, b))| { .filter_map(|(i, (s, b))| {
if i % 2 != 0 { if i % 2 != 0 {
Some(( Some((s.clone(), b.clone()))
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else { } else {
None None
} }
@ -1384,7 +1516,7 @@ mod tests {
.unzip(); .unzip();
let mut result = Shredder::try_recovery( let mut result = Shredder::try_recovery(
shred_meta_bufs.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
25, 25,
@ -1416,16 +1548,16 @@ mod tests {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0); assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 25); assert_eq!(code.header.coding_header.index, 25);
} }
let recovered_shred = result.recovered_code.remove(0); let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred { if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4); assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4); assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2); assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.common_header.index, 27); assert_eq!(code.header.coding_header.index, 27);
} }
let result = Shredder::deshred(&shreds[..4]).unwrap(); let result = Shredder::deshred(&shreds[..4]).unwrap();
@ -1434,7 +1566,7 @@ mod tests {
// Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds
let result = Shredder::try_recovery( let result = Shredder::try_recovery(
shred_meta_bufs.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
25, 25,
@ -1446,7 +1578,7 @@ mod tests {
// Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!( assert_matches!(
Shredder::try_recovery( Shredder::try_recovery(
shred_meta_bufs.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
15, 15,
@ -1458,7 +1590,7 @@ mod tests {
// Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!( assert_matches!(
Shredder::try_recovery( Shredder::try_recovery(
shred_meta_bufs.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
35, 35,
@ -1511,4 +1643,45 @@ mod tests {
.for_each(|(s, _)| assert!(!s.is_data())); .for_each(|(s, _)| assert!(!s.is_data()));
} }
} }
#[test]
fn test_shred_info_construction() {
let keypair = Arc::new(Keypair::new());
let slot = 0x123456789abcdef0;
let mut shredder =
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..1200 * 3).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let mut offset = shredder.write(&data).unwrap();
let approx_shred_payload_size = offset;
while offset < data.len() {
offset += shredder.write(&data[offset..]).unwrap();
}
// We should have some shreds now
assert!(shredder.shred_tuples.len() >= data.len() / approx_shred_payload_size);
assert_eq!(offset, data.len());
shredder.finalize_data();
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
shredder
.shred_tuples
.iter()
.for_each(|(shred, shred_info)| {
assert_eq!(shred.slot(), shred_info.slot());
assert_eq!(shred.index(), shred_info.index());
assert_eq!(shred.parent(), shred_info.parent());
assert_eq!(shred.signature(), shred_info.signature());
assert_eq!(shred.is_data(), shred_info.is_data());
assert_eq!(shred.last_in_slot(), shred_info.last_in_slot());
assert_eq!(shred.data_complete(), shred_info.data_complete());
assert_eq!(shred.coding_params(), shred_info.coding_params());
})
}
} }