diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 3dddc12338..48813115c6 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -2,7 +2,12 @@ extern crate test; -use solana_core::shred::{Shredder, RECOMMENDED_FEC_RATE}; +use solana_core::entry::create_ticks; +use solana_core::shred::{ + max_ticks_per_shred, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER, +}; +use solana_sdk::hash::Hash; +use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; use test::Bencher; @@ -10,24 +15,29 @@ use test::Bencher; #[bench] fn bench_shredder(bencher: &mut Bencher) { let kp = Arc::new(Keypair::new()); - // 1Mb - let data = vec![0u8; 1000 * 1000]; + let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; + let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; + // ~1Mb + let num_ticks = max_ticks_per_shred() * num_shreds as u64; + let entries = create_ticks(num_ticks, Hash::default()); bencher.iter(|| { - let mut shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, &kp, 0).unwrap(); - bincode::serialize_into(&mut shredder, &data).unwrap(); + let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap(); + shredder.entries_to_shreds(&entries, true, 0); }) } #[bench] fn bench_deshredder(bencher: &mut Bencher) { let kp = Arc::new(Keypair::new()); - // 10MB - let data = vec![0u8; 10000 * 1000]; - let mut shredded = Shredder::new(1, 0, 0.0, &kp, 0).unwrap(); - let _ = bincode::serialize_into(&mut shredded, &data); - shredded.finalize_data(); + let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; + // ~10Mb + let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; + let num_ticks = max_ticks_per_shred() * num_shreds as u64; + let entries = create_ticks(num_ticks, Hash::default()); + let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp).unwrap(); + let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; bencher.iter(|| { - let raw = &mut Shredder::deshred(&shredded.shreds).unwrap(); + let raw = &mut Shredder::deshred(&data_shreds).unwrap(); assert_ne!(raw.len(), 0); }) } diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index c5e7a3c2f8..c8ef3c2365 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -169,7 +169,7 @@ mod test { None, true, &Arc::new(Keypair::new()), - &entries, + entries, ) .unwrap(); diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index b379a90524..db59cf15fc 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -18,7 +18,6 @@ use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::borrow::Borrow; use std::cell::RefCell; use std::cmp; use std::fs; @@ -806,21 +805,17 @@ impl Blocktree { self.code_shred_cf.get_bytes((slot, index)) } - pub fn write_entries( + pub fn write_entries( &self, start_slot: u64, num_ticks_in_start_slot: u64, - start_index: u64, + start_index: u32, ticks_per_slot: u64, parent: Option, is_full_slot: bool, keypair: &Arc, - entries: I, - ) -> Result - where - I: IntoIterator, - I::Item: Borrow, - { + entries: Vec, + ) -> Result { assert!(num_ticks_in_start_slot < ticks_per_slot); let mut remaining_ticks_in_slot = ticks_per_slot - num_ticks_in_start_slot; @@ -833,40 +828,45 @@ impl Blocktree { }, |v| v, ); - let mut shredder = - Shredder::new(current_slot, parent_slot, 0.0, keypair, start_index as u32) - .expect("Failed to create entry shredder"); + let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone()) + .expect("Failed to create entry shredder"); let mut all_shreds = vec![]; + let mut slot_entries = vec![]; // Find all the entries for start_slot - for entry in entries { + for entry in entries.into_iter() { if remaining_ticks_in_slot == 0 { current_slot += 1; parent_slot = current_slot - 1; remaining_ticks_in_slot = ticks_per_slot; - shredder.finalize_slot(); - all_shreds.append(&mut shredder.shreds); - shredder = - Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0) - .expect("Failed to create entry shredder"); + let mut current_entries = vec![]; + std::mem::swap(&mut slot_entries, &mut current_entries); + let start_index = { + if all_shreds.is_empty() { + start_index + } else { + 0 + } + }; + let (mut data_shreds, mut coding_shreds, _) = + shredder.entries_to_shreds(¤t_entries, true, start_index); + all_shreds.append(&mut data_shreds); + all_shreds.append(&mut coding_shreds); + shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone()) + .expect("Failed to create entry shredder"); } - if entry.borrow().is_tick() { + if entry.is_tick() { remaining_ticks_in_slot -= 1; } - - bincode::serialize_into(&mut shredder, &vec![entry.borrow().clone()]) - .expect("Expect to write all entries to shreds"); - if remaining_ticks_in_slot == 0 { - shredder.finalize_slot(); - } else { - shredder.finalize_data(); - } + slot_entries.push(entry); } - if is_full_slot && remaining_ticks_in_slot != 0 { - shredder.finalize_slot(); + if !slot_entries.is_empty() { + let (mut data_shreds, mut coding_shreds, _) = + shredder.entries_to_shreds(&slot_entries, is_full_slot, 0); + all_shreds.append(&mut data_shreds); + all_shreds.append(&mut coding_shreds); } - all_shreds.append(&mut shredder.shreds); let num_shreds = all_shreds.len(); self.insert_shreds(all_shreds, None)?; @@ -919,6 +919,7 @@ impl Blocktree { break; } let (current_slot, index) = db_iterator.key().expect("Expect a valid key"); + let current_index = { if current_slot > slot { end_index @@ -926,6 +927,7 @@ impl Blocktree { index } }; + let upper_index = cmp::min(current_index, end_index); for i in prev_index..upper_index { @@ -982,9 +984,9 @@ impl Blocktree { ) -> Result<(Vec, usize)> { // Find the next consecutive block of shreds. let mut serialized_shreds: Vec> = vec![]; - let data_cf = self.db.column::(); + let data_shred_cf = self.db.column::(); - while let Some(serialized_shred) = data_cf.get_bytes((slot, start_index))? { + while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, start_index))? { serialized_shreds.push(serialized_shred); start_index += 1; } @@ -994,6 +996,7 @@ impl Blocktree { serialized_shreds.len(), slot ); + let mut shreds: Vec = serialized_shreds .into_iter() .filter_map(|serialized_shred| Shred::new_from_serialized_shred(serialized_shred).ok()) @@ -1036,7 +1039,6 @@ impl Blocktree { } trace!("Found {:?} entries", all_entries.len()); - Ok((all_entries, num)) } @@ -1551,15 +1553,14 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re // Fill slot 0 with ticks that link back to the genesis_block to bootstrap the ledger. let blocktree = Blocktree::open(ledger_path)?; - let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.hash()); - let mut shredder = Shredder::new(0, 0, 0.0, &Arc::new(Keypair::new()), 0) - .expect("Failed to create entry shredder"); + let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.hash()); let last_hash = entries.last().unwrap().hash; - bincode::serialize_into(&mut shredder, &entries) - .expect("Expect to write all entries to shreds"); - shredder.finalize_slot(); - let shreds: Vec = shredder.shreds.drain(..).collect(); + + let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new())) + .expect("Failed to create entry shredder"); + let shreds = shredder.entries_to_shreds(&entries, true, 0).0; + assert!(shreds.last().unwrap().last_in_slot()); blocktree.insert_shreds(shreds, None)?; blocktree.set_roots(&[0])?; @@ -1641,24 +1642,18 @@ pub fn entries_to_test_shreds( parent_slot: u64, is_full_slot: bool, ) -> Vec { - let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32) + let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new())) .expect("Failed to create entry shredder"); - bincode::serialize_into(&mut shredder, &entries) - .expect("Expect to write all entries to shreds"); - if is_full_slot { - shredder.finalize_slot(); - } else { - shredder.finalize_data(); - } - - shredder.shreds.drain(..).collect() + shredder.entries_to_shreds(&entries, is_full_slot, 0).0 } #[cfg(test)] pub mod tests { use super::*; use crate::entry::{create_ticks, Entry}; + use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use crate::shred::max_ticks_per_shred; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -1667,6 +1662,54 @@ pub mod tests { use std::iter::FromIterator; use std::time::Duration; + #[test] + fn test_create_new_ledger() { + let mint_total = 1_000_000_000_000; + let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(mint_total); + let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); + let ledger = Blocktree::open(&ledger_path).unwrap(); + + let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash()); + let entries = ledger.get_slot_entries(0, 0, None).unwrap(); + + assert_eq!(ticks, entries); + + // Destroying database without closing it first is undefined behavior + drop(ledger); + Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_insert_get_bytes() { + // Create enough entries to ensure there are at least two shreds created + let num_entries = max_ticks_per_shred() + 1; + assert!(num_entries > 1); + + let (mut shreds, _) = make_slot_entries(0, 0, num_entries); + + let ledger_path = get_tmp_ledger_path("test_insert_data_shreds_basic"); + let ledger = Blocktree::open(&ledger_path).unwrap(); + + // Insert last shred, test we can retrieve it + let last_shred = shreds.pop().unwrap(); + assert!(last_shred.index() > 0); + ledger + .insert_shreds(vec![last_shred.clone()], None) + .unwrap(); + + let serialized_shred = ledger + .data_shred_cf + .get_bytes((0, last_shred.index() as u64)) + .unwrap() + .unwrap(); + let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap(); + + assert_eq!(last_shred, deserialized_shred); + // Destroying database without closing it first is undefined behavior + drop(ledger); + Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] fn test_write_entries() { solana_logger::setup(); @@ -1877,7 +1920,8 @@ pub mod tests { #[test] fn test_insert_data_shreds_basic() { - let num_entries = 5; + // Create enough entries to ensure there are at least two shreds created + let num_entries = max_ticks_per_shred() + 1; assert!(num_entries > 1); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); @@ -1888,6 +1932,7 @@ pub mod tests { // Insert last shred, we're missing the other shreds, so no consecutive // shreds starting from slot 0, index 0 should exist. + assert!(shreds.len() > 1); let last_shred = shreds.pop().unwrap(); ledger.insert_shreds(vec![last_shred], None).unwrap(); assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); @@ -2098,21 +2143,28 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path("test_insert_data_shreds_consecutive"); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); + // Create enough entries to ensure there are at least two shreds created + let min_entries = max_ticks_per_shred() + 1; for i in 0..4 { let slot = i; let parent_slot = if i == 0 { 0 } else { i - 1 }; // Write entries - let num_entries = 21 as u64 * (i + 1); - let (mut shreds, original_entries) = - make_slot_entries(slot, parent_slot, num_entries); + let num_entries = min_entries * (i + 1); + let (shreds, original_entries) = make_slot_entries(slot, parent_slot, num_entries); let num_shreds = shreds.len() as u64; + assert!(num_shreds > 1); + let mut even_shreds = vec![]; let mut odd_shreds = vec![]; - for i in (0..num_shreds).rev() { - if i % 2 != 0 { - odd_shreds.insert(0, shreds.remove(i as usize)); + + for (i, shred) in shreds.into_iter().enumerate() { + if i % 2 == 0 { + even_shreds.push(shred); + } else { + odd_shreds.push(shred); } } + blocktree.insert_shreds(odd_shreds, None).unwrap(); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); @@ -2121,7 +2173,7 @@ pub mod tests { if num_shreds % 2 == 0 { assert_eq!(meta.received, num_shreds); } else { - debug!("got here"); + trace!("got here"); assert_eq!(meta.received, num_shreds - 1); } assert_eq!(meta.consumed, 0); @@ -2131,7 +2183,7 @@ pub mod tests { assert_eq!(meta.last_index, std::u64::MAX); } - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(even_shreds, None).unwrap(); assert_eq!( blocktree.get_slot_entries(slot, 0, None).unwrap(), @@ -2504,11 +2556,13 @@ pub mod tests { { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let num_slots = 15; - let entries_per_slot = 5; + // Create enough entries to ensure there are at least two shreds created + let entries_per_slot = max_ticks_per_shred() + 1; assert!(entries_per_slot > 1); let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); let shreds_per_slot = shreds.len() / num_slots as usize; + assert!(shreds_per_slot > 1); // Write the shreds such that every 3rd slot has a gap in the beginning let mut missing_shreds = vec![]; @@ -2852,13 +2906,15 @@ pub mod tests { // Write entries let gap: u64 = 10; assert!(gap > 3); - let num_entries = 10; + // Create enough entries to ensure there are at least two shreds created + let num_entries = max_ticks_per_shred() + 1; let entries = create_ticks(num_entries, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); let num_shreds = shreds.len(); - for (i, b) in shreds.iter_mut().enumerate() { - b.set_index(i as u32 * gap as u32); - b.set_slot(slot); + assert!(num_shreds > 1); + for (i, s) in shreds.iter_mut().enumerate() { + s.set_index(i as u32 * gap as u32); + s.set_slot(slot); } blocktree.insert_shreds(shreds, None).unwrap(); @@ -2892,7 +2948,8 @@ pub mod tests { vec![1], ); - // Test with end indexes that are greater than the last item in the ledger + // Test with a range that encompasses a shred with index == gap which was + // already inserted. let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( @@ -2943,8 +3000,9 @@ pub mod tests { assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); - let entries = create_ticks(20, Hash::default()); + let entries = create_ticks(100, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); + assert!(shreds.len() > 2); shreds.drain(2..); const ONE: u64 = 1; diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 4771ca7c04..69d05d26d5 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -456,7 +456,7 @@ pub mod tests { Some(parent_slot), true, &Arc::new(Keypair::new()), - &entries, + entries, ) .unwrap(); @@ -849,7 +849,7 @@ pub mod tests { // Fill up the rest of slot 1 with ticks entries.extend(create_ticks(genesis_block.ticks_per_slot, last_entry_hash)); - + let last_blockhash = entries.last().unwrap().hash; let blocktree = Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger"); blocktree @@ -861,7 +861,7 @@ pub mod tests { None, true, &Arc::new(Keypair::new()), - &entries, + entries, ) .unwrap(); let (bank_forks, bank_forks_info, _) = @@ -877,7 +877,7 @@ pub mod tests { mint - deducted_from_mint ); assert_eq!(bank.tick_height(), 2 * genesis_block.ticks_per_slot - 1); - assert_eq!(bank.last_blockhash(), entries.last().unwrap().hash); + assert_eq!(bank.last_blockhash(), last_blockhash); } #[test] diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 3cefb53112..0b4d865fe1 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -1,5 +1,6 @@ use super::*; use crate::entry::Entry; +use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::hash::Hash; pub(super) struct BroadcastFakeBlobsRun { @@ -30,22 +31,26 @@ impl BroadcastRun for BroadcastFakeBlobsRun { let last_tick = receive_results.last_tick; let keypair = &cluster_info.read().unwrap().keypair.clone(); - let latest_blob_index = blocktree + let next_shred_index = blocktree .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) - .unwrap_or(0); + .unwrap_or(0) as u32; let num_entries = receive_results.entries.len(); - let (shred_bufs, _) = broadcast_utils::entries_to_shreds( - receive_results.entries, + + let shredder = Shredder::new( bank.slot(), - receive_results.last_tick, - bank.max_tick_height(), - keypair, - latest_blob_index, bank.parent().unwrap().slot(), - None, + RECOMMENDED_FEC_RATE, + keypair.clone(), + ) + .expect("Expected to create a new shredder"); + + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + &receive_results.entries, + last_tick == bank.max_tick_height(), + next_shred_index, ); // If the last blockhash is default, a new block is being created @@ -58,15 +63,10 @@ impl BroadcastRun for BroadcastFakeBlobsRun { .map(|_| Entry::new(&self.last_blockhash, 0, vec![])) .collect(); - let (fake_shred_bufs, _) = broadcast_utils::entries_to_shreds( - fake_entries, - receive_results.last_tick, - bank.slot(), - bank.max_tick_height(), - keypair, - latest_blob_index, - bank.parent().unwrap().slot(), - None, + let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds( + &fake_entries, + last_tick == bank.max_tick_height(), + next_shred_index, ); // If it's the last tick, reset the last block hash to default @@ -75,19 +75,27 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = Hash::default(); } - blocktree.insert_shreds(shred_bufs.clone(), None)?; + blocktree.insert_shreds(data_shreds.clone(), None)?; + blocktree.insert_shreds(coding_shreds.clone(), None)?; + // 3) Start broadcast step let peers = cluster_info.read().unwrap().tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { if i <= self.partition { // Send fake blobs to the first N peers - fake_shred_bufs.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); + fake_data_shreds + .iter() + .chain(fake_coding_shreds.iter()) + .for_each(|b| { + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + }); } else { - shred_bufs.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); + data_shreds + .iter() + .chain(coding_shreds.iter()) + .for_each(|b| { + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + }); } }); diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 2c4433b0cb..0334f07efd 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,9 +1,7 @@ use crate::entry::Entry; use crate::poh_recorder::WorkingBankEntry; use crate::result::Result; -use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE}; use solana_runtime::bank::Bank; -use solana_sdk::signature::Keypair; use std::sync::mpsc::Receiver; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -71,79 +69,6 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result }) } -pub(super) fn entries_to_shreds( - entries: Vec, - last_tick: u64, - slot: u64, - bank_max_tick: u64, - keypair: &Arc, - latest_shred_index: u64, - parent_slot: u64, - last_unfinished_slot: Option, -) -> (Vec, Option) { - let mut shreds = if let Some(unfinished_slot) = last_unfinished_slot { - if unfinished_slot.slot != slot { - let mut shredder = Shredder::new( - unfinished_slot.slot, - unfinished_slot.parent, - RECOMMENDED_FEC_RATE, - keypair, - unfinished_slot.next_index as u32, - ) - .expect("Expected to create a new shredder"); - shredder.finalize_slot(); - shredder.shreds.drain(..).collect() - } else { - vec![] - } - } else { - vec![] - }; - - let mut shredder = Shredder::new( - slot, - parent_slot, - RECOMMENDED_FEC_RATE, - keypair, - latest_shred_index as u32, - ) - .expect("Expected to create a new shredder"); - - let now = Instant::now(); - bincode::serialize_into(&mut shredder, &entries) - .expect("Expect to write all entries to shreds"); - let elapsed = now.elapsed().as_millis(); - - let unfinished_slot = if last_tick == bank_max_tick { - shredder.finalize_slot(); - None - } else { - shredder.finalize_data(); - Some(UnfinishedSlotInfo { - next_index: u64::from(shredder.index), - slot, - parent: parent_slot, - }) - }; - - let num_shreds = shredder.shreds.len(); - shreds.append(&mut shredder.shreds); - - datapoint_info!( - "shredding-stats", - ("slot", slot as i64, i64), - ("num_shreds", num_shreds as i64, i64), - ("signing_coding", shredder.signing_coding_time as i64, i64), - ( - "copying_serializing", - (elapsed - shredder.signing_coding_time) as i64, - i64 - ), - ); - - (shreds, unfinished_slot) -} - #[cfg(test)] mod tests { use super::*; diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 6cf88319a7..ca5978eb38 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,4 +1,5 @@ use super::*; +use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::hash::Hash; pub(super) struct FailEntryVerificationBroadcastRun {} @@ -29,38 +30,52 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { last_entry.hash = Hash::default(); } - let keypair = &cluster_info.read().unwrap().keypair.clone(); - let latest_blob_index = blocktree + let keypair = cluster_info.read().unwrap().keypair.clone(); + let next_shred_index = blocktree .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) - .unwrap_or(0); + .unwrap_or(0) as u32; - let (shred_infos, _) = broadcast_utils::entries_to_shreds( - receive_results.entries, - last_tick, + let shredder = Shredder::new( bank.slot(), - bank.max_tick_height(), - keypair, - latest_blob_index, bank.parent().unwrap().slot(), - None, + RECOMMENDED_FEC_RATE, + keypair.clone(), + ) + .expect("Expected to create a new shredder"); + + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + &receive_results.entries, + last_tick == bank.max_tick_height(), + next_shred_index, ); - let seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); - - blocktree.insert_shreds(shred_infos.clone(), None)?; + let all_shreds = data_shreds + .iter() + .cloned() + .chain(coding_shreds.iter().cloned()) + .collect::>(); + let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); + blocktree + .insert_shreds(all_shreds, None) + .expect("Failed to insert shreds in blocktree"); // 3) Start broadcast step let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.payload).collect(); - // Broadcast data + erasures + let all_shred_bufs: Vec> = data_shreds + .into_iter() + .chain(coding_shreds.into_iter()) + .map(|s| s.payload) + .collect(); + + // Broadcast data cluster_info.read().unwrap().broadcast_shreds( sock, - &shred_bufs, - &seeds, + &all_shred_bufs, + &all_seeds, stakes.as_ref(), )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 03e9c3c441..4724fbe3d1 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,6 +1,6 @@ use super::broadcast_utils; use super::*; -use crate::broadcast_stage::broadcast_utils::{entries_to_shreds, UnfinishedSlotInfo}; +use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::timing::duration_as_ms; #[derive(Default)] @@ -12,7 +12,6 @@ struct BroadcastStats { pub(super) struct StandardBroadcastRun { stats: BroadcastStats, - unfinished_slot: Option, current_slot: Option, shredding_elapsed: u128, insertion_elapsed: u128, @@ -24,7 +23,6 @@ impl StandardBroadcastRun { pub(super) fn new() -> Self { Self { stats: BroadcastStats::default(), - unfinished_slot: None, current_slot: None, shredding_elapsed: 0, insertion_elapsed: 0, @@ -42,7 +40,7 @@ impl StandardBroadcastRun { run_elapsed: u64, num_entries: usize, num_shreds: usize, - blob_index: u64, + shred_index: u32, ) { inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); @@ -67,7 +65,7 @@ impl StandardBroadcastRun { ("shredding_time", shredding_elapsed as i64, i64), ("insert_shred_time", insert_shreds_elapsed as i64, i64), ("broadcast_time", broadcast_elapsed as i64, i64), - ("transmit-index", blob_index as i64, i64), + ("transmit-index", i64::from(shred_index), i64), ); } } @@ -95,11 +93,11 @@ impl BroadcastRun for StandardBroadcastRun { // 2) Convert entries to blobs + generate coding blobs let keypair = &cluster_info.read().unwrap().keypair.clone(); - let latest_shred_index = blocktree + let next_shred_index = blocktree .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) - .unwrap_or(0); + .unwrap_or(0) as u32; let parent_slot = if let Some(parent_bank) = bank.parent() { parent_bank.slot() @@ -107,25 +105,35 @@ impl BroadcastRun for StandardBroadcastRun { 0 }; + // Create shreds from entries let to_shreds_start = Instant::now(); - let (shred_infos, uninished_slot) = entries_to_shreds( - receive_results.entries, - last_tick, + let shredder = Shredder::new( bank.slot(), - bank.max_tick_height(), - keypair, - latest_shred_index, parent_slot, - self.unfinished_slot, + RECOMMENDED_FEC_RATE, + keypair.clone(), + ) + .expect("Expected to create a new shredder"); + + let (data_shreds, coding_shreds, latest_shred_index) = shredder.entries_to_shreds( + &receive_results.entries, + last_tick == bank.max_tick_height(), + next_shred_index, ); let to_shreds_elapsed = to_shreds_start.elapsed(); - self.unfinished_slot = uninished_slot; - let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); - let num_shreds = shred_infos.len(); + let all_shreds = data_shreds + .iter() + .cloned() + .chain(coding_shreds.iter().cloned()) + .collect::>(); + let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); + let num_shreds = all_shreds.len(); + + // Insert shreds into blocktree let insert_shreds_start = Instant::now(); blocktree - .insert_shreds(shred_infos.clone(), None) + .insert_shreds(all_shreds, None) .expect("Failed to insert shreds in blocktree"); let insert_shreds_elapsed = insert_shreds_start.elapsed(); @@ -134,7 +142,11 @@ impl BroadcastRun for StandardBroadcastRun { let bank_epoch = bank.get_stakers_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let all_shred_bufs: Vec> = shred_infos.into_iter().map(|s| s.payload).collect(); + let all_shred_bufs: Vec> = data_shreds + .into_iter() + .chain(coding_shreds.into_iter()) + .map(|s| s.payload) + .collect(); trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); cluster_info.read().unwrap().broadcast_shreds( @@ -145,13 +157,6 @@ impl BroadcastRun for StandardBroadcastRun { )?; let broadcast_elapsed = broadcast_start.elapsed(); - let latest_shred_index = uninished_slot.map(|s| s.next_index).unwrap_or_else(|| { - blocktree - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0) - }); self.insertion_elapsed += insert_shreds_elapsed.as_millis(); self.shredding_elapsed += to_shreds_elapsed.as_millis(); @@ -164,7 +169,7 @@ impl BroadcastRun for StandardBroadcastRun { ("shredding_time", self.shredding_elapsed as i64, i64), ("insertion_time", self.insertion_elapsed as i64, i64), ("broadcast_time", self.broadcast_elapsed as i64, i64), - ("num_shreds", latest_shred_index as i64, i64), + ("num_shreds", i64::from(latest_shred_index), i64), ( "slot_broadcast_time", self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64, @@ -186,7 +191,7 @@ impl BroadcastRun for StandardBroadcastRun { ), num_entries, num_shreds, - latest_shred_index, + next_shred_index, ); Ok(()) diff --git a/core/src/chacha.rs b/core/src/chacha.rs index c668d64104..c3147a7485 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -136,7 +136,7 @@ mod tests { None, true, &Arc::new(keypair), - &entries, + entries, ) .unwrap(); @@ -153,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "CLGvEayebjdgnLdttFAweZE9rqVkehXqEStUifG9kiU9" + let golden: Hash = "CGL4L6Q2QwiZQDCMwzshqj3S9riroUQuDjx8bS7ra2PU" .parse() .unwrap(); diff --git a/core/src/chacha_cuda.rs b/core/src/chacha_cuda.rs index aeac34c4de..50c7a348ba 100644 --- a/core/src/chacha_cuda.rs +++ b/core/src/chacha_cuda.rs @@ -146,7 +146,7 @@ mod tests { Some(0), true, &Arc::new(Keypair::new()), - &entries, + entries, ) .unwrap(); @@ -193,10 +193,10 @@ mod tests { return; } - let entries = create_ticks(32, Hash::default()); let ledger_dir = "test_encrypt_file_many_keys_multiple"; let ledger_path = get_tmp_ledger_path(ledger_dir); - let ticks_per_slot = 16; + let ticks_per_slot = 90; + let entries = create_ticks(2 * ticks_per_slot, Hash::default()); let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); blocktree .write_entries( @@ -207,7 +207,7 @@ mod tests { Some(0), true, &Arc::new(Keypair::new()), - &entries, + entries, ) .unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index dacad60084..875546e0d1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1775,9 +1775,9 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::repair_service::RepairType; use crate::result::Error; + use crate::shred::max_ticks_per_shred; use crate::shred::{DataShredHeader, Shred}; use crate::test_tx::test_tx; - use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; @@ -1980,7 +1980,7 @@ mod tests { let _ = fill_blocktree_slot_with_ticks( &blocktree, - DEFAULT_TICKS_PER_SLOT, + max_ticks_per_shred() + 1, 2, 1, Hash::default(), diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index cecfde4440..3e4b551a97 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -404,6 +404,7 @@ mod test { }; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::Node; + use crate::shred::max_ticks_per_shred; use itertools::Itertools; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; @@ -535,7 +536,7 @@ mod test { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let slots: Vec = vec![1, 3, 5, 7, 8]; - let num_entries_per_slot = 10; + let num_entries_per_slot = max_ticks_per_shred() + 1; let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { diff --git a/core/src/shred.rs b/core/src/shred.rs index 9f1d4e8e0b..8a9c63b604 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -1,26 +1,30 @@ //! The `shred` module defines data structures and methods to pull MTU sized data frames from the network. +use crate::entry::create_ticks; +use crate::entry::Entry; use crate::erasure::Session; use crate::result; use crate::result::Error; use bincode::serialized_size; use core::cell::RefCell; use lazy_static::lazy_static; -use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; +use rayon::slice::ParallelSlice; use rayon::ThreadPool; use serde::{Deserialize, Serialize}; use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::hash::Hash; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use std::io; -use std::io::{Error as IOError, ErrorKind, Write}; +use std::io::{Error as IOError, ErrorKind}; use std::sync::Arc; use std::time::Instant; lazy_static! { - static ref SIZE_OF_CODING_SHRED_HEADER: usize = + pub static ref SIZE_OF_CODING_SHRED_HEADER: usize = { serialized_size(&CodingShredHeader::default()).unwrap() as usize }; - static ref SIZE_OF_DATA_SHRED_HEADER: usize = + pub static ref SIZE_OF_DATA_SHRED_HEADER: usize = { serialized_size(&DataShredHeader::default()).unwrap() as usize }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; @@ -38,7 +42,7 @@ pub const CODING_SHRED: u8 = 0b0101_1010; /// This limit comes from reed solomon library, but unfortunately they don't have /// a public constant defined for it. -const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; +pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; /// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds pub const RECOMMENDED_FEC_RATE: f32 = 0.25; @@ -113,6 +117,30 @@ impl Shred { } } + pub fn new_from_data( + slot: u64, + index: u32, + parent_offset: u16, + data: Option<&[u8]>, + flags: u8, + ) -> Self { + let mut shred_buf = vec![0; PACKET_DATA_SIZE]; + let mut header = DataShredHeader::default(); + header.data_header.slot = slot; + header.data_header.index = index; + header.parent_offset = parent_offset; + header.flags = flags; + + if let Some(data) = data { + bincode::serialize_into(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER], &header) + .expect("Failed to write header into shred buffer"); + shred_buf[*SIZE_OF_DATA_SHRED_HEADER..*SIZE_OF_DATA_SHRED_HEADER + data.len()] + .clone_from_slice(data); + } + + Self::new(header, shred_buf) + } + pub fn new_from_serialized_shred(shred_buf: Vec) -> result::Result { let shred_type: u8 = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; let header = if shred_type == CODING_SHRED { @@ -124,6 +152,7 @@ impl Shred { let end = *SIZE_OF_DATA_SHRED_HEADER; bincode::deserialize(&shred_buf[..end])? }; + Ok(Self::new(header, shred_buf)) } @@ -141,7 +170,7 @@ impl Shred { Shred { headers, payload } } - fn header(&self) -> &ShredCommonHeader { + pub fn header(&self) -> &ShredCommonHeader { if self.is_data() { &self.headers.data_header } else { @@ -252,53 +281,18 @@ impl Shred { #[derive(Debug)] pub struct Shredder { slot: u64, - pub index: u32, - fec_set_index: u32, - parent_offset: u16, + parent_slot: u64, fec_rate: f32, - signer: Arc, - pub shreds: Vec, - fec_set_shred_start: usize, - active_shred: Vec, - active_shred_header: DataShredHeader, - active_offset: usize, + keypair: Arc, pub signing_coding_time: u128, } -impl Write for Shredder { - fn write(&mut self, buf: &[u8]) -> io::Result { - let offset = self.active_offset + *SIZE_OF_DATA_SHRED_HEADER; - let slice_len = std::cmp::min(buf.len(), PACKET_DATA_SIZE - offset); - self.active_shred[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); - let capacity = PACKET_DATA_SIZE - offset - slice_len; - - if buf.len() > slice_len || capacity == 0 { - self.finalize_data_shred(); - } else { - self.active_offset += slice_len; - } - - if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK { - let now = Instant::now(); - self.sign_unsigned_shreds_and_generate_codes(); - self.signing_coding_time += now.elapsed().as_millis(); - } - - Ok(slice_len) - } - - fn flush(&mut self) -> io::Result<()> { - unimplemented!() - } -} - impl Shredder { pub fn new( slot: u64, - parent: u64, + parent_slot: u64, fec_rate: f32, - signer: &Arc, - index: u32, + keypair: Arc, ) -> result::Result { if fec_rate > 1.0 || fec_rate < 0.0 { Err(Error::IO(IOError::new( @@ -308,37 +302,116 @@ impl Shredder { fec_rate ), ))) - } else if slot < parent || slot - parent > u64::from(std::u16::MAX) { + } else if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { Err(Error::IO(IOError::new( ErrorKind::Other, format!( "Current slot {:?} must be > Parent slot {:?}, but the difference must not be > {:?}", - slot, parent, std::u16::MAX + slot, parent_slot, std::u16::MAX ), ))) } else { - let mut header = DataShredHeader::default(); - header.data_header.slot = slot; - header.data_header.index = index; - header.parent_offset = (slot - parent) as u16; - let active_shred = vec![0; PACKET_DATA_SIZE]; Ok(Shredder { slot, - index, - fec_set_index: index, - parent_offset: (slot - parent) as u16, + parent_slot, fec_rate, - signer: signer.clone(), - shreds: vec![], - fec_set_shred_start: 0, - active_shred, - active_shred_header: header, - active_offset: 0, + keypair, signing_coding_time: 0, }) } } + pub fn entries_to_shreds( + &self, + entries: &[Entry], + is_last_in_slot: bool, + next_shred_index: u32, + ) -> (Vec, Vec, u32) { + let now = Instant::now(); + let serialized_shreds = + bincode::serialize(entries).expect("Expect to serialize all entries"); + + let no_header_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; + let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; + let last_shred_index = next_shred_index + num_shreds as u32 - 1; + + // 1) Generate data shreds + let data_shreds: Vec = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + serialized_shreds + .par_chunks(no_header_size) + .enumerate() + .map(|(i, shred_data)| { + let shred_index = next_shred_index + i as u32; + + let mut header: u8 = 0; + if shred_index == last_shred_index { + header |= DATA_COMPLETE_SHRED; + if is_last_in_slot { + header |= LAST_SHRED_IN_SLOT; + } + } + + let mut shred = Shred::new_from_data( + self.slot, + shred_index, + (self.slot - self.parent_slot) as u16, + Some(shred_data), + header, + ); + + Shredder::sign_shred( + &self.keypair, + &mut shred, + *SIZE_OF_CODING_SHRED_HEADER, + ); + shred + }) + .collect() + }) + }); + + // 2) Generate coding shreds + let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + data_shreds + .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) + .flat_map(|shred_data_batch| { + Shredder::generate_coding_shreds(self.slot, self.fec_rate, shred_data_batch) + }) + .collect() + }) + }); + + // 3) Sign coding shreds + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + coding_shreds.par_iter_mut().for_each(|mut coding_shred| { + Shredder::sign_shred(&self.keypair, &mut coding_shred, *SIZE_OF_SHRED_TYPE); + }) + }) + }); + + // TODO: pre-allocate this + let elapsed = now.elapsed().as_millis(); + + datapoint_info!( + "shredding-stats", + ("slot", self.slot as i64, i64), + ("num_data_shreds", data_shreds.len() as i64, i64), + ("num_coding_shreds", coding_shreds.len() as i64, i64), + // TODO: update signing_coding_time + ("signing_coding", self.signing_coding_time as i64, i64), + ( + "copying_serialzing", + (elapsed - self.signing_coding_time) as i64, + i64 + ), + ); + + (data_shreds, coding_shreds, last_shred_index + 1) + } + pub fn sign_shred(signer: &Arc, shred_info: &mut Shred, signature_offset: usize) { let data_offset = signature_offset + *SIZE_OF_SIGNATURE; let signature = signer.sign_message(&shred_info.payload[data_offset..]); @@ -349,58 +422,6 @@ impl Shredder { shred_info.header_mut().signature = signature; } - fn sign_unsigned_shreds_and_generate_codes(&mut self) { - let signature_offset = *SIZE_OF_CODING_SHRED_HEADER; - let signer = self.signer.clone(); - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - self.shreds[self.fec_set_shred_start..] - .par_iter_mut() - .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); - }) - }); - let unsigned_coding_shred_start = self.shreds.len(); - - if self.fec_rate > 0.0 { - self.generate_coding_shreds(); - let signature_offset = *SIZE_OF_SHRED_TYPE; - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - self.shreds[unsigned_coding_shred_start..] - .par_iter_mut() - .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); - }) - }); - } else { - self.fec_set_index = self.index; - } - self.fec_set_shred_start = self.shreds.len(); - } - - /// Finalize a data shred. Update the shred index for the next shred - fn finalize_data_shred(&mut self) { - self.active_offset = 0; - self.index += 1; - - // Swap header - let mut header = DataShredHeader::default(); - header.data_header.slot = self.slot; - header.data_header.index = self.index; - header.parent_offset = self.parent_offset; - std::mem::swap(&mut header, &mut self.active_shred_header); - - // Swap shred buffer - let mut shred_buf = vec![0; PACKET_DATA_SIZE]; - std::mem::swap(&mut shred_buf, &mut self.active_shred); - - let mut wr = io::Cursor::new(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER]); - bincode::serialize_into(&mut wr, &header) - .expect("Failed to write header into shred buffer"); - - let shred = Shred::new(header, shred_buf); - self.shreds.push(shred); - } - pub fn new_coding_shred_header( slot: u64, index: u32, @@ -419,18 +440,23 @@ impl Shredder { } /// Generates coding shreds for the data shreds in the current FEC set - fn generate_coding_shreds(&mut self) { - if self.fec_rate != 0.0 { - let num_data = (self.index - self.fec_set_index) as usize; + pub fn generate_coding_shreds( + slot: u64, + fec_rate: f32, + data_shred_batch: &[Shred], + ) -> Vec { + assert!(!data_shred_batch.is_empty()); + if fec_rate != 0.0 { + let num_data = data_shred_batch.len(); // always generate at least 1 coding shred even if the fec_rate doesn't allow it - let num_coding = 1.max((self.fec_rate * num_data as f32) as usize); + let num_coding = Self::calculate_num_coding_shreds(num_data as f32, fec_rate); let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); - let start_index = self.index - num_data as u32; + let start_index = data_shred_batch[0].header().index; // All information after coding shred field in a data shred is encoded let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; - let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..] + let data_ptrs: Vec<_> = data_shred_batch .iter() .map(|data| &data.payload[coding_block_offset..]) .collect(); @@ -439,7 +465,7 @@ impl Shredder { let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { let header = Self::new_coding_shred_header( - self.slot, + slot, start_index + i as u32, num_data, num_coding, @@ -461,43 +487,27 @@ impl Shredder { .expect("Failed in erasure encode"); // append to the shred list - coding_shreds.into_iter().enumerate().for_each(|(i, code)| { - let header = Self::new_coding_shred_header( - self.slot, - start_index + i as u32, - num_data, - num_coding, - i, - ); - self.shreds.push(Shred::new(header, code)); - }); - self.fec_set_index = self.index; + coding_shreds + .into_iter() + .enumerate() + .map(|(i, code)| { + let header = Self::new_coding_shred_header( + slot, + start_index + i as u32, + num_data, + num_coding, + i, + ); + Shred::new(header, code) + }) + .collect() + } else { + vec![] } } - /// Create the final data shred for the current FEC set or slot - /// If there's an active data shred, morph it into the final shred - /// If the current active data shred is first in slot, finalize it and create a new shred - fn make_final_data_shred(&mut self, last_in_slot: u8) { - if self.active_shred_header.data_header.index == 0 { - self.finalize_data_shred(); - } - self.active_shred_header.flags |= DATA_COMPLETE_SHRED; - if last_in_slot == LAST_SHRED_IN_SLOT { - self.active_shred_header.flags |= LAST_SHRED_IN_SLOT; - } - self.finalize_data_shred(); - self.sign_unsigned_shreds_and_generate_codes(); - } - - /// Finalize the current FEC block, and generate coding shreds - pub fn finalize_data(&mut self) { - self.make_final_data_shred(0); - } - - /// Finalize the current slot (i.e. add last slot shred) and generate coding shreds - pub fn finalize_slot(&mut self) { - self.make_final_data_shred(LAST_SHRED_IN_SLOT); + fn calculate_num_coding_shreds(num_data_shreds: f32, fec_rate: f32) -> usize { + 1.max((fec_rate * num_data_shreds) as usize) } fn fill_in_missing_shreds( @@ -539,6 +549,7 @@ impl Shredder { ) -> Result, reed_solomon_erasure::Error> { let mut recovered_data = vec![]; let fec_set_size = num_data + num_coding; + if num_coding > 0 && shreds.len() < fec_set_size { let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; @@ -657,9 +668,26 @@ impl Shredder { } } +pub fn max_ticks_per_shred() -> u64 { + let ticks = create_ticks(1, Hash::default()); + max_entries_per_n_shred(&ticks[0], 1) +} + +pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { + let shred_data_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64; + let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); + let entry_size = bincode::serialized_size(entry).unwrap(); + let count_size = vec_size - entry_size; + + (shred_data_size * num_shreds - count_size) / entry_size +} + #[cfg(test)] -mod tests { +pub mod tests { use super::*; + use solana_sdk::system_transaction; + use std::collections::HashSet; + use std::convert::TryInto; fn verify_test_data_shred( shred: &Shred, @@ -668,6 +696,8 @@ mod tests { parent: u64, pk: &Pubkey, verify: bool, + is_last_in_slot: bool, + is_last_in_fec_set: bool, ) { assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); assert!(shred.is_data()); @@ -675,6 +705,16 @@ mod tests { assert_eq!(shred.slot(), slot); assert_eq!(shred.parent(), parent); assert_eq!(verify, shred.verify(pk)); + if is_last_in_slot { + assert!(shred.last_in_slot()); + } else { + assert!(!shred.last_in_slot()); + } + if is_last_in_fec_set { + assert!(shred.data_complete()); + } else { + assert!(!shred.data_complete()); + } } fn verify_test_code_shred(shred: &Shred, index: u32, slot: u64, pk: &Pubkey, verify: bool) { @@ -691,154 +731,111 @@ mod tests { let slot = 0x123456789abcdef0; // Test that parent cannot be > current slot - assert_matches!(Shredder::new(slot, slot + 1, 1.001, &keypair, 0), Err(_)); + assert_matches!( + Shredder::new(slot, slot + 1, 1.001, keypair.clone()), + Err(_) + ); // Test that slot - parent cannot be > u16 MAX assert_matches!( - Shredder::new(slot, slot - 1 - 0xffff, 1.001, &keypair, 0), + Shredder::new(slot, slot - 1 - 0xffff, 1.001, keypair.clone()), Err(_) ); - let mut shredder = - Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); + let fec_rate = 0.25; + let parent_slot = slot - 5; + let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone()) + .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); + let entries: Vec<_> = (0..5) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); - // Test0: Write some data to shred. Not enough to create a signed shred - let data: Vec = (0..25).collect(); - assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 25); + let size = serialized_size(&entries).unwrap(); + let no_header_size = (PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER) as u64; + let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; + let num_expected_coding_shreds = + Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate); - // Test1: Write some more data to shred. Not enough to create a signed shred - assert_eq!(shredder.write(&data).unwrap(), data.len()); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 50); + let start_index = 0; + let (data_shreds, coding_shreds, next_index) = + shredder.entries_to_shreds(&entries, true, start_index); + assert_eq!(next_index as u64, num_expected_data_shreds); - // Test2: Write enough data to create a shred (> PACKET_DATA_SIZE) - let data: Vec<_> = (0..PACKET_DATA_SIZE).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let offset = shredder.write(&data).unwrap(); - assert_ne!(offset, data.len()); - // Assert that we have atleast one signed shred - assert!(!shredder.shreds.is_empty()); - // Assert that the new active shred was not populated - assert_eq!(shredder.active_offset, 0); + let mut data_shred_indexes = HashSet::new(); + let mut coding_shred_indexes = HashSet::new(); + for shred in data_shreds.iter() { + assert_eq!(shred.headers.common_header.shred_type, DATA_SHRED); + let index = shred.headers.data_header.index; + let is_last = index as u64 == num_expected_data_shreds - 1; + verify_test_data_shred( + shred, + index, + slot, + parent_slot, + &keypair.pubkey(), + true, + is_last, + is_last, + ); + assert!(!data_shred_indexes.contains(&index)); + data_shred_indexes.insert(index); + } - // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) - let shred = &shredder.shreds[0]; - // Test4: assert that it matches the original shred - // The shreds are not signed yet, as the data is not finalized - verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), false); + for shred in coding_shreds.iter() { + let index = shred.headers.data_header.index; + assert_eq!(shred.headers.common_header.shred_type, CODING_SHRED); + verify_test_code_shred(shred, index, slot, &keypair.pubkey(), true); + assert!(!coding_shred_indexes.contains(&index)); + coding_shred_indexes.insert(index); + } - let seed0 = shred.seed(); - // Test that same seed is generated for a given shred - assert_eq!(seed0, shred.seed()); + for i in start_index..start_index + num_expected_data_shreds as u32 { + assert!(data_shred_indexes.contains(&i)); + } - // Test5: Write left over data, and assert that a data shred is being created - shredder.write(&data[offset..]).unwrap(); + for i in start_index..start_index + num_expected_coding_shreds as u32 { + assert!(coding_shred_indexes.contains(&i)); + } - // Test6: Let's finalize the FEC block. That should result in the current shred to morph into - // a signed LastInFECBlock shred - shredder.finalize_data(); + assert_eq!(data_shred_indexes.len() as u64, num_expected_data_shreds); + assert_eq!(coding_shred_indexes.len(), num_expected_coding_shreds); - // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); - - // Must be Last in FEC Set - let shred = &shredder.shreds[1]; - verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); - - // Test that same seed is NOT generated for two different shreds - assert_ne!(seed0, shred.seed()); - - // Test7: Let's write some more data to the shredder. - // Now we should get a new FEC block - let data: Vec<_> = (0..PACKET_DATA_SIZE).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let offset = shredder.write(&data).unwrap(); - assert_ne!(offset, data.len()); - - // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); - - let shred = &shredder.shreds[2]; - verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), false); - - // Test8: Write more data to generate an intermediate data shred - let offset = shredder.write(&data).unwrap(); - assert_ne!(offset, data.len()); - - // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); - - // Must be a Data shred - let shred = &shredder.shreds[3]; - verify_test_data_shred(&shred, 3, slot, slot - 5, &keypair.pubkey(), false); - - // Test9: Write some data to shredder - let data: Vec = (0..25).collect(); - assert_eq!(shredder.write(&data).unwrap(), data.len()); - - // And, finish the slot - shredder.finalize_slot(); - - // We should have a new signed shred - assert!(!shredder.shreds.is_empty()); - - // Must be LastInSlot - let shred = &shredder.shreds[4]; - verify_test_data_shred(&shred, 4, slot, slot - 5, &keypair.pubkey(), true); + // Test reassembly + let deshred_payload = Shredder::deshred(&data_shreds).unwrap(); + let deshred_entries: Vec = bincode::deserialize(&deshred_payload).unwrap(); + assert_eq!(entries, deshred_entries); } #[test] - fn test_small_data_shredder() { + fn test_deserialize_shred_payload() { let keypair = Arc::new(Keypair::new()); + let slot = 1; - let slot = 0x123456789abcdef0; - let mut shredder = - Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); - - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); - - let data: Vec<_> = (0..25).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let _ = shredder.write(&data).unwrap(); - - // We should have 0 shreds now - assert_eq!(shredder.shreds.len(), 0); - - shredder.finalize_data(); - - // We should have 1 shred now - assert_eq!(shredder.shreds.len(), 2); - - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); - - let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 0.0, &keypair, 2) + let parent_slot = 0; + let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone()) .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); + let entries: Vec<_> = (0..5) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); - let data: Vec<_> = (0..25).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let _ = shredder.write(&data).unwrap(); + let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; - // We should have 0 shreds now - assert_eq!(shredder.shreds.len(), 0); - - shredder.finalize_data(); - - // We should have 1 shred now (LastInFECBlock) - assert_eq!(shredder.shreds.len(), 1); - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true); + let deserialized_shred = + Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); + assert_eq!(deserialized_shred, *data_shreds.last().unwrap()); } #[test] @@ -847,97 +844,92 @@ mod tests { let slot = 0x123456789abcdef0; // Test that FEC rate cannot be > 1.0 - assert_matches!(Shredder::new(slot, slot - 5, 1.001, &keypair, 0), Err(_)); + assert_matches!( + Shredder::new(slot, slot - 5, 1.001, keypair.clone()), + Err(_) + ); - let mut shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, &keypair, 0) + let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone()) .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); + // Create enough entries to make > 1 shred + let num_entries = max_ticks_per_shred() + 1; + let entries: Vec<_> = (0..num_entries) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); - // Write enough data to create a shred (> PACKET_DATA_SIZE) - let data: Vec<_> = (0..PACKET_DATA_SIZE).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let _ = shredder.write(&data).unwrap(); - let _ = shredder.write(&data).unwrap(); + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); - // We should have 2 shreds now - assert_eq!(shredder.shreds.len(), 2); + // Must have created an equal number of coding and data shreds + assert_eq!(data_shreds.len(), coding_shreds.len()); - shredder.finalize_data(); + for (i, s) in data_shreds.iter().enumerate() { + verify_test_data_shred( + s, + s.index(), + slot, + slot - 5, + &keypair.pubkey(), + true, + i == data_shreds.len() - 1, + i == data_shreds.len() - 1, + ); + } - // Finalize must have created 1 final data shred and 3 coding shreds - // assert_eq!(shredder.shreds.len(), 6); - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 0, slot, slot - 5, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 1, slot, slot - 5, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_data_shred(&shred, 2, slot, slot - 5, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_code_shred(&shred, 0, slot, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_code_shred(&shred, 1, slot, &keypair.pubkey(), true); - - let shred = shredder.shreds.remove(0); - verify_test_code_shred(&shred, 2, slot, &keypair.pubkey(), true); - } - - #[test] - fn test_large_data_shredder() { - let keypair = Arc::new(Keypair::new()); - let mut shredder = - Shredder::new(1, 0, 0.0, &keypair, 0).expect("Failed in creating shredder"); - - let data = vec![0u8; 1000 * 1000]; - bincode::serialize_into(&mut shredder, &data).unwrap(); - assert!(shredder.shreds.len() > data.len() / PACKET_DATA_SIZE); + for s in coding_shreds { + verify_test_code_shred(&s, s.index(), slot, &keypair.pubkey(), true); + } } #[test] fn test_recovery_and_reassembly() { let keypair = Arc::new(Keypair::new()); let slot = 0x123456789abcdef0; - let mut shredder = - Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone()) + .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let data: Vec<_> = (0..4000).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let mut offset = shredder.write(&data).unwrap(); - let approx_shred_payload_size = offset; - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); + let num_data_shreds: usize = 5; + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); + let entries: Vec<_> = (0..num_entries) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); - // We should have some shreds now - assert_eq!( - shredder.shreds.len(), - data.len() / approx_shred_payload_size - ); - assert_eq!(offset, data.len()); + let serialized_entries = bincode::serialize(&entries).unwrap(); + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); - shredder.finalize_data(); + // We should have 10 shreds now, an equal number of coding shreds + assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(coding_shreds.len(), num_data_shreds); - // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) - let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); - - let shred_infos = shredder.shreds.clone(); + let all_shreds = data_shreds + .iter() + .cloned() + .chain(coding_shreds.iter().cloned()) + .collect::>(); // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( Shredder::try_recovery( - shred_infos[..3].to_vec(), - expected_shred_count / 2, - expected_shred_count / 2, + data_shreds[..data_shreds.len() - 1].to_vec(), + num_data_shreds, + num_data_shreds, 0, slot ), @@ -946,21 +938,17 @@ mod tests { // Test1: Try recovery/reassembly with only data shreds. Hint: should work let recovered_data = Shredder::try_recovery( - shred_infos[..4].to_vec(), - expected_shred_count / 2, - expected_shred_count / 2, + data_shreds[..].to_vec(), + num_data_shreds, + num_data_shreds, 0, slot, ) .unwrap(); assert!(recovered_data.is_empty()); - let result = Shredder::deshred(&shred_infos[..4]).unwrap(); - assert!(result.len() >= data.len()); - assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let mut shred_info: Vec = shredder - .shreds + let mut shred_info: Vec = all_shreds .iter() .enumerate() .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) @@ -968,8 +956,8 @@ mod tests { let mut recovered_data = Shredder::try_recovery( shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, + num_data_shreds, + num_data_shreds, 0, slot, ) @@ -977,107 +965,80 @@ mod tests { assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 1, slot, slot - 5, &keypair.pubkey(), true); + verify_test_data_shred( + &recovered_shred, + 1, + slot, + slot - 5, + &keypair.pubkey(), + true, + false, + false, + ); shred_info.insert(1, recovered_shred); let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 3, slot, slot - 5, &keypair.pubkey(), true); + verify_test_data_shred( + &recovered_shred, + 3, + slot, + slot - 5, + &keypair.pubkey(), + true, + false, + false, + ); shred_info.insert(3, recovered_shred); - let result = Shredder::deshred(&shred_info[..4]).unwrap(); - assert!(result.len() >= data.len()); - assert_eq!(data[..], result[..data.len()]); + let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + assert!(result.len() >= serialized_entries.len()); + assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let mut shred_info: Vec = shredder - .shreds + let mut shred_info: Vec = all_shreds .iter() .enumerate() .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let mut recovered_data = Shredder::try_recovery( + let recovered_data = Shredder::try_recovery( shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, + num_data_shreds, + num_data_shreds, 0, slot, ) .unwrap(); - assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); - shred_info.insert(0, recovered_shred); + assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing + for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + let index = i * 2; + verify_test_data_shred( + &recovered_shred, + index.try_into().unwrap(), + slot, + slot - 5, + &keypair.pubkey(), + true, + recovered_shred.index() as usize == num_data_shreds - 1, + recovered_shred.index() as usize == num_data_shreds - 1, + ); - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); - shred_info.insert(2, recovered_shred); + shred_info.insert(i * 2, recovered_shred); + } - let result = Shredder::deshred(&shred_info[..4]).unwrap(); - assert!(result.len() >= data.len()); - assert_eq!(data[..], result[..data.len()]); + let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + assert!(result.len() >= serialized_entries.len()); + assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); - // Test4: Try recovery/reassembly full slot with 3 missing data shreds + 2 coding shreds. Hint: should work - let mut shredder = - Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); - - let mut offset = shredder.write(&data).unwrap(); - let approx_shred_payload_size = offset; - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - - // We should have some shreds now - assert_eq!( - shredder.shreds.len(), - data.len() / approx_shred_payload_size - ); - assert_eq!(offset, data.len()); - - shredder.finalize_slot(); - - // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) - let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); - - let mut shred_info: Vec = shredder - .shreds - .iter() - .enumerate() - .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) - .collect(); - - let mut recovered_data = Shredder::try_recovery( - shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, - 0, - slot, - ) - .unwrap(); - - assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); - shred_info.insert(0, recovered_shred); - - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); - shred_info.insert(2, recovered_shred); - - let result = Shredder::deshred(&shred_info[..4]).unwrap(); - assert!(result.len() >= data.len()); - assert_eq!(data[..], result[..data.len()]); - - // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail - let shreds: Vec = shredder - .shreds + // Test4: Try reassembly with 2 missing data shreds, but keeping the last + // data shred. Hint: should fail + let shreds: Vec = all_shreds[..num_data_shreds] .iter() .enumerate() .filter_map(|(i, s)| { - if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) { + if (i < 4 && i % 2 != 0) || i == num_data_shreds - 1 { + // Keep 1, 3, 4 Some(s.clone()) } else { None @@ -1085,111 +1046,89 @@ mod tests { }) .collect(); - assert_eq!(shreds.len(), 4); + assert_eq!(shreds.len(), 3); assert_matches!( Shredder::deshred(&shreds), Err(reed_solomon_erasure::Error::TooFewDataShards) ); - // Test6: Try recovery/reassembly with non zero index full slot with 3 missing data shreds + 2 coding shreds. Hint: should work - let mut shredder = - Shredder::new(slot, slot - 5, 1.0, &keypair, 25).expect("Failed in creating shredder"); + // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds + // and 2 missing coding shreds. Hint: should work + let serialized_entries = bincode::serialize(&entries).unwrap(); + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 25); - let mut offset = shredder.write(&data).unwrap(); - let approx_shred_payload_size = offset; - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); - offset += shredder.write(&data[offset..]).unwrap(); + // We should have 10 shreds now, an equal number of coding shreds + assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(coding_shreds.len(), num_data_shreds); - // We should have some shreds now - assert_eq!( - shredder.shreds.len(), - data.len() / approx_shred_payload_size - ); - assert_eq!(offset, data.len()); + let all_shreds = data_shreds + .iter() + .cloned() + .chain(coding_shreds.iter().cloned()) + .collect::>(); - shredder.finalize_slot(); - - // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) - let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); - - let mut shred_info: Vec = shredder - .shreds + let mut shred_info: Vec = all_shreds .iter() .enumerate() .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let mut recovered_data = Shredder::try_recovery( + let recovered_data = Shredder::try_recovery( shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, + num_data_shreds, + num_data_shreds, 25, slot, ) .unwrap(); - assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred( - &recovered_shred, - 25, - slot, - slot - 5, - &keypair.pubkey(), - true, - ); - shred_info.insert(0, recovered_shred); + assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing + for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + let index = 25 + (i * 2); + verify_test_data_shred( + &recovered_shred, + index.try_into().unwrap(), + slot, + slot - 5, + &keypair.pubkey(), + true, + index == 25 + num_data_shreds - 1, + index == 25 + num_data_shreds - 1, + ); - let recovered_shred = recovered_data.remove(0); - verify_test_data_shred( - &recovered_shred, - 27, - slot, - slot - 5, - &keypair.pubkey(), - true, - ); - shred_info.insert(2, recovered_shred); + shred_info.insert(i * 2, recovered_shred); + } - let result = Shredder::deshred(&shred_info[..4]).unwrap(); - assert!(result.len() >= data.len()); - assert_eq!(data[..], result[..data.len()]); + let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + assert!(result.len() >= serialized_entries.len()); + assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); - // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds + // Test6: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds let recovered_data = Shredder::try_recovery( shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, + num_data_shreds, + num_data_shreds, 25, slot + 1, ) .unwrap(); assert!(recovered_data.is_empty()); - // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds + // Test7: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( shred_info.clone(), - expected_shred_count / 2, - expected_shred_count / 2, + num_data_shreds, + num_data_shreds, 15, slot, ), Err(reed_solomon_erasure::Error::TooFewShardsPresent) ); - // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds + // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( - Shredder::try_recovery( - shred_info, - expected_shred_count / 2, - expected_shred_count / 2, - 35, - slot, - ), + Shredder::try_recovery(shred_info, num_data_shreds, num_data_shreds, 35, slot,), Err(reed_solomon_erasure::Error::TooFewShardsPresent) ); } @@ -1198,43 +1137,87 @@ mod tests { fn test_multi_fec_block_coding() { let keypair = Arc::new(Keypair::new()); let slot = 0x123456789abcdef0; - let mut shredder = - Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone()) + .expect("Failed in creating shredder"); - assert!(shredder.shreds.is_empty()); - assert_eq!(shredder.active_offset, 0); + let num_fec_sets = 100; + let num_data_shreds = (MAX_DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets) as usize; + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + let entry = Entry::new(&Hash::default(), 1, vec![tx0]); + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); - let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect(); - let data: Vec = data.iter().map(|x| *x as u8).collect(); - let mut offset = shredder.write(&data).unwrap(); - let approx_shred_payload_size = offset; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); + let entries: Vec<_> = (0..num_entries) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); + + let serialized_entries = bincode::serialize(&entries).unwrap(); + let (data_shreds, coding_shreds, next_index) = + shredder.entries_to_shreds(&entries, true, 0); + assert_eq!(next_index as usize, num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(coding_shreds.len(), num_data_shreds); + + for c in &coding_shreds { + assert!(!c.is_data()); } - // We should have some shreds now - assert!(shredder.shreds.len() > data.len() / approx_shred_payload_size); - assert_eq!(offset, data.len()); + let mut all_shreds = vec![]; + for i in 0..num_fec_sets { + let shred_start_index = (MAX_DATA_SHREDS_PER_FEC_BLOCK * i) as usize; + let end_index = shred_start_index + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1; + let fec_set_shreds = data_shreds[shred_start_index..=end_index] + .iter() + .cloned() + .chain(coding_shreds[shred_start_index..=end_index].iter().cloned()) + .collect::>(); - shredder.finalize_data(); - let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; - assert_eq!(shredder.shreds.len(), expected_shred_count); + let mut shred_info: Vec = fec_set_shreds + .iter() + .enumerate() + .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) + .collect(); - let mut index = 0; - - while index < shredder.shreds.len() { - let num_data_shreds = std::cmp::min( + let recovered_data = Shredder::try_recovery( + shred_info.clone(), MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, - (shredder.shreds.len() - index) / 2, + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, + shred_start_index, + slot, + ) + .unwrap(); + + for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + let index = shred_start_index + (i * 2); + verify_test_data_shred( + &recovered_shred, + index.try_into().unwrap(), + slot, + slot - 5, + &keypair.pubkey(), + true, + index == end_index, + index == end_index, + ); + + shred_info.insert(i * 2, recovered_shred); + } + + all_shreds.extend( + shred_info + .into_iter() + .take(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize), ); - let coding_start = index + num_data_shreds; - shredder.shreds[index..coding_start] - .iter() - .for_each(|s| assert!(s.is_data())); - index = coding_start + num_data_shreds; - shredder.shreds[coding_start..index] - .iter() - .for_each(|s| assert!(!s.is_data())); } + + let result = Shredder::deshred(&all_shreds[..]).unwrap(); + assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7383d9ffa1..700dbd167b 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -311,17 +311,14 @@ mod test { }; fn local_entries_to_shred( - entries: Vec, + entries: &[Entry], slot: u64, parent: u64, keypair: &Arc, ) -> Vec { - let mut shredder = - Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder"); - bincode::serialize_into(&mut shredder, &entries) - .expect("Expect to write all entries to shreds"); - shredder.finalize_slot(); - shredder.shreds.drain(..).collect() + let shredder = Shredder::new(slot, parent, 0.0, keypair.clone()) + .expect("Failed to create entry shredder"); + shredder.entries_to_shreds(&entries, true, 0).0 } #[test] @@ -330,8 +327,7 @@ mod test { let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); let num_entries = 10; let original_entries = create_ticks(num_entries, Hash::default()); - let mut shreds = - local_entries_to_shred(original_entries.clone(), 0, 0, &Arc::new(Keypair::new())); + let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new())); shreds.reverse(); blocktree .insert_shreds(shreds, None) @@ -356,7 +352,7 @@ mod test { )); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair); + let mut shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair); // with a Bank for slot 0, blob continues assert_eq!( @@ -408,8 +404,7 @@ mod test { // with a shred where shred.slot() == root, blob gets thrown out let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; - let shreds = - local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair); + let shreds = local_entries_to_shred(&[Entry::default()], slot, slot - 1, &leader_keypair); assert_eq!( should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot), false @@ -418,7 +413,7 @@ mod test { // with a shred where shred.parent() < root, blob gets thrown out let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; let shreds = - local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair); + local_entries_to_shred(&[Entry::default()], slot + 1, slot - 1, &leader_keypair); assert_eq!( should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot), false