Replace Shred usage with ShredInfo (#5939)

* Replace Shred usage with ShredInfo

* Fix tests

* fix clippy
This commit is contained in:
Pankaj Garg 2019-09-17 18:22:46 -07:00 committed by GitHub
parent 7e31a67d81
commit ff608992ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 294 additions and 317 deletions

View File

@ -4,7 +4,7 @@
use crate::entry::Entry;
use crate::erasure::ErasureConfig;
use crate::result::{Error, Result};
use crate::shred::{Shred, ShredInfo, Shredder};
use crate::shred::{ShredInfo, Shredder};
#[cfg(feature = "kvstore")]
use solana_kvstore as kvstore;
@ -22,7 +22,7 @@ use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::{Borrow, Cow};
use std::borrow::Borrow;
use std::cell::RefCell;
use std::cmp;
use std::fs;
@ -322,7 +322,7 @@ impl Blocktree {
index_working_set: &HashMap<u64, Index>,
prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>,
prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>,
) -> Vec<Shred> {
) -> Vec<ShredInfo> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
@ -357,7 +357,7 @@ impl Blocktree {
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Some(ShredInfo::new_from_serialized_shred(data))
ShredInfo::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
@ -377,7 +377,7 @@ impl Blocktree {
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Some(ShredInfo::new_from_serialized_shred(code))
ShredInfo::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
@ -415,7 +415,7 @@ impl Blocktree {
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
shreds: Vec<ShredInfo>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
) -> Result<()> {
let db = &*self.db;
@ -509,7 +509,7 @@ impl Blocktree {
fn check_insert_coding_shred(
&self,
shred: Shred,
shred: ShredInfo,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>,
write_batch: &mut WriteBatch,
@ -525,13 +525,11 @@ impl Blocktree {
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
if let Ok(shred_buf) =
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
if let Ok(()) = self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
{
let shred_info = ShredInfo::new_from_shred(&shred, shred_buf);
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred_info);
.or_insert_with(|| shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
}
}
@ -539,7 +537,7 @@ impl Blocktree {
fn check_insert_data_shred(
&self,
shred: Shred,
shred: ShredInfo,
index_working_set: &mut HashMap<u64, Index>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
@ -563,14 +561,13 @@ impl Blocktree {
index_meta.data(),
&self.last_root,
) {
if let Ok(shred_buf) = self.insert_data_shred(
if let Ok(()) = self.insert_data_shred(
&mut slot_meta,
index_meta.data_mut(),
&shred,
write_batch,
) {
let shred_info = ShredInfo::new_from_shred(&shred, shred_buf);
just_inserted_data_shreds.insert((slot, shred_index), shred_info);
just_inserted_data_shreds.insert((slot, shred_index), shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
true
} else {
@ -587,7 +584,7 @@ impl Blocktree {
}
fn should_insert_coding_shred(
shred: &Shred,
shred: &ShredInfo,
coding_index: &CodingIndex,
last_root: &RwLock<u64>,
) -> bool {
@ -614,9 +611,9 @@ impl Blocktree {
&self,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_meta: &mut Index,
shred: &Shred,
shred: &ShredInfo,
write_batch: &mut WriteBatch,
) -> Result<Vec<u8>> {
) -> Result<()> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let (num_data, num_coding, pos) = shred
@ -651,18 +648,16 @@ impl Blocktree {
);
}
let serialized_shred = bincode::serialize(shred).unwrap();
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &serialized_shred)?;
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.shred)?;
index_meta.coding_mut().set_present(shred_index, true);
Ok(serialized_shred)
Ok(())
}
fn should_insert_data_shred(
shred: &Shred,
shred: &ShredInfo,
slot_meta: &SlotMeta,
data_index: &DataIndex,
last_root: &RwLock<u64>,
@ -725,9 +720,9 @@ impl Blocktree {
&self,
slot_meta: &mut SlotMeta,
data_index: &mut DataIndex,
shred: &Shred,
shred: &ShredInfo,
write_batch: &mut WriteBatch,
) -> Result<Vec<u8>> {
) -> Result<()> {
let slot = shred.slot();
let index = u64::from(shred.index());
let parent = shred.parent();
@ -763,15 +758,13 @@ impl Blocktree {
slot_meta.consumed
};
let serialized_shred = bincode::serialize(shred).unwrap();
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?;
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.shred)?;
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(serialized_shred)
Ok(())
}
pub fn get_data_shred(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
@ -859,8 +852,8 @@ impl Blocktree {
parent_slot = current_slot - 1;
remaining_ticks_in_slot = ticks_per_slot;
shredder.finalize_slot();
let shreds: Vec<Shred> =
shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
let shreds: Vec<ShredInfo> =
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
all_shreds.extend(shreds);
shredder =
Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0)
@ -883,7 +876,7 @@ impl Blocktree {
if is_full_slot && remaining_ticks_in_slot != 0 {
shredder.finalize_slot();
}
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
let shreds: Vec<ShredInfo> = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
all_shreds.extend(shreds);
let num_shreds = all_shreds.len();
@ -996,21 +989,26 @@ impl Blocktree {
pub fn get_slot_entries_with_shred_count(
&self,
slot: u64,
start_index: u64,
mut start_index: u64,
) -> Result<(Vec<Entry>, usize)> {
// Find the next consecutive block of shreds.
let serialized_shreds = get_slot_consecutive_shreds(slot, &self.db, start_index)?;
let mut serialized_shreds: Vec<Vec<u8>> = vec![];
let data_cf = self.db.column::<cf::ShredData>();
while let Some(serialized_shred) = data_cf.get_bytes((slot, start_index))? {
serialized_shreds.push(serialized_shred);
start_index += 1;
}
trace!(
"Found {:?} shreds for slot {:?}",
serialized_shreds.len(),
slot
);
let mut shreds: Vec<Shred> = serialized_shreds
.iter()
.map(|serialzied_shred| {
let shred: Shred =
bincode::deserialize(serialzied_shred).expect("Failed to deserialize shred");
shred
let mut shreds: Vec<ShredInfo> = serialized_shreds
.into_iter()
.filter_map(|serialized_shred| {
ShredInfo::new_from_serialized_shred(serialized_shred).ok()
})
.collect();
@ -1387,22 +1385,6 @@ fn find_slot_meta_in_cached_state<'a>(
}
}
fn get_slot_consecutive_shreds<'a>(
slot: u64,
db: &Database,
mut current_index: u64,
) -> Result<Vec<Cow<'a, [u8]>>> {
let mut serialized_shreds: Vec<Cow<[u8]>> = vec![];
let data_cf = db.column::<cf::ShredData>();
while let Some(serialized_shred) = data_cf.get_bytes((slot, current_index))? {
serialized_shreds.push(Cow::Owned(serialized_shred));
current_index += 1;
}
Ok(serialized_shreds)
}
// Chaining based on latest discussion here: https://github.com/solana-labs/solana/pull/2253
fn handle_chaining(
db: &Database,
@ -1590,7 +1572,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
let shreds: Vec<ShredInfo> = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect();
blocktree.insert_shreds(shreds, None)?;
blocktree.set_roots(&[0])?;
@ -1671,7 +1653,7 @@ pub fn entries_to_test_shreds(
slot: u64,
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Shred> {
) -> Vec<ShredInfo> {
let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32)
.expect("Failed to create entry shredder");
@ -1683,16 +1665,14 @@ pub fn entries_to_test_shreds(
shredder.finalize_data();
}
let shreds: Vec<Shred> = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect();
shreds
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect()
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::entry::{create_ticks, Entry};
use crate::shred::CodingShred;
use crate::shred::{CodingShred, Shred};
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -1854,10 +1834,7 @@ pub mod tests {
let slot = 0;
let (shreds, _) = make_slot_entries(slot, 0, 100);
let num_shreds = shreds.len() as u64;
let shred_bufs: Vec<_> = shreds
.iter()
.map(|shred| bincode::serialize(shred).unwrap())
.collect();
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.shred.clone()).collect();
let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes");
let ledger = Blocktree::open(&ledger_path).unwrap();
@ -2255,7 +2232,7 @@ pub mod tests {
// is missing the tick at shred index == slot index - 1. Thus, no consecutive blocks
// will be formed
let num_slots = shreds_per_slot;
let mut shreds: Vec<Shred> = vec![];
let mut shreds = vec![];
let mut missing_shreds = vec![];
for slot in 1..num_slots + 1 {
let (mut slot_shreds, _) = make_slot_entries(slot, slot - 1, entries_per_slot);
@ -3141,7 +3118,7 @@ pub mod tests {
shred.header.coding_header.index = 11;
shred.header.coding_header.slot = 1;
shred.header.num_coding_shreds = shred.header.position + 1;
let coding_shred = Shred::Coding(shred.clone());
let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
// Insert a good coding shred
assert!(Blocktree::should_insert_coding_shred(
@ -3172,12 +3149,13 @@ pub mod tests {
// Establish a baseline that works
{
let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
let index = index_cf
.get(shred.header.coding_header.slot)
.unwrap()
.unwrap();
assert!(Blocktree::should_insert_coding_shred(
&Shred::Coding(shred.clone()),
&coding_shred,
index.coding(),
&last_root
));
@ -3185,14 +3163,13 @@ pub mod tests {
// Trying to insert a shred with index < position should fail
{
let mut shred_ = shred.clone();
shred_.header.coding_header.index = (shred_.header.position - 1).into();
let index = index_cf
.get(shred_.header.coding_header.slot)
.unwrap()
.unwrap();
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
let index = coding_shred.headers.common_header.header.position - 1;
coding_shred.set_index(index as u32);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
&coding_shred,
index.coding(),
&last_root
));
@ -3200,14 +3177,11 @@ pub mod tests {
// Trying to insert shred with num_coding == 0 should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 0;
let index = index_cf
.get(shred_.header.coding_header.slot)
.unwrap()
.unwrap();
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
coding_shred.headers.common_header.header.num_coding_shreds = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
&coding_shred,
index.coding(),
&last_root
));
@ -3215,14 +3189,12 @@ pub mod tests {
// Trying to insert shred with pos >= num_coding should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = shred_.header.position;
let index = index_cf
.get(shred_.header.coding_header.slot)
.unwrap()
.unwrap();
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
coding_shred.headers.common_header.header.num_coding_shreds =
coding_shred.headers.common_header.header.position;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
&coding_shred,
index.coding(),
&last_root
));
@ -3231,23 +3203,24 @@ pub mod tests {
// Trying to insert with set_index with num_coding that would imply the last blob
// has index > u32::MAX should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 3;
shred_.header.coding_header.index = std::u32::MAX - 1;
shred_.header.position = 0;
let index = index_cf
.get(shred_.header.coding_header.slot)
.unwrap()
.unwrap();
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
coding_shred.headers.common_header.header.num_coding_shreds = 3;
coding_shred
.headers
.common_header
.header
.coding_header
.index = std::u32::MAX - 1;
coding_shred.headers.common_header.header.position = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_.clone()),
&coding_shred,
index.coding(),
&last_root
));
// Decreasing the number of num_coding_shreds will put it within the allowed limit
shred_.header.num_coding_shreds = 2;
let coding_shred = Shred::Coding(shred_);
coding_shred.headers.common_header.header.num_coding_shreds = 2;
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
@ -3260,14 +3233,11 @@ pub mod tests {
// Trying to insert value into slot <= than last root should fail
{
let mut shred_ = shred.clone();
let index = index_cf
.get(shred_.header.coding_header.slot)
.unwrap()
.unwrap();
shred_.header.coding_header.slot = *last_root.read().unwrap();
let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone()));
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
&coding_shred,
index.coding(),
&last_root
));
@ -3313,7 +3283,7 @@ pub mod tests {
let shreds_per_slot = 10;
let slots = vec![2, 4, 8, 12];
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot);
let slot_8_shreds = bincode::serialize(&all_shreds[2].0).unwrap();
let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds {
blocktree.insert_shreds(slot_shreds, None).unwrap();
}
@ -3325,15 +3295,11 @@ pub mod tests {
// Test that the iterator for slot 8 contains what was inserted earlier
let shred_iter = blocktree.slot_data_iterator(8).unwrap();
let result: Vec<Shred> = shred_iter
.map(|(_, bytes)| {
let shred: Shred = bincode::deserialize(&bytes).unwrap();
shred
})
let result: Vec<ShredInfo> = shred_iter
.filter_map(|(_, bytes)| ShredInfo::new_from_serialized_shred(bytes.to_vec()).ok())
.collect();
let result_serialized = bincode::serialize(&result).unwrap();
assert_eq!(result_serialized.len(), slot_8_shreds.len());
assert_eq!(result_serialized, slot_8_shreds);
assert_eq!(result.len(), slot_8_shreds.len());
assert_eq!(result, slot_8_shreds);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -3472,7 +3438,7 @@ pub mod tests {
slot: u64,
parent_slot: u64,
num_entries: u64,
) -> (Vec<Shred>, Vec<Entry>) {
) -> (Vec<ShredInfo>, Vec<Entry>) {
let entries = create_ticks(num_entries, Hash::default());
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true);
(shreds, entries)
@ -3482,7 +3448,7 @@ pub mod tests {
start_slot: u64,
num_slots: u64,
entries_per_slot: u64,
) -> (Vec<Shred>, Vec<Entry>) {
) -> (Vec<ShredInfo>, Vec<Entry>) {
let mut shreds = vec![];
let mut entries = vec![];
for slot in start_slot..start_slot + num_slots {
@ -3501,7 +3467,7 @@ pub mod tests {
pub fn make_chaining_slot_entries(
chain: &[u64],
entries_per_slot: u64,
) -> Vec<(Vec<Shred>, Vec<Entry>)> {
) -> Vec<(Vec<ShredInfo>, Vec<Entry>)> {
let mut slots_shreds_and_entries = vec![];
for (i, slot) in chain.iter().enumerate() {
let parent_slot = {

View File

@ -36,7 +36,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
.map(|meta| meta.consumed)
.unwrap_or(0);
let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds(
let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries,
bank.slot(),
receive_results.last_tick,
@ -72,7 +72,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = Hash::default();
}
blocktree.insert_shreds(shreds, None)?;
blocktree.insert_shreds(shred_bufs.clone(), None)?;
// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {

View File

@ -53,7 +53,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
blocktree.insert_shreds(shreds, None)?;
blocktree.insert_shreds(shred_infos.clone(), None)?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());

View File

@ -92,7 +92,7 @@ impl BroadcastRun for StandardBroadcastRun {
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let num_shreds = all_shreds.len();
blocktree
.insert_shreds(all_shreds, None)
.insert_shreds(shred_infos.clone(), None)
.expect("Failed to insert shreds in blocktree");
let to_blobs_elapsed = to_blobs_start.elapsed();

View File

@ -1768,7 +1768,7 @@ mod tests {
use crate::crds_value::CrdsValueLabel;
use crate::repair_service::RepairType;
use crate::result::Error;
use crate::shred::{DataShred, Shred};
use crate::shred::{DataShred, Shred, ShredInfo};
use crate::test_tx::test_tx;
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
use solana_sdk::hash::Hash;
@ -1931,9 +1931,10 @@ mod tests {
let mut shred = Shred::Data(DataShred::default());
shred.set_slot(2);
shred.set_index(1);
let shred_info = ShredInfo::new_from_shred(&shred);
blocktree
.insert_shreds(vec![shred], None)
.insert_shreds(vec![shred_info], None)
.expect("Expect successful ledger write");
let rv = ClusterInfo::run_window_request(
@ -2008,10 +2009,10 @@ mod tests {
assert!(rv.is_empty());
// Create slots 1, 2, 3 with 5 blobs apiece
let (blobs, _) = make_many_slot_entries(1, 3, 5);
let (shreds, _) = make_many_slot_entries(1, 3, 5);
blocktree
.insert_shreds(blobs, None)
.insert_shreds(shreds, None)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset

View File

@ -872,7 +872,7 @@ mod test {
use crate::entry;
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
use crate::replay_stage::ReplayStage;
use crate::shred::Shred;
use crate::shred::ShredInfo;
use solana_runtime::genesis_utils::GenesisBlockInfo;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -1022,7 +1022,7 @@ mod test {
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
where
F: Fn(&Hash, u64) -> Vec<Shred>,
F: Fn(&Hash, u64) -> Vec<ShredInfo>,
{
let ledger_path = get_tmp_ledger_path!();
let res = {

View File

@ -11,7 +11,7 @@ use crate::repair_service;
use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shred;
use crate::shred::ShredInfo;
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use crate::streamer::{receiver, responder, PacketReceiver};
use crate::window_service::WindowService;
@ -477,7 +477,7 @@ impl Replicator {
&exit,
RepairStrategy::RepairRange(repair_slot_range),
&Arc::new(LeaderScheduleCache::default()),
|_, _, _, _, _| true,
|_, _, _, _| true,
);
info!("waiting for ledger download");
Self::wait_for_segment_download(
@ -871,10 +871,10 @@ impl Replicator {
while let Ok(mut more) = r_reader.try_recv() {
packets.packets.append(&mut more.packets);
}
let shreds: Vec<Shred> = packets
let shreds: Vec<ShredInfo> = packets
.packets
.iter()
.filter_map(|p| bincode::deserialize(&p.data).ok())
.into_iter()
.filter_map(|p| ShredInfo::new_from_serialized_shred(p.data.to_vec()).ok())
.collect();
blocktree.insert_shreds(shreds, None)?;
}

View File

@ -151,10 +151,9 @@ impl RetransmitStage {
exit,
repair_strategy,
&leader_schedule_cache.clone(),
move |id, shred, shred_buf, working_bank, last_root| {
move |id, shred, working_bank, last_root| {
should_retransmit_and_persist(
shred,
shred_buf,
working_bank,
&leader_schedule_cache,
id,

View File

@ -37,7 +37,7 @@ lazy_static! {
const DATA_SHRED: u8 = 0b1010_0101;
const CODING_SHRED: u8 = 0b0101_1010;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct ShredInfo {
pub headers: DataShredHeader,
pub shred: Vec<u8>,
@ -51,25 +51,24 @@ impl ShredInfo {
}
}
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> Self {
pub fn new_from_serialized_shred(shred_buf: Vec<u8>) -> result::Result<Self> {
let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
let shred_type: u8 =
bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])
.unwrap();
bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])?;
let header = if shred_type == CODING_SHRED {
let end = *SIZE_OF_CODING_SHRED_HEADER;
let mut header = DataShredHeader::default();
header.common_header.header =
bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap();
bincode::deserialize(&shred_buf[header_offset..header_offset + end])?;
header
} else {
let end = *SIZE_OF_DATA_SHRED_HEADER;
bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap()
bincode::deserialize(&shred_buf[header_offset..header_offset + end])?
};
Self::new(header, shred_buf)
Ok(Self::new(header, shred_buf))
}
pub fn new_from_shred(shred: &Shred, shred_buf: Vec<u8>) -> Self {
pub fn new_from_shred_and_buf(shred: &Shred, shred_buf: Vec<u8>) -> Self {
let header = match shred {
Shred::Data(s) => s.header.clone(),
Shred::Coding(s) => {
@ -82,6 +81,38 @@ impl ShredInfo {
Self::new(header, shred_buf)
}
pub fn new_from_shred(shred: &Shred) -> Self {
let header = match shred {
Shred::Data(s) => s.header.clone(),
Shred::Coding(s) => {
let mut hdr = DataShredHeader::default();
hdr.common_header.header = s.header.clone();
hdr
}
};
let shred_buf = bincode::serialize(&shred).unwrap();
Self::new(header, shred_buf)
}
pub fn new_empty_from_header(header: DataShredHeader) -> Self {
let start = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
let end = start + *SIZE_OF_DATA_SHRED_HEADER;
let mut payload = vec![0; PACKET_DATA_SIZE];
let mut wr = io::Cursor::new(&mut payload[start..end]);
bincode::serialize_into(&mut wr, &header).expect("Failed to serialize shred");
if header.common_header.header.shred_type == CODING_SHRED {
let shred_type = 1;
let mut wr = io::Cursor::new(&mut payload[..start]);
bincode::serialize_into(&mut wr, &shred_type).expect("Failed to set coding shred type");
}
ShredInfo {
headers: header,
shred: payload,
}
}
fn header(&self) -> &ShredCommonHeader {
if self.is_data() {
&self.headers.data_header
@ -114,6 +145,18 @@ impl ShredInfo {
self.header().index
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_index(&mut self, index: u32) {
self.header_mut().index = index
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_slot(&mut self, slot: u64) {
self.header_mut().slot = slot
}
pub fn signature(&self) -> Signature {
self.header().signature
}
@ -138,6 +181,14 @@ impl ShredInfo {
}
}
/// This is not a safe function. It only changes the meta information.
/// Use this only for test code which doesn't care about actual shred
pub fn set_last_in_slot(&mut self) {
if self.is_data() {
self.headers.flags |= LAST_SHRED_IN_SLOT
}
}
pub fn data_complete(&self) -> bool {
if self.is_data() {
self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED
@ -158,6 +209,18 @@ impl ShredInfo {
None
}
}
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let signed_payload_offset = if self.is_data() {
CodingShred::overhead()
} else {
CodingShred::overhead() + *SIZE_OF_SHRED_TYPE
- *SIZE_OF_CODING_SHRED_HEADER
- *SIZE_OF_EMPTY_VEC
} + *SIZE_OF_SIGNATURE;
self.signature()
.verify(pubkey.as_ref(), &self.shred[signed_payload_offset..])
}
}
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
@ -169,6 +232,7 @@ pub enum Shred {
/// This limit comes from reed solomon library, but unfortunately they don't have
/// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16;
/// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds
pub const RECOMMENDED_FEC_RATE: f32 = 0.25;
@ -313,7 +377,7 @@ pub struct ShredCommonHeader {
/// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
common_header: CodingShred,
pub common_header: CodingShred,
pub data_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
@ -493,8 +557,8 @@ impl Write for Shredder {
#[derive(Default, Debug, PartialEq)]
pub struct RecoveryResult {
pub recovered_data: Vec<Shred>,
pub recovered_code: Vec<Shred>,
pub recovered_data: Vec<ShredInfo>,
pub recovered_code: Vec<ShredInfo>,
}
#[derive(Default, Debug, PartialEq)]
@ -593,7 +657,7 @@ impl Shredder {
let mut shred = Shred::Data(self.new_data_shred());
std::mem::swap(&mut shred, &mut self.active_shred);
let shred_info = ShredInfo::new_from_shred(&shred, data);
let shred_info = ShredInfo::new_from_shred_and_buf(&shred, data);
self.shred_tuples.push((shred, shred_info));
}
@ -667,7 +731,7 @@ impl Shredder {
// append to the shred list
coding_shreds.into_iter().for_each(|code| {
let shred: Shred = bincode::deserialize(&code).unwrap();
let shred_info = ShredInfo::new_from_shred(&shred, code);
let shred_info = ShredInfo::new_from_shred_and_buf(&shred, code);
self.shred_tuples.push((shred, shred_info));
});
self.fec_set_index = self.index;
@ -812,12 +876,16 @@ impl Shredder {
.collect();
session.decode_blocks(&mut blocks, &present)?;
let mut num_drained = 0;
present
.iter()
.enumerate()
.for_each(|(position, was_present)| {
if !was_present {
let shred: Shred = bincode::deserialize(&shred_bufs[position]).unwrap();
let drain_this = position - num_drained;
let shred_buf = shred_bufs.remove(drain_this);
num_drained += 1;
if let Ok(shred) = ShredInfo::new_from_serialized_shred(shred_buf) {
let shred_index = shred.index() as usize;
// Valid shred must be in the same slot as the original shreds
if shred.slot() == slot {
@ -828,13 +896,15 @@ impl Shredder {
{
// Also, a valid data shred must be indexed between first_index and first+num_data index
recovered_data.push(shred)
} else if (first_index..first_index + num_coding).contains(&shred_index)
} else if (first_index..first_index + num_coding)
.contains(&shred_index)
{
// A valid coding shred must be indexed between first_index and first+num_coding index
recovered_code.push(shred)
}
}
}
}
});
}
@ -845,7 +915,7 @@ impl Shredder {
}
/// Combines all shreds to recreate the original buffer
pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
pub fn deshred(shreds: &[ShredInfo]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
let num_data = shreds.len();
let data_shred_bufs = {
let first_index = shreds.first().unwrap().index() as usize;
@ -860,11 +930,7 @@ impl Shredder {
Err(reed_solomon_erasure::Error::TooFewDataShards)?;
}
let shred_bufs: Vec<Vec<u8>> = shreds
.iter()
.map(|shred| bincode::serialize(shred).unwrap())
.collect();
shred_bufs
shreds.iter().map(|shred| &shred.shred).collect()
};
Ok(Self::reassemble_payload(num_data, data_shred_bufs))
@ -878,7 +944,7 @@ impl Shredder {
}
}
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<Vec<u8>>) -> Vec<u8> {
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec<u8>>) -> Vec<u8> {
data_shred_bufs[..num_data]
.iter()
.flat_map(|data| {
@ -1215,7 +1281,7 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (shreds, shred_infos): (Vec<Shred>, Vec<ShredInfo>) = shredder
let (_, shred_infos): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples
.iter()
.map(|(s, b)| (s.clone(), b.clone()))
@ -1245,12 +1311,12 @@ mod tests {
assert_ne!(RecoveryResult::default(), result);
assert!(result.recovered_data.is_empty());
assert!(!result.recovered_code.is_empty());
let result = Shredder::deshred(&shreds[..4]).unwrap();
let result = Shredder::deshred(&shred_infos[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples
.iter()
.enumerate()
@ -1264,7 +1330,7 @@ mod tests {
.unzip();
let mut result = Shredder::try_recovery(
shred_info,
shred_info.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1275,45 +1341,40 @@ mod tests {
assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 1);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(1, recovered_shred);
shred_info.insert(1, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 3);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(3, recovered_shred);
shred_info.insert(3, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 1);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 3);
}
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 1);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 1)));
let result = Shredder::deshred(&shreds[..4]).unwrap();
let recovered_shred = result.recovered_code.remove(0);
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 3);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 3)));
let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples
.iter()
.enumerate()
@ -1327,7 +1388,7 @@ mod tests {
.unzip();
let mut result = Shredder::try_recovery(
shred_info,
shred_info.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1338,40 +1399,35 @@ mod tests {
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 0);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(0, recovered_shred);
shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 2);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 0);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 2);
}
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 0);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let result = Shredder::deshred(&shreds[..4]).unwrap();
let recovered_shred = result.recovered_code.remove(0);
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 2);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1399,7 +1455,7 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples
.iter()
.enumerate()
@ -1413,7 +1469,7 @@ mod tests {
.unzip();
let mut result = Shredder::try_recovery(
shred_info,
shred_info.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1424,49 +1480,44 @@ mod tests {
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 0);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(0, recovered_shred);
shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 2);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 0);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 2);
}
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 0);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let result = Shredder::deshred(&shreds[..4]).unwrap();
let recovered_shred = result.recovered_code.remove(0);
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 2);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
// Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail
let shreds: Vec<Shred> = shredder
let shreds: Vec<ShredInfo> = shredder
.shred_tuples
.iter()
.enumerate()
.filter_map(|(i, (s, _))| {
.filter_map(|(i, (_, s))| {
if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) {
Some(s.clone())
} else {
@ -1505,7 +1556,7 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let (mut shreds, shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
let (_, mut shred_info): (Vec<Shred>, Vec<ShredInfo>) = shredder
.shred_tuples
.iter()
.enumerate()
@ -1530,40 +1581,35 @@ mod tests {
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 25);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(0, recovered_shred);
shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert!(recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 27);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 25);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.coding_header.slot, slot);
assert_eq!(code.header.coding_header.index, 27);
}
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 25);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let result = Shredder::deshred(&shreds[..4]).unwrap();
let recovered_shred = result.recovered_code.remove(0);
assert!(!recovered_shred.is_data());
assert_eq!(recovered_shred.index(), 27);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1593,7 +1639,7 @@ mod tests {
// Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!(
Shredder::try_recovery(
shred_info.clone(),
shred_info,
expected_shred_count / 2,
expected_shred_count / 2,
35,
@ -1685,6 +1731,7 @@ mod tests {
assert_eq!(shred.last_in_slot(), shred_info.last_in_slot());
assert_eq!(shred.data_complete(), shred_info.data_complete());
assert_eq!(shred.coding_params(), shred_info.coding_params());
assert!(shred_info.verify(&keypair.pubkey()));
})
}
}

View File

@ -7,7 +7,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shred;
use crate::shred::ShredInfo;
use crate::streamer::{PacketReceiver, PacketSender};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use rayon::ThreadPool;
@ -28,8 +28,7 @@ pub const NUM_THREADS: u32 = 10;
/// drop blobs that are from myself or not from the correct leader for the
/// blob's slot
pub fn should_retransmit_and_persist(
shred: &Shred,
shred_buf: &[u8],
shred: &ShredInfo,
bank: Option<Arc<Bank>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
my_pubkey: &Pubkey,
@ -46,7 +45,7 @@ pub fn should_retransmit_and_persist(
} else if !blocktree::verify_shred_slots(shred.slot(), shred.parent(), root) {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
} else if !shred.fast_verify(&shred_buf, &leader_id) {
} else if !shred.verify(&leader_id) {
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
false
} else {
@ -68,7 +67,7 @@ fn recv_window<F>(
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Result<()>
where
F: Fn(&Shred, &[u8], u64) -> bool,
F: Fn(&ShredInfo, u64) -> bool,
F: Sync,
{
let timer = Duration::from_millis(200);
@ -87,9 +86,8 @@ where
.par_iter_mut()
.enumerate()
.filter_map(|(i, packet)| {
if let Ok(s) = bincode::deserialize(&packet.data) {
let shred: Shred = s;
if shred_filter(&shred, &packet.data, last_root) {
if let Ok(shred) = ShredInfo::new_from_serialized_shred(packet.data.to_vec()) {
if shred_filter(&shred, last_root) {
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some((shred, i))
@ -179,7 +177,7 @@ impl WindowService {
) -> WindowService
where
F: 'static
+ Fn(&Pubkey, &Shred, &[u8], Option<Arc<Bank>>, u64) -> bool
+ Fn(&Pubkey, &ShredInfo, Option<Arc<Bank>>, u64) -> bool
+ std::marker::Send
+ std::marker::Sync,
{
@ -223,11 +221,10 @@ impl WindowService {
&id,
&r,
&retransmit,
|shred, shred_buf, last_root| {
|shred, last_root| {
shred_filter(
&id,
shred,
shred_buf,
bank_forks
.as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
@ -308,13 +305,13 @@ mod test {
slot: u64,
parent: u64,
keypair: &Arc<Keypair>,
) -> Vec<Shred> {
) -> Vec<ShredInfo> {
let mut shredder =
Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder");
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
shredder.shred_tuples.into_iter().map(|(s, _)| s).collect()
shredder.shred_tuples.into_iter().map(|(_, s)| s).collect()
}
#[test]
@ -350,21 +347,10 @@ mod test {
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair);
let shred_bufs: Vec<_> = shreds
.iter()
.map(|s| bincode::serialize(s).unwrap())
.collect();
// with a Bank for slot 0, blob continues
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
0,
),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0,),
true
);
@ -379,14 +365,7 @@ mod test {
// with a Bank and no idea who leader is, blob gets thrown out
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
0
),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0),
false
);
@ -395,14 +374,7 @@ mod test {
let shreds =
local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
slot
),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
false
);
@ -411,14 +383,7 @@ mod test {
let shreds =
local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
slot
),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
false
);
@ -454,7 +419,7 @@ mod test {
&exit,
RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }),
&Arc::new(LeaderScheduleCache::default()),
|_, _, _, _, _| true,
|_, _, _, _| true,
);
window
}
@ -468,10 +433,9 @@ mod test {
let (shreds, _) = make_many_slot_entries(0, 5, 10);
let packets: Vec<_> = shreds
.into_iter()
.map(|s| {
.map(|mut s| {
let mut p = Packet::default();
p.data
.copy_from_slice(&mut bincode::serialize(&s).unwrap().as_ref());
p.data.copy_from_slice(&mut s.shred);
p
})
.collect();