infers erasure batches from FEC-set indices of data shreds (#26873)

data_shreds_to_coding_shreds relies on the hardcoded
MAX_DATA_SHREDS_PER_FEC_BLOCK batches of data shreds:
https://github.com/solana-labs/solana/blob/e74ad90cd/ledger/src/shredder.rs#L175-L183

This hardcoded logic is unnecessary since the shreds belonging to the
same erasure batch can be identified from run of the same FEC-set index.
This commit is contained in:
behzad nouri 2022-08-02 16:05:27 +00:00 committed by GitHub
parent ec36f0c5df
commit 1181510531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 25 deletions

View File

@ -2,6 +2,7 @@ use {
crate::shred::{ crate::shred::{
Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK,
}, },
itertools::Itertools,
lazy_static::lazy_static, lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool}, rayon::{prelude::*, ThreadPool},
reed_solomon_erasure::{ reed_solomon_erasure::{
@ -12,7 +13,7 @@ use {
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count, solana_rayon_threadlimit::get_thread_count,
solana_sdk::{clock::Slot, signature::Keypair}, solana_sdk::{clock::Slot, signature::Keypair},
std::fmt::Debug, std::{borrow::Borrow, fmt::Debug},
}; };
lazy_static! { lazy_static! {
@ -172,24 +173,32 @@ impl Shredder {
return Ok(Vec::default()); return Ok(Vec::default());
} }
let mut gen_coding_time = Measure::start("gen_coding_shreds"); let mut gen_coding_time = Measure::start("gen_coding_shreds");
// Step size when advancing next_code_index from one batch to the next. let chunks: Vec<Vec<&Shred>> = data_shreds
let step = get_erasure_batch_size( .iter()
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, .group_by(|shred| shred.fec_set_index())
false, // is_last_in_slot .into_iter()
) - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; .map(|(_, shreds)| shreds.collect())
.collect();
let next_code_index: Vec<_> = std::iter::once(next_code_index)
.chain(
chunks
.iter()
.scan(next_code_index, |next_code_index, chunk| {
let num_data_shreds = chunk.len();
let erasure_batch_size =
get_erasure_batch_size(num_data_shreds, is_last_in_slot);
*next_code_index += (erasure_batch_size - num_data_shreds) as u32;
Some(*next_code_index)
}),
)
.collect();
// 1) Generate coding shreds // 1) Generate coding shreds
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| { let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| {
data_shreds chunks
.par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) .into_par_iter()
.enumerate() .zip(next_code_index)
.flat_map(|(k, shred_data_batch)| { .flat_map(|(shreds, next_code_index)| {
let offset = u32::try_from(step.checked_mul(k).unwrap()); Shredder::generate_coding_shreds(&shreds, is_last_in_slot, next_code_index)
let next_code_index = next_code_index.checked_add(offset.unwrap());
Shredder::generate_coding_shreds(
shred_data_batch,
is_last_in_slot,
next_code_index.unwrap(),
)
}) })
.collect() .collect()
}); });
@ -210,13 +219,13 @@ impl Shredder {
} }
/// Generates coding shreds for the data shreds in the current FEC set /// Generates coding shreds for the data shreds in the current FEC set
pub fn generate_coding_shreds( pub fn generate_coding_shreds<T: Borrow<Shred>>(
data: &[Shred], data: &[T],
is_last_in_slot: bool, is_last_in_slot: bool,
next_code_index: u32, next_code_index: u32,
) -> Vec<Shred> { ) -> Vec<Shred> {
let (slot, index, version, fec_set_index) = { let (slot, index, version, fec_set_index) = {
let shred = data.first().unwrap(); let shred = data.first().unwrap().borrow();
( (
shred.slot(), shred.slot(),
shred.index(), shred.index(),
@ -225,15 +234,22 @@ impl Shredder {
) )
}; };
assert_eq!(fec_set_index, index); assert_eq!(fec_set_index, index);
assert!(data.iter().all(|shred| shred.slot() == slot assert!(data
&& shred.version() == version .iter()
&& shred.fec_set_index() == fec_set_index)); .map(Borrow::borrow)
.all(|shred| shred.slot() == slot
&& shred.version() == version
&& shred.fec_set_index() == fec_set_index));
let num_data = data.len(); let num_data = data.len();
let num_coding = get_erasure_batch_size(num_data, is_last_in_slot) let num_coding = get_erasure_batch_size(num_data, is_last_in_slot)
.checked_sub(num_data) .checked_sub(num_data)
.unwrap(); .unwrap();
let data = data.iter().map(Shred::erasure_shard_as_slice); let data: Vec<_> = data
let data: Vec<_> = data.collect::<Result<_, _>>().unwrap(); .iter()
.map(Borrow::borrow)
.map(Shred::erasure_shard_as_slice)
.collect::<Result<_, _>>()
.unwrap();
let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
ReedSolomon::new(num_data, num_coding) ReedSolomon::new(num_data, num_coding)
.unwrap() .unwrap()