From ff608992ee030b8d7da8ecb6945a9e2059631375 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 17 Sep 2019 18:22:46 -0700 Subject: [PATCH] Replace Shred usage with ShredInfo (#5939) * Replace Shred usage with ShredInfo * Fix tests * fix clippy --- core/src/blocktree.rs | 206 +++++------- .../broadcast_fake_blobs_run.rs | 4 +- .../fail_entry_verification_broadcast_run.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 2 +- core/src/cluster_info.rs | 9 +- core/src/replay_stage.rs | 4 +- core/src/replicator.rs | 10 +- core/src/retransmit_stage.rs | 3 +- core/src/shred.rs | 301 ++++++++++-------- core/src/window_service.rs | 70 +--- 10 files changed, 294 insertions(+), 317 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 64706fc5e..010ec90f0 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -4,7 +4,7 @@ use crate::entry::Entry; use crate::erasure::ErasureConfig; use crate::result::{Error, Result}; -use crate::shred::{Shred, ShredInfo, Shredder}; +use crate::shred::{ShredInfo, Shredder}; #[cfg(feature = "kvstore")] use solana_kvstore as kvstore; @@ -22,7 +22,7 @@ use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::borrow::{Borrow, Cow}; +use std::borrow::Borrow; use std::cell::RefCell; use std::cmp; use std::fs; @@ -322,7 +322,7 @@ impl Blocktree { index_working_set: &HashMap, prev_inserted_datas: &mut HashMap<(u64, u64), ShredInfo>, prev_inserted_codes: &mut HashMap<(u64, u64), ShredInfo>, - ) -> Vec { + ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); let mut recovered_data_shreds = vec![]; @@ -357,7 +357,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { - Some(ShredInfo::new_from_serialized_shred(data)) + ShredInfo::new_from_serialized_shred(data).ok() } else { warn!("Data shred deleted while reading for recovery"); None @@ -377,7 +377,7 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { - Some(ShredInfo::new_from_serialized_shred(code)) + ShredInfo::new_from_serialized_shred(code).ok() } else { warn!("Code shred deleted while reading for recovery"); None @@ -415,7 +415,7 @@ impl Blocktree { pub fn insert_shreds( &self, - shreds: Vec, + shreds: Vec, leader_schedule: Option<&Arc>, ) -> Result<()> { let db = &*self.db; @@ -509,7 +509,7 @@ impl Blocktree { fn check_insert_coding_shred( &self, - shred: Shred, + shred: ShredInfo, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -525,13 +525,11 @@ impl Blocktree { // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { - if let Ok(shred_buf) = - self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) + if let Ok(()) = self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) { - let shred_info = ShredInfo::new_from_shred(&shred, shred_buf); just_inserted_coding_shreds .entry((slot, shred_index)) - .or_insert_with(|| shred_info); + .or_insert_with(|| shred); new_index_meta.map(|n| index_working_set.insert(slot, n)); } } @@ -539,7 +537,7 @@ impl Blocktree { fn check_insert_data_shred( &self, - shred: Shred, + shred: ShredInfo, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -563,14 +561,13 @@ impl Blocktree { index_meta.data(), &self.last_root, ) { - if let Ok(shred_buf) = self.insert_data_shred( + if let Ok(()) = self.insert_data_shred( &mut slot_meta, index_meta.data_mut(), &shred, write_batch, ) { - let shred_info = ShredInfo::new_from_shred(&shred, shred_buf); - just_inserted_data_shreds.insert((slot, shred_index), shred_info); + just_inserted_data_shreds.insert((slot, shred_index), shred); new_index_meta.map(|n| index_working_set.insert(slot, n)); true } else { @@ -587,7 +584,7 @@ impl Blocktree { } fn should_insert_coding_shred( - shred: &Shred, + shred: &ShredInfo, coding_index: &CodingIndex, last_root: &RwLock, ) -> bool { @@ -614,9 +611,9 @@ impl Blocktree { &self, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_meta: &mut Index, - shred: &Shred, + shred: &ShredInfo, write_batch: &mut WriteBatch, - ) -> Result> { + ) -> Result<()> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); let (num_data, num_coding, pos) = shred @@ -651,18 +648,16 @@ impl Blocktree { ); } - let serialized_shred = bincode::serialize(shred).unwrap(); - // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), &serialized_shred)?; + write_batch.put_bytes::((slot, shred_index), &shred.shred)?; index_meta.coding_mut().set_present(shred_index, true); - Ok(serialized_shred) + Ok(()) } fn should_insert_data_shred( - shred: &Shred, + shred: &ShredInfo, slot_meta: &SlotMeta, data_index: &DataIndex, last_root: &RwLock, @@ -725,9 +720,9 @@ impl Blocktree { &self, slot_meta: &mut SlotMeta, data_index: &mut DataIndex, - shred: &Shred, + shred: &ShredInfo, write_batch: &mut WriteBatch, - ) -> Result> { + ) -> Result<()> { let slot = shred.slot(); let index = u64::from(shred.index()); let parent = shred.parent(); @@ -763,15 +758,13 @@ impl Blocktree { slot_meta.consumed }; - let serialized_shred = bincode::serialize(shred).unwrap(); - // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), &serialized_shred)?; + write_batch.put_bytes::((slot, index), &shred.shred)?; update_slot_meta(last_in_slot, slot_meta, index, new_consumed); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); - Ok(serialized_shred) + Ok(()) } pub fn get_data_shred(&self, slot: u64, index: u64) -> Result>> { @@ -859,8 +852,8 @@ impl Blocktree { parent_slot = current_slot - 1; remaining_ticks_in_slot = ticks_per_slot; shredder.finalize_slot(); - let shreds: Vec = - shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); + let shreds: Vec = + shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); all_shreds.extend(shreds); shredder = Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0) @@ -883,7 +876,7 @@ impl Blocktree { if is_full_slot && remaining_ticks_in_slot != 0 { shredder.finalize_slot(); } - let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); + let shreds: Vec = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); all_shreds.extend(shreds); let num_shreds = all_shreds.len(); @@ -996,21 +989,26 @@ impl Blocktree { pub fn get_slot_entries_with_shred_count( &self, slot: u64, - start_index: u64, + mut start_index: u64, ) -> Result<(Vec, usize)> { // Find the next consecutive block of shreds. - let serialized_shreds = get_slot_consecutive_shreds(slot, &self.db, start_index)?; + let mut serialized_shreds: Vec> = vec![]; + let data_cf = self.db.column::(); + + while let Some(serialized_shred) = data_cf.get_bytes((slot, start_index))? { + serialized_shreds.push(serialized_shred); + start_index += 1; + } + trace!( "Found {:?} shreds for slot {:?}", serialized_shreds.len(), slot ); - let mut shreds: Vec = serialized_shreds - .iter() - .map(|serialzied_shred| { - let shred: Shred = - bincode::deserialize(serialzied_shred).expect("Failed to deserialize shred"); - shred + let mut shreds: Vec = serialized_shreds + .into_iter() + .filter_map(|serialized_shred| { + ShredInfo::new_from_serialized_shred(serialized_shred).ok() }) .collect(); @@ -1387,22 +1385,6 @@ fn find_slot_meta_in_cached_state<'a>( } } -fn get_slot_consecutive_shreds<'a>( - slot: u64, - db: &Database, - mut current_index: u64, -) -> Result>> { - let mut serialized_shreds: Vec> = vec![]; - let data_cf = db.column::(); - - while let Some(serialized_shred) = data_cf.get_bytes((slot, current_index))? { - serialized_shreds.push(Cow::Owned(serialized_shred)); - current_index += 1; - } - - Ok(serialized_shreds) -} - // Chaining based on latest discussion here: https://github.com/solana-labs/solana/pull/2253 fn handle_chaining( db: &Database, @@ -1590,7 +1572,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); + let shreds: Vec = shredder.shred_tuples.into_iter().map(|(_, s)| s).collect(); blocktree.insert_shreds(shreds, None)?; blocktree.set_roots(&[0])?; @@ -1671,7 +1653,7 @@ pub fn entries_to_test_shreds( slot: u64, parent_slot: u64, is_full_slot: bool, -) -> Vec { +) -> Vec { let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32) .expect("Failed to create entry shredder"); @@ -1683,16 +1665,14 @@ pub fn entries_to_test_shreds( shredder.finalize_data(); } - let shreds: Vec = shredder.shred_tuples.into_iter().map(|(s, _)| s).collect(); - - shreds + shredder.shred_tuples.into_iter().map(|(_, s)| s).collect() } #[cfg(test)] pub mod tests { use super::*; use crate::entry::{create_ticks, Entry}; - use crate::shred::CodingShred; + use crate::shred::{CodingShred, Shred}; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -1854,10 +1834,7 @@ pub mod tests { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100); let num_shreds = shreds.len() as u64; - let shred_bufs: Vec<_> = shreds - .iter() - .map(|shred| bincode::serialize(shred).unwrap()) - .collect(); + let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.shred.clone()).collect(); let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes"); let ledger = Blocktree::open(&ledger_path).unwrap(); @@ -2255,7 +2232,7 @@ pub mod tests { // is missing the tick at shred index == slot index - 1. Thus, no consecutive blocks // will be formed let num_slots = shreds_per_slot; - let mut shreds: Vec = vec![]; + let mut shreds = vec![]; let mut missing_shreds = vec![]; for slot in 1..num_slots + 1 { let (mut slot_shreds, _) = make_slot_entries(slot, slot - 1, entries_per_slot); @@ -3141,7 +3118,7 @@ pub mod tests { shred.header.coding_header.index = 11; shred.header.coding_header.slot = 1; shred.header.num_coding_shreds = shred.header.position + 1; - let coding_shred = Shred::Coding(shred.clone()); + let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); // Insert a good coding shred assert!(Blocktree::should_insert_coding_shred( @@ -3172,12 +3149,13 @@ pub mod tests { // Establish a baseline that works { + let coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); let index = index_cf .get(shred.header.coding_header.slot) .unwrap() .unwrap(); assert!(Blocktree::should_insert_coding_shred( - &Shred::Coding(shred.clone()), + &coding_shred, index.coding(), &last_root )); @@ -3185,14 +3163,13 @@ pub mod tests { // Trying to insert a shred with index < position should fail { - let mut shred_ = shred.clone(); - shred_.header.coding_header.index = (shred_.header.position - 1).into(); - let index = index_cf - .get(shred_.header.coding_header.slot) - .unwrap() - .unwrap(); + let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let index = coding_shred.headers.common_header.header.position - 1; + coding_shred.set_index(index as u32); + + let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( - &Shred::Coding(shred_), + &coding_shred, index.coding(), &last_root )); @@ -3200,14 +3177,11 @@ pub mod tests { // Trying to insert shred with num_coding == 0 should fail { - let mut shred_ = shred.clone(); - shred_.header.num_coding_shreds = 0; - let index = index_cf - .get(shred_.header.coding_header.slot) - .unwrap() - .unwrap(); + let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + coding_shred.headers.common_header.header.num_coding_shreds = 0; + let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( - &Shred::Coding(shred_), + &coding_shred, index.coding(), &last_root )); @@ -3215,14 +3189,12 @@ pub mod tests { // Trying to insert shred with pos >= num_coding should fail { - let mut shred_ = shred.clone(); - shred_.header.num_coding_shreds = shred_.header.position; - let index = index_cf - .get(shred_.header.coding_header.slot) - .unwrap() - .unwrap(); + let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + coding_shred.headers.common_header.header.num_coding_shreds = + coding_shred.headers.common_header.header.position; + let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( - &Shred::Coding(shred_), + &coding_shred, index.coding(), &last_root )); @@ -3231,23 +3203,24 @@ pub mod tests { // Trying to insert with set_index with num_coding that would imply the last blob // has index > u32::MAX should fail { - let mut shred_ = shred.clone(); - shred_.header.num_coding_shreds = 3; - shred_.header.coding_header.index = std::u32::MAX - 1; - shred_.header.position = 0; - let index = index_cf - .get(shred_.header.coding_header.slot) - .unwrap() - .unwrap(); + let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + coding_shred.headers.common_header.header.num_coding_shreds = 3; + coding_shred + .headers + .common_header + .header + .coding_header + .index = std::u32::MAX - 1; + coding_shred.headers.common_header.header.position = 0; + let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( - &Shred::Coding(shred_.clone()), + &coding_shred, index.coding(), &last_root )); // Decreasing the number of num_coding_shreds will put it within the allowed limit - shred_.header.num_coding_shreds = 2; - let coding_shred = Shred::Coding(shred_); + coding_shred.headers.common_header.header.num_coding_shreds = 2; assert!(Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), @@ -3260,14 +3233,11 @@ pub mod tests { // Trying to insert value into slot <= than last root should fail { - let mut shred_ = shred.clone(); - let index = index_cf - .get(shred_.header.coding_header.slot) - .unwrap() - .unwrap(); - shred_.header.coding_header.slot = *last_root.read().unwrap(); + let mut coding_shred = ShredInfo::new_from_shred(&Shred::Coding(shred.clone())); + let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); + coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blocktree::should_insert_coding_shred( - &Shred::Coding(shred_), + &coding_shred, index.coding(), &last_root )); @@ -3313,7 +3283,7 @@ pub mod tests { let shreds_per_slot = 10; let slots = vec![2, 4, 8, 12]; let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot); - let slot_8_shreds = bincode::serialize(&all_shreds[2].0).unwrap(); + let slot_8_shreds = all_shreds[2].0.clone(); for (slot_shreds, _) in all_shreds { blocktree.insert_shreds(slot_shreds, None).unwrap(); } @@ -3325,15 +3295,11 @@ pub mod tests { // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blocktree.slot_data_iterator(8).unwrap(); - let result: Vec = shred_iter - .map(|(_, bytes)| { - let shred: Shred = bincode::deserialize(&bytes).unwrap(); - shred - }) + let result: Vec = shred_iter + .filter_map(|(_, bytes)| ShredInfo::new_from_serialized_shred(bytes.to_vec()).ok()) .collect(); - let result_serialized = bincode::serialize(&result).unwrap(); - assert_eq!(result_serialized.len(), slot_8_shreds.len()); - assert_eq!(result_serialized, slot_8_shreds); + assert_eq!(result.len(), slot_8_shreds.len()); + assert_eq!(result, slot_8_shreds); drop(blocktree); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -3472,7 +3438,7 @@ pub mod tests { slot: u64, parent_slot: u64, num_entries: u64, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let entries = create_ticks(num_entries, Hash::default()); let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true); (shreds, entries) @@ -3482,7 +3448,7 @@ pub mod tests { start_slot: u64, num_slots: u64, entries_per_slot: u64, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let mut shreds = vec![]; let mut entries = vec![]; for slot in start_slot..start_slot + num_slots { @@ -3501,7 +3467,7 @@ pub mod tests { pub fn make_chaining_slot_entries( chain: &[u64], entries_per_slot: u64, - ) -> Vec<(Vec, Vec)> { + ) -> Vec<(Vec, Vec)> { let mut slots_shreds_and_entries = vec![]; for (i, slot) in chain.iter().enumerate() { let parent_slot = { diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index d9c98b1e5..6e4fe818e 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -36,7 +36,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { .map(|meta| meta.consumed) .unwrap_or(0); - let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds( + let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds( receive_results.ventries, bank.slot(), receive_results.last_tick, @@ -72,7 +72,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = Hash::default(); } - blocktree.insert_shreds(shreds, None)?; + blocktree.insert_shreds(shred_bufs.clone(), None)?; // 3) Start broadcast step let peers = cluster_info.read().unwrap().tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index c1521747d..765380355 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -53,7 +53,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); - blocktree.insert_shreds(shreds, None)?; + blocktree.insert_shreds(shred_infos.clone(), None)?; // 3) Start broadcast step let bank_epoch = bank.get_stakers_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index a8dc9a401..bedda4c06 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -92,7 +92,7 @@ impl BroadcastRun for StandardBroadcastRun { let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); let num_shreds = all_shreds.len(); blocktree - .insert_shreds(all_shreds, None) + .insert_shreds(shred_infos.clone(), None) .expect("Failed to insert shreds in blocktree"); let to_blobs_elapsed = to_blobs_start.elapsed(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 06463b813..682f3e63e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1768,7 +1768,7 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::repair_service::RepairType; use crate::result::Error; - use crate::shred::{DataShred, Shred}; + use crate::shred::{DataShred, Shred, ShredInfo}; use crate::test_tx::test_tx; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use solana_sdk::hash::Hash; @@ -1931,9 +1931,10 @@ mod tests { let mut shred = Shred::Data(DataShred::default()); shred.set_slot(2); shred.set_index(1); + let shred_info = ShredInfo::new_from_shred(&shred); blocktree - .insert_shreds(vec![shred], None) + .insert_shreds(vec![shred_info], None) .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request( @@ -2008,10 +2009,10 @@ mod tests { assert!(rv.is_empty()); // Create slots 1, 2, 3 with 5 blobs apiece - let (blobs, _) = make_many_slot_entries(1, 3, 5); + let (shreds, _) = make_many_slot_entries(1, 3, 5); blocktree - .insert_shreds(blobs, None) + .insert_shreds(shreds, None) .expect("Expect successful ledger write"); // We don't have slot 4, so we don't know how to service this requeset diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 72bd45f84..0a4ec7497 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -872,7 +872,7 @@ mod test { use crate::entry; use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; use crate::replay_stage::ReplayStage; - use crate::shred::Shred; + use crate::shred::ShredInfo; use solana_runtime::genesis_utils::GenesisBlockInfo; use solana_sdk::hash::{hash, Hash}; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -1022,7 +1022,7 @@ mod test { // marked as dead. Returns the error for caller to verify. fn check_dead_fork(shred_to_insert: F) -> Result<()> where - F: Fn(&Hash, u64) -> Vec, + F: Fn(&Hash, u64) -> Vec, { let ledger_path = get_tmp_ledger_path!(); let res = { diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 6eac3d7c1..9bfd77612 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -11,7 +11,7 @@ use crate::repair_service; use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::shred::Shred; +use crate::shred::ShredInfo; use crate::storage_stage::NUM_STORAGE_SAMPLES; use crate::streamer::{receiver, responder, PacketReceiver}; use crate::window_service::WindowService; @@ -477,7 +477,7 @@ impl Replicator { &exit, RepairStrategy::RepairRange(repair_slot_range), &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _, _| true, + |_, _, _, _| true, ); info!("waiting for ledger download"); Self::wait_for_segment_download( @@ -871,10 +871,10 @@ impl Replicator { while let Ok(mut more) = r_reader.try_recv() { packets.packets.append(&mut more.packets); } - let shreds: Vec = packets + let shreds: Vec = packets .packets - .iter() - .filter_map(|p| bincode::deserialize(&p.data).ok()) + .into_iter() + .filter_map(|p| ShredInfo::new_from_serialized_shred(p.data.to_vec()).ok()) .collect(); blocktree.insert_shreds(shreds, None)?; } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9c491cdfe..a63b532fb 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -151,10 +151,9 @@ impl RetransmitStage { exit, repair_strategy, &leader_schedule_cache.clone(), - move |id, shred, shred_buf, working_bank, last_root| { + move |id, shred, working_bank, last_root| { should_retransmit_and_persist( shred, - shred_buf, working_bank, &leader_schedule_cache, id, diff --git a/core/src/shred.rs b/core/src/shred.rs index 60bd51358..472a704b2 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -37,7 +37,7 @@ lazy_static! { const DATA_SHRED: u8 = 0b1010_0101; const CODING_SHRED: u8 = 0b0101_1010; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct ShredInfo { pub headers: DataShredHeader, pub shred: Vec, @@ -51,25 +51,24 @@ impl ShredInfo { } } - pub fn new_from_serialized_shred(shred_buf: Vec) -> Self { + pub fn new_from_serialized_shred(shred_buf: Vec) -> result::Result { let header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; let shred_type: u8 = - bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE]) - .unwrap(); + bincode::deserialize(&shred_buf[header_offset..header_offset + *SIZE_OF_SHRED_TYPE])?; let header = if shred_type == CODING_SHRED { let end = *SIZE_OF_CODING_SHRED_HEADER; let mut header = DataShredHeader::default(); header.common_header.header = - bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap(); + bincode::deserialize(&shred_buf[header_offset..header_offset + end])?; header } else { let end = *SIZE_OF_DATA_SHRED_HEADER; - bincode::deserialize(&shred_buf[header_offset..header_offset + end]).unwrap() + bincode::deserialize(&shred_buf[header_offset..header_offset + end])? }; - Self::new(header, shred_buf) + Ok(Self::new(header, shred_buf)) } - pub fn new_from_shred(shred: &Shred, shred_buf: Vec) -> Self { + pub fn new_from_shred_and_buf(shred: &Shred, shred_buf: Vec) -> Self { let header = match shred { Shred::Data(s) => s.header.clone(), Shred::Coding(s) => { @@ -82,6 +81,38 @@ impl ShredInfo { Self::new(header, shred_buf) } + pub fn new_from_shred(shred: &Shred) -> Self { + let header = match shred { + Shred::Data(s) => s.header.clone(), + Shred::Coding(s) => { + let mut hdr = DataShredHeader::default(); + hdr.common_header.header = s.header.clone(); + hdr + } + }; + + let shred_buf = bincode::serialize(&shred).unwrap(); + + Self::new(header, shred_buf) + } + + pub fn new_empty_from_header(header: DataShredHeader) -> Self { + let start = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; + let end = start + *SIZE_OF_DATA_SHRED_HEADER; + let mut payload = vec![0; PACKET_DATA_SIZE]; + let mut wr = io::Cursor::new(&mut payload[start..end]); + bincode::serialize_into(&mut wr, &header).expect("Failed to serialize shred"); + if header.common_header.header.shred_type == CODING_SHRED { + let shred_type = 1; + let mut wr = io::Cursor::new(&mut payload[..start]); + bincode::serialize_into(&mut wr, &shred_type).expect("Failed to set coding shred type"); + } + ShredInfo { + headers: header, + shred: payload, + } + } + fn header(&self) -> &ShredCommonHeader { if self.is_data() { &self.headers.data_header @@ -114,6 +145,18 @@ impl ShredInfo { self.header().index } + /// This is not a safe function. It only changes the meta information. + /// Use this only for test code which doesn't care about actual shred + pub fn set_index(&mut self, index: u32) { + self.header_mut().index = index + } + + /// This is not a safe function. It only changes the meta information. + /// Use this only for test code which doesn't care about actual shred + pub fn set_slot(&mut self, slot: u64) { + self.header_mut().slot = slot + } + pub fn signature(&self) -> Signature { self.header().signature } @@ -138,6 +181,14 @@ impl ShredInfo { } } + /// This is not a safe function. It only changes the meta information. + /// Use this only for test code which doesn't care about actual shred + pub fn set_last_in_slot(&mut self) { + if self.is_data() { + self.headers.flags |= LAST_SHRED_IN_SLOT + } + } + pub fn data_complete(&self) -> bool { if self.is_data() { self.headers.flags & DATA_COMPLETE_SHRED == DATA_COMPLETE_SHRED @@ -158,6 +209,18 @@ impl ShredInfo { None } } + + pub fn verify(&self, pubkey: &Pubkey) -> bool { + let signed_payload_offset = if self.is_data() { + CodingShred::overhead() + } else { + CodingShred::overhead() + *SIZE_OF_SHRED_TYPE + - *SIZE_OF_CODING_SHRED_HEADER + - *SIZE_OF_EMPTY_VEC + } + *SIZE_OF_SIGNATURE; + self.signature() + .verify(pubkey.as_ref(), &self.shred[signed_payload_offset..]) + } } #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] @@ -169,6 +232,7 @@ pub enum Shred { /// This limit comes from reed solomon library, but unfortunately they don't have /// a public constant defined for it. const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; + /// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds pub const RECOMMENDED_FEC_RATE: f32 = 0.25; @@ -313,7 +377,7 @@ pub struct ShredCommonHeader { /// A common header that is present at start of every data shred #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct DataShredHeader { - common_header: CodingShred, + pub common_header: CodingShred, pub data_header: ShredCommonHeader, pub parent_offset: u16, pub flags: u8, @@ -493,8 +557,8 @@ impl Write for Shredder { #[derive(Default, Debug, PartialEq)] pub struct RecoveryResult { - pub recovered_data: Vec, - pub recovered_code: Vec, + pub recovered_data: Vec, + pub recovered_code: Vec, } #[derive(Default, Debug, PartialEq)] @@ -593,7 +657,7 @@ impl Shredder { let mut shred = Shred::Data(self.new_data_shred()); std::mem::swap(&mut shred, &mut self.active_shred); - let shred_info = ShredInfo::new_from_shred(&shred, data); + let shred_info = ShredInfo::new_from_shred_and_buf(&shred, data); self.shred_tuples.push((shred, shred_info)); } @@ -667,7 +731,7 @@ impl Shredder { // append to the shred list coding_shreds.into_iter().for_each(|code| { let shred: Shred = bincode::deserialize(&code).unwrap(); - let shred_info = ShredInfo::new_from_shred(&shred, code); + let shred_info = ShredInfo::new_from_shred_and_buf(&shred, code); self.shred_tuples.push((shred, shred_info)); }); self.fec_set_index = self.index; @@ -812,26 +876,32 @@ impl Shredder { .collect(); session.decode_blocks(&mut blocks, &present)?; + let mut num_drained = 0; present .iter() .enumerate() .for_each(|(position, was_present)| { if !was_present { - let shred: Shred = bincode::deserialize(&shred_bufs[position]).unwrap(); - let shred_index = shred.index() as usize; - // Valid shred must be in the same slot as the original shreds - if shred.slot() == slot { - // Data shreds are "positioned" at the start of the iterator. First num_data - // shreds are expected to be the data shreds. - if position < num_data - && (first_index..first_index + num_data).contains(&shred_index) - { - // Also, a valid data shred must be indexed between first_index and first+num_data index - recovered_data.push(shred) - } else if (first_index..first_index + num_coding).contains(&shred_index) - { - // A valid coding shred must be indexed between first_index and first+num_coding index - recovered_code.push(shred) + let drain_this = position - num_drained; + let shred_buf = shred_bufs.remove(drain_this); + num_drained += 1; + if let Ok(shred) = ShredInfo::new_from_serialized_shred(shred_buf) { + let shred_index = shred.index() as usize; + // Valid shred must be in the same slot as the original shreds + if shred.slot() == slot { + // Data shreds are "positioned" at the start of the iterator. First num_data + // shreds are expected to be the data shreds. + if position < num_data + && (first_index..first_index + num_data).contains(&shred_index) + { + // Also, a valid data shred must be indexed between first_index and first+num_data index + recovered_data.push(shred) + } else if (first_index..first_index + num_coding) + .contains(&shred_index) + { + // A valid coding shred must be indexed between first_index and first+num_coding index + recovered_code.push(shred) + } } } } @@ -845,7 +915,7 @@ impl Shredder { } /// Combines all shreds to recreate the original buffer - pub fn deshred(shreds: &[Shred]) -> Result, reed_solomon_erasure::Error> { + pub fn deshred(shreds: &[ShredInfo]) -> Result, reed_solomon_erasure::Error> { let num_data = shreds.len(); let data_shred_bufs = { let first_index = shreds.first().unwrap().index() as usize; @@ -860,11 +930,7 @@ impl Shredder { Err(reed_solomon_erasure::Error::TooFewDataShards)?; } - let shred_bufs: Vec> = shreds - .iter() - .map(|shred| bincode::serialize(shred).unwrap()) - .collect(); - shred_bufs + shreds.iter().map(|shred| &shred.shred).collect() }; Ok(Self::reassemble_payload(num_data, data_shred_bufs)) @@ -878,7 +944,7 @@ impl Shredder { } } - fn reassemble_payload(num_data: usize, data_shred_bufs: Vec>) -> Vec { + fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec>) -> Vec { data_shred_bufs[..num_data] .iter() .flat_map(|data| { @@ -1215,7 +1281,7 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (shreds, shred_infos): (Vec, Vec) = shredder + let (_, shred_infos): (Vec, Vec) = shredder .shred_tuples .iter() .map(|(s, b)| (s.clone(), b.clone())) @@ -1245,12 +1311,12 @@ mod tests { assert_ne!(RecoveryResult::default(), result); assert!(result.recovered_data.is_empty()); assert!(!result.recovered_code.is_empty()); - let result = Shredder::deshred(&shreds[..4]).unwrap(); + let result = Shredder::deshred(&shred_infos[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let (mut shreds, shred_info): (Vec, Vec) = shredder + let (_, mut shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() @@ -1264,7 +1330,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_info, + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1275,45 +1341,40 @@ mod tests { assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 1); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(1, recovered_shred); + shred_info.insert(1, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 3); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(3, recovered_shred); + shred_info.insert(3, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 1); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 1); - } - let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 3); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 3); - } + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 1); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 1))); - let result = Shredder::deshred(&shreds[..4]).unwrap(); + let recovered_shred = result.recovered_code.remove(0); + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 3); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 3))); + + let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let (mut shreds, shred_info): (Vec, Vec) = shredder + let (_, mut shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() @@ -1327,7 +1388,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_info, + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1338,40 +1399,35 @@ mod tests { assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 0); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(0, recovered_shred); + shred_info.insert(0, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 2); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(2, recovered_shred); + shred_info.insert(2, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 0); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 0); - } - let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 2); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 2); - } + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 0); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - let result = Shredder::deshred(&shreds[..4]).unwrap(); + let recovered_shred = result.recovered_code.remove(0); + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 2); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); + + let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1399,7 +1455,7 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (mut shreds, shred_info): (Vec, Vec) = shredder + let (_, mut shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() @@ -1413,7 +1469,7 @@ mod tests { .unzip(); let mut result = Shredder::try_recovery( - shred_info, + shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1424,49 +1480,44 @@ mod tests { assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 0); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(0, recovered_shred); + shred_info.insert(0, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 2); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(2, recovered_shred); + shred_info.insert(2, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 0); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 0); - } - let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 2); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 2); - } + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 0); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - let result = Shredder::deshred(&shreds[..4]).unwrap(); + let recovered_shred = result.recovered_code.remove(0); + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 2); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); + + let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail - let shreds: Vec = shredder + let shreds: Vec = shredder .shred_tuples .iter() .enumerate() - .filter_map(|(i, (s, _))| { + .filter_map(|(i, (_, s))| { if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) { Some(s.clone()) } else { @@ -1505,7 +1556,7 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let (mut shreds, shred_info): (Vec, Vec) = shredder + let (_, mut shred_info): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() @@ -1530,40 +1581,35 @@ mod tests { assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 25); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(0, recovered_shred); + shred_info.insert(0, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert!(recovered_shred.is_data()); assert_eq!(recovered_shred.index(), 27); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(2, recovered_shred); + shred_info.insert(2, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 0); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 25); - } - let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 4); - assert_eq!(code.header.num_coding_shreds, 4); - assert_eq!(code.header.position, 2); - assert_eq!(code.header.coding_header.slot, slot); - assert_eq!(code.header.coding_header.index, 27); - } + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 25); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - let result = Shredder::deshred(&shreds[..4]).unwrap(); + let recovered_shred = result.recovered_code.remove(0); + assert!(!recovered_shred.is_data()); + assert_eq!(recovered_shred.index(), 27); + assert_eq!(recovered_shred.slot(), slot); + assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); + + let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1593,7 +1639,7 @@ mod tests { // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( - shred_info.clone(), + shred_info, expected_shred_count / 2, expected_shred_count / 2, 35, @@ -1685,6 +1731,7 @@ mod tests { assert_eq!(shred.last_in_slot(), shred_info.last_in_slot()); assert_eq!(shred.data_complete(), shred_info.data_complete()); assert_eq!(shred.coding_params(), shred_info.coding_params()); + assert!(shred_info.verify(&keypair.pubkey())); }) } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 1770476a4..f2db851b7 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::shred::Shred; +use crate::shred::ShredInfo; use crate::streamer::{PacketReceiver, PacketSender}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; use rayon::ThreadPool; @@ -28,8 +28,7 @@ pub const NUM_THREADS: u32 = 10; /// drop blobs that are from myself or not from the correct leader for the /// blob's slot pub fn should_retransmit_and_persist( - shred: &Shred, - shred_buf: &[u8], + shred: &ShredInfo, bank: Option>, leader_schedule_cache: &Arc, my_pubkey: &Pubkey, @@ -46,7 +45,7 @@ pub fn should_retransmit_and_persist( } else if !blocktree::verify_shred_slots(shred.slot(), shred.parent(), root) { inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1); false - } else if !shred.fast_verify(&shred_buf, &leader_id) { + } else if !shred.verify(&leader_id) { inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); false } else { @@ -68,7 +67,7 @@ fn recv_window( leader_schedule_cache: &Arc, ) -> Result<()> where - F: Fn(&Shred, &[u8], u64) -> bool, + F: Fn(&ShredInfo, u64) -> bool, F: Sync, { let timer = Duration::from_millis(200); @@ -87,9 +86,8 @@ where .par_iter_mut() .enumerate() .filter_map(|(i, packet)| { - if let Ok(s) = bincode::deserialize(&packet.data) { - let shred: Shred = s; - if shred_filter(&shred, &packet.data, last_root) { + if let Ok(shred) = ShredInfo::new_from_serialized_shred(packet.data.to_vec()) { + if shred_filter(&shred, last_root) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); Some((shred, i)) @@ -179,7 +177,7 @@ impl WindowService { ) -> WindowService where F: 'static - + Fn(&Pubkey, &Shred, &[u8], Option>, u64) -> bool + + Fn(&Pubkey, &ShredInfo, Option>, u64) -> bool + std::marker::Send + std::marker::Sync, { @@ -223,11 +221,10 @@ impl WindowService { &id, &r, &retransmit, - |shred, shred_buf, last_root| { + |shred, last_root| { shred_filter( &id, shred, - shred_buf, bank_forks .as_ref() .map(|bank_forks| bank_forks.read().unwrap().working_bank()), @@ -308,13 +305,13 @@ mod test { slot: u64, parent: u64, keypair: &Arc, - ) -> Vec { + ) -> Vec { let mut shredder = Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder"); bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); - shredder.shred_tuples.into_iter().map(|(s, _)| s).collect() + shredder.shred_tuples.into_iter().map(|(_, s)| s).collect() } #[test] @@ -350,21 +347,10 @@ mod test { let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair); - let shred_bufs: Vec<_> = shreds - .iter() - .map(|s| bincode::serialize(s).unwrap()) - .collect(); // with a Bank for slot 0, blob continues assert_eq!( - should_retransmit_and_persist( - &shreds[0], - &shred_bufs[0], - Some(bank.clone()), - &cache, - &me_id, - 0, - ), + should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0,), true ); @@ -379,14 +365,7 @@ mod test { // with a Bank and no idea who leader is, blob gets thrown out shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); assert_eq!( - should_retransmit_and_persist( - &shreds[0], - &shred_bufs[0], - Some(bank.clone()), - &cache, - &me_id, - 0 - ), + should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0), false ); @@ -395,14 +374,7 @@ mod test { let shreds = local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair); assert_eq!( - should_retransmit_and_persist( - &shreds[0], - &shred_bufs[0], - Some(bank.clone()), - &cache, - &me_id, - slot - ), + should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot), false ); @@ -411,14 +383,7 @@ mod test { let shreds = local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair); assert_eq!( - should_retransmit_and_persist( - &shreds[0], - &shred_bufs[0], - Some(bank.clone()), - &cache, - &me_id, - slot - ), + should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot), false ); @@ -454,7 +419,7 @@ mod test { &exit, RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _, _| true, + |_, _, _, _| true, ); window } @@ -468,10 +433,9 @@ mod test { let (shreds, _) = make_many_slot_entries(0, 5, 10); let packets: Vec<_> = shreds .into_iter() - .map(|s| { + .map(|mut s| { let mut p = Packet::default(); - p.data - .copy_from_slice(&mut bincode::serialize(&s).unwrap().as_ref()); + p.data.copy_from_slice(&mut s.shred); p }) .collect();