diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index cde4a51b0..4d6bd1207 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -2,6 +2,7 @@ use { crate::shred::{ Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, }, + itertools::Itertools, lazy_static::lazy_static, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::{ @@ -12,7 +13,7 @@ use { solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, solana_sdk::{clock::Slot, signature::Keypair}, - std::fmt::Debug, + std::{borrow::Borrow, fmt::Debug}, }; lazy_static! { @@ -172,24 +173,32 @@ impl Shredder { return Ok(Vec::default()); } let mut gen_coding_time = Measure::start("gen_coding_shreds"); - // Step size when advancing next_code_index from one batch to the next. - let step = get_erasure_batch_size( - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, - false, // is_last_in_slot - ) - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; + let chunks: Vec> = data_shreds + .iter() + .group_by(|shred| shred.fec_set_index()) + .into_iter() + .map(|(_, shreds)| shreds.collect()) + .collect(); + let next_code_index: Vec<_> = std::iter::once(next_code_index) + .chain( + chunks + .iter() + .scan(next_code_index, |next_code_index, chunk| { + let num_data_shreds = chunk.len(); + let erasure_batch_size = + get_erasure_batch_size(num_data_shreds, is_last_in_slot); + *next_code_index += (erasure_batch_size - num_data_shreds) as u32; + Some(*next_code_index) + }), + ) + .collect(); // 1) Generate coding shreds let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| { - data_shreds - .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) - .enumerate() - .flat_map(|(k, shred_data_batch)| { - let offset = u32::try_from(step.checked_mul(k).unwrap()); - let next_code_index = next_code_index.checked_add(offset.unwrap()); - Shredder::generate_coding_shreds( - shred_data_batch, - is_last_in_slot, - next_code_index.unwrap(), - ) + chunks + .into_par_iter() + .zip(next_code_index) + .flat_map(|(shreds, next_code_index)| { + Shredder::generate_coding_shreds(&shreds, is_last_in_slot, next_code_index) }) .collect() }); @@ -210,13 +219,13 @@ impl Shredder { } /// Generates coding shreds for the data shreds in the current FEC set - pub fn generate_coding_shreds( - data: &[Shred], + pub fn generate_coding_shreds>( + data: &[T], is_last_in_slot: bool, next_code_index: u32, ) -> Vec { let (slot, index, version, fec_set_index) = { - let shred = data.first().unwrap(); + let shred = data.first().unwrap().borrow(); ( shred.slot(), shred.index(), @@ -225,15 +234,22 @@ impl Shredder { ) }; assert_eq!(fec_set_index, index); - assert!(data.iter().all(|shred| shred.slot() == slot - && shred.version() == version - && shred.fec_set_index() == fec_set_index)); + assert!(data + .iter() + .map(Borrow::borrow) + .all(|shred| shred.slot() == slot + && shred.version() == version + && shred.fec_set_index() == fec_set_index)); let num_data = data.len(); let num_coding = get_erasure_batch_size(num_data, is_last_in_slot) .checked_sub(num_data) .unwrap(); - let data = data.iter().map(Shred::erasure_shard_as_slice); - let data: Vec<_> = data.collect::>().unwrap(); + let data: Vec<_> = data + .iter() + .map(Borrow::borrow) + .map(Shred::erasure_shard_as_slice) + .collect::>() + .unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; ReedSolomon::new(num_data, num_coding) .unwrap()