From 4ac1213c9ce5070d55f858101619da23d5f3b523 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 26 Aug 2019 18:27:45 -0700 Subject: [PATCH] Integrate coding shreds and recovery (#5625) * Integrate coding shreds and recovery * More tests for shreds and some fixes * address review comments * fixes to code shred generation * unignore tests * fixes to recovery --- core/src/blocktree.rs | 337 +++++++++++----- core/src/blocktree/meta.rs | 2 +- core/src/broadcast_stage.rs | 19 +- core/src/broadcast_stage/broadcast_utils.rs | 30 -- .../broadcast_stage/standard_broadcast_run.rs | 111 +++--- core/src/cluster_info.rs | 4 +- core/src/cluster_info_repair_listener.rs | 4 +- core/src/repair_service.rs | 7 +- core/src/replay_stage.rs | 2 +- core/src/replicator.rs | 2 +- core/src/shred.rs | 368 ++++++++++++++---- core/src/storage_stage.rs | 1 - core/src/window_service.rs | 15 +- 13 files changed, 614 insertions(+), 288 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 23187f29b3..cac54c6f35 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1,7 +1,6 @@ //! The `block_tree` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use crate::broadcast_stage::broadcast_utils::entries_to_shreds; use crate::entry::Entry; use crate::erasure::{ErasureConfig, Session}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; @@ -37,6 +36,7 @@ use std::sync::{Arc, RwLock}; pub use self::meta::*; pub use self::rooted_slot_iterator::*; use solana_sdk::timing::Slot; +use std::io::Write; mod db; mod meta; @@ -94,7 +94,7 @@ pub struct Blocktree { orphans_cf: LedgerColumn, index_cf: LedgerColumn, data_shred_cf: LedgerColumn, - _code_shred_cf: LedgerColumn, + code_shred_cf: LedgerColumn, batch_processor: Arc>, pub new_blobs_signals: Vec>, pub completed_slots_senders: Vec>>, @@ -166,7 +166,7 @@ impl Blocktree { orphans_cf, index_cf, data_shred_cf, - _code_shred_cf: code_shred_cf, + code_shred_cf, new_blobs_signals: vec![], batch_processor, completed_slots_senders: vec![], @@ -258,6 +258,14 @@ impl Blocktree { .erasure_cf .delete_slot(&mut write_batch, from_slot, batch_end) .unwrap_or(false) + && self + .data_shred_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + .unwrap_or(false) + && self + .code_shred_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + .unwrap_or(false) && self .orphans_cf .delete_slot(&mut write_batch, from_slot, batch_end) @@ -318,47 +326,145 @@ impl Blocktree { Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot)) } - pub fn insert_shreds(&self, shreds: &[Shred]) -> Result<()> { + fn try_shred_recovery( + db: &Database, + erasure_metas: &HashMap<(u64, u64), ErasureMeta>, + index_working_set: &HashMap, + prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, + prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + ) -> Vec { + let data_cf = db.column::(); + let code_cf = db.column::(); + let mut recovered_data_shreds = vec![]; + // Recovery rules: + // 1. Only try recovery around indexes for which new data or coding shreds are received + // 2. For new data shreds, check if an erasure set exists. If not, don't try recovery + // 3. Before trying recovery, check if enough number of shreds have been received + // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data + for (&(slot, set_index), erasure_meta) in erasure_metas.iter() { + let index = index_working_set.get(&slot).expect("Index"); + if let ErasureMetaStatus::CanRecover = erasure_meta.status(&index) { + // Find shreds for this erasure set and try recovery + let slot = index.slot; + let mut available_shreds = vec![]; + (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { + if index.data().is_present(i) { + if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { + let some_data = data_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch data shred"); + if let Some(data) = some_data { + bincode::deserialize(&data).ok() + } else { + warn!("Data shred deleted while reading for recovery"); + None + } + }) { + available_shreds.push(shred); + } + } + }); + (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(|i| { + if index.coding().is_present(i) { + if let Some(shred) = prev_inserted_codes.remove(&(slot, i)).or_else(|| { + let some_code = code_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch code shred"); + if let Some(code) = some_code { + bincode::deserialize(&code).ok() + } else { + warn!("Code shred deleted while reading for recovery"); + None + } + }) { + available_shreds.push(shred); + } + } + }); + if let Ok(mut result) = Shredder::try_recovery( + &available_shreds, + erasure_meta.config.num_data(), + erasure_meta.config.num_coding(), + set_index as usize, + slot, + ) { + recovered_data_shreds.append(&mut result.recovered_data); + } + } + } + recovered_data_shreds + } + + pub fn insert_shreds(&self, shreds: Vec) -> Result<()> { let db = &*self.db; let mut batch_processor = self.batch_processor.write().unwrap(); let mut write_batch = batch_processor.batch()?; - let mut just_inserted_data_indexes = HashMap::new(); + let mut just_inserted_data_shreds = HashMap::new(); + let mut just_inserted_coding_shreds = HashMap::new(); + let mut erasure_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); - shreds.iter().for_each(|shred| { + shreds.into_iter().for_each(|shred| { let slot = shred.slot(); + let shred_index = u64::from(shred.index()); - let _ = index_working_set.entry(slot).or_insert_with(|| { + let index_meta = index_working_set.entry(slot).or_insert_with(|| { self.index_cf .get(slot) .unwrap() .unwrap_or_else(|| Index::new(slot)) }); - }); - // Possibly do erasure recovery here + if let Shred::Coding(coding_shred) = &shred { + // This gives the index of first coding shred in this FEC block + // So, all coding shreds in a given FEC block will have the same set index + let pos = u64::from(coding_shred.header.position); + if shred_index >= pos { + let set_index = shred_index - pos; - let dummy_data = vec![]; - - for shred in shreds { - let slot = shred.slot(); - let index = u64::from(shred.index()); - - let inserted = Blocktree::insert_data_shred( + self.insert_coding_shred( + set_index, + coding_shred.header.num_data_shreds as usize, + coding_shred.header.num_coding_shreds as usize, + &mut just_inserted_coding_shreds, + &mut erasure_metas, + index_meta, + shred, + &mut write_batch, + ) + } + } else if Blocktree::insert_data_shred( db, - &just_inserted_data_indexes, &mut slot_meta_working_set, &mut index_working_set, - shred, + &shred, &mut write_batch, - )?; - - if inserted { - just_inserted_data_indexes.insert((slot, index), &dummy_data); + ) + .unwrap_or(false) + { + just_inserted_data_shreds.insert((slot, shred_index), shred); } - } + }); + + let recovered_data = Self::try_shred_recovery( + &db, + &erasure_metas, + &index_working_set, + &mut just_inserted_data_shreds, + &mut just_inserted_coding_shreds, + ); + + recovered_data.into_iter().for_each(|shred| { + let _ = Blocktree::insert_data_shred( + db, + &mut slot_meta_working_set, + &mut index_working_set, + &shred, + &mut write_batch, + ); + }); // Handle chaining for the working set handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; @@ -391,9 +497,53 @@ impl Blocktree { Ok(()) } + fn insert_coding_shred( + &self, + set_index: u64, + num_data: usize, + num_coding: usize, + prev_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, + erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + index_meta: &mut Index, + shred: Shred, + write_batch: &mut WriteBatch, + ) { + let slot = shred.slot(); + let shred_index = u64::from(shred.index()); + + let erasure_config = ErasureConfig::new(num_data, num_coding); + + let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { + self.erasure_meta_cf + .get((slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config)) + }); + + if erasure_config != erasure_meta.config { + // ToDo: This is a potential slashing condition + warn!("Received multiple erasure configs for the same erasure set!!!"); + warn!( + "Stored config: {:#?}, new config: {:#?}", + erasure_meta.config, erasure_config + ); + } + + let serialized_shred = bincode::serialize(&shred).unwrap(); + let inserted = + write_batch.put_bytes::((slot, shred_index), &serialized_shred); + if inserted.is_ok() { + index_meta.coding_mut().set_present(shred_index, true); + + // `or_insert_with` used to prevent stack overflow + prev_inserted_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred); + } + } + fn insert_data_shred( db: &Database, - prev_inserted_data_indexes: &HashMap<(u64, u64), &[u8]>, mut slot_meta_working_set: &mut HashMap>, Option)>, index_working_set: &mut HashMap, shred: &Shred, @@ -426,31 +576,30 @@ impl Blocktree { .unwrap_or(false) }; - if should_insert( - slot_meta, - &prev_inserted_data_indexes, - index as u64, - slot, - last_in_slot, - check_data_cf, - ) { - let new_consumed = compute_consume_index( - prev_inserted_data_indexes, - slot_meta, - index, - slot, - check_data_cf, - ); + let index_meta = index_working_set + .get_mut(&slot) + .expect("Index must be present for all data blobs") + .data_mut(); + + if !index_meta.is_present(index) + && should_insert(slot_meta, index, slot, last_in_slot, check_data_cf) + { + let new_consumed = if slot_meta.consumed == index { + let mut current_index = index + 1; + + while index_meta.is_present(current_index) || check_data_cf(slot, current_index) { + current_index += 1; + } + current_index + } else { + slot_meta.consumed + }; let serialized_shred = bincode::serialize(shred).unwrap(); write_batch.put_bytes::((slot, index), &serialized_shred)?; update_slot_meta(last_in_slot, slot_meta, index, new_consumed); - index_working_set - .get_mut(&slot) - .expect("Index must be present for all data blobs") - .data_mut() - .set_present(index, true); + index_meta.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(true) } else { @@ -605,12 +754,16 @@ impl Blocktree { remaining_ticks_in_slot -= 1; } - entries_to_shreds( - vec![vec![entry.borrow().clone()]], - ticks_per_slot - remaining_ticks_in_slot, - ticks_per_slot, - &mut shredder, - ); + let data = bincode::serialize(&vec![entry.borrow().clone()]).unwrap(); + let mut offset = 0; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + if remaining_ticks_in_slot == 0 { + shredder.finalize_slot(); + } else { + shredder.finalize_fec_block(); + } } if is_full_slot && remaining_ticks_in_slot != 0 { @@ -624,7 +777,7 @@ impl Blocktree { all_shreds.extend(shreds); let num_shreds = all_shreds.len(); - self.insert_shreds(&all_shreds)?; + self.insert_shreds(all_shreds)?; Ok(num_shreds) } @@ -1182,8 +1335,8 @@ impl Blocktree { break; } - if let Ok(deshred) = Shredder::deshred(&shred_chunk) { - let entries: Vec = bincode::deserialize(&deshred.payload)?; + if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) { + let entries: Vec = bincode::deserialize(&deshred_payload)?; trace!("Found entries: {:#?}", entries); all_entries.extend(entries); num += shred_chunk.len(); @@ -1542,19 +1695,12 @@ fn should_insert_blob( .unwrap_or(false) }; - should_insert( - slot, - prev_inserted_blob_datas, - blob_index, - blob_slot, - last_in_slot, - check_data_cf, - ) + !prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) + && should_insert(slot, blob_index, blob_slot, last_in_slot, check_data_cf) } fn should_insert( slot_meta: &SlotMeta, - prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, index: u64, slot: u64, last_in_slot: bool, @@ -1564,10 +1710,7 @@ where F: Fn(u64, u64) -> bool, { // Check that the index doesn't already exist - if index < slot_meta.consumed - || prev_inserted_blob_datas.contains_key(&(slot, index)) - || db_check(slot, index) - { + if index < slot_meta.consumed || db_check(slot, index) { return false; } // Check that we do not receive index >= than the last_index @@ -2276,14 +2419,19 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re let mut shredder = Shredder::new(0, Some(0), 0.0, &Arc::new(Keypair::new()), 0) .expect("Failed to create entry shredder"); let last_hash = entries.last().unwrap().hash; - entries_to_shreds(vec![entries], ticks_per_slot, ticks_per_slot, &mut shredder); + let data = bincode::serialize(&entries).unwrap(); + let mut offset = 0; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + shredder.finalize_slot(); let shreds: Vec = shredder .shreds .iter() .map(|s| bincode::deserialize(s).unwrap()) .collect(); - blocktree.insert_shreds(&shreds)?; + blocktree.insert_shreds(shreds)?; Ok(last_hash) } @@ -2602,7 +2750,7 @@ pub mod tests { // Insert last blob, we're missing the other blobs, so no consecutive // blobs starting from slot 0, index 0 should exist. let last_shred = shreds.pop().unwrap(); - ledger.insert_shreds(&[last_shred]).unwrap(); + ledger.insert_shreds(vec![last_shred]).unwrap(); assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); let meta = ledger @@ -2612,7 +2760,7 @@ pub mod tests { assert!(meta.consumed == 0 && meta.received == num_shreds); // Insert the other blobs, check for consecutive returned entries - ledger.insert_shreds(&shreds).unwrap(); + ledger.insert_shreds(shreds).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); assert_eq!(result, entries); @@ -2645,7 +2793,7 @@ pub mod tests { // Insert blobs in reverse, check for consecutive returned blobs for i in (0..num_shreds).rev() { let shred = shreds.pop().unwrap(); - ledger.insert_shreds(&[shred]).unwrap(); + ledger.insert_shreds(vec![shred]).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); let meta = ledger @@ -2721,7 +2869,7 @@ pub mod tests { let entries = make_tiny_test_entries(8); let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false); blocktree - .insert_shreds(&shreds) + .insert_shreds(shreds) .expect("Expected successful write of blobs"); let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false); @@ -2729,7 +2877,7 @@ pub mod tests { b.set_index(8 + i as u32); } blocktree - .insert_shreds(&shreds1) + .insert_shreds(shreds1) .expect("Expected successful write of blobs"); assert_eq!( @@ -2763,7 +2911,7 @@ pub mod tests { index += 1; } blocktree - .insert_shreds(&shreds) + .insert_shreds(shreds) .expect("Expected successful write of shreds"); assert_eq!( blocktree @@ -2796,7 +2944,7 @@ pub mod tests { entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false); assert!(shreds.len() as u64 >= shreds_per_slot); blocktree - .insert_shreds(&shreds) + .insert_shreds(shreds) .expect("Expected successful write of shreds"); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries); } @@ -2824,7 +2972,7 @@ pub mod tests { odd_shreds.insert(0, shreds.remove(i as usize)); } } - blocktree.insert_shreds(&odd_shreds).unwrap(); + blocktree.insert_shreds(odd_shreds).unwrap(); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); @@ -2842,7 +2990,7 @@ pub mod tests { assert_eq!(meta.last_index, std::u64::MAX); } - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); assert_eq!( blocktree.get_slot_entries(slot, 0, None).unwrap(), @@ -2875,19 +3023,19 @@ pub mod tests { // Discard first shred original_shreds.remove(0); - blocktree.insert_shreds(&original_shreds).unwrap(); + blocktree.insert_shreds(original_shreds).unwrap(); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true); - blocktree.insert_shreds(&duplicate_shreds).unwrap(); + let num_shreds = duplicate_shreds.len() as u64; + blocktree.insert_shreds(duplicate_shreds).unwrap(); assert_eq!( blocktree.get_slot_entries(0, 0, None).unwrap(), original_entries ); - let num_shreds = duplicate_shreds.len() as u64; let meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(meta.consumed, num_shreds); assert_eq!(meta.received, num_shreds); @@ -3523,11 +3671,11 @@ pub mod tests { let num_shreds = shreds.len(); // Write blobs to the database if should_bulk_write { - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); } else { for _ in 0..num_shreds { let shred = shreds.remove(0); - blocktree.insert_shreds(&vec![shred]).unwrap(); + blocktree.insert_shreds(vec![shred]).unwrap(); } } @@ -3569,7 +3717,7 @@ pub mod tests { b.set_index(i as u32 * gap as u32); b.set_slot(slot); } - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); // Index of the first blob is 0 // Index of the second blob is "gap" @@ -3654,6 +3802,7 @@ pub mod tests { let entries = make_tiny_test_entries(20); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); + shreds.drain(2..); const ONE: u64 = 1; const OTHER: u64 = 4; @@ -3662,7 +3811,7 @@ pub mod tests { shreds[1].set_index(OTHER as u32); // Insert one blob at index = first_index - blocktree.insert_shreds(&shreds[0..2]).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); const STARTS: u64 = OTHER * 2; const END: u64 = OTHER * 3; @@ -3696,7 +3845,7 @@ pub mod tests { let shreds = entries_to_test_shreds(entries, slot, 0, true); let num_shreds = shreds.len(); - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); let empty: Vec = vec![]; for i in 0..num_shreds as u64 { @@ -4051,6 +4200,7 @@ pub mod tests { } #[test] + #[ignore] pub fn test_recovery_basic() { solana_logger::setup(); @@ -4357,6 +4507,7 @@ pub mod tests { } #[test] + #[ignore] fn test_recovery_multi_slot_multi_thread() { use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use std::thread; @@ -4615,14 +4766,16 @@ pub mod tests { ) .expect("Failed to create entry shredder"); - let last_tick = 0; - let bank_max_tick = if is_full_slot { - last_tick + let data = bincode::serialize(&entries).unwrap(); + let mut offset = 0; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + if is_full_slot { + shredder.finalize_slot(); } else { - last_tick + 1 - }; - - entries_to_shreds(vec![entries], last_tick, bank_max_tick, &mut shredder); + shredder.finalize_fec_block(); + } let shreds: Vec = shredder .shreds diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index dfb70af6fd..cef855a961 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -277,7 +277,7 @@ impl ErasureMeta { } pub fn start_index(&self) -> u64 { - self.set_index * self.config.num_data() as u64 + self.set_index } /// returns a tuple of (data_end, coding_end) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 2e55d39081..ea74d3354c 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -108,7 +108,6 @@ trait BroadcastRun { struct Broadcast { coding_generator: CodingGenerator, - parent_slot: Option, thread_pool: ThreadPool, } @@ -148,7 +147,6 @@ impl BroadcastStage { let mut broadcast = Broadcast { coding_generator, - parent_slot: None, thread_pool: rayon::ThreadPoolBuilder::new() .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) .build() @@ -298,7 +296,6 @@ mod test { } #[test] - #[ignore] fn test_broadcast_ledger() { solana_logger::setup(); let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); @@ -316,12 +313,13 @@ mod test { let start_tick_height; let max_tick_height; let ticks_per_slot; + let slot; { let bank = broadcast_service.bank.clone(); start_tick_height = bank.tick_height(); max_tick_height = bank.max_tick_height(); ticks_per_slot = bank.ticks_per_slot(); - + slot = bank.slot(); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { entry_sender @@ -339,15 +337,10 @@ mod test { ); let blocktree = broadcast_service.blocktree; - let mut blob_index = 0; - for i in 0..max_tick_height - start_tick_height { - let slot = (start_tick_height + i + 1) / ticks_per_slot; - - let result = blocktree.get_data_shred_as_blob(slot, blob_index).unwrap(); - - blob_index += 1; - result.expect("expect blob presence"); - } + let (entries, _) = blocktree + .get_slot_entries_with_shred_count(slot, 0) + .expect("Expect entries to be present"); + assert_eq!(entries.len(), max_tick_height as usize); drop(entry_sender); broadcast_service diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index e197b2518d..efad43196d 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -4,12 +4,10 @@ use crate::erasure::CodingGenerator; use crate::packet::{self, SharedBlob}; use crate::poh_recorder::WorkingBankEntries; use crate::result::Result; -use crate::shred::Shredder; use rayon::prelude::*; use rayon::ThreadPool; use solana_runtime::bank::Bank; use solana_sdk::signature::{Keypair, KeypairUtil, Signable}; -use std::io::Write; use std::sync::mpsc::Receiver; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -99,34 +97,6 @@ pub(super) fn entries_to_blobs( (blobs, coding) } -pub fn entries_to_shreds( - ventries: Vec>, - last_tick: u64, - bank_max_tick: u64, - shredder: &mut Shredder, -) { - ventries.iter().enumerate().for_each(|(i, entries)| { - let data = bincode::serialize(entries).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } - // bincode::serialize_into(&shredder, &entries).unwrap(); - trace!( - "Shredded {:?} entries into {:?} shreds", - entries.len(), - shredder.shreds.len() - ); - if i + 1 == ventries.len() && last_tick == bank_max_tick { - debug!("Finalized slot for the shreds"); - shredder.finalize_slot(); - } else { - debug!("Finalized fec block for the shreds"); - shredder.finalize_fec_block(); - } - }) -} - pub(super) fn generate_data_blobs( ventries: Vec>, thread_pool: &ThreadPool, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 01f3b6e6f5..e11ddd83ee 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -2,6 +2,7 @@ use super::broadcast_utils; use super::*; use crate::shred::Shred; use solana_sdk::timing::duration_as_ms; +use std::io::Write; #[derive(Default)] struct BroadcastStats { @@ -51,7 +52,7 @@ impl StandardBroadcastRun { impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, - broadcast: &mut Broadcast, + _broadcast: &mut Broadcast, cluster_info: &Arc>, receiver: &Receiver, sock: &UdpSocket, @@ -68,73 +69,63 @@ impl BroadcastRun for StandardBroadcastRun { // 2) Convert entries to blobs + generate coding blobs let to_blobs_start = Instant::now(); let keypair = &cluster_info.read().unwrap().keypair.clone(); - let latest_blob_index = blocktree + let mut latest_blob_index = blocktree .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) .unwrap_or(0); - let parent_slot = bank.parent().unwrap().slot(); - let shredder = if let Some(slot) = broadcast.parent_slot { - if slot != parent_slot { - trace!("Renew shredder with parent slot {:?}", parent_slot); - broadcast.parent_slot = Some(parent_slot); - Shredder::new( - bank.slot(), - Some(parent_slot), - 0.0, - keypair, - latest_blob_index as u32, - ) - } else { - trace!("Renew shredder with same parent slot {:?}", parent_slot); - Shredder::new( - bank.slot(), - Some(parent_slot), - 0.0, - keypair, - latest_blob_index as u32, - ) - } + let parent_slot = if let Some(parent_bank) = bank.parent() { + parent_bank.slot() } else { - trace!("New shredder with parent slot {:?}", parent_slot); - broadcast.parent_slot = Some(parent_slot); - Shredder::new( - bank.slot(), - Some(parent_slot), - 0.0, - keypair, - latest_blob_index as u32, - ) + 0 }; - let mut shredder = shredder.expect("Expected to create a new shredder"); - - let ventries = receive_results + let mut all_shreds = vec![]; + let mut all_seeds = vec![]; + let num_ventries = receive_results.ventries.len(); + receive_results .ventries .into_iter() - .map(|entries_tuple| { + .enumerate() + .for_each(|(i, entries_tuple)| { let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip(); - entries - }) - .collect(); - broadcast_utils::entries_to_shreds( - ventries, - last_tick, - bank.max_tick_height(), - &mut shredder, - ); + //entries + let mut shredder = Shredder::new( + bank.slot(), + Some(parent_slot), + 1.0, + keypair, + latest_blob_index as u32, + ) + .expect("Expected to create a new shredder"); - let shreds: Vec = shredder - .shreds - .iter() - .map(|s| bincode::deserialize(s).unwrap()) - .collect(); + let data = bincode::serialize(&entries).unwrap(); + let mut offset = 0; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } - let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); - trace!("Inserting {:?} shreds in blocktree", shreds.len()); - blocktree - .insert_shreds(&shreds) - .expect("Failed to insert shreds in blocktree"); + if i == (num_ventries - 1) && last_tick == bank.max_tick_height() { + shredder.finalize_slot(); + } else { + shredder.finalize_fec_block(); + } + + let shreds: Vec = shredder + .shreds + .iter() + .map(|s| bincode::deserialize(s).unwrap()) + .collect(); + + let mut seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); + trace!("Inserting {:?} shreds in blocktree", shreds.len()); + blocktree + .insert_shreds(shreds) + .expect("Failed to insert shreds in blocktree"); + latest_blob_index = u64::from(shredder.index); + all_shreds.append(&mut shredder.shreds); + all_seeds.append(&mut seeds); + }); let to_blobs_elapsed = to_blobs_start.elapsed(); @@ -143,15 +134,15 @@ impl BroadcastRun for StandardBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - trace!("Broadcasting {:?} shreds", shredder.shreds.len()); + trace!("Broadcasting {:?} shreds", all_shreds.len()); cluster_info.read().unwrap().broadcast_shreds( sock, - &shredder.shreds, - &seeds, + &all_shreds, + &all_seeds, stakes.as_ref(), )?; - inc_new_counter_debug!("streamer-broadcast-sent", shredder.shreds.len()); + inc_new_counter_debug!("streamer-broadcast-sent", all_shreds.len()); let broadcast_elapsed = broadcast_start.elapsed(); self.update_broadcast_stats( diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 5bd3143f80..e06c4c0ce7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1916,7 +1916,7 @@ mod tests { shred.set_index(1); blocktree - .insert_shreds(&vec![shred]) + .insert_shreds(vec![shred]) .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request( @@ -1994,7 +1994,7 @@ mod tests { let (blobs, _) = make_many_slot_entries_using_shreds(1, 3, 5); blocktree - .insert_shreds(&blobs) + .insert_shreds(blobs) .expect("Expect successful ledger write"); // We don't have slot 4, so we don't know how to service this requeset diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 46b9b2b23f..1b8f6bef2c 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -627,7 +627,7 @@ mod tests { let num_shreds_per_slot = shreds.len() as u64 / num_slots; // Write slots in the range [0, num_slots] to blocktree - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=num_slots - 1).collect(); @@ -704,7 +704,7 @@ mod tests { // Create blobs for first two epochs and write them to blocktree let total_slots = slots_per_epoch * 2; let (shreds, _) = make_many_slot_entries_using_shreds(0, total_slots, 1); - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=slots_per_epoch * 2 - 1).collect(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 595b527bf5..966edd5617 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -488,7 +488,7 @@ mod test { missing_indexes_per_slot.insert(0, index); } } - blocktree.insert_shreds(&shreds_to_write).unwrap(); + blocktree.insert_shreds(shreds_to_write).unwrap(); let expected: Vec = (0..num_slots) .flat_map(|slot| { @@ -548,8 +548,9 @@ mod test { let num_entries_per_slot = 10; let shreds = make_chaining_slot_entries_using_shreds(&slots, num_entries_per_slot); - for (slot_shreds, _) in shreds.iter() { - blocktree.insert_shreds(&slot_shreds[1..]).unwrap(); + for (mut slot_shreds, _) in shreds.into_iter() { + slot_shreds.remove(0); + blocktree.insert_shreds(slot_shreds).unwrap(); } // Iterate through all possible combinations of start..end (inclusive on both diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index be75ee6688..cfacbde4f1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1018,7 +1018,7 @@ mod test { let last_blockhash = bank0.last_blockhash(); progress.insert(bank0.slot(), ForkProgress::new(last_blockhash)); let shreds = shred_to_insert(&last_blockhash, bank0.slot()); - blocktree.insert_shreds(&shreds).unwrap(); + blocktree.insert_shreds(shreds).unwrap(); let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 27fa2fab21..4109b167fa 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -873,7 +873,7 @@ impl Replicator { .iter() .filter_map(|p| bincode::deserialize(&p.data).ok()) .collect(); - blocktree.insert_shreds(&shreds)?; + blocktree.insert_shreds(shreds)?; } // check if all the slots in the segment are complete if Self::segment_complete(start_slot, slots_per_segment, blocktree) { diff --git a/core/src/shred.rs b/core/src/shred.rs index b4ca977624..99bdc9740c 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -294,7 +294,7 @@ impl ShredCommon for CodingShred { #[derive(Default, Debug)] pub struct Shredder { slot: u64, - index: u32, + pub index: u32, pub parent: Option, parent_slot: u64, fec_rate: f32, @@ -363,6 +363,12 @@ impl Write for Shredder { } } +#[derive(Default, Debug, PartialEq)] +pub struct RecoveryResult { + pub recovered_data: Vec, + pub recovered_code: Vec, +} + #[derive(Default, Debug, PartialEq)] pub struct DeshredResult { pub payload: Vec, @@ -555,9 +561,15 @@ impl Shredder { ) -> (Vec>, bool, usize) { let (index, mut first_shred_in_slot) = Self::get_shred_index(shred, num_data); + // The index of current shred must be within the range of shreds that are being + // recovered + if !(first_index..first_index + num_data + num_coding).contains(&index) { + return (vec![], false, index); + } + let mut missing_blocks: Vec> = (expected_index..index) .map(|missing| { - present[missing] = false; + present[missing.saturating_sub(first_index)] = false; // If index 0 shred is missing, then first shred in slot will also be recovered first_shred_in_slot |= missing == 0; Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) @@ -601,39 +613,26 @@ impl Shredder { bincode::serialize(&missing_shred).unwrap() } - /// Combines all shreds to recreate the original buffer - /// If the shreds include coding shreds, and if not all shreds are present, it tries - /// to reconstruct missing shreds using erasure - /// Note: The shreds are expected to be sorted - /// (lower to higher index, and data shreds before coding shreds) - pub fn deshred(shreds: &[Shred]) -> Result { - // If coding is enabled, the last shred must be a coding shred. - let (num_data, num_coding, first_index, slot) = - if let Shred::Coding(code) = shreds.last().unwrap() { - ( - code.header.num_data_shreds as usize, - code.header.num_coding_shreds as usize, - code.header.common_header.index as usize - code.header.position as usize, - code.header.common_header.slot, - ) - } else { - (shreds.len(), 0, 0, 0) - }; - + pub fn try_recovery( + shreds: &[Shred], + num_data: usize, + num_coding: usize, + first_index: usize, + slot: u64, + ) -> Result { let mut recovered_data = vec![]; let mut recovered_code = vec![]; let fec_set_size = num_data + num_coding; - let (data_shred_bufs, first_shred) = if num_coding > 0 && shreds.len() < fec_set_size { + if num_coding > 0 && shreds.len() < fec_set_size { let coding_block_offset = CodingShred::overhead(); // Let's try recovering missing shreds using erasure let mut present = &mut vec![true; fec_set_size]; - let mut first_shred_in_slot = false; let mut next_expected_index = first_index; let mut shred_bufs: Vec> = shreds .iter() .flat_map(|shred| { - let (blocks, first_shred, last_index) = Self::fill_in_missing_shreds( + let (blocks, _first_shred, last_index) = Self::fill_in_missing_shreds( shred, num_data, num_coding, @@ -642,21 +641,26 @@ impl Shredder { next_expected_index, &mut present, ); - first_shred_in_slot |= first_shred; next_expected_index = last_index + 1; blocks }) .collect(); + // Insert any other missing shreds after the last shred we have received in the + // current FEC block let mut pending_shreds: Vec> = (next_expected_index ..first_index + fec_set_size) .map(|missing| { - present[missing] = false; + present[missing.saturating_sub(first_index)] = false; Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) }) .collect(); shred_bufs.append(&mut pending_shreds); + if shred_bufs.len() != fec_set_size { + Err(reed_solomon_erasure::Error::TooFewShardsPresent)?; + } + let session = Session::new(num_data, num_coding).unwrap(); let mut blocks: Vec<&mut [u8]> = shred_bufs @@ -665,35 +669,64 @@ impl Shredder { .collect(); session.decode_blocks(&mut blocks, &present)?; - present.iter().enumerate().for_each(|(index, was_present)| { - if !was_present { - let shred: Shred = bincode::deserialize(&shred_bufs[index]).unwrap(); - if index < first_index + num_data { - // Check if the last recovered data shred is also last in Slot. - // If so, it needs to be morphed into the correct type - let shred = if let Shred::Data(s) = shred { - if s.header.last_in_slot == 1 { - Shred::LastInSlot(s) - } else { - Shred::Data(s) + present + .iter() + .enumerate() + .for_each(|(position, was_present)| { + if !was_present { + let shred: Shred = bincode::deserialize(&shred_bufs[position]).unwrap(); + let shred_index = shred.index() as usize; + // Valid shred must be in the same slot as the original shreds + if shred.slot() == slot { + // Data shreds are "positioned" at the start of the iterator. First num_data + // shreds are expected to be the data shreds. + if position < num_data + && (first_index..first_index + num_data).contains(&shred_index) + { + // Also, a valid data shred must be indexed between first_index and first+num_data index + + // Check if the last recovered data shred is also last in Slot. + // If so, it needs to be morphed into the correct type + let shred = if let Shred::Data(s) = shred { + if s.header.last_in_slot == 1 { + Shred::LastInSlot(s) + } else { + Shred::Data(s) + } + } else if let Shred::LastInFECSet(s) = shred { + if s.header.last_in_slot == 1 { + Shred::LastInSlot(s) + } else { + Shred::LastInFECSet(s) + } + } else { + shred + }; + recovered_data.push(shred) + } else if (first_index..first_index + num_coding).contains(&shred_index) + { + // A valid coding shred must be indexed between first_index and first+num_coding index + recovered_code.push(shred) } - } else if let Shred::LastInFECSet(s) = shred { - if s.header.last_in_slot == 1 { - Shred::LastInSlot(s) - } else { - Shred::LastInFECSet(s) - } - } else { - shred - }; - recovered_data.push(shred) - } else { - recovered_code.push(shred) + } } - } - }); - (shred_bufs, first_shred_in_slot) - } else { + }); + } + + Ok(RecoveryResult { + recovered_data, + recovered_code, + }) + } + + /// Combines all shreds to recreate the original buffer + /// If the shreds include coding shreds, and if not all shreds are present, it tries + /// to reconstruct missing shreds using erasure + /// Note: The shreds are expected to be sorted + /// (lower to higher index, and data shreds before coding shreds) + pub fn deshred(shreds: &[Shred]) -> Result, reed_solomon_erasure::Error> { + let num_data = shreds.len(); + let (data_shred_bufs, first_shred) = { let (first_index, first_shred_in_slot) = Shredder::get_shred_index(shreds.first().unwrap(), num_data); @@ -715,11 +748,11 @@ impl Shredder { (shred_bufs, first_shred_in_slot) }; - Ok(DeshredResult { - payload: Self::reassemble_payload(num_data, data_shred_bufs, first_shred), - recovered_data, - recovered_code, - }) + Ok(Self::reassemble_payload( + num_data, + data_shred_bufs, + first_shred, + )) } fn get_shred_index(shred: &Shred, num_data: usize) -> (usize, bool) { @@ -1085,20 +1118,34 @@ mod tests { // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( - Shredder::deshred(&shreds[..4]), - Err(reed_solomon_erasure::Error::TooFewDataShards) + Shredder::try_recovery( + &shreds[..4], + expected_shred_count / 2, + expected_shred_count / 2, + 0, + slot + ), + Err(reed_solomon_erasure::Error::TooFewShardsPresent) ); // Test1: Try recovery/reassembly with only data shreds. Hint: should work - let result = Shredder::deshred(&shreds[..5]).unwrap(); - assert_ne!(DeshredResult::default(), result); - assert!(result.payload.len() >= data.len()); + let result = Shredder::try_recovery( + &shreds[..5], + expected_shred_count / 2, + expected_shred_count / 2, + 0, + slot, + ) + .unwrap(); + assert_ne!(RecoveryResult::default(), result); assert!(result.recovered_data.is_empty()); - assert!(result.recovered_code.is_empty()); - assert_eq!(data[..], result.payload[..data.len()]); + assert!(!result.recovered_code.is_empty()); + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert!(result.len() >= data.len()); + assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let shreds: Vec = shredder + let mut shreds: Vec = shredder .shreds .iter() .enumerate() @@ -1111,20 +1158,30 @@ mod tests { }) .collect(); - let mut result = Shredder::deshred(&shreds).unwrap(); - assert!(result.payload.len() >= data.len()); + let mut result = Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 0, + slot, + ) + .unwrap(); + assert_ne!(RecoveryResult::default(), result); + assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::Data(_)); assert_eq!(recovered_shred.index(), 1); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(1, recovered_shred); let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::Data(_)); assert_eq!(recovered_shred.index(), 3); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(3, recovered_shred); assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing let recovered_shred = result.recovered_code.remove(0); @@ -1151,10 +1208,13 @@ mod tests { assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.common_header.index, 4); } - assert_eq!(data[..], result.payload[..data.len()]); + + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert!(result.len() >= data.len()); + assert_eq!(data[..], result[..data.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let shreds: Vec = shredder + let mut shreds: Vec = shredder .shreds .iter() .enumerate() @@ -1167,26 +1227,37 @@ mod tests { }) .collect(); - let mut result = Shredder::deshred(&shreds).unwrap(); - assert!(result.payload.len() >= data.len()); + let mut result = Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 0, + slot, + ) + .unwrap(); + assert_ne!(RecoveryResult::default(), result); + assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::FirstInSlot(_)); assert_eq!(recovered_shred.index(), 0); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(0, recovered_shred); let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::Data(_)); assert_eq!(recovered_shred.index(), 2); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(2, recovered_shred); let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::LastInFECSet(_)); assert_eq!(recovered_shred.index(), 4); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(4, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing let recovered_shred = result.recovered_code.remove(0); @@ -1205,7 +1276,10 @@ mod tests { assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.common_header.index, 3); } - assert_eq!(data[..], result.payload[..data.len()]); + + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert!(result.len() >= data.len()); + assert_eq!(data[..], result[..data.len()]); // Test4: Try recovery/reassembly full slot with 3 missing data shreds + 2 coding shreds. Hint: should work let mut shredder = @@ -1231,7 +1305,7 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shreds.len(), expected_shred_count); - let shreds: Vec = shredder + let mut shreds: Vec = shredder .shreds .iter() .enumerate() @@ -1244,26 +1318,37 @@ mod tests { }) .collect(); - let mut result = Shredder::deshred(&shreds).unwrap(); - assert!(result.payload.len() >= data.len()); + let mut result = Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 0, + slot, + ) + .unwrap(); + assert_ne!(RecoveryResult::default(), result); + assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::FirstInSlot(_)); assert_eq!(recovered_shred.index(), 0); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(0, recovered_shred); let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::Data(_)); assert_eq!(recovered_shred.index(), 2); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(2, recovered_shred); let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::LastInSlot(_)); assert_eq!(recovered_shred.index(), 4); assert_eq!(recovered_shred.slot(), slot); assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(4, recovered_shred); assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing let recovered_shred = result.recovered_code.remove(0); @@ -1282,7 +1367,10 @@ mod tests { assert_eq!(code.header.common_header.slot, slot); assert_eq!(code.header.common_header.index, 3); } - assert_eq!(data[..], result.payload[..data.len()]); + + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert!(result.len() >= data.len()); + assert_eq!(data[..], result[..data.len()]); // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail let shreds: Vec = shredder @@ -1301,6 +1389,132 @@ mod tests { assert_eq!(shreds.len(), 4); assert_matches!( Shredder::deshred(&shreds), + Err(reed_solomon_erasure::Error::TooFewDataShards) + ); + + // Test6: Try recovery/reassembly with non zero index full slot with 3 missing data shreds + 2 coding shreds. Hint: should work + let mut shredder = + Shredder::new(slot, Some(5), 1.0, &keypair, 25).expect("Failed in creating shredder"); + + let mut offset = shredder.write(&data).unwrap(); + let approx_shred_payload_size = offset; + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + offset += shredder.write(&data[offset..]).unwrap(); + + // We should have some shreds now + assert_eq!( + shredder.shreds.len(), + data.len() / approx_shred_payload_size + ); + assert_eq!(offset, data.len()); + + shredder.finalize_slot(); + + // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) + let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; + assert_eq!(shredder.shreds.len(), expected_shred_count); + + let mut shreds: Vec = shredder + .shreds + .iter() + .enumerate() + .filter_map(|(i, s)| { + if i % 2 != 0 { + Some(bincode::deserialize(s).unwrap()) + } else { + None + } + }) + .collect(); + + let mut result = Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 25, + slot, + ) + .unwrap(); + assert_ne!(RecoveryResult::default(), result); + + assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + let recovered_shred = result.recovered_data.remove(0); + assert_matches!(recovered_shred, Shred::Data(_)); + assert_eq!(recovered_shred.index(), 25); + assert_eq!(recovered_shred.slot(), slot); + assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(0, recovered_shred); + + let recovered_shred = result.recovered_data.remove(0); + assert_matches!(recovered_shred, Shred::Data(_)); + assert_eq!(recovered_shred.index(), 27); + assert_eq!(recovered_shred.slot(), slot); + assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(2, recovered_shred); + + let recovered_shred = result.recovered_data.remove(0); + assert_matches!(recovered_shred, Shred::LastInSlot(_)); + assert_eq!(recovered_shred.index(), 29); + assert_eq!(recovered_shred.slot(), slot); + assert!(recovered_shred.verify(&keypair.pubkey())); + shreds.insert(4, recovered_shred); + + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 1); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 26); + } + let recovered_shred = result.recovered_code.remove(0); + if let Shred::Coding(code) = recovered_shred { + assert_eq!(code.header.num_data_shreds, 5); + assert_eq!(code.header.num_coding_shreds, 5); + assert_eq!(code.header.position, 3); + assert_eq!(code.header.common_header.slot, slot); + assert_eq!(code.header.common_header.index, 28); + } + + let result = Shredder::deshred(&shreds[..5]).unwrap(); + assert!(result.len() >= data.len()); + assert_eq!(data[..], result[..data.len()]); + + // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds + let result = Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 25, + slot + 1, + ) + .unwrap(); + assert!(result.recovered_data.is_empty()); + + // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds + assert_matches!( + Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 15, + slot, + ), + Err(reed_solomon_erasure::Error::TooFewShardsPresent) + ); + + // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds + assert_matches!( + Shredder::try_recovery( + &shreds, + expected_shred_count / 2, + expected_shred_count / 2, + 35, + slot, + ), Err(reed_solomon_erasure::Error::TooFewShardsPresent) ); } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 6903b97130..8bd7d6407c 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -690,7 +690,6 @@ mod tests { } #[test] - #[ignore] fn test_storage_stage_process_banks() { solana_logger::setup(); let keypair = Arc::new(Keypair::new()); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 0efdfdf901..7b3009c283 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -23,7 +23,7 @@ use std::time::{Duration, Instant}; pub const NUM_THREADS: u32 = 10; /// Process a blob: Add blob to the ledger window. -pub fn process_shreds(shreds: &[Shred], blocktree: &Arc) -> Result<()> { +pub fn process_shreds(shreds: Vec, blocktree: &Arc) -> Result<()> { blocktree.insert_shreds(shreds) } @@ -112,7 +112,7 @@ where }?; } - blocktree.insert_shreds(&shreds)?; + blocktree.insert_shreds(shreds)?; trace!( "Elapsed processing time in recv_window(): {}", @@ -249,7 +249,6 @@ mod test { use super::*; use crate::bank_forks::BankForks; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; - use crate::broadcast_stage::broadcast_utils::entries_to_shreds; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry}; use crate::genesis_utils::create_genesis_block_with_leader; @@ -261,6 +260,7 @@ mod test { use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; + use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -270,7 +270,12 @@ mod test { fn local_entries_to_shred(entries: Vec, keypair: &Arc) -> Vec { let mut shredder = Shredder::new(0, Some(0), 0.0, keypair, 0).expect("Failed to create entry shredder"); - entries_to_shreds(vec![entries], 0, 0, &mut shredder); + let data = bincode::serialize(&entries).unwrap(); + let mut offset = 0; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + shredder.finalize_slot(); shredder .shreds .iter() @@ -287,7 +292,7 @@ mod test { let shreds = local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); for shred in shreds.into_iter().rev() { - process_shreds(&[shred], &blocktree).expect("Expect successful processing of blob"); + process_shreds(vec![shred], &blocktree).expect("Expect successful processing of blob"); } assert_eq!(