Removed Shred enum (#5963)
* Remove shred enum and it's references * rename ShredInfo to Shred * clippy
This commit is contained in:
parent
d93b552e8c
commit
783e8672e7
|
@ -26,9 +26,8 @@ fn bench_deshredder(bencher: &mut Bencher) {
|
||||||
let mut shredded = Shredder::new(1, 0, 0.0, &kp, 0).unwrap();
|
let mut shredded = Shredder::new(1, 0, 0.0, &kp, 0).unwrap();
|
||||||
let _ = bincode::serialize_into(&mut shredded, &data);
|
let _ = bincode::serialize_into(&mut shredded, &data);
|
||||||
shredded.finalize_data();
|
shredded.finalize_data();
|
||||||
let (_, shreds): (Vec<_>, Vec<_>) = shredded.shred_tuples.into_iter().unzip();
|
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let raw = &mut Shredder::deshred(&shreds).unwrap();
|
let raw = &mut Shredder::deshred(&shredded.shreds).unwrap();
|
||||||
assert_ne!(raw.len(), 0);
|
assert_ne!(raw.len(), 0);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
use crate::entry::Entry;
|
use crate::entry::Entry;
|
||||||
use crate::erasure::ErasureConfig;
|
use crate::erasure::ErasureConfig;
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::shred::{ShredInfo, Shredder};
|
use crate::shred::{Shred, Shredder};
|
||||||
|
|
||||||
#[cfg(feature = "kvstore")]
|
#[cfg(feature = "kvstore")]
|
||||||
use solana_kvstore as kvstore;
|
use solana_kvstore as kvstore;
|
||||||
|
@ -320,9 +320,9 @@ impl Blocktree {
|
||||||
db: &Database,
|
db: &Database,
|
||||||
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
|
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
|
||||||
index_working_set: &HashMap<u64, Index>,
|
index_working_set: &HashMap<u64, Index>,
|
||||||
prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>,
|
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
|
||||||
prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>,
|
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
|
||||||
) -> Vec<ShredInfo> {
|
) -> Vec<Shred> {
|
||||||
let data_cf = db.column::<cf::ShredData>();
|
let data_cf = db.column::<cf::ShredData>();
|
||||||
let code_cf = db.column::<cf::ShredCode>();
|
let code_cf = db.column::<cf::ShredCode>();
|
||||||
let mut recovered_data_shreds = vec![];
|
let mut recovered_data_shreds = vec![];
|
||||||
|
@ -357,7 +357,7 @@ impl Blocktree {
|
||||||
.get_bytes((slot, i))
|
.get_bytes((slot, i))
|
||||||
.expect("Database failure, could not fetch data shred");
|
.expect("Database failure, could not fetch data shred");
|
||||||
if let Some(data) = some_data {
|
if let Some(data) = some_data {
|
||||||
ShredInfo::new_from_serialized_shred(data).ok()
|
Shred::new_from_serialized_shred(data).ok()
|
||||||
} else {
|
} else {
|
||||||
warn!("Data shred deleted while reading for recovery");
|
warn!("Data shred deleted while reading for recovery");
|
||||||
None
|
None
|
||||||
|
@ -377,7 +377,7 @@ impl Blocktree {
|
||||||
.get_bytes((slot, i))
|
.get_bytes((slot, i))
|
||||||
.expect("Database failure, could not fetch code shred");
|
.expect("Database failure, could not fetch code shred");
|
||||||
if let Some(code) = some_code {
|
if let Some(code) = some_code {
|
||||||
ShredInfo::new_from_serialized_shred(code).ok()
|
Shred::new_from_serialized_shred(code).ok()
|
||||||
} else {
|
} else {
|
||||||
warn!("Code shred deleted while reading for recovery");
|
warn!("Code shred deleted while reading for recovery");
|
||||||
None
|
None
|
||||||
|
@ -415,7 +415,7 @@ impl Blocktree {
|
||||||
|
|
||||||
pub fn insert_shreds(
|
pub fn insert_shreds(
|
||||||
&self,
|
&self,
|
||||||
shreds: Vec<ShredInfo>,
|
shreds: Vec<Shred>,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let db = &*self.db;
|
let db = &*self.db;
|
||||||
|
@ -509,11 +509,11 @@ impl Blocktree {
|
||||||
|
|
||||||
fn check_insert_coding_shred(
|
fn check_insert_coding_shred(
|
||||||
&self,
|
&self,
|
||||||
shred: ShredInfo,
|
shred: Shred,
|
||||||
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
|
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
|
||||||
index_working_set: &mut HashMap<u64, Index>,
|
index_working_set: &mut HashMap<u64, Index>,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredInfo>,
|
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
|
||||||
) {
|
) {
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
let shred_index = u64::from(shred.index());
|
let shred_index = u64::from(shred.index());
|
||||||
|
@ -537,11 +537,11 @@ impl Blocktree {
|
||||||
|
|
||||||
fn check_insert_data_shred(
|
fn check_insert_data_shred(
|
||||||
&self,
|
&self,
|
||||||
shred: ShredInfo,
|
shred: Shred,
|
||||||
index_working_set: &mut HashMap<u64, Index>,
|
index_working_set: &mut HashMap<u64, Index>,
|
||||||
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredInfo>,
|
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
|
||||||
) {
|
) {
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
let shred_index = u64::from(shred.index());
|
let shred_index = u64::from(shred.index());
|
||||||
|
@ -584,7 +584,7 @@ impl Blocktree {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_insert_coding_shred(
|
fn should_insert_coding_shred(
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
coding_index: &CodingIndex,
|
coding_index: &CodingIndex,
|
||||||
last_root: &RwLock<u64>,
|
last_root: &RwLock<u64>,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
|
@ -611,7 +611,7 @@ impl Blocktree {
|
||||||
&self,
|
&self,
|
||||||
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
|
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
|
||||||
index_meta: &mut Index,
|
index_meta: &mut Index,
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
|
@ -650,14 +650,14 @@ impl Blocktree {
|
||||||
|
|
||||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||||
// We don't want only a subset of these changes going through.
|
// We don't want only a subset of these changes going through.
|
||||||
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.shred)?;
|
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
|
||||||
index_meta.coding_mut().set_present(shred_index, true);
|
index_meta.coding_mut().set_present(shred_index, true);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_insert_data_shred(
|
fn should_insert_data_shred(
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
slot_meta: &SlotMeta,
|
slot_meta: &SlotMeta,
|
||||||
data_index: &DataIndex,
|
data_index: &DataIndex,
|
||||||
last_root: &RwLock<u64>,
|
last_root: &RwLock<u64>,
|
||||||
|
@ -720,7 +720,7 @@ impl Blocktree {
|
||||||
&self,
|
&self,
|
||||||
slot_meta: &mut SlotMeta,
|
slot_meta: &mut SlotMeta,
|
||||||
data_index: &mut DataIndex,
|
data_index: &mut DataIndex,
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
|
@ -760,7 +760,7 @@ impl Blocktree {
|
||||||
|
|
||||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||||
// We don't want only a subset of these changes going through.
|
// We don't want only a subset of these changes going through.
|
||||||
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.shred)?;
|
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
|
||||||
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
|
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
|
||||||
data_index.set_present(index, true);
|
data_index.set_present(index, true);
|
||||||
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
||||||
|
@ -852,9 +852,7 @@ impl Blocktree {
|
||||||
parent_slot = current_slot - 1;
|
parent_slot = current_slot - 1;
|
||||||
remaining_ticks_in_slot = ticks_per_slot;
|
remaining_ticks_in_slot = ticks_per_slot;
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
let shreds: Vec<ShredInfo> =
|
all_shreds.append(&mut shredder.shreds);
|
||||||
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
|
|
||||||
all_shreds.extend(shreds);
|
|
||||||
shredder =
|
shredder =
|
||||||
Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0)
|
Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0)
|
||||||
.expect("Failed to create entry shredder");
|
.expect("Failed to create entry shredder");
|
||||||
|
@ -876,8 +874,7 @@ impl Blocktree {
|
||||||
if is_full_slot && remaining_ticks_in_slot != 0 {
|
if is_full_slot && remaining_ticks_in_slot != 0 {
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
}
|
}
|
||||||
let shreds: Vec<ShredInfo> = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
|
all_shreds.append(&mut shredder.shreds);
|
||||||
all_shreds.extend(shreds);
|
|
||||||
|
|
||||||
let num_shreds = all_shreds.len();
|
let num_shreds = all_shreds.len();
|
||||||
self.insert_shreds(all_shreds, None)?;
|
self.insert_shreds(all_shreds, None)?;
|
||||||
|
@ -1005,11 +1002,9 @@ impl Blocktree {
|
||||||
serialized_shreds.len(),
|
serialized_shreds.len(),
|
||||||
slot
|
slot
|
||||||
);
|
);
|
||||||
let mut shreds: Vec<ShredInfo> = serialized_shreds
|
let mut shreds: Vec<Shred> = serialized_shreds
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|serialized_shred| {
|
.filter_map(|serialized_shred| Shred::new_from_serialized_shred(serialized_shred).ok())
|
||||||
ShredInfo::new_from_serialized_shred(serialized_shred).ok()
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut all_entries = vec![];
|
let mut all_entries = vec![];
|
||||||
|
@ -1572,7 +1567,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
|
||||||
bincode::serialize_into(&mut shredder, &entries)
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
.expect("Expect to write all entries to shreds");
|
.expect("Expect to write all entries to shreds");
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
let shreds: Vec<ShredInfo> = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
|
let shreds: Vec<Shred> = shredder.shreds.drain(..).collect();
|
||||||
|
|
||||||
blocktree.insert_shreds(shreds, None)?;
|
blocktree.insert_shreds(shreds, None)?;
|
||||||
blocktree.set_roots(&[0])?;
|
blocktree.set_roots(&[0])?;
|
||||||
|
@ -1653,7 +1648,7 @@ pub fn entries_to_test_shreds(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
parent_slot: u64,
|
parent_slot: u64,
|
||||||
is_full_slot: bool,
|
is_full_slot: bool,
|
||||||
) -> Vec<ShredInfo> {
|
) -> Vec<Shred> {
|
||||||
let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32)
|
let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32)
|
||||||
.expect("Failed to create entry shredder");
|
.expect("Failed to create entry shredder");
|
||||||
|
|
||||||
|
@ -1665,14 +1660,14 @@ pub fn entries_to_test_shreds(
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
}
|
}
|
||||||
|
|
||||||
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect()
|
shredder.shreds.drain(..).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::entry::{create_ticks, Entry};
|
use crate::entry::{create_ticks, Entry};
|
||||||
use crate::shred::{CodingShred, Shred};
|
use crate::shred::{DataShredHeader, CODING_SHRED};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
|
@ -1834,7 +1829,7 @@ pub mod tests {
|
||||||
let slot = 0;
|
let slot = 0;
|
||||||
let (shreds, _) = make_slot_entries(slot, 0, 100);
|
let (shreds, _) = make_slot_entries(slot, 0, 100);
|
||||||
let num_shreds = shreds.len() as u64;
|
let num_shreds = shreds.len() as u64;
|
||||||
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.shred.clone()).collect();
|
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect();
|
||||||
|
|
||||||
let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes");
|
let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes");
|
||||||
let ledger = Blocktree::open(&ledger_path).unwrap();
|
let ledger = Blocktree::open(&ledger_path).unwrap();
|
||||||
|
@ -3112,13 +3107,14 @@ pub mod tests {
|
||||||
let index_cf = blocktree.db.column::<cf::Index>();
|
let index_cf = blocktree.db.column::<cf::Index>();
|
||||||
let last_root = RwLock::new(0);
|
let last_root = RwLock::new(0);
|
||||||
|
|
||||||
let mut shred = CodingShred::default();
|
let mut shred = DataShredHeader::default();
|
||||||
let slot = 1;
|
let slot = 1;
|
||||||
shred.header.position = 10;
|
shred.common_header.header.shred_type = CODING_SHRED;
|
||||||
shred.header.coding_header.index = 11;
|
shred.common_header.header.position = 10;
|
||||||
shred.header.coding_header.slot = 1;
|
shred.common_header.header.coding_header.index = 11;
|
||||||
shred.header.num_coding_shreds = shred.header.position + 1;
|
shred.common_header.header.coding_header.slot = 1;
|
||||||
let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
shred.common_header.header.num_coding_shreds = shred.common_header.header.position + 1;
|
||||||
|
let coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
|
|
||||||
// Insert a good coding shred
|
// Insert a good coding shred
|
||||||
assert!(Blocktree::should_insert_coding_shred(
|
assert!(Blocktree::should_insert_coding_shred(
|
||||||
|
@ -3135,7 +3131,7 @@ pub mod tests {
|
||||||
// Trying to insert the same shred again should fail
|
// Trying to insert the same shred again should fail
|
||||||
{
|
{
|
||||||
let index = index_cf
|
let index = index_cf
|
||||||
.get(shred.header.coding_header.slot)
|
.get(shred.common_header.header.coding_header.slot)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(!Blocktree::should_insert_coding_shred(
|
assert!(!Blocktree::should_insert_coding_shred(
|
||||||
|
@ -3145,13 +3141,13 @@ pub mod tests {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
shred.header.coding_header.index += 1;
|
shred.common_header.header.coding_header.index += 1;
|
||||||
|
|
||||||
// Establish a baseline that works
|
// Establish a baseline that works
|
||||||
{
|
{
|
||||||
let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
let index = index_cf
|
let index = index_cf
|
||||||
.get(shred.header.coding_header.slot)
|
.get(shred.common_header.header.coding_header.slot)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(Blocktree::should_insert_coding_shred(
|
assert!(Blocktree::should_insert_coding_shred(
|
||||||
|
@ -3163,7 +3159,7 @@ pub mod tests {
|
||||||
|
|
||||||
// Trying to insert a shred with index < position should fail
|
// Trying to insert a shred with index < position should fail
|
||||||
{
|
{
|
||||||
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
let index = coding_shred.headers.common_header.header.position - 1;
|
let index = coding_shred.headers.common_header.header.position - 1;
|
||||||
coding_shred.set_index(index as u32);
|
coding_shred.set_index(index as u32);
|
||||||
|
|
||||||
|
@ -3177,7 +3173,7 @@ pub mod tests {
|
||||||
|
|
||||||
// Trying to insert shred with num_coding == 0 should fail
|
// Trying to insert shred with num_coding == 0 should fail
|
||||||
{
|
{
|
||||||
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
coding_shred.headers.common_header.header.num_coding_shreds = 0;
|
coding_shred.headers.common_header.header.num_coding_shreds = 0;
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
assert!(!Blocktree::should_insert_coding_shred(
|
assert!(!Blocktree::should_insert_coding_shred(
|
||||||
|
@ -3189,7 +3185,7 @@ pub mod tests {
|
||||||
|
|
||||||
// Trying to insert shred with pos >= num_coding should fail
|
// Trying to insert shred with pos >= num_coding should fail
|
||||||
{
|
{
|
||||||
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
coding_shred.headers.common_header.header.num_coding_shreds =
|
coding_shred.headers.common_header.header.num_coding_shreds =
|
||||||
coding_shred.headers.common_header.header.position;
|
coding_shred.headers.common_header.header.position;
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
|
@ -3203,7 +3199,7 @@ pub mod tests {
|
||||||
// Trying to insert with set_index with num_coding that would imply the last blob
|
// Trying to insert with set_index with num_coding that would imply the last blob
|
||||||
// has index > u32::MAX should fail
|
// has index > u32::MAX should fail
|
||||||
{
|
{
|
||||||
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
coding_shred.headers.common_header.header.num_coding_shreds = 3;
|
coding_shred.headers.common_header.header.num_coding_shreds = 3;
|
||||||
coding_shred
|
coding_shred
|
||||||
.headers
|
.headers
|
||||||
|
@ -3233,7 +3229,7 @@ pub mod tests {
|
||||||
|
|
||||||
// Trying to insert value into slot <= than last root should fail
|
// Trying to insert value into slot <= than last root should fail
|
||||||
{
|
{
|
||||||
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
|
let mut coding_shred = Shred::new_empty_from_header(shred.clone());
|
||||||
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
|
||||||
coding_shred.set_slot(*last_root.read().unwrap());
|
coding_shred.set_slot(*last_root.read().unwrap());
|
||||||
assert!(!Blocktree::should_insert_coding_shred(
|
assert!(!Blocktree::should_insert_coding_shred(
|
||||||
|
@ -3295,8 +3291,8 @@ pub mod tests {
|
||||||
|
|
||||||
// Test that the iterator for slot 8 contains what was inserted earlier
|
// Test that the iterator for slot 8 contains what was inserted earlier
|
||||||
let shred_iter = blocktree.slot_data_iterator(8).unwrap();
|
let shred_iter = blocktree.slot_data_iterator(8).unwrap();
|
||||||
let result: Vec<ShredInfo> = shred_iter
|
let result: Vec<Shred> = shred_iter
|
||||||
.filter_map(|(_, bytes)| ShredInfo::new_from_serialized_shred(bytes.to_vec()).ok())
|
.filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok())
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(result.len(), slot_8_shreds.len());
|
assert_eq!(result.len(), slot_8_shreds.len());
|
||||||
assert_eq!(result, slot_8_shreds);
|
assert_eq!(result, slot_8_shreds);
|
||||||
|
@ -3438,7 +3434,7 @@ pub mod tests {
|
||||||
slot: u64,
|
slot: u64,
|
||||||
parent_slot: u64,
|
parent_slot: u64,
|
||||||
num_entries: u64,
|
num_entries: u64,
|
||||||
) -> (Vec<ShredInfo>, Vec<Entry>) {
|
) -> (Vec<Shred>, Vec<Entry>) {
|
||||||
let entries = create_ticks(num_entries, Hash::default());
|
let entries = create_ticks(num_entries, Hash::default());
|
||||||
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true);
|
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true);
|
||||||
(shreds, entries)
|
(shreds, entries)
|
||||||
|
@ -3448,7 +3444,7 @@ pub mod tests {
|
||||||
start_slot: u64,
|
start_slot: u64,
|
||||||
num_slots: u64,
|
num_slots: u64,
|
||||||
entries_per_slot: u64,
|
entries_per_slot: u64,
|
||||||
) -> (Vec<ShredInfo>, Vec<Entry>) {
|
) -> (Vec<Shred>, Vec<Entry>) {
|
||||||
let mut shreds = vec![];
|
let mut shreds = vec![];
|
||||||
let mut entries = vec![];
|
let mut entries = vec![];
|
||||||
for slot in start_slot..start_slot + num_slots {
|
for slot in start_slot..start_slot + num_slots {
|
||||||
|
@ -3467,7 +3463,7 @@ pub mod tests {
|
||||||
pub fn make_chaining_slot_entries(
|
pub fn make_chaining_slot_entries(
|
||||||
chain: &[u64],
|
chain: &[u64],
|
||||||
entries_per_slot: u64,
|
entries_per_slot: u64,
|
||||||
) -> Vec<(Vec<ShredInfo>, Vec<Entry>)> {
|
) -> Vec<(Vec<Shred>, Vec<Entry>)> {
|
||||||
let mut slots_shreds_and_entries = vec![];
|
let mut slots_shreds_and_entries = vec![];
|
||||||
for (i, slot) in chain.iter().enumerate() {
|
for (i, slot) in chain.iter().enumerate() {
|
||||||
let parent_slot = {
|
let parent_slot = {
|
||||||
|
|
|
@ -37,7 +37,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let num_entries = receive_results.entries.len();
|
let num_entries = receive_results.entries.len();
|
||||||
let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds(
|
let (shred_bufs, _) = broadcast_utils::entries_to_shreds(
|
||||||
receive_results.entries,
|
receive_results.entries,
|
||||||
bank.slot(),
|
bank.slot(),
|
||||||
receive_results.last_tick,
|
receive_results.last_tick,
|
||||||
|
@ -57,10 +57,10 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
|
||||||
.map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
|
.map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds(
|
let (fake_shred_bufs, _) = broadcast_utils::entries_to_shreds(
|
||||||
fake_entries,
|
fake_entries,
|
||||||
bank.slot(),
|
|
||||||
receive_results.last_tick,
|
receive_results.last_tick,
|
||||||
|
bank.slot(),
|
||||||
bank.max_tick_height(),
|
bank.max_tick_height(),
|
||||||
keypair,
|
keypair,
|
||||||
latest_blob_index,
|
latest_blob_index,
|
||||||
|
@ -80,11 +80,11 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
|
||||||
if i <= self.partition {
|
if i <= self.partition {
|
||||||
// Send fake blobs to the first N peers
|
// Send fake blobs to the first N peers
|
||||||
fake_shred_bufs.iter().for_each(|b| {
|
fake_shred_bufs.iter().for_each(|b| {
|
||||||
sock.send_to(&b.shred, &peer.tvu_forwards).unwrap();
|
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
shred_bufs.iter().for_each(|b| {
|
shred_bufs.iter().for_each(|b| {
|
||||||
sock.send_to(&b.shred, &peer.tvu_forwards).unwrap();
|
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use crate::entry::Entry;
|
use crate::entry::Entry;
|
||||||
use crate::poh_recorder::WorkingBankEntry;
|
use crate::poh_recorder::WorkingBankEntry;
|
||||||
use crate::result::Result;
|
use crate::result::Result;
|
||||||
use crate::shred::{Shred, ShredInfo, Shredder, RECOMMENDED_FEC_RATE};
|
use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::signature::Keypair;
|
use solana_sdk::signature::Keypair;
|
||||||
use std::sync::mpsc::Receiver;
|
use std::sync::mpsc::Receiver;
|
||||||
|
@ -62,7 +62,7 @@ pub(super) fn entries_to_shreds(
|
||||||
keypair: &Arc<Keypair>,
|
keypair: &Arc<Keypair>,
|
||||||
latest_shred_index: u64,
|
latest_shred_index: u64,
|
||||||
parent_slot: u64,
|
parent_slot: u64,
|
||||||
) -> (Vec<Shred>, Vec<ShredInfo>, u64) {
|
) -> (Vec<Shred>, u64) {
|
||||||
let mut shredder = Shredder::new(
|
let mut shredder = Shredder::new(
|
||||||
slot,
|
slot,
|
||||||
parent_slot,
|
parent_slot,
|
||||||
|
@ -81,10 +81,9 @@ pub(super) fn entries_to_shreds(
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
}
|
}
|
||||||
|
|
||||||
let (shreds, shred_infos): (Vec<Shred>, Vec<ShredInfo>) =
|
let shred_infos: Vec<Shred> = shredder.shreds.drain(..).collect();
|
||||||
shredder.shred_tuples.into_iter().unzip();
|
|
||||||
|
|
||||||
trace!("Inserting {:?} shreds in blocktree", shreds.len());
|
trace!("Inserting {:?} shreds in blocktree", shred_infos.len());
|
||||||
|
|
||||||
(shreds, shred_infos, u64::from(shredder.index))
|
(shred_infos, u64::from(shredder.index))
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||||
.map(|meta| meta.consumed)
|
.map(|meta| meta.consumed)
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let (_, shred_infos, _) = broadcast_utils::entries_to_shreds(
|
let (shred_infos, _) = broadcast_utils::entries_to_shreds(
|
||||||
receive_results.entries,
|
receive_results.entries,
|
||||||
last_tick,
|
last_tick,
|
||||||
bank.slot(),
|
bank.slot(),
|
||||||
|
@ -54,7 +54,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||||
let bank_epoch = bank.get_stakers_epoch(bank.slot());
|
let bank_epoch = bank.get_stakers_epoch(bank.slot());
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
|
|
||||||
let shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.shred).collect();
|
let shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.payload).collect();
|
||||||
// Broadcast data + erasures
|
// Broadcast data + erasures
|
||||||
cluster_info.read().unwrap().broadcast_shreds(
|
cluster_info.read().unwrap().broadcast_shreds(
|
||||||
sock,
|
sock,
|
||||||
|
|
|
@ -79,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds(
|
let (shred_infos, latest_shred_index) = entries_to_shreds(
|
||||||
receive_results.entries,
|
receive_results.entries,
|
||||||
last_tick,
|
last_tick,
|
||||||
bank.slot(),
|
bank.slot(),
|
||||||
|
@ -90,7 +90,7 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||||
);
|
);
|
||||||
|
|
||||||
let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
|
let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
|
||||||
let num_shreds = all_shreds.len();
|
let num_shreds = shred_infos.len();
|
||||||
blocktree
|
blocktree
|
||||||
.insert_shreds(shred_infos.clone(), None)
|
.insert_shreds(shred_infos.clone(), None)
|
||||||
.expect("Failed to insert shreds in blocktree");
|
.expect("Failed to insert shreds in blocktree");
|
||||||
|
@ -102,7 +102,7 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||||
let bank_epoch = bank.get_stakers_epoch(bank.slot());
|
let bank_epoch = bank.get_stakers_epoch(bank.slot());
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
|
|
||||||
let all_shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.shred).collect();
|
let all_shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.payload).collect();
|
||||||
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
|
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
|
||||||
cluster_info.read().unwrap().broadcast_shreds(
|
cluster_info.read().unwrap().broadcast_shreds(
|
||||||
sock,
|
sock,
|
||||||
|
|
|
@ -153,7 +153,7 @@ mod tests {
|
||||||
hasher.hash(&buf[..size]);
|
hasher.hash(&buf[..size]);
|
||||||
|
|
||||||
// golden needs to be updated if blob stuff changes....
|
// golden needs to be updated if blob stuff changes....
|
||||||
let golden: Hash = "C7RmQ7oDswQfgquukXHGvpYYSCcKTgPnJrYA3ABbX9oG"
|
let golden: Hash = "3LWNjNqC6HncoWUhXbk6cUH8NSM675aZqRPGUC4Zq21H"
|
||||||
.parse()
|
.parse()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
|
@ -1768,7 +1768,7 @@ mod tests {
|
||||||
use crate::crds_value::CrdsValueLabel;
|
use crate::crds_value::CrdsValueLabel;
|
||||||
use crate::repair_service::RepairType;
|
use crate::repair_service::RepairType;
|
||||||
use crate::result::Error;
|
use crate::result::Error;
|
||||||
use crate::shred::{DataShred, Shred, ShredInfo};
|
use crate::shred::{DataShredHeader, Shred};
|
||||||
use crate::test_tx::test_tx;
|
use crate::test_tx::test_tx;
|
||||||
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
|
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
|
@ -1928,12 +1928,11 @@ mod tests {
|
||||||
0,
|
0,
|
||||||
);
|
);
|
||||||
assert!(rv.is_empty());
|
assert!(rv.is_empty());
|
||||||
let mut data_shred = DataShred::default();
|
let mut data_shred = DataShredHeader::default();
|
||||||
data_shred.header.data_header.slot = 2;
|
data_shred.data_header.slot = 2;
|
||||||
data_shred.header.parent_offset = 1;
|
data_shred.parent_offset = 1;
|
||||||
data_shred.header.data_header.index = 1;
|
data_shred.data_header.index = 1;
|
||||||
let shred = Shred::Data(data_shred);
|
let shred_info = Shred::new_empty_from_header(data_shred);
|
||||||
let shred_info = ShredInfo::new_from_shred(&shred);
|
|
||||||
|
|
||||||
blocktree
|
blocktree
|
||||||
.insert_shreds(vec![shred_info], None)
|
.insert_shreds(vec![shred_info], None)
|
||||||
|
@ -1948,10 +1947,10 @@ mod tests {
|
||||||
1,
|
1,
|
||||||
);
|
);
|
||||||
assert!(!rv.is_empty());
|
assert!(!rv.is_empty());
|
||||||
let rv: Vec<ShredInfo> = rv
|
let rv: Vec<Shred> = rv
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|b| {
|
.filter_map(|b| {
|
||||||
ShredInfo::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
|
Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(rv[0].index(), 1);
|
assert_eq!(rv[0].index(), 1);
|
||||||
|
@ -1982,10 +1981,10 @@ mod tests {
|
||||||
|
|
||||||
let rv =
|
let rv =
|
||||||
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1);
|
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1);
|
||||||
let rv: Vec<ShredInfo> = rv
|
let rv: Vec<Shred> = rv
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|b| {
|
.filter_map(|b| {
|
||||||
ShredInfo::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
|
Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok()
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
assert!(!rv.is_empty());
|
assert!(!rv.is_empty());
|
||||||
|
|
|
@ -806,7 +806,7 @@ mod test {
|
||||||
use crate::entry;
|
use crate::entry;
|
||||||
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
|
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
|
||||||
use crate::replay_stage::ReplayStage;
|
use crate::replay_stage::ReplayStage;
|
||||||
use crate::shred::ShredInfo;
|
use crate::shred::Shred;
|
||||||
use solana_runtime::genesis_utils::GenesisBlockInfo;
|
use solana_runtime::genesis_utils::GenesisBlockInfo;
|
||||||
use solana_sdk::hash::{hash, Hash};
|
use solana_sdk::hash::{hash, Hash};
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
@ -956,7 +956,7 @@ mod test {
|
||||||
// marked as dead. Returns the error for caller to verify.
|
// marked as dead. Returns the error for caller to verify.
|
||||||
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
|
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(&Hash, u64) -> Vec<ShredInfo>,
|
F: Fn(&Hash, u64) -> Vec<Shred>,
|
||||||
{
|
{
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
let res = {
|
let res = {
|
||||||
|
|
|
@ -11,7 +11,7 @@ use crate::repair_service;
|
||||||
use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
|
use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::shred::ShredInfo;
|
use crate::shred::Shred;
|
||||||
use crate::storage_stage::NUM_STORAGE_SAMPLES;
|
use crate::storage_stage::NUM_STORAGE_SAMPLES;
|
||||||
use crate::streamer::{receiver, responder, PacketReceiver};
|
use crate::streamer::{receiver, responder, PacketReceiver};
|
||||||
use crate::window_service::WindowService;
|
use crate::window_service::WindowService;
|
||||||
|
@ -871,10 +871,10 @@ impl Replicator {
|
||||||
while let Ok(mut more) = r_reader.try_recv() {
|
while let Ok(mut more) = r_reader.try_recv() {
|
||||||
packets.packets.append(&mut more.packets);
|
packets.packets.append(&mut more.packets);
|
||||||
}
|
}
|
||||||
let shreds: Vec<ShredInfo> = packets
|
let shreds: Vec<Shred> = packets
|
||||||
.packets
|
.packets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|p| ShredInfo::new_from_serialized_shred(p.data.to_vec()).ok())
|
.filter_map(|p| Shred::new_from_serialized_shred(p.data.to_vec()).ok())
|
||||||
.collect();
|
.collect();
|
||||||
blocktree.insert_shreds(shreds, None)?;
|
blocktree.insert_shreds(shreds, None)?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ use crate::erasure::Session;
|
||||||
use crate::result;
|
use crate::result;
|
||||||
use crate::result::Error;
|
use crate::result::Error;
|
||||||
use bincode::serialized_size;
|
use bincode::serialized_size;
|
||||||
use core::borrow::BorrowMut;
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use solana_sdk::packet::PACKET_DATA_SIZE;
|
use solana_sdk::packet::PACKET_DATA_SIZE;
|
||||||
|
@ -22,10 +21,6 @@ lazy_static! {
|
||||||
{ serialized_size(&CodingShred::empty_shred()).unwrap() as usize };
|
{ serialized_size(&CodingShred::empty_shred()).unwrap() as usize };
|
||||||
static ref SIZE_OF_EMPTY_DATA_SHRED: usize =
|
static ref SIZE_OF_EMPTY_DATA_SHRED: usize =
|
||||||
{ serialized_size(&DataShred::empty_shred()).unwrap() as usize };
|
{ serialized_size(&DataShred::empty_shred()).unwrap() as usize };
|
||||||
static ref SIZE_OF_SHRED_CODING_SHRED: usize =
|
|
||||||
{ serialized_size(&Shred::Coding(CodingShred::empty_shred())).unwrap() as usize };
|
|
||||||
static ref SIZE_OF_SHRED_DATA_SHRED: usize =
|
|
||||||
{ serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap() as usize };
|
|
||||||
static ref SIZE_OF_SIGNATURE: usize =
|
static ref SIZE_OF_SIGNATURE: usize =
|
||||||
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
|
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
|
||||||
static ref SIZE_OF_EMPTY_VEC: usize =
|
static ref SIZE_OF_EMPTY_VEC: usize =
|
||||||
|
@ -34,83 +29,42 @@ lazy_static! {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The constants that define if a shred is data or coding
|
/// The constants that define if a shred is data or coding
|
||||||
const DATA_SHRED: u8 = 0b1010_0101;
|
pub const DATA_SHRED: u8 = 0b1010_0101;
|
||||||
const CODING_SHRED: u8 = 0b0101_1010;
|
pub const CODING_SHRED: u8 = 0b0101_1010;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct ShredInfo {
|
pub struct Shred {
|
||||||
pub headers: DataShredHeader,
|
pub headers: DataShredHeader,
|
||||||
pub shred: Vec<u8>,
|
pub payload: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredInfo {
|
impl Shred {
|
||||||
fn new(header: DataShredHeader, shred_buf: Vec<u8>) -> Self {
|
fn new(header: DataShredHeader, shred_buf: Vec<u8>) -> Self {
|
||||||
ShredInfo {
|
Shred {
|
||||||
headers: header,
|
headers: header,
|
||||||
shred: shred_buf,
|
payload: shred_buf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> result::Result<Self> {
|
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> result::Result<Self> {
|
||||||
let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
|
let shred_type: u8 = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?;
|
||||||
let shred_type: u8 =
|
|
||||||
bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])?;
|
|
||||||
let header = if shred_type == CODING_SHRED {
|
let header = if shred_type == CODING_SHRED {
|
||||||
let end = *SIZE_OF_CODING_SHRED_HEADER;
|
let end = *SIZE_OF_CODING_SHRED_HEADER;
|
||||||
let mut header = DataShredHeader::default();
|
let mut header = DataShredHeader::default();
|
||||||
header.common_header.header =
|
header.common_header.header = bincode::deserialize(&shred_buf[..end])?;
|
||||||
bincode::deserialize(&shred_buf[header_offset..header_offset + end])?;
|
|
||||||
header
|
header
|
||||||
} else {
|
} else {
|
||||||
let end = *SIZE_OF_DATA_SHRED_HEADER;
|
let end = *SIZE_OF_DATA_SHRED_HEADER;
|
||||||
bincode::deserialize(&shred_buf[header_offset..header_offset + end])?
|
bincode::deserialize(&shred_buf[..end])?
|
||||||
};
|
};
|
||||||
Ok(Self::new(header, shred_buf))
|
Ok(Self::new(header, shred_buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_from_shred_and_buf(shred: &Shred, shred_buf: Vec<u8>) -> Self {
|
pub fn new_empty_from_header(headers: DataShredHeader) -> Self {
|
||||||
let header = match shred {
|
|
||||||
Shred::Data(s) => s.header.clone(),
|
|
||||||
Shred::Coding(s) => {
|
|
||||||
let mut hdr = DataShredHeader::default();
|
|
||||||
hdr.common_header.header = s.header.clone();
|
|
||||||
hdr
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Self::new(header, shred_buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_from_shred(shred: &Shred) -> Self {
|
|
||||||
let header = match shred {
|
|
||||||
Shred::Data(s) => s.header.clone(),
|
|
||||||
Shred::Coding(s) => {
|
|
||||||
let mut hdr = DataShredHeader::default();
|
|
||||||
hdr.common_header.header = s.header.clone();
|
|
||||||
hdr
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let shred_buf = bincode::serialize(&shred).unwrap();
|
|
||||||
|
|
||||||
Self::new(header, shred_buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_empty_from_header(header: DataShredHeader) -> Self {
|
|
||||||
let start = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
|
|
||||||
let end = start + *SIZE_OF_DATA_SHRED_HEADER;
|
|
||||||
let mut payload = vec![0; PACKET_DATA_SIZE];
|
let mut payload = vec![0; PACKET_DATA_SIZE];
|
||||||
let mut wr = io::Cursor::new(&mut payload[start..end]);
|
let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_DATA_SHRED_HEADER]);
|
||||||
bincode::serialize_into(&mut wr, &header).expect("Failed to serialize shred");
|
bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred");
|
||||||
if header.common_header.header.shred_type == CODING_SHRED {
|
Shred { headers, payload }
|
||||||
let shred_type = 1;
|
|
||||||
let mut wr = io::Cursor::new(&mut payload[..start]);
|
|
||||||
bincode::serialize_into(&mut wr, &shred_type).expect("Failed to set coding shred type");
|
|
||||||
}
|
|
||||||
ShredInfo {
|
|
||||||
headers: header,
|
|
||||||
shred: payload,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn header(&self) -> &ShredCommonHeader {
|
fn header(&self) -> &ShredCommonHeader {
|
||||||
|
@ -214,21 +168,13 @@ impl ShredInfo {
|
||||||
let signed_payload_offset = if self.is_data() {
|
let signed_payload_offset = if self.is_data() {
|
||||||
CodingShred::overhead()
|
CodingShred::overhead()
|
||||||
} else {
|
} else {
|
||||||
CodingShred::overhead() + *SIZE_OF_SHRED_TYPE
|
*SIZE_OF_SHRED_TYPE
|
||||||
- *SIZE_OF_CODING_SHRED_HEADER
|
|
||||||
- *SIZE_OF_EMPTY_VEC
|
|
||||||
} + *SIZE_OF_SIGNATURE;
|
} + *SIZE_OF_SIGNATURE;
|
||||||
self.signature()
|
self.signature()
|
||||||
.verify(pubkey.as_ref(), &self.shred[signed_payload_offset..])
|
.verify(pubkey.as_ref(), &self.payload[signed_payload_offset..])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
|
|
||||||
pub enum Shred {
|
|
||||||
Data(DataShred),
|
|
||||||
Coding(CodingShred),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This limit comes from reed solomon library, but unfortunately they don't have
|
/// This limit comes from reed solomon library, but unfortunately they don't have
|
||||||
/// a public constant defined for it.
|
/// a public constant defined for it.
|
||||||
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16;
|
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16;
|
||||||
|
@ -310,7 +256,7 @@ impl Default for CodingShredHeader {
|
||||||
/// Default shred is sized correctly to meet MTU/Packet size requirements
|
/// Default shred is sized correctly to meet MTU/Packet size requirements
|
||||||
impl Default for DataShred {
|
impl Default for DataShred {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_DATA_SHRED;
|
let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_DATA_SHRED;
|
||||||
DataShred {
|
DataShred {
|
||||||
header: DataShredHeader::default(),
|
header: DataShredHeader::default(),
|
||||||
payload: vec![0; size],
|
payload: vec![0; size],
|
||||||
|
@ -321,7 +267,7 @@ impl Default for DataShred {
|
||||||
/// Default shred is sized correctly to meet MTU/Packet size requirements
|
/// Default shred is sized correctly to meet MTU/Packet size requirements
|
||||||
impl Default for CodingShred {
|
impl Default for CodingShred {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED;
|
let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_CODING_SHRED;
|
||||||
CodingShred {
|
CodingShred {
|
||||||
header: CodingShredHeader::default(),
|
header: CodingShredHeader::default(),
|
||||||
payload: vec![0; size],
|
payload: vec![0; size],
|
||||||
|
@ -351,7 +297,7 @@ impl ShredCommon for DataShred {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn overhead() -> usize {
|
fn overhead() -> usize {
|
||||||
*SIZE_OF_SHRED_DATA_SHRED - *SIZE_OF_EMPTY_VEC
|
*SIZE_OF_EMPTY_DATA_SHRED - *SIZE_OF_EMPTY_VEC
|
||||||
}
|
}
|
||||||
|
|
||||||
fn empty_shred() -> Self {
|
fn empty_shred() -> Self {
|
||||||
|
@ -374,7 +320,7 @@ impl ShredCommon for CodingShred {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn overhead() -> usize {
|
fn overhead() -> usize {
|
||||||
*SIZE_OF_SHRED_CODING_SHRED
|
*SIZE_OF_EMPTY_CODING_SHRED
|
||||||
}
|
}
|
||||||
|
|
||||||
fn empty_shred() -> Self {
|
fn empty_shred() -> Self {
|
||||||
|
@ -393,19 +339,16 @@ pub struct Shredder {
|
||||||
parent_offset: u16,
|
parent_offset: u16,
|
||||||
fec_rate: f32,
|
fec_rate: f32,
|
||||||
signer: Arc<Keypair>,
|
signer: Arc<Keypair>,
|
||||||
pub shred_tuples: Vec<(Shred, ShredInfo)>,
|
pub shreds: Vec<Shred>,
|
||||||
fec_set_shred_start: usize,
|
fec_set_shred_start: usize,
|
||||||
active_shred: Shred,
|
active_shred: DataShred,
|
||||||
active_offset: usize,
|
active_offset: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Write for Shredder {
|
impl Write for Shredder {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
let written = self.active_offset;
|
let written = self.active_offset;
|
||||||
let (slice_len, capacity) = match self.active_shred.borrow_mut() {
|
let (slice_len, capacity) = self.active_shred.write_at(written, buf);
|
||||||
Shred::Data(s) => s.write_at(written, buf),
|
|
||||||
Shred::Coding(s) => s.write_at(written, buf),
|
|
||||||
};
|
|
||||||
|
|
||||||
if buf.len() > slice_len || capacity == 0 {
|
if buf.len() > slice_len || capacity == 0 {
|
||||||
self.finalize_data_shred();
|
self.finalize_data_shred();
|
||||||
|
@ -427,13 +370,6 @@ impl Write for Shredder {
|
||||||
|
|
||||||
#[derive(Default, Debug, PartialEq)]
|
#[derive(Default, Debug, PartialEq)]
|
||||||
pub struct RecoveryResult {
|
pub struct RecoveryResult {
|
||||||
pub recovered_data: Vec<ShredInfo>,
|
|
||||||
pub recovered_code: Vec<ShredInfo>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Debug, PartialEq)]
|
|
||||||
pub struct DeshredResult {
|
|
||||||
pub payload: Vec<u8>,
|
|
||||||
pub recovered_data: Vec<Shred>,
|
pub recovered_data: Vec<Shred>,
|
||||||
pub recovered_code: Vec<Shred>,
|
pub recovered_code: Vec<Shred>,
|
||||||
}
|
}
|
||||||
|
@ -467,7 +403,7 @@ impl Shredder {
|
||||||
data_shred.header.data_header.slot = slot;
|
data_shred.header.data_header.slot = slot;
|
||||||
data_shred.header.data_header.index = index;
|
data_shred.header.data_header.index = index;
|
||||||
data_shred.header.parent_offset = (slot - parent) as u16;
|
data_shred.header.parent_offset = (slot - parent) as u16;
|
||||||
let active_shred = Shred::Data(data_shred);
|
let active_shred = data_shred;
|
||||||
Ok(Shredder {
|
Ok(Shredder {
|
||||||
slot,
|
slot,
|
||||||
index,
|
index,
|
||||||
|
@ -475,7 +411,7 @@ impl Shredder {
|
||||||
parent_offset: (slot - parent) as u16,
|
parent_offset: (slot - parent) as u16,
|
||||||
fec_rate,
|
fec_rate,
|
||||||
signer: signer.clone(),
|
signer: signer.clone(),
|
||||||
shred_tuples: vec![],
|
shreds: vec![],
|
||||||
fec_set_shred_start: 0,
|
fec_set_shred_start: 0,
|
||||||
active_shred,
|
active_shred,
|
||||||
active_offset: 0,
|
active_offset: 0,
|
||||||
|
@ -483,12 +419,12 @@ impl Shredder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sign_shred(signer: &Arc<Keypair>, shred_info: &mut ShredInfo, signature_offset: usize) {
|
fn sign_shred(signer: &Arc<Keypair>, shred_info: &mut Shred, signature_offset: usize) {
|
||||||
let data_offset = signature_offset + *SIZE_OF_SIGNATURE;
|
let data_offset = signature_offset + *SIZE_OF_SIGNATURE;
|
||||||
let signature = signer.sign_message(&shred_info.shred[data_offset..]);
|
let signature = signer.sign_message(&shred_info.payload[data_offset..]);
|
||||||
let serialized_signature =
|
let serialized_signature =
|
||||||
bincode::serialize(&signature).expect("Failed to generate serialized signature");
|
bincode::serialize(&signature).expect("Failed to generate serialized signature");
|
||||||
shred_info.shred[signature_offset..signature_offset + serialized_signature.len()]
|
shred_info.payload[signature_offset..signature_offset + serialized_signature.len()]
|
||||||
.copy_from_slice(&serialized_signature);
|
.copy_from_slice(&serialized_signature);
|
||||||
shred_info.header_mut().signature = signature;
|
shred_info.header_mut().signature = signature;
|
||||||
}
|
}
|
||||||
|
@ -496,18 +432,17 @@ impl Shredder {
|
||||||
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
|
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
|
||||||
let signature_offset = CodingShred::overhead();
|
let signature_offset = CodingShred::overhead();
|
||||||
let signer = self.signer.clone();
|
let signer = self.signer.clone();
|
||||||
self.shred_tuples[self.fec_set_shred_start..]
|
self.shreds[self.fec_set_shred_start..]
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.for_each(|(_, d)| Self::sign_shred(&signer, d, signature_offset));
|
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
||||||
let unsigned_coding_shred_start = self.shred_tuples.len();
|
let unsigned_coding_shred_start = self.shreds.len();
|
||||||
|
|
||||||
self.generate_coding_shreds();
|
self.generate_coding_shreds();
|
||||||
let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED + *SIZE_OF_SHRED_TYPE
|
let signature_offset = *SIZE_OF_SHRED_TYPE;
|
||||||
- *SIZE_OF_CODING_SHRED_HEADER
|
self.shreds[unsigned_coding_shred_start..]
|
||||||
- *SIZE_OF_EMPTY_VEC;
|
|
||||||
self.shred_tuples[unsigned_coding_shred_start..]
|
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.for_each(|(_, d)| Self::sign_shred(&signer, d, coding_header_offset));
|
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
||||||
self.fec_set_shred_start = self.shred_tuples.len();
|
self.fec_set_shred_start = self.shreds.len();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Finalize a data shred. Update the shred index for the next shred
|
/// Finalize a data shred. Update the shred index for the next shred
|
||||||
|
@ -518,10 +453,10 @@ impl Shredder {
|
||||||
self.active_offset = 0;
|
self.active_offset = 0;
|
||||||
self.index += 1;
|
self.index += 1;
|
||||||
|
|
||||||
let mut shred = Shred::Data(self.new_data_shred());
|
let mut shred = self.new_data_shred();
|
||||||
std::mem::swap(&mut shred, &mut self.active_shred);
|
std::mem::swap(&mut shred, &mut self.active_shred);
|
||||||
let shred_info = ShredInfo::new_from_shred_and_buf(&shred, data);
|
let shred_info = Shred::new(shred.header, data);
|
||||||
self.shred_tuples.push((shred, shred_info));
|
self.shreds.push(shred_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new data shred
|
/// Creates a new data shred
|
||||||
|
@ -559,23 +494,23 @@ impl Shredder {
|
||||||
Session::new(num_data, num_coding).expect("Failed to create erasure session");
|
Session::new(num_data, num_coding).expect("Failed to create erasure session");
|
||||||
let start_index = self.index - num_data as u32;
|
let start_index = self.index - num_data as u32;
|
||||||
|
|
||||||
// All information after "reserved" field (coding shred header) in a data shred is encoded
|
// All information after coding shred field in a data shred is encoded
|
||||||
let coding_block_offset = CodingShred::overhead();
|
let coding_block_offset = CodingShred::overhead();
|
||||||
let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..]
|
let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..]
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, data)| &data.shred[coding_block_offset..])
|
.map(|data| &data.payload[coding_block_offset..])
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Create empty coding shreds, with correctly populated headers
|
// Create empty coding shreds, with correctly populated headers
|
||||||
let mut coding_shreds = Vec::with_capacity(num_coding);
|
let mut coding_shreds = Vec::with_capacity(num_coding);
|
||||||
(0..num_coding).for_each(|i| {
|
(0..num_coding).for_each(|i| {
|
||||||
let shred = bincode::serialize(&Shred::Coding(Self::new_coding_shred(
|
let shred = bincode::serialize(&Self::new_coding_shred(
|
||||||
self.slot,
|
self.slot,
|
||||||
start_index + i as u32,
|
start_index + i as u32,
|
||||||
num_data,
|
num_data,
|
||||||
num_coding,
|
num_coding,
|
||||||
i,
|
i,
|
||||||
)))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
coding_shreds.push(shred);
|
coding_shreds.push(shred);
|
||||||
});
|
});
|
||||||
|
@ -592,10 +527,16 @@ impl Shredder {
|
||||||
.expect("Failed in erasure encode");
|
.expect("Failed in erasure encode");
|
||||||
|
|
||||||
// append to the shred list
|
// append to the shred list
|
||||||
coding_shreds.into_iter().for_each(|code| {
|
coding_shreds.into_iter().enumerate().for_each(|(i, code)| {
|
||||||
let shred: Shred = bincode::deserialize(&code).unwrap();
|
let mut header = DataShredHeader::default();
|
||||||
let shred_info = ShredInfo::new_from_shred_and_buf(&shred, code);
|
header.common_header.header.shred_type = CODING_SHRED;
|
||||||
self.shred_tuples.push((shred, shred_info));
|
header.common_header.header.coding_header.index = start_index + i as u32;
|
||||||
|
header.common_header.header.coding_header.slot = self.slot;
|
||||||
|
header.common_header.header.num_coding_shreds = num_coding as u16;
|
||||||
|
header.common_header.header.num_data_shreds = num_data as u16;
|
||||||
|
header.common_header.header.position = i as u16;
|
||||||
|
let shred_info = Shred::new(header, code);
|
||||||
|
self.shreds.push(shred_info);
|
||||||
});
|
});
|
||||||
self.fec_set_index = self.index;
|
self.fec_set_index = self.index;
|
||||||
}
|
}
|
||||||
|
@ -605,21 +546,13 @@ impl Shredder {
|
||||||
/// If there's an active data shred, morph it into the final shred
|
/// If there's an active data shred, morph it into the final shred
|
||||||
/// If the current active data shred is first in slot, finalize it and create a new shred
|
/// If the current active data shred is first in slot, finalize it and create a new shred
|
||||||
fn make_final_data_shred(&mut self, last_in_slot: u8) {
|
fn make_final_data_shred(&mut self, last_in_slot: u8) {
|
||||||
if let Shred::Data(s) = &self.active_shred {
|
if self.active_shred.header.data_header.index == 0 {
|
||||||
if s.header.data_header.index == 0 {
|
self.finalize_data_shred();
|
||||||
self.finalize_data_shred();
|
}
|
||||||
}
|
self.active_shred.header.flags |= DATA_COMPLETE_SHRED;
|
||||||
|
if last_in_slot == LAST_SHRED_IN_SLOT {
|
||||||
|
self.active_shred.header.flags |= LAST_SHRED_IN_SLOT;
|
||||||
}
|
}
|
||||||
self.active_shred = match self.active_shred.borrow_mut() {
|
|
||||||
Shred::Data(s) => {
|
|
||||||
s.header.flags |= DATA_COMPLETE_SHRED;
|
|
||||||
if last_in_slot == LAST_SHRED_IN_SLOT {
|
|
||||||
s.header.flags |= LAST_SHRED_IN_SLOT;
|
|
||||||
}
|
|
||||||
Shred::Data(s.clone())
|
|
||||||
}
|
|
||||||
Shred::Coding(_) => unreachable!(),
|
|
||||||
};
|
|
||||||
self.finalize_data_shred();
|
self.finalize_data_shred();
|
||||||
self.sign_unsigned_shreds_and_generate_codes();
|
self.sign_unsigned_shreds_and_generate_codes();
|
||||||
}
|
}
|
||||||
|
@ -635,7 +568,7 @@ impl Shredder {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fill_in_missing_shreds(
|
fn fill_in_missing_shreds(
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
num_data: usize,
|
num_data: usize,
|
||||||
num_coding: usize,
|
num_coding: usize,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
|
@ -667,25 +600,25 @@ impl Shredder {
|
||||||
first_index: usize,
|
first_index: usize,
|
||||||
missing: usize,
|
missing: usize,
|
||||||
) -> Vec<u8> {
|
) -> Vec<u8> {
|
||||||
let missing_shred = if missing < first_index + num_data {
|
if missing < first_index + num_data {
|
||||||
let mut data_shred = DataShred::default();
|
let mut data_shred = DataShred::default();
|
||||||
data_shred.header.data_header.slot = slot;
|
data_shred.header.data_header.slot = slot;
|
||||||
data_shred.header.data_header.index = missing as u32;
|
data_shred.header.data_header.index = missing as u32;
|
||||||
Shred::Data(data_shred)
|
bincode::serialize(&data_shred).unwrap()
|
||||||
} else {
|
} else {
|
||||||
Shred::Coding(Self::new_coding_shred(
|
bincode::serialize(&Self::new_coding_shred(
|
||||||
slot,
|
slot,
|
||||||
missing.saturating_sub(num_data) as u32,
|
missing.saturating_sub(num_data) as u32,
|
||||||
num_data,
|
num_data,
|
||||||
num_coding,
|
num_coding,
|
||||||
missing - first_index - num_data,
|
missing - first_index - num_data,
|
||||||
))
|
))
|
||||||
};
|
.unwrap()
|
||||||
bincode::serialize(&missing_shred).unwrap()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_recovery(
|
pub fn try_recovery(
|
||||||
shreds: Vec<ShredInfo>,
|
shreds: Vec<Shred>,
|
||||||
num_data: usize,
|
num_data: usize,
|
||||||
num_coding: usize,
|
num_coding: usize,
|
||||||
first_index: usize,
|
first_index: usize,
|
||||||
|
@ -712,7 +645,7 @@ impl Shredder {
|
||||||
next_expected_index,
|
next_expected_index,
|
||||||
&mut present,
|
&mut present,
|
||||||
);
|
);
|
||||||
blocks.push(shred.shred);
|
blocks.push(shred.payload);
|
||||||
next_expected_index = last_index + 1;
|
next_expected_index = last_index + 1;
|
||||||
blocks
|
blocks
|
||||||
})
|
})
|
||||||
|
@ -750,7 +683,7 @@ impl Shredder {
|
||||||
let drain_this = position - num_drained;
|
let drain_this = position - num_drained;
|
||||||
let shred_buf = shred_bufs.remove(drain_this);
|
let shred_buf = shred_bufs.remove(drain_this);
|
||||||
num_drained += 1;
|
num_drained += 1;
|
||||||
if let Ok(shred) = ShredInfo::new_from_serialized_shred(shred_buf) {
|
if let Ok(shred) = Shred::new_from_serialized_shred(shred_buf) {
|
||||||
let shred_index = shred.index() as usize;
|
let shred_index = shred.index() as usize;
|
||||||
// Valid shred must be in the same slot as the original shreds
|
// Valid shred must be in the same slot as the original shreds
|
||||||
if shred.slot() == slot {
|
if shred.slot() == slot {
|
||||||
|
@ -780,7 +713,7 @@ impl Shredder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Combines all shreds to recreate the original buffer
|
/// Combines all shreds to recreate the original buffer
|
||||||
pub fn deshred(shreds: &[ShredInfo]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
|
pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
|
||||||
let num_data = shreds.len();
|
let num_data = shreds.len();
|
||||||
let data_shred_bufs = {
|
let data_shred_bufs = {
|
||||||
let first_index = shreds.first().unwrap().index() as usize;
|
let first_index = shreds.first().unwrap().index() as usize;
|
||||||
|
@ -795,13 +728,13 @@ impl Shredder {
|
||||||
Err(reed_solomon_erasure::Error::TooFewDataShards)?;
|
Err(reed_solomon_erasure::Error::TooFewDataShards)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
shreds.iter().map(|shred| &shred.shred).collect()
|
shreds.iter().map(|shred| &shred.payload).collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self::reassemble_payload(num_data, data_shred_bufs))
|
Ok(Self::reassemble_payload(num_data, data_shred_bufs))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_shred_index(shred: &ShredInfo, num_data: usize) -> usize {
|
fn get_shred_index(shred: &Shred, num_data: usize) -> usize {
|
||||||
if shred.is_data() {
|
if shred.is_data() {
|
||||||
shred.index() as usize
|
shred.index() as usize
|
||||||
} else {
|
} else {
|
||||||
|
@ -813,7 +746,7 @@ impl Shredder {
|
||||||
data_shred_bufs[..num_data]
|
data_shred_bufs[..num_data]
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|data| {
|
.flat_map(|data| {
|
||||||
let offset = *SIZE_OF_SHRED_DATA_SHRED;
|
let offset = *SIZE_OF_EMPTY_DATA_SHRED;
|
||||||
data[offset as usize..].iter()
|
data[offset as usize..].iter()
|
||||||
})
|
})
|
||||||
.cloned()
|
.cloned()
|
||||||
|
@ -826,14 +759,14 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn verify_test_data_shred(
|
fn verify_test_data_shred(
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
index: u32,
|
index: u32,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
parent: u64,
|
parent: u64,
|
||||||
pk: &Pubkey,
|
pk: &Pubkey,
|
||||||
verify: bool,
|
verify: bool,
|
||||||
) {
|
) {
|
||||||
assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
|
assert_eq!(shred.payload.len(), PACKET_DATA_SIZE);
|
||||||
assert!(shred.is_data());
|
assert!(shred.is_data());
|
||||||
assert_eq!(shred.index(), index);
|
assert_eq!(shred.index(), index);
|
||||||
assert_eq!(shred.slot(), slot);
|
assert_eq!(shred.slot(), slot);
|
||||||
|
@ -841,8 +774,8 @@ mod tests {
|
||||||
assert_eq!(verify, shred.verify(pk));
|
assert_eq!(verify, shred.verify(pk));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_test_code_shred(shred: &ShredInfo, index: u32, slot: u64, pk: &Pubkey, verify: bool) {
|
fn verify_test_code_shred(shred: &Shred, index: u32, slot: u64, pk: &Pubkey, verify: bool) {
|
||||||
assert_eq!(shred.shred.len(), PACKET_DATA_SIZE);
|
assert_eq!(shred.payload.len(), PACKET_DATA_SIZE);
|
||||||
assert!(!shred.is_data());
|
assert!(!shred.is_data());
|
||||||
assert_eq!(shred.index(), index);
|
assert_eq!(shred.index(), index);
|
||||||
assert_eq!(shred.slot(), slot);
|
assert_eq!(shred.slot(), slot);
|
||||||
|
@ -865,7 +798,7 @@ mod tests {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
|
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
assert!(DataShred::overhead() < PACKET_DATA_SIZE);
|
assert!(DataShred::overhead() < PACKET_DATA_SIZE);
|
||||||
|
@ -874,12 +807,12 @@ mod tests {
|
||||||
// Test0: Write some data to shred. Not enough to create a signed shred
|
// Test0: Write some data to shred. Not enough to create a signed shred
|
||||||
let data: Vec<u8> = (0..25).collect();
|
let data: Vec<u8> = (0..25).collect();
|
||||||
assert_eq!(shredder.write(&data).unwrap(), data.len());
|
assert_eq!(shredder.write(&data).unwrap(), data.len());
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 25);
|
assert_eq!(shredder.active_offset, 25);
|
||||||
|
|
||||||
// Test1: Write some more data to shred. Not enough to create a signed shred
|
// Test1: Write some more data to shred. Not enough to create a signed shred
|
||||||
assert_eq!(shredder.write(&data).unwrap(), data.len());
|
assert_eq!(shredder.write(&data).unwrap(), data.len());
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 50);
|
assert_eq!(shredder.active_offset, 50);
|
||||||
|
|
||||||
// Test2: Write enough data to create a shred (> PACKET_DATA_SIZE)
|
// Test2: Write enough data to create a shred (> PACKET_DATA_SIZE)
|
||||||
|
@ -888,12 +821,12 @@ mod tests {
|
||||||
let offset = shredder.write(&data).unwrap();
|
let offset = shredder.write(&data).unwrap();
|
||||||
assert_ne!(offset, data.len());
|
assert_ne!(offset, data.len());
|
||||||
// Assert that we have atleast one signed shred
|
// Assert that we have atleast one signed shred
|
||||||
assert!(!shredder.shred_tuples.is_empty());
|
assert!(!shredder.shreds.is_empty());
|
||||||
// Assert that the new active shred was not populated
|
// Assert that the new active shred was not populated
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
// Test3: Assert that the first shred in slot was created (since we gave a parent to shredder)
|
// Test3: Assert that the first shred in slot was created (since we gave a parent to shredder)
|
||||||
let (_, shred) = &shredder.shred_tuples[0];
|
let shred = &shredder.shreds[0];
|
||||||
// Test4: assert that it matches the original shred
|
// Test4: assert that it matches the original shred
|
||||||
// The shreds are not signed yet, as the data is not finalized
|
// The shreds are not signed yet, as the data is not finalized
|
||||||
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), false);
|
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), false);
|
||||||
|
@ -910,10 +843,10 @@ mod tests {
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
|
|
||||||
// We should have a new signed shred
|
// We should have a new signed shred
|
||||||
assert!(!shredder.shred_tuples.is_empty());
|
assert!(!shredder.shreds.is_empty());
|
||||||
|
|
||||||
// Must be Last in FEC Set
|
// Must be Last in FEC Set
|
||||||
let (_, shred) = &shredder.shred_tuples[1];
|
let shred = &shredder.shreds[1];
|
||||||
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
// Test that same seed is NOT generated for two different shreds
|
// Test that same seed is NOT generated for two different shreds
|
||||||
|
@ -927,9 +860,9 @@ mod tests {
|
||||||
assert_ne!(offset, data.len());
|
assert_ne!(offset, data.len());
|
||||||
|
|
||||||
// We should have a new signed shred
|
// We should have a new signed shred
|
||||||
assert!(!shredder.shred_tuples.is_empty());
|
assert!(!shredder.shreds.is_empty());
|
||||||
|
|
||||||
let (_, shred) = &shredder.shred_tuples[2];
|
let shred = &shredder.shreds[2];
|
||||||
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), false);
|
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), false);
|
||||||
|
|
||||||
// Test8: Write more data to generate an intermediate data shred
|
// Test8: Write more data to generate an intermediate data shred
|
||||||
|
@ -937,10 +870,10 @@ mod tests {
|
||||||
assert_ne!(offset, data.len());
|
assert_ne!(offset, data.len());
|
||||||
|
|
||||||
// We should have a new signed shred
|
// We should have a new signed shred
|
||||||
assert!(!shredder.shred_tuples.is_empty());
|
assert!(!shredder.shreds.is_empty());
|
||||||
|
|
||||||
// Must be a Data shred
|
// Must be a Data shred
|
||||||
let (_, shred) = &shredder.shred_tuples[3];
|
let shred = &shredder.shreds[3];
|
||||||
verify_test_data_shred(&shred, 3, slot, slot - 5, &keypair.pubkey(), false);
|
verify_test_data_shred(&shred, 3, slot, slot - 5, &keypair.pubkey(), false);
|
||||||
|
|
||||||
// Test9: Write some data to shredder
|
// Test9: Write some data to shredder
|
||||||
|
@ -951,10 +884,10 @@ mod tests {
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
|
|
||||||
// We should have a new signed shred
|
// We should have a new signed shred
|
||||||
assert!(!shredder.shred_tuples.is_empty());
|
assert!(!shredder.shreds.is_empty());
|
||||||
|
|
||||||
// Must be LastInSlot
|
// Must be LastInSlot
|
||||||
let (_, shred) = &shredder.shred_tuples[4];
|
let shred = &shredder.shreds[4];
|
||||||
verify_test_data_shred(&shred, 4, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 4, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -966,7 +899,7 @@ mod tests {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
|
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
let data: Vec<_> = (0..25).collect();
|
let data: Vec<_> = (0..25).collect();
|
||||||
|
@ -974,23 +907,23 @@ mod tests {
|
||||||
let _ = shredder.write(&data).unwrap();
|
let _ = shredder.write(&data).unwrap();
|
||||||
|
|
||||||
// We should have 0 shreds now
|
// We should have 0 shreds now
|
||||||
assert_eq!(shredder.shred_tuples.len(), 0);
|
assert_eq!(shredder.shreds.len(), 0);
|
||||||
|
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
|
|
||||||
// We should have 1 shred now
|
// We should have 1 shred now
|
||||||
assert_eq!(shredder.shred_tuples.len(), 2);
|
assert_eq!(shredder.shreds.len(), 2);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2)
|
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
let data: Vec<_> = (0..25).collect();
|
let data: Vec<_> = (0..25).collect();
|
||||||
|
@ -998,13 +931,13 @@ mod tests {
|
||||||
let _ = shredder.write(&data).unwrap();
|
let _ = shredder.write(&data).unwrap();
|
||||||
|
|
||||||
// We should have 0 shreds now
|
// We should have 0 shreds now
|
||||||
assert_eq!(shredder.shred_tuples.len(), 0);
|
assert_eq!(shredder.shreds.len(), 0);
|
||||||
|
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
|
|
||||||
// We should have 1 shred now (LastInFECBlock)
|
// We should have 1 shred now (LastInFECBlock)
|
||||||
assert_eq!(shredder.shred_tuples.len(), 1);
|
assert_eq!(shredder.shreds.len(), 1);
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,7 +952,7 @@ mod tests {
|
||||||
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0)
|
let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0)
|
||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
// Write enough data to create a shred (> PACKET_DATA_SIZE)
|
// Write enough data to create a shred (> PACKET_DATA_SIZE)
|
||||||
|
@ -1029,28 +962,28 @@ mod tests {
|
||||||
let _ = shredder.write(&data).unwrap();
|
let _ = shredder.write(&data).unwrap();
|
||||||
|
|
||||||
// We should have 2 shreds now
|
// We should have 2 shreds now
|
||||||
assert_eq!(shredder.shred_tuples.len(), 2);
|
assert_eq!(shredder.shreds.len(), 2);
|
||||||
|
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
|
|
||||||
// Finalize must have created 1 final data shred and 3 coding shreds
|
// Finalize must have created 1 final data shred and 3 coding shreds
|
||||||
// assert_eq!(shredder.shreds.len(), 6);
|
// assert_eq!(shredder.shreds.len(), 6);
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true);
|
verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_code_shred(&shred, 0, slot, &keypair.pubkey(), true);
|
verify_test_code_shred(&shred, 0, slot, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_code_shred(&shred, 1, slot, &keypair.pubkey(), true);
|
verify_test_code_shred(&shred, 1, slot, &keypair.pubkey(), true);
|
||||||
|
|
||||||
let (_, shred) = shredder.shred_tuples.remove(0);
|
let shred = shredder.shreds.remove(0);
|
||||||
verify_test_code_shred(&shred, 2, slot, &keypair.pubkey(), true);
|
verify_test_code_shred(&shred, 2, slot, &keypair.pubkey(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1061,7 +994,7 @@ mod tests {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
|
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
let data: Vec<_> = (0..4000).collect();
|
let data: Vec<_> = (0..4000).collect();
|
||||||
|
@ -1075,7 +1008,7 @@ mod tests {
|
||||||
|
|
||||||
// We should have some shreds now
|
// We should have some shreds now
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
shredder.shred_tuples.len(),
|
shredder.shreds.len(),
|
||||||
data.len() / approx_shred_payload_size
|
data.len() / approx_shred_payload_size
|
||||||
);
|
);
|
||||||
assert_eq!(offset, data.len());
|
assert_eq!(offset, data.len());
|
||||||
|
@ -1084,13 +1017,9 @@ mod tests {
|
||||||
|
|
||||||
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
||||||
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
||||||
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
|
assert_eq!(shredder.shreds.len(), expected_shred_count);
|
||||||
|
|
||||||
let (_, shred_infos): (Vec<Shred>, Vec<ShredInfo>) = shredder
|
let shred_infos = shredder.shreds.clone();
|
||||||
.shred_tuples
|
|
||||||
.iter()
|
|
||||||
.map(|(s, b)| (s.clone(), b.clone()))
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
|
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
|
@ -1121,18 +1050,12 @@ mod tests {
|
||||||
assert_eq!(data[..], result[..data.len()]);
|
assert_eq!(data[..], result[..data.len()]);
|
||||||
|
|
||||||
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
|
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
|
||||||
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
|
let mut shred_info: Vec<Shred> = shredder
|
||||||
.shred_tuples
|
.shreds
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, (s, b))| {
|
.filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None })
|
||||||
if i % 2 == 0 {
|
.collect();
|
||||||
Some((s.clone(), b.clone()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
let mut result = Shredder::try_recovery(
|
let mut result = Shredder::try_recovery(
|
||||||
shred_info.clone(),
|
shred_info.clone(),
|
||||||
|
@ -1167,18 +1090,12 @@ mod tests {
|
||||||
assert_eq!(data[..], result[..data.len()]);
|
assert_eq!(data[..], result[..data.len()]);
|
||||||
|
|
||||||
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
|
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
|
||||||
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
|
let mut shred_info: Vec<Shred> = shredder
|
||||||
.shred_tuples
|
.shreds
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, (s, b))| {
|
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
|
||||||
if i % 2 != 0 {
|
.collect();
|
||||||
Some((s.clone(), b.clone()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
let mut result = Shredder::try_recovery(
|
let mut result = Shredder::try_recovery(
|
||||||
shred_info.clone(),
|
shred_info.clone(),
|
||||||
|
@ -1225,7 +1142,7 @@ mod tests {
|
||||||
|
|
||||||
// We should have some shreds now
|
// We should have some shreds now
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
shredder.shred_tuples.len(),
|
shredder.shreds.len(),
|
||||||
data.len() / approx_shred_payload_size
|
data.len() / approx_shred_payload_size
|
||||||
);
|
);
|
||||||
assert_eq!(offset, data.len());
|
assert_eq!(offset, data.len());
|
||||||
|
@ -1234,20 +1151,14 @@ mod tests {
|
||||||
|
|
||||||
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
||||||
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
||||||
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
|
assert_eq!(shredder.shreds.len(), expected_shred_count);
|
||||||
|
|
||||||
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
|
let mut shred_info: Vec<Shred> = shredder
|
||||||
.shred_tuples
|
.shreds
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, (s, b))| {
|
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
|
||||||
if i % 2 != 0 {
|
.collect();
|
||||||
Some((s.clone(), b.clone()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
let mut result = Shredder::try_recovery(
|
let mut result = Shredder::try_recovery(
|
||||||
shred_info.clone(),
|
shred_info.clone(),
|
||||||
|
@ -1282,11 +1193,11 @@ mod tests {
|
||||||
assert_eq!(data[..], result[..data.len()]);
|
assert_eq!(data[..], result[..data.len()]);
|
||||||
|
|
||||||
// Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail
|
// Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail
|
||||||
let shreds: Vec<ShredInfo> = shredder
|
let shreds: Vec<Shred> = shredder
|
||||||
.shred_tuples
|
.shreds
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, (_, s))| {
|
.filter_map(|(i, s)| {
|
||||||
if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) {
|
if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) {
|
||||||
Some(s.clone())
|
Some(s.clone())
|
||||||
} else {
|
} else {
|
||||||
|
@ -1314,7 +1225,7 @@ mod tests {
|
||||||
|
|
||||||
// We should have some shreds now
|
// We should have some shreds now
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
shredder.shred_tuples.len(),
|
shredder.shreds.len(),
|
||||||
data.len() / approx_shred_payload_size
|
data.len() / approx_shred_payload_size
|
||||||
);
|
);
|
||||||
assert_eq!(offset, data.len());
|
assert_eq!(offset, data.len());
|
||||||
|
@ -1323,20 +1234,14 @@ mod tests {
|
||||||
|
|
||||||
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
|
||||||
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
||||||
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
|
assert_eq!(shredder.shreds.len(), expected_shred_count);
|
||||||
|
|
||||||
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
|
let mut shred_info: Vec<Shred> = shredder
|
||||||
.shred_tuples
|
.shreds
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, (s, b))| {
|
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
|
||||||
if i % 2 != 0 {
|
.collect();
|
||||||
Some((s.clone(), b.clone()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
let mut result = Shredder::try_recovery(
|
let mut result = Shredder::try_recovery(
|
||||||
shred_info.clone(),
|
shred_info.clone(),
|
||||||
|
@ -1427,7 +1332,7 @@ mod tests {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
|
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
|
||||||
|
|
||||||
assert!(shredder.shred_tuples.is_empty());
|
assert!(shredder.shreds.is_empty());
|
||||||
assert_eq!(shredder.active_offset, 0);
|
assert_eq!(shredder.active_offset, 0);
|
||||||
|
|
||||||
let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect();
|
let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect();
|
||||||
|
@ -1439,28 +1344,28 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should have some shreds now
|
// We should have some shreds now
|
||||||
assert!(shredder.shred_tuples.len() > data.len() / approx_shred_payload_size);
|
assert!(shredder.shreds.len() > data.len() / approx_shred_payload_size);
|
||||||
assert_eq!(offset, data.len());
|
assert_eq!(offset, data.len());
|
||||||
|
|
||||||
shredder.finalize_data();
|
shredder.finalize_data();
|
||||||
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
|
||||||
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
|
assert_eq!(shredder.shreds.len(), expected_shred_count);
|
||||||
|
|
||||||
let mut index = 0;
|
let mut index = 0;
|
||||||
|
|
||||||
while index < shredder.shred_tuples.len() {
|
while index < shredder.shreds.len() {
|
||||||
let num_data_shreds = cmp::min(
|
let num_data_shreds = cmp::min(
|
||||||
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize,
|
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize,
|
||||||
(shredder.shred_tuples.len() - index) / 2,
|
(shredder.shreds.len() - index) / 2,
|
||||||
);
|
);
|
||||||
let coding_start = index + num_data_shreds;
|
let coding_start = index + num_data_shreds;
|
||||||
shredder.shred_tuples[index..coding_start]
|
shredder.shreds[index..coding_start]
|
||||||
.iter()
|
.iter()
|
||||||
.for_each(|(_, s)| assert!(s.is_data()));
|
.for_each(|s| assert!(s.is_data()));
|
||||||
index = coding_start + num_data_shreds;
|
index = coding_start + num_data_shreds;
|
||||||
shredder.shred_tuples[coding_start..index]
|
shredder.shreds[coding_start..index]
|
||||||
.iter()
|
.iter()
|
||||||
.for_each(|(_, s)| assert!(!s.is_data()));
|
.for_each(|s| assert!(!s.is_data()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||||
use crate::repair_service::{RepairService, RepairStrategy};
|
use crate::repair_service::{RepairService, RepairStrategy};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::shred::ShredInfo;
|
use crate::shred::Shred;
|
||||||
use crate::streamer::{PacketReceiver, PacketSender};
|
use crate::streamer::{PacketReceiver, PacketSender};
|
||||||
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
|
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
|
@ -28,7 +28,7 @@ pub const NUM_THREADS: u32 = 10;
|
||||||
/// drop blobs that are from myself or not from the correct leader for the
|
/// drop blobs that are from myself or not from the correct leader for the
|
||||||
/// blob's slot
|
/// blob's slot
|
||||||
pub fn should_retransmit_and_persist(
|
pub fn should_retransmit_and_persist(
|
||||||
shred: &ShredInfo,
|
shred: &Shred,
|
||||||
bank: Option<Arc<Bank>>,
|
bank: Option<Arc<Bank>>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
|
@ -67,7 +67,7 @@ fn recv_window<F>(
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(&ShredInfo, u64) -> bool,
|
F: Fn(&Shred, u64) -> bool,
|
||||||
F: Sync,
|
F: Sync,
|
||||||
{
|
{
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
|
@ -86,7 +86,7 @@ where
|
||||||
.par_iter_mut()
|
.par_iter_mut()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter_map(|(i, packet)| {
|
.filter_map(|(i, packet)| {
|
||||||
if let Ok(shred) = ShredInfo::new_from_serialized_shred(packet.data.to_vec()) {
|
if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) {
|
||||||
if shred_filter(&shred, last_root) {
|
if shred_filter(&shred, last_root) {
|
||||||
packet.meta.slot = shred.slot();
|
packet.meta.slot = shred.slot();
|
||||||
packet.meta.seed = shred.seed();
|
packet.meta.seed = shred.seed();
|
||||||
|
@ -177,7 +177,7 @@ impl WindowService {
|
||||||
) -> WindowService
|
) -> WindowService
|
||||||
where
|
where
|
||||||
F: 'static
|
F: 'static
|
||||||
+ Fn(&Pubkey, &ShredInfo, Option<Arc<Bank>>, u64) -> bool
|
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool
|
||||||
+ std::marker::Send
|
+ std::marker::Send
|
||||||
+ std::marker::Sync,
|
+ std::marker::Sync,
|
||||||
{
|
{
|
||||||
|
@ -305,13 +305,13 @@ mod test {
|
||||||
slot: u64,
|
slot: u64,
|
||||||
parent: u64,
|
parent: u64,
|
||||||
keypair: &Arc<Keypair>,
|
keypair: &Arc<Keypair>,
|
||||||
) -> Vec<ShredInfo> {
|
) -> Vec<Shred> {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder");
|
Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder");
|
||||||
bincode::serialize_into(&mut shredder, &entries)
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
.expect("Expect to write all entries to shreds");
|
.expect("Expect to write all entries to shreds");
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect()
|
shredder.shreds.drain(..).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -435,7 +435,7 @@ mod test {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|mut s| {
|
.map(|mut s| {
|
||||||
let mut p = Packet::default();
|
let mut p = Packet::default();
|
||||||
p.data.copy_from_slice(&mut s.shred);
|
p.data.copy_from_slice(&mut s.payload);
|
||||||
p
|
p
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
Loading…
Reference in New Issue