diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index d632858a8c..53353b9b9d 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -40,16 +40,14 @@ fn make_shreds(num_shreds: usize) -> Vec { ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); let shredder = Shredder::new(1, 0, 0, 0).unwrap(); - let data_shreds = shredder - .entries_to_data_shreds( - &Keypair::new(), - &entries, - true, // is_last_in_slot - 0, // next_shred_index - 0, // fec_set_offset - &mut ProcessShredsStats::default(), - ) - .0; + let data_shreds = shredder.entries_to_data_shreds( + &Keypair::new(), + &entries, + true, // is_last_in_slot + 0, // next_shred_index + 0, // fec_set_offset + &mut ProcessShredsStats::default(), + ); assert!(data_shreds.len() >= num_shreds); data_shreds } diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 5964cdb606..02c42a7769 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -154,7 +154,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, _, _) = shredder.entries_to_shreds( + let (data_shreds, _) = shredder.entries_to_shreds( keypair, &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), @@ -163,10 +163,10 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.next_shred_index += data_shreds.len() as u32; let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| { - let (original_last_data_shred, _, _) = + let (original_last_data_shred, _) = shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index); - let (partition_last_data_shred, _, _) = + let (partition_last_data_shred, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index f430da9b0f..7dd8b3322d 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -52,7 +52,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( keypair, &receive_results.entries, last_tick_height == bank.max_tick_height(), @@ -69,7 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { .map(|_| Entry::new(&self.last_blockhash, 0, vec![])) .collect(); - let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds( + let (fake_data_shreds, fake_coding_shreds) = shredder.entries_to_shreds( keypair, &fake_entries, last_tick_height == bank.max_tick_height(), diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 5c74e1e56d..b8fbcdc95b 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -83,7 +83,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, _, _) = shredder.entries_to_shreds( + let (data_shreds, _) = shredder.entries_to_shreds( keypair, &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), @@ -92,10 +92,10 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index += data_shreds.len() as u32; let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { - let (good_last_data_shred, _, _) = + let (good_last_data_shred, _) = shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index); - let (bad_last_data_shred, _, _) = + let (bad_last_data_shred, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 877a234bd8..454f29c3de 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -119,17 +119,16 @@ impl StandardBroadcastRun { None => (0, 0), }, }; - let (data_shreds, next_shred_index) = - Shredder::new(slot, parent_slot, reference_tick, self.shred_version) - .unwrap() - .entries_to_data_shreds( - keypair, - entries, - is_slot_end, - next_shred_index, - fec_set_offset, - process_stats, - ); + let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version) + .unwrap() + .entries_to_data_shreds( + keypair, + entries, + is_slot_end, + next_shred_index, + fec_set_offset, + process_stats, + ); let mut data_shreds_buffer = match &mut self.unfinished_slot { Some(state) => { assert_eq!(state.slot, slot); @@ -138,6 +137,10 @@ impl StandardBroadcastRun { None => Vec::default(), }; data_shreds_buffer.extend(data_shreds.clone()); + let next_shred_index = match data_shreds.iter().map(Shred::index).max() { + Some(index) => index + 1, + None => next_shred_index, + }; self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index, slot, diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 1c9c0ee995..d2da9dbe43 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -336,7 +336,7 @@ pub(crate) mod tests { }) .take(5) .collect(); - let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds( + let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds( keypair, &entries, true, // is_last_in_slot diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 198f131bc8..314abf817f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1696,7 +1696,7 @@ impl Blockstore { 0 } }; - let (mut data_shreds, mut coding_shreds, _) = + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(keypair, ¤t_entries, true, start_index); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); @@ -1716,7 +1716,7 @@ impl Blockstore { } if !slot_entries.is_empty() { - let (mut data_shreds, mut coding_shreds, _) = + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(keypair, &slot_entries, is_full_slot, 0); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); @@ -8104,7 +8104,7 @@ pub mod tests { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); - let (data_shreds, coding_shreds, _) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&leader_keypair, &entries, true, 0); let genesis_config = create_genesis_config(2).genesis_config; @@ -8160,9 +8160,8 @@ pub mod tests { let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); - let (shreds, _, _) = shredder.entries_to_shreds(&leader_keypair, &entries1, true, 0); - let (duplicate_shreds, _, _) = - shredder.entries_to_shreds(&leader_keypair, &entries2, true, 0); + let (shreds, _) = shredder.entries_to_shreds(&leader_keypair, &entries1, true, 0); + let (duplicate_shreds, _) = shredder.entries_to_shreds(&leader_keypair, &entries2, true, 0); let shred = shreds[0].clone(); let duplicate_shred = duplicate_shreds[0].clone(); let non_duplicate_shred = shred.clone(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index f2a6fe2b75..20f66ba421 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -759,9 +759,12 @@ impl Shredder { entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, - ) -> (Vec, Vec, u32) { + ) -> ( + Vec, // data shreds + Vec, // coding shreds + ) { let mut stats = ProcessShredsStats::default(); - let (data_shreds, last_shred_index) = self.entries_to_data_shreds( + let data_shreds = self.entries_to_data_shreds( keypair, entries, is_last_in_slot, @@ -772,7 +775,7 @@ impl Shredder { let coding_shreds = Self::data_shreds_to_coding_shreds(keypair, &data_shreds, is_last_in_slot, &mut stats) .unwrap(); - (data_shreds, coding_shreds, last_shred_index) + (data_shreds, coding_shreds) } // Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds. @@ -794,7 +797,7 @@ impl Shredder { // Shred index offset at which FEC sets are generated. fec_set_offset: u32, process_stats: &mut ProcessShredsStats, - ) -> (Vec, u32) { + ) -> Vec { let mut serialize_time = Measure::start("shred_serialize"); let serialized_shreds = bincode::serialize(entries).expect("Expect to serialize all entries"); @@ -842,7 +845,7 @@ impl Shredder { process_stats.serialize_elapsed += serialize_time.as_us(); process_stats.gen_data_elapsed += gen_data_time.as_us(); - (data_shreds, last_shred_index + 1) + data_shreds } pub fn data_shreds_to_coding_shreds( @@ -1305,8 +1308,9 @@ pub mod tests { .saturating_sub(num_expected_data_shreds as usize) .max(num_expected_data_shreds as usize); let start_index = 0; - let (data_shreds, coding_shreds, next_index) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, start_index); + let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as u64, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); @@ -1458,8 +1462,7 @@ pub mod tests { }) .collect(); - let (data_shreds, coding_shreds, _) = - shredder.entries_to_shreds(&keypair, &entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0); for (i, s) in data_shreds.iter().enumerate() { verify_test_data_shred( @@ -1507,7 +1510,7 @@ pub mod tests { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, &entries, is_last_in_slot, @@ -1638,8 +1641,7 @@ pub mod tests { // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds // and 2 missing coding shreds. Hint: should work let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = - shredder.entries_to_shreds(&keypair, &entries, true, 25); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 25); // We should have 10 shreds now assert_eq!(data_shreds.len(), num_data_shreds); @@ -1723,7 +1725,7 @@ pub mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); - let (data_shreds, coding_shreds, _) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &[entry], is_last_in_slot, next_shred_index); let num_data_shreds = data_shreds.len(); let mut shreds = coding_shreds; @@ -1777,8 +1779,7 @@ pub mod tests { }) .collect(); - let (data_shreds, coding_shreds, _next_index) = - shredder.entries_to_shreds(&keypair, &entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0); assert!(!data_shreds .iter() .chain(coding_shreds.iter()) @@ -1826,7 +1827,7 @@ pub mod tests { .collect(); let start_index = 0x12; - let (data_shreds, coding_shreds, _next_index) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, start_index); let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; @@ -1863,7 +1864,7 @@ pub mod tests { let mut stats = ProcessShredsStats::default(); let start_index = 0x12; - let (data_shreds, _next_index) = shredder.entries_to_data_shreds( + let data_shreds = shredder.entries_to_data_shreds( &keypair, &entries, true, // is_last_in_slot diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index fe09937734..6fcdc7443c 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -48,8 +48,8 @@ fn test_multi_fec_block_coding() { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, next_index) = - shredder.entries_to_shreds(&keypair, &entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0); + let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as usize, num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!(coding_shreds.len(), num_data_shreds); @@ -218,7 +218,7 @@ fn setup_different_sized_fec_blocks( let total_num_data_shreds: usize = 2 * num_shreds_per_iter; for i in 0..2 { let is_last = i == 1; - let (data_shreds, coding_shreds, new_next_index) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, is_last, next_index); for shred in &data_shreds { if (shred.index() as usize) == total_num_data_shreds - 1 { @@ -232,7 +232,7 @@ fn setup_different_sized_fec_blocks( } } assert_eq!(data_shreds.len(), num_shreds_per_iter as usize); - next_index = new_next_index; + next_index = data_shreds.last().unwrap().index() + 1; sort_data_coding_into_fec_sets( data_shreds, coding_shreds,