From 783e8672e7bef3d09bb412b4bd090e23bc30764d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 18 Sep 2019 16:24:30 -0700 Subject: [PATCH] Removed Shred enum (#5963) * Remove shred enum and it's references * rename ShredInfo to Shred * clippy --- core/benches/shredder.rs | 3 +- core/src/blocktree.rs | 98 +++-- .../broadcast_fake_blobs_run.rs | 10 +- core/src/broadcast_stage/broadcast_utils.rs | 11 +- .../fail_entry_verification_broadcast_run.rs | 4 +- .../broadcast_stage/standard_broadcast_run.rs | 6 +- core/src/chacha.rs | 2 +- core/src/cluster_info.rs | 21 +- core/src/replay_stage.rs | 4 +- core/src/replicator.rs | 6 +- core/src/shred.rs | 381 +++++++----------- core/src/window_service.rs | 16 +- 12 files changed, 230 insertions(+), 332 deletions(-) diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 10b1e82047..3dddc12338 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -26,9 +26,8 @@ fn bench_deshredder(bencher: &mut Bencher) { let mut shredded = Shredder::new(1, 0, 0.0, &kp, 0).unwrap(); let _ = bincode::serialize_into(&mut shredded, &data); shredded.finalize_data(); - let (_, shreds): (Vec<_>, Vec<_>) = shredded.shred_tuples.into_iter().unzip(); bencher.iter(|| { - let raw = &mut Shredder::deshred(&shreds).unwrap(); + let raw = &mut Shredder::deshred(&shredded.shreds).unwrap(); assert_ne!(raw.len(), 0); }) } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 010ec90f09..0388d3cde5 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -4,7 +4,7 @@ use crate::entry::Entry; use crate::erasure::ErasureConfig; use crate::result::{Error, Result}; -use crate::shred::{ShredInfo, Shredder}; +use crate::shred::{Shred, Shredder}; #[cfg(feature = "kvstore")] use solana_kvstore as kvstore; @@ -320,9 +320,9 @@ impl Blocktree { db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, index_working_set: &HashMap, - prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>, - prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>, - ) -> Vec { + prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, + prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); let mut recovered_data_shreds = vec![]; @@ -357,7 +357,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { - ShredInfo::new_from_serialized_shred(data).ok() + Shred::new_from_serialized_shred(data).ok() } else { warn!("Data shred deleted while reading for recovery"); None @@ -377,7 +377,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { - ShredInfo::new_from_serialized_shred(code).ok() + Shred::new_from_serialized_shred(code).ok() } else { warn!("Code shred deleted while reading for recovery"); None @@ -415,7 +415,7 @@ impl Blocktree { pub fn insert_shreds( &self, - shreds: Vec, + shreds: Vec, leader_schedule: Option<&Arc>, ) -> Result<()> { let db = &*self.db; @@ -509,11 +509,11 @@ impl Blocktree { fn check_insert_coding_shred( &self, - shred: ShredInfo, + shred: Shred, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredInfo>, + just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -537,11 +537,11 @@ impl Blocktree { fn check_insert_data_shred( &self, - shred: ShredInfo, + shred: Shred, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredInfo>, + just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -584,7 +584,7 @@ impl Blocktree { } fn should_insert_coding_shred( - shred: &ShredInfo, + shred: &Shred, coding_index: &CodingIndex, last_root: &RwLock, ) -> bool { @@ -611,7 +611,7 @@ impl Blocktree { &self, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_meta: &mut Index, - shred: &ShredInfo, + shred: &Shred, write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); @@ -650,14 +650,14 @@ impl Blocktree { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), &shred.shred)?; + write_batch.put_bytes::((slot, shred_index), &shred.payload)?; index_meta.coding_mut().set_present(shred_index, true); Ok(()) } fn should_insert_data_shred( - shred: &ShredInfo, + shred: &Shred, slot_meta: &SlotMeta, data_index: &DataIndex, last_root: &RwLock, @@ -720,7 +720,7 @@ impl Blocktree { &self, slot_meta: &mut SlotMeta, data_index: &mut DataIndex, - shred: &ShredInfo, + shred: &Shred, write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); @@ -760,7 +760,7 @@ impl Blocktree { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), &shred.shred)?; + write_batch.put_bytes::((slot, index), &shred.payload)?; update_slot_meta(last_in_slot, slot_meta, index, new_consumed); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); @@ -852,9 +852,7 @@ impl Blocktree { parent_slot = current_slot - 1; remaining_ticks_in_slot = ticks_per_slot; shredder.finalize_slot(); - let shreds: Vec = - shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); - all_shreds.extend(shreds); + all_shreds.append(&mut shredder.shreds); shredder = Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0) .expect("Failed to create entry shredder"); @@ -876,8 +874,7 @@ impl Blocktree { if is_full_slot && remaining_ticks_in_slot != 0 { shredder.finalize_slot(); } - let shreds: Vec = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); - all_shreds.extend(shreds); + all_shreds.append(&mut shredder.shreds); let num_shreds = all_shreds.len(); self.insert_shreds(all_shreds, None)?; @@ -1005,11 +1002,9 @@ impl Blocktree { serialized_shreds.len(), slot ); - let mut shreds: Vec = serialized_shreds + let mut shreds: Vec = serialized_shreds .into_iter() - .filter_map(|serialized_shred| { - ShredInfo::new_from_serialized_shred(serialized_shred).ok() - }) + .filter_map(|serialized_shred| Shred::new_from_serialized_shred(serialized_shred).ok()) .collect(); let mut all_entries = vec![]; @@ -1572,7 +1567,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - let shreds: Vec = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); + let shreds: Vec = shredder.shreds.drain(..).collect(); blocktree.insert_shreds(shreds, None)?; blocktree.set_roots(&[0])?; @@ -1653,7 +1648,7 @@ pub fn entries_to_test_shreds( slot: u64, parent_slot: u64, is_full_slot: bool, -) -> Vec { +) -> Vec { let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32) .expect("Failed to create entry shredder"); @@ -1665,14 +1660,14 @@ pub fn entries_to_test_shreds( shredder.finalize_data(); } - shredder.shred_tuples.into_iter().map(|(_, s)| s).collect() + shredder.shreds.drain(..).collect() } #[cfg(test)] pub mod tests { use super::*; use crate::entry::{create_ticks, Entry}; - use crate::shred::{CodingShred, Shred}; + use crate::shred::{DataShredHeader, CODING_SHRED}; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -1834,7 +1829,7 @@ pub mod tests { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100); let num_shreds = shreds.len() as u64; - let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.shred.clone()).collect(); + let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect(); let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes"); let ledger = Blocktree::open(&ledger_path).unwrap(); @@ -3112,13 +3107,14 @@ pub mod tests { let index_cf = blocktree.db.column::(); let last_root = RwLock::new(0); - let mut shred = CodingShred::default(); + let mut shred = DataShredHeader::default(); let slot = 1; - shred.header.position = 10; - shred.header.coding_header.index = 11; - shred.header.coding_header.slot = 1; - shred.header.num_coding_shreds = shred.header.position + 1; - let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + shred.common_header.header.shred_type = CODING_SHRED; + shred.common_header.header.position = 10; + shred.common_header.header.coding_header.index = 11; + shred.common_header.header.coding_header.slot = 1; + shred.common_header.header.num_coding_shreds = shred.common_header.header.position + 1; + let coding_shred = Shred::new_empty_from_header(shred.clone()); // Insert a good coding shred assert!(Blocktree::should_insert_coding_shred( @@ -3135,7 +3131,7 @@ pub mod tests { // Trying to insert the same shred again should fail { let index = index_cf - .get(shred.header.coding_header.slot) + .get(shred.common_header.header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3145,13 +3141,13 @@ pub mod tests { )); } - shred.header.coding_header.index += 1; + shred.common_header.header.coding_header.index += 1; // Establish a baseline that works { - let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let coding_shred = Shred::new_empty_from_header(shred.clone()); let index = index_cf - .get(shred.header.coding_header.slot) + .get(shred.common_header.header.coding_header.slot) .unwrap() .unwrap(); assert!(Blocktree::should_insert_coding_shred( @@ -3163,7 +3159,7 @@ pub mod tests { // Trying to insert a shred with index < position should fail { - let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let mut coding_shred = Shred::new_empty_from_header(shred.clone()); let index = coding_shred.headers.common_header.header.position - 1; coding_shred.set_index(index as u32); @@ -3177,7 +3173,7 @@ pub mod tests { // Trying to insert shred with num_coding == 0 should fail { - let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let mut coding_shred = Shred::new_empty_from_header(shred.clone()); coding_shred.headers.common_header.header.num_coding_shreds = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3189,7 +3185,7 @@ pub mod tests { // Trying to insert shred with pos >= num_coding should fail { - let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let mut coding_shred = Shred::new_empty_from_header(shred.clone()); coding_shred.headers.common_header.header.num_coding_shreds = coding_shred.headers.common_header.header.position; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); @@ -3203,7 +3199,7 @@ pub mod tests { // Trying to insert with set_index with num_coding that would imply the last blob // has index > u32::MAX should fail { - let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let mut coding_shred = Shred::new_empty_from_header(shred.clone()); coding_shred.headers.common_header.header.num_coding_shreds = 3; coding_shred .headers @@ -3233,7 +3229,7 @@ pub mod tests { // Trying to insert value into slot <= than last root should fail { - let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let mut coding_shred = Shred::new_empty_from_header(shred.clone()); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blocktree::should_insert_coding_shred( @@ -3295,8 +3291,8 @@ pub mod tests { // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blocktree.slot_data_iterator(8).unwrap(); - let result: Vec = shred_iter - .filter_map(|(_, bytes)| ShredInfo::new_from_serialized_shred(bytes.to_vec()).ok()) + let result: Vec = shred_iter + .filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) .collect(); assert_eq!(result.len(), slot_8_shreds.len()); assert_eq!(result, slot_8_shreds); @@ -3438,7 +3434,7 @@ pub mod tests { slot: u64, parent_slot: u64, num_entries: u64, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let entries = create_ticks(num_entries, Hash::default()); let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true); (shreds, entries) @@ -3448,7 +3444,7 @@ pub mod tests { start_slot: u64, num_slots: u64, entries_per_slot: u64, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let mut shreds = vec![]; let mut entries = vec![]; for slot in start_slot..start_slot + num_slots { @@ -3467,7 +3463,7 @@ pub mod tests { pub fn make_chaining_slot_entries( chain: &[u64], entries_per_slot: u64, - ) -> Vec<(Vec, Vec)> { + ) -> Vec<(Vec, Vec)> { let mut slots_shreds_and_entries = vec![]; for (i, slot) in chain.iter().enumerate() { let parent_slot = { diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 401da166bb..5eec39902d 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -37,7 +37,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { .unwrap_or(0); let num_entries = receive_results.entries.len(); - let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds( + let (shred_bufs, _) = broadcast_utils::entries_to_shreds( receive_results.entries, bank.slot(), receive_results.last_tick, @@ -57,10 +57,10 @@ impl BroadcastRun for BroadcastFakeBlobsRun { .map(|_| Entry::new(&self.last_blockhash, 0, vec![])) .collect(); - let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds( + let (fake_shred_bufs, _) = broadcast_utils::entries_to_shreds( fake_entries, - bank.slot(), receive_results.last_tick, + bank.slot(), bank.max_tick_height(), keypair, latest_blob_index, @@ -80,11 +80,11 @@ impl BroadcastRun for BroadcastFakeBlobsRun { if i <= self.partition { // Send fake blobs to the first N peers fake_shred_bufs.iter().for_each(|b| { - sock.send_to(&b.shred, &peer.tvu_forwards).unwrap(); + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); }); } else { shred_bufs.iter().for_each(|b| { - sock.send_to(&b.shred, &peer.tvu_forwards).unwrap(); + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); }); } }); diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index f354a70aaf..567496b72f 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,7 +1,7 @@ use crate::entry::Entry; use crate::poh_recorder::WorkingBankEntry; use crate::result::Result; -use crate::shred::{Shred, ShredInfo, Shredder, RECOMMENDED_FEC_RATE}; +use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE}; use solana_runtime::bank::Bank; use solana_sdk::signature::Keypair; use std::sync::mpsc::Receiver; @@ -62,7 +62,7 @@ pub(super) fn entries_to_shreds( keypair: &Arc, latest_shred_index: u64, parent_slot: u64, -) -> (Vec, Vec, u64) { +) -> (Vec, u64) { let mut shredder = Shredder::new( slot, parent_slot, @@ -81,10 +81,9 @@ pub(super) fn entries_to_shreds( shredder.finalize_data(); } - let (shreds, shred_infos): (Vec, Vec) = - shredder.shred_tuples.into_iter().unzip(); + let shred_infos: Vec = shredder.shreds.drain(..).collect(); - trace!("Inserting {:?} shreds in blocktree", shreds.len()); + trace!("Inserting {:?} shreds in blocktree", shred_infos.len()); - (shreds, shred_infos, u64::from(shredder.index)) + (shred_infos, u64::from(shredder.index)) } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index d0adf12188..9cd8463c5c 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -36,7 +36,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { .map(|meta| meta.consumed) .unwrap_or(0); - let (_, shred_infos, _) = broadcast_utils::entries_to_shreds( + let (shred_infos, _) = broadcast_utils::entries_to_shreds( receive_results.entries, last_tick, bank.slot(), @@ -54,7 +54,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.shred).collect(); + let shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.payload).collect(); // Broadcast data + erasures cluster_info.read().unwrap().broadcast_shreds( sock, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ae67235f54..0ffaa27348 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -79,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun { 0 }; - let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds( + let (shred_infos, latest_shred_index) = entries_to_shreds( receive_results.entries, last_tick, bank.slot(), @@ -90,7 +90,7 @@ impl BroadcastRun for StandardBroadcastRun { ); let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); - let num_shreds = all_shreds.len(); + let num_shreds = shred_infos.len(); blocktree .insert_shreds(shred_infos.clone(), None) .expect("Failed to insert shreds in blocktree"); @@ -102,7 +102,7 @@ impl BroadcastRun for StandardBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let all_shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.shred).collect(); + let all_shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.payload).collect(); trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); cluster_info.read().unwrap().broadcast_shreds( sock, diff --git a/core/src/chacha.rs b/core/src/chacha.rs index c0dd47f2d6..e6e553cded 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -153,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "C7RmQ7oDswQfgquukXHGvpYYSCcKTgPnJrYA3ABbX9oG" + let golden: Hash = "3LWNjNqC6HncoWUhXbk6cUH8NSM675aZqRPGUC4Zq21H" .parse() .unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c56895c266..97957c6cd8 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1768,7 +1768,7 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::repair_service::RepairType; use crate::result::Error; - use crate::shred::{DataShred, Shred, ShredInfo}; + use crate::shred::{DataShredHeader, Shred}; use crate::test_tx::test_tx; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use solana_sdk::hash::Hash; @@ -1928,12 +1928,11 @@ mod tests { 0, ); assert!(rv.is_empty()); - let mut data_shred = DataShred::default(); - data_shred.header.data_header.slot = 2; - data_shred.header.parent_offset = 1; - data_shred.header.data_header.index = 1; - let shred = Shred::Data(data_shred); - let shred_info = ShredInfo::new_from_shred(&shred); + let mut data_shred = DataShredHeader::default(); + data_shred.data_header.slot = 2; + data_shred.parent_offset = 1; + data_shred.data_header.index = 1; + let shred_info = Shred::new_empty_from_header(data_shred); blocktree .insert_shreds(vec![shred_info], None) @@ -1948,10 +1947,10 @@ mod tests { 1, ); assert!(!rv.is_empty()); - let rv: Vec = rv + let rv: Vec = rv .into_iter() .filter_map(|b| { - ShredInfo::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() + Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() }) .collect(); assert_eq!(rv[0].index(), 1); @@ -1982,10 +1981,10 @@ mod tests { let rv = ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1); - let rv: Vec = rv + let rv: Vec = rv .into_iter() .filter_map(|b| { - ShredInfo::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() + Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() }) .collect(); assert!(!rv.is_empty()); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6818fe9115..a681a4dfee 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -806,7 +806,7 @@ mod test { use crate::entry; use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; use crate::replay_stage::ReplayStage; - use crate::shred::ShredInfo; + use crate::shred::Shred; use solana_runtime::genesis_utils::GenesisBlockInfo; use solana_sdk::hash::{hash, Hash}; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -956,7 +956,7 @@ mod test { // marked as dead. Returns the error for caller to verify. fn check_dead_fork(shred_to_insert: F) -> Result<()> where - F: Fn(&Hash, u64) -> Vec, + F: Fn(&Hash, u64) -> Vec, { let ledger_path = get_tmp_ledger_path!(); let res = { diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 9bfd776124..12df376886 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -11,7 +11,7 @@ use crate::repair_service; use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::shred::ShredInfo; +use crate::shred::Shred; use crate::storage_stage::NUM_STORAGE_SAMPLES; use crate::streamer::{receiver, responder, PacketReceiver}; use crate::window_service::WindowService; @@ -871,10 +871,10 @@ impl Replicator { while let Ok(mut more) = r_reader.try_recv() { packets.packets.append(&mut more.packets); } - let shreds: Vec = packets + let shreds: Vec = packets .packets .into_iter() - .filter_map(|p| ShredInfo::new_from_serialized_shred(p.data.to_vec()).ok()) + .filter_map(|p| Shred::new_from_serialized_shred(p.data.to_vec()).ok()) .collect(); blocktree.insert_shreds(shreds, None)?; } diff --git a/core/src/shred.rs b/core/src/shred.rs index 645c33628b..4c034d7cdf 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -3,7 +3,6 @@ use crate::erasure::Session; use crate::result; use crate::result::Error; use bincode::serialized_size; -use core::borrow::BorrowMut; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use solana_sdk::packet::PACKET_DATA_SIZE; @@ -22,10 +21,6 @@ lazy_static! { { serialized_size(&CodingShred::empty_shred()).unwrap() as usize }; static ref SIZE_OF_EMPTY_DATA_SHRED: usize = { serialized_size(&DataShred::empty_shred()).unwrap() as usize }; - static ref SIZE_OF_SHRED_CODING_SHRED: usize = - { serialized_size(&Shred::Coding(CodingShred::empty_shred())).unwrap() as usize }; - static ref SIZE_OF_SHRED_DATA_SHRED: usize = - { serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap() as usize }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; static ref SIZE_OF_EMPTY_VEC: usize = @@ -34,83 +29,42 @@ lazy_static! { } /// The constants that define if a shred is data or coding -const DATA_SHRED: u8 = 0b1010_0101; -const CODING_SHRED: u8 = 0b0101_1010; +pub const DATA_SHRED: u8 = 0b1010_0101; +pub const CODING_SHRED: u8 = 0b0101_1010; #[derive(Clone, Debug, PartialEq)] -pub struct ShredInfo { +pub struct Shred { pub headers: DataShredHeader, - pub shred: Vec, + pub payload: Vec, } -impl ShredInfo { +impl Shred { fn new(header: DataShredHeader, shred_buf: Vec) -> Self { - ShredInfo { + Shred { headers: header, - shred: shred_buf, + payload: shred_buf, } } pub fn new_from_serialized_shred(shred_buf: Vec) -> result::Result { - let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; - let shred_type: u8 = - bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])?; + let shred_type: u8 = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; let header = if shred_type == CODING_SHRED { let end = *SIZE_OF_CODING_SHRED_HEADER; let mut header = DataShredHeader::default(); - header.common_header.header = - bincode::deserialize(&shred_buf[header_offset..header_offset + end])?; + header.common_header.header = bincode::deserialize(&shred_buf[..end])?; header } else { let end = *SIZE_OF_DATA_SHRED_HEADER; - bincode::deserialize(&shred_buf[header_offset..header_offset + end])? + bincode::deserialize(&shred_buf[..end])? }; Ok(Self::new(header, shred_buf)) } - pub fn new_from_shred_and_buf(shred: &Shred, shred_buf: Vec) -> Self { - let header = match shred { - Shred::Data(s) => s.header.clone(), - Shred::Coding(s) => { - let mut hdr = DataShredHeader::default(); - hdr.common_header.header = s.header.clone(); - hdr - } - }; - - Self::new(header, shred_buf) - } - - pub fn new_from_shred(shred: &Shred) -> Self { - let header = match shred { - Shred::Data(s) => s.header.clone(), - Shred::Coding(s) => { - let mut hdr = DataShredHeader::default(); - hdr.common_header.header = s.header.clone(); - hdr - } - }; - - let shred_buf = bincode::serialize(&shred).unwrap(); - - Self::new(header, shred_buf) - } - - pub fn new_empty_from_header(header: DataShredHeader) -> Self { - let start = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; - let end = start + *SIZE_OF_DATA_SHRED_HEADER; + pub fn new_empty_from_header(headers: DataShredHeader) -> Self { let mut payload = vec![0; PACKET_DATA_SIZE]; - let mut wr = io::Cursor::new(&mut payload[start..end]); - bincode::serialize_into(&mut wr, &header).expect("Failed to serialize shred"); - if header.common_header.header.shred_type == CODING_SHRED { - let shred_type = 1; - let mut wr = io::Cursor::new(&mut payload[..start]); - bincode::serialize_into(&mut wr, &shred_type).expect("Failed to set coding shred type"); - } - ShredInfo { - headers: header, - shred: payload, - } + let mut wr = io::Cursor::new(&mut payload[..*SIZE_OF_DATA_SHRED_HEADER]); + bincode::serialize_into(&mut wr, &headers).expect("Failed to serialize shred"); + Shred { headers, payload } } fn header(&self) -> &ShredCommonHeader { @@ -214,21 +168,13 @@ impl ShredInfo { let signed_payload_offset = if self.is_data() { CodingShred::overhead() } else { - CodingShred::overhead() + *SIZE_OF_SHRED_TYPE - - *SIZE_OF_CODING_SHRED_HEADER - - *SIZE_OF_EMPTY_VEC + *SIZE_OF_SHRED_TYPE } + *SIZE_OF_SIGNATURE; self.signature() - .verify(pubkey.as_ref(), &self.shred[signed_payload_offset..]) + .verify(pubkey.as_ref(), &self.payload[signed_payload_offset..]) } } -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub enum Shred { - Data(DataShred), - Coding(CodingShred), -} - /// This limit comes from reed solomon library, but unfortunately they don't have /// a public constant defined for it. const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; @@ -310,7 +256,7 @@ impl Default for CodingShredHeader { /// Default shred is sized correctly to meet MTU/Packet size requirements impl Default for DataShred { fn default() -> Self { - let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_DATA_SHRED; + let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_DATA_SHRED; DataShred { header: DataShredHeader::default(), payload: vec![0; size], @@ -321,7 +267,7 @@ impl Default for DataShred { /// Default shred is sized correctly to meet MTU/Packet size requirements impl Default for CodingShred { fn default() -> Self { - let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED; + let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_CODING_SHRED; CodingShred { header: CodingShredHeader::default(), payload: vec![0; size], @@ -351,7 +297,7 @@ impl ShredCommon for DataShred { } fn overhead() -> usize { - *SIZE_OF_SHRED_DATA_SHRED - *SIZE_OF_EMPTY_VEC + *SIZE_OF_EMPTY_DATA_SHRED - *SIZE_OF_EMPTY_VEC } fn empty_shred() -> Self { @@ -374,7 +320,7 @@ impl ShredCommon for CodingShred { } fn overhead() -> usize { - *SIZE_OF_SHRED_CODING_SHRED + *SIZE_OF_EMPTY_CODING_SHRED } fn empty_shred() -> Self { @@ -393,19 +339,16 @@ pub struct Shredder { parent_offset: u16, fec_rate: f32, signer: Arc, - pub shred_tuples: Vec<(Shred, ShredInfo)>, + pub shreds: Vec, fec_set_shred_start: usize, - active_shred: Shred, + active_shred: DataShred, active_offset: usize, } impl Write for Shredder { fn write(&mut self, buf: &[u8]) -> io::Result { let written = self.active_offset; - let (slice_len, capacity) = match self.active_shred.borrow_mut() { - Shred::Data(s) => s.write_at(written, buf), - Shred::Coding(s) => s.write_at(written, buf), - }; + let (slice_len, capacity) = self.active_shred.write_at(written, buf); if buf.len() > slice_len || capacity == 0 { self.finalize_data_shred(); @@ -427,13 +370,6 @@ impl Write for Shredder { #[derive(Default, Debug, PartialEq)] pub struct RecoveryResult { - pub recovered_data: Vec, - pub recovered_code: Vec, -} - -#[derive(Default, Debug, PartialEq)] -pub struct DeshredResult { - pub payload: Vec, pub recovered_data: Vec, pub recovered_code: Vec, } @@ -467,7 +403,7 @@ impl Shredder { data_shred.header.data_header.slot = slot; data_shred.header.data_header.index = index; data_shred.header.parent_offset = (slot - parent) as u16; - let active_shred = Shred::Data(data_shred); + let active_shred = data_shred; Ok(Shredder { slot, index, @@ -475,7 +411,7 @@ impl Shredder { parent_offset: (slot - parent) as u16, fec_rate, signer: signer.clone(), - shred_tuples: vec![], + shreds: vec![], fec_set_shred_start: 0, active_shred, active_offset: 0, @@ -483,12 +419,12 @@ impl Shredder { } } - fn sign_shred(signer: &Arc, shred_info: &mut ShredInfo, signature_offset: usize) { + fn sign_shred(signer: &Arc, shred_info: &mut Shred, signature_offset: usize) { let data_offset = signature_offset + *SIZE_OF_SIGNATURE; - let signature = signer.sign_message(&shred_info.shred[data_offset..]); + let signature = signer.sign_message(&shred_info.payload[data_offset..]); let serialized_signature = bincode::serialize(&signature).expect("Failed to generate serialized signature"); - shred_info.shred[signature_offset..signature_offset + serialized_signature.len()] + shred_info.payload[signature_offset..signature_offset + serialized_signature.len()] .copy_from_slice(&serialized_signature); shred_info.header_mut().signature = signature; } @@ -496,18 +432,17 @@ impl Shredder { fn sign_unsigned_shreds_and_generate_codes(&mut self) { let signature_offset = CodingShred::overhead(); let signer = self.signer.clone(); - self.shred_tuples[self.fec_set_shred_start..] + self.shreds[self.fec_set_shred_start..] .iter_mut() - .for_each(|(_, d)| Self::sign_shred(&signer, d, signature_offset)); - let unsigned_coding_shred_start = self.shred_tuples.len(); + .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + let unsigned_coding_shred_start = self.shreds.len(); + self.generate_coding_shreds(); - let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED + *SIZE_OF_SHRED_TYPE - - *SIZE_OF_CODING_SHRED_HEADER - - *SIZE_OF_EMPTY_VEC; - self.shred_tuples[unsigned_coding_shred_start..] + let signature_offset = *SIZE_OF_SHRED_TYPE; + self.shreds[unsigned_coding_shred_start..] .iter_mut() - .for_each(|(_, d)| Self::sign_shred(&signer, d, coding_header_offset)); - self.fec_set_shred_start = self.shred_tuples.len(); + .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + self.fec_set_shred_start = self.shreds.len(); } /// Finalize a data shred. Update the shred index for the next shred @@ -518,10 +453,10 @@ impl Shredder { self.active_offset = 0; self.index += 1; - let mut shred = Shred::Data(self.new_data_shred()); + let mut shred = self.new_data_shred(); std::mem::swap(&mut shred, &mut self.active_shred); - let shred_info = ShredInfo::new_from_shred_and_buf(&shred, data); - self.shred_tuples.push((shred, shred_info)); + let shred_info = Shred::new(shred.header, data); + self.shreds.push(shred_info); } /// Creates a new data shred @@ -559,23 +494,23 @@ impl Shredder { Session::new(num_data, num_coding).expect("Failed to create erasure session"); let start_index = self.index - num_data as u32; - // All information after "reserved" field (coding shred header) in a data shred is encoded + // All information after coding shred field in a data shred is encoded let coding_block_offset = CodingShred::overhead(); - let data_ptrs: Vec<_> = self.shred_tuples[self.fec_set_shred_start..] + let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..] .iter() - .map(|(_, data)| &data.shred[coding_block_offset..]) + .map(|data| &data.payload[coding_block_offset..]) .collect(); // Create empty coding shreds, with correctly populated headers let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { - let shred = bincode::serialize(&Shred::Coding(Self::new_coding_shred( + let shred = bincode::serialize(&Self::new_coding_shred( self.slot, start_index + i as u32, num_data, num_coding, i, - ))) + )) .unwrap(); coding_shreds.push(shred); }); @@ -592,10 +527,16 @@ impl Shredder { .expect("Failed in erasure encode"); // append to the shred list - coding_shreds.into_iter().for_each(|code| { - let shred: Shred = bincode::deserialize(&code).unwrap(); - let shred_info = ShredInfo::new_from_shred_and_buf(&shred, code); - self.shred_tuples.push((shred, shred_info)); + coding_shreds.into_iter().enumerate().for_each(|(i, code)| { + let mut header = DataShredHeader::default(); + header.common_header.header.shred_type = CODING_SHRED; + header.common_header.header.coding_header.index = start_index + i as u32; + header.common_header.header.coding_header.slot = self.slot; + header.common_header.header.num_coding_shreds = num_coding as u16; + header.common_header.header.num_data_shreds = num_data as u16; + header.common_header.header.position = i as u16; + let shred_info = Shred::new(header, code); + self.shreds.push(shred_info); }); self.fec_set_index = self.index; } @@ -605,21 +546,13 @@ impl Shredder { /// If there's an active data shred, morph it into the final shred /// If the current active data shred is first in slot, finalize it and create a new shred fn make_final_data_shred(&mut self, last_in_slot: u8) { - if let Shred::Data(s) = &self.active_shred { - if s.header.data_header.index == 0 { - self.finalize_data_shred(); - } + if self.active_shred.header.data_header.index == 0 { + self.finalize_data_shred(); + } + self.active_shred.header.flags |= DATA_COMPLETE_SHRED; + if last_in_slot == LAST_SHRED_IN_SLOT { + self.active_shred.header.flags |= LAST_SHRED_IN_SLOT; } - self.active_shred = match self.active_shred.borrow_mut() { - Shred::Data(s) => { - s.header.flags |= DATA_COMPLETE_SHRED; - if last_in_slot == LAST_SHRED_IN_SLOT { - s.header.flags |= LAST_SHRED_IN_SLOT; - } - Shred::Data(s.clone()) - } - Shred::Coding(_) => unreachable!(), - }; self.finalize_data_shred(); self.sign_unsigned_shreds_and_generate_codes(); } @@ -635,7 +568,7 @@ impl Shredder { } fn fill_in_missing_shreds( - shred: &ShredInfo, + shred: &Shred, num_data: usize, num_coding: usize, slot: u64, @@ -667,25 +600,25 @@ impl Shredder { first_index: usize, missing: usize, ) -> Vec { - let missing_shred = if missing < first_index + num_data { + if missing < first_index + num_data { let mut data_shred = DataShred::default(); data_shred.header.data_header.slot = slot; data_shred.header.data_header.index = missing as u32; - Shred::Data(data_shred) + bincode::serialize(&data_shred).unwrap() } else { - Shred::Coding(Self::new_coding_shred( + bincode::serialize(&Self::new_coding_shred( slot, missing.saturating_sub(num_data) as u32, num_data, num_coding, missing - first_index - num_data, )) - }; - bincode::serialize(&missing_shred).unwrap() + .unwrap() + } } pub fn try_recovery( - shreds: Vec, + shreds: Vec, num_data: usize, num_coding: usize, first_index: usize, @@ -712,7 +645,7 @@ impl Shredder { next_expected_index, &mut present, ); - blocks.push(shred.shred); + blocks.push(shred.payload); next_expected_index = last_index + 1; blocks }) @@ -750,7 +683,7 @@ impl Shredder { let drain_this = position - num_drained; let shred_buf = shred_bufs.remove(drain_this); num_drained += 1; - if let Ok(shred) = ShredInfo::new_from_serialized_shred(shred_buf) { + if let Ok(shred) = Shred::new_from_serialized_shred(shred_buf) { let shred_index = shred.index() as usize; // Valid shred must be in the same slot as the original shreds if shred.slot() == slot { @@ -780,7 +713,7 @@ impl Shredder { } /// Combines all shreds to recreate the original buffer - pub fn deshred(shreds: &[ShredInfo]) -> Result, reed_solomon_erasure::Error> { + pub fn deshred(shreds: &[Shred]) -> Result, reed_solomon_erasure::Error> { let num_data = shreds.len(); let data_shred_bufs = { let first_index = shreds.first().unwrap().index() as usize; @@ -795,13 +728,13 @@ impl Shredder { Err(reed_solomon_erasure::Error::TooFewDataShards)?; } - shreds.iter().map(|shred| &shred.shred).collect() + shreds.iter().map(|shred| &shred.payload).collect() }; Ok(Self::reassemble_payload(num_data, data_shred_bufs)) } - fn get_shred_index(shred: &ShredInfo, num_data: usize) -> usize { + fn get_shred_index(shred: &Shred, num_data: usize) -> usize { if shred.is_data() { shred.index() as usize } else { @@ -813,7 +746,7 @@ impl Shredder { data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = *SIZE_OF_SHRED_DATA_SHRED; + let offset = *SIZE_OF_EMPTY_DATA_SHRED; data[offset as usize..].iter() }) .cloned() @@ -826,14 +759,14 @@ mod tests { use super::*; fn verify_test_data_shred( - shred: &ShredInfo, + shred: &Shred, index: u32, slot: u64, parent: u64, pk: &Pubkey, verify: bool, ) { - assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); assert!(shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -841,8 +774,8 @@ mod tests { assert_eq!(verify, shred.verify(pk)); } - fn verify_test_code_shred(shred: &ShredInfo, index: u32, slot: u64, pk: &Pubkey, verify: bool) { - assert_eq!(shred.shred.len(), PACKET_DATA_SIZE); + fn verify_test_code_shred(shred: &Shred, index: u32, slot: u64, pk: &Pubkey, verify: bool) { + assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); assert!(!shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -865,7 +798,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); assert!(DataShred::overhead() < PACKET_DATA_SIZE); @@ -874,12 +807,12 @@ mod tests { // Test0: Write some data to shred. Not enough to create a signed shred let data: Vec = (0..25).collect(); assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 25); // Test1: Write some more data to shred. Not enough to create a signed shred assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 50); // Test2: Write enough data to create a shred (> PACKET_DATA_SIZE) @@ -888,12 +821,12 @@ mod tests { let offset = shredder.write(&data).unwrap(); assert_ne!(offset, data.len()); // Assert that we have atleast one signed shred - assert!(!shredder.shred_tuples.is_empty()); + assert!(!shredder.shreds.is_empty()); // Assert that the new active shred was not populated assert_eq!(shredder.active_offset, 0); // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) - let (_, shred) = &shredder.shred_tuples[0]; + let shred = &shredder.shreds[0]; // Test4: assert that it matches the original shred // The shreds are not signed yet, as the data is not finalized verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), false); @@ -910,10 +843,10 @@ mod tests { shredder.finalize_data(); // We should have a new signed shred - assert!(!shredder.shred_tuples.is_empty()); + assert!(!shredder.shreds.is_empty()); // Must be Last in FEC Set - let (_, shred) = &shredder.shred_tuples[1]; + let shred = &shredder.shreds[1]; verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); // Test that same seed is NOT generated for two different shreds @@ -927,9 +860,9 @@ mod tests { assert_ne!(offset, data.len()); // We should have a new signed shred - assert!(!shredder.shred_tuples.is_empty()); + assert!(!shredder.shreds.is_empty()); - let (_, shred) = &shredder.shred_tuples[2]; + let shred = &shredder.shreds[2]; verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), false); // Test8: Write more data to generate an intermediate data shred @@ -937,10 +870,10 @@ mod tests { assert_ne!(offset, data.len()); // We should have a new signed shred - assert!(!shredder.shred_tuples.is_empty()); + assert!(!shredder.shreds.is_empty()); // Must be a Data shred - let (_, shred) = &shredder.shred_tuples[3]; + let shred = &shredder.shreds[3]; verify_test_data_shred(&shred, 3, slot, slot - 5, &keypair.pubkey(), false); // Test9: Write some data to shredder @@ -951,10 +884,10 @@ mod tests { shredder.finalize_slot(); // We should have a new signed shred - assert!(!shredder.shred_tuples.is_empty()); + assert!(!shredder.shreds.is_empty()); // Must be LastInSlot - let (_, shred) = &shredder.shred_tuples[4]; + let shred = &shredder.shreds[4]; verify_test_data_shred(&shred, 4, slot, slot - 5, &keypair.pubkey(), true); } @@ -966,7 +899,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..25).collect(); @@ -974,23 +907,23 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 0 shreds now - assert_eq!(shredder.shred_tuples.len(), 0); + assert_eq!(shredder.shreds.len(), 0); shredder.finalize_data(); // We should have 1 shred now - assert_eq!(shredder.shred_tuples.len(), 2); + assert_eq!(shredder.shreds.len(), 2); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2) .expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..25).collect(); @@ -998,13 +931,13 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 0 shreds now - assert_eq!(shredder.shred_tuples.len(), 0); + assert_eq!(shredder.shreds.len(), 0); shredder.finalize_data(); // We should have 1 shred now (LastInFECBlock) - assert_eq!(shredder.shred_tuples.len(), 1); - let (_, shred) = shredder.shred_tuples.remove(0); + assert_eq!(shredder.shreds.len(), 1); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true); } @@ -1019,7 +952,7 @@ mod tests { let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0) .expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); // Write enough data to create a shred (> PACKET_DATA_SIZE) @@ -1029,28 +962,28 @@ mod tests { let _ = shredder.write(&data).unwrap(); // We should have 2 shreds now - assert_eq!(shredder.shred_tuples.len(), 2); + assert_eq!(shredder.shreds.len(), 2); shredder.finalize_data(); // Finalize must have created 1 final data shred and 3 coding shreds // assert_eq!(shredder.shreds.len(), 6); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_code_shred(&shred, 0, slot, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_code_shred(&shred, 1, slot, &keypair.pubkey(), true); - let (_, shred) = shredder.shred_tuples.remove(0); + let shred = shredder.shreds.remove(0); verify_test_code_shred(&shred, 2, slot, &keypair.pubkey(), true); } @@ -1061,7 +994,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..4000).collect(); @@ -1075,7 +1008,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shred_tuples.len(), + shredder.shreds.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1084,13 +1017,9 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + assert_eq!(shredder.shreds.len(), expected_shred_count); - let (_, shred_infos): (Vec, Vec) = shredder - .shred_tuples - .iter() - .map(|(s, b)| (s.clone(), b.clone())) - .unzip(); + let shred_infos = shredder.shreds.clone(); // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( @@ -1121,18 +1050,12 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let (_, mut shred_info): (Vec, Vec) = shredder - .shred_tuples + let mut shred_info: Vec = shredder + .shreds .iter() .enumerate() - .filter_map(|(i, (s, b))| { - if i % 2 == 0 { - Some((s.clone(), b.clone())) - } else { - None - } - }) - .unzip(); + .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) + .collect(); let mut result = Shredder::try_recovery( shred_info.clone(), @@ -1167,18 +1090,12 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let (_, mut shred_info): (Vec, Vec) = shredder - .shred_tuples + let mut shred_info: Vec = shredder + .shreds .iter() .enumerate() - .filter_map(|(i, (s, b))| { - if i % 2 != 0 { - Some((s.clone(), b.clone())) - } else { - None - } - }) - .unzip(); + .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) + .collect(); let mut result = Shredder::try_recovery( shred_info.clone(), @@ -1225,7 +1142,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shred_tuples.len(), + shredder.shreds.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1234,20 +1151,14 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + assert_eq!(shredder.shreds.len(), expected_shred_count); - let (_, mut shred_info): (Vec, Vec) = shredder - .shred_tuples + let mut shred_info: Vec = shredder + .shreds .iter() .enumerate() - .filter_map(|(i, (s, b))| { - if i % 2 != 0 { - Some((s.clone(), b.clone())) - } else { - None - } - }) - .unzip(); + .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) + .collect(); let mut result = Shredder::try_recovery( shred_info.clone(), @@ -1282,11 +1193,11 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail - let shreds: Vec = shredder - .shred_tuples + let shreds: Vec = shredder + .shreds .iter() .enumerate() - .filter_map(|(i, (_, s))| { + .filter_map(|(i, s)| { if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) { Some(s.clone()) } else { @@ -1314,7 +1225,7 @@ mod tests { // We should have some shreds now assert_eq!( - shredder.shred_tuples.len(), + shredder.shreds.len(), data.len() / approx_shred_payload_size ); assert_eq!(offset, data.len()); @@ -1323,20 +1234,14 @@ mod tests { // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + assert_eq!(shredder.shreds.len(), expected_shred_count); - let (_, mut shred_info): (Vec, Vec) = shredder - .shred_tuples + let mut shred_info: Vec = shredder + .shreds .iter() .enumerate() - .filter_map(|(i, (s, b))| { - if i % 2 != 0 { - Some((s.clone(), b.clone())) - } else { - None - } - }) - .unzip(); + .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) + .collect(); let mut result = Shredder::try_recovery( shred_info.clone(), @@ -1427,7 +1332,7 @@ mod tests { let mut shredder = Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); - assert!(shredder.shred_tuples.is_empty()); + assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect(); @@ -1439,28 +1344,28 @@ mod tests { } // We should have some shreds now - assert!(shredder.shred_tuples.len() > data.len() / approx_shred_payload_size); + assert!(shredder.shreds.len() > data.len() / approx_shred_payload_size); assert_eq!(offset, data.len()); shredder.finalize_data(); let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + assert_eq!(shredder.shreds.len(), expected_shred_count); let mut index = 0; - while index < shredder.shred_tuples.len() { + while index < shredder.shreds.len() { let num_data_shreds = cmp::min( MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, - (shredder.shred_tuples.len() - index) / 2, + (shredder.shreds.len() - index) / 2, ); let coding_start = index + num_data_shreds; - shredder.shred_tuples[index..coding_start] + shredder.shreds[index..coding_start] .iter() - .for_each(|(_, s)| assert!(s.is_data())); + .for_each(|s| assert!(s.is_data())); index = coding_start + num_data_shreds; - shredder.shred_tuples[coding_start..index] + shredder.shreds[coding_start..index] .iter() - .for_each(|(_, s)| assert!(!s.is_data())); + .for_each(|s| assert!(!s.is_data())); } } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f2db851b7f..c36e9bca4f 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::shred::ShredInfo; +use crate::shred::Shred; use crate::streamer::{PacketReceiver, PacketSender}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; use rayon::ThreadPool; @@ -28,7 +28,7 @@ pub const NUM_THREADS: u32 = 10; /// drop blobs that are from myself or not from the correct leader for the /// blob's slot pub fn should_retransmit_and_persist( - shred: &ShredInfo, + shred: &Shred, bank: Option>, leader_schedule_cache: &Arc, my_pubkey: &Pubkey, @@ -67,7 +67,7 @@ fn recv_window( leader_schedule_cache: &Arc, ) -> Result<()> where - F: Fn(&ShredInfo, u64) -> bool, + F: Fn(&Shred, u64) -> bool, F: Sync, { let timer = Duration::from_millis(200); @@ -86,7 +86,7 @@ where .par_iter_mut() .enumerate() .filter_map(|(i, packet)| { - if let Ok(shred) = ShredInfo::new_from_serialized_shred(packet.data.to_vec()) { + if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { if shred_filter(&shred, last_root) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); @@ -177,7 +177,7 @@ impl WindowService { ) -> WindowService where F: 'static - + Fn(&Pubkey, &ShredInfo, Option>, u64) -> bool + + Fn(&Pubkey, &Shred, Option>, u64) -> bool + std::marker::Send + std::marker::Sync, { @@ -305,13 +305,13 @@ mod test { slot: u64, parent: u64, keypair: &Arc, - ) -> Vec { + ) -> Vec { let mut shredder = Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder"); bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - shredder.shred_tuples.into_iter().map(|(_, s)| s).collect() + shredder.shreds.drain(..).collect() } #[test] @@ -435,7 +435,7 @@ mod test { .into_iter() .map(|mut s| { let mut p = Packet::default(); - p.data.copy_from_slice(&mut s.shred); + p.data.copy_from_slice(&mut s.payload); p }) .collect();