From 11a53de0e310ee78788a2808a15da1b05f59b4c5 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 4 Nov 2021 16:15:51 +0000 Subject: [PATCH] rewrites Shredder::try_recovery (#21082) For every missing data-shred, Shredder::try_recovery calls into new_empty_data_shred which does a redundant serialization into payload buffer which is then immediately overwritten by the erasure recovery: https://github.com/solana-labs/solana/blob/696501500/ledger/src/shred.rs#L372-L417 Additionally, the implementation is unnecessary complex hindering upcoming changes to erasure coding generation https://github.com/solana-labs/solana/blob/696501500/ledger/src/shred.rs#L814-L938 The commit simplifies the Shredder::try_recovery implementation. --- ledger/src/erasure.rs | 20 +++--- ledger/src/shred.rs | 162 ++++++++++++------------------------------ 2 files changed, 56 insertions(+), 126 deletions(-) diff --git a/ledger/src/erasure.rs b/ledger/src/erasure.rs index 455fdbcf64..bc247706f6 100644 --- a/ledger/src/erasure.rs +++ b/ledger/src/erasure.rs @@ -41,9 +41,10 @@ //! //! -use reed_solomon_erasure::galois_8::Field; -use reed_solomon_erasure::ReedSolomon; -use serde::{Deserialize, Serialize}; +use { + reed_solomon_erasure::{galois_8::Field, ReconstructShard, ReedSolomon}, + serde::{Deserialize, Serialize}, +}; //TODO(sakridge) pick these values /// Number of data shreds @@ -113,14 +114,11 @@ impl Session { } /// Recover data + coding blocks into data blocks - /// # Arguments - /// * `data` - array of data blocks to recover into - /// * `coding` - array of coding blocks - /// * `erasures` - list of indices in data where blocks should be recovered - pub fn decode_blocks(&self, blocks: &mut [(&mut [u8], bool)]) -> Result<()> { - self.0.reconstruct_data(blocks)?; - - Ok(()) + pub fn decode_blocks(&self, blocks: &mut [T]) -> Result<()> + where + T: ReconstructShard, + { + self.0.reconstruct_data(blocks) } } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index eac87c73d0..9d28439a66 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -811,36 +811,6 @@ impl Shredder { .collect() } - fn fill_in_missing_shreds( - num_data: usize, - num_coding: usize, - first_index_in_fec_set: usize, - expected_index: usize, - index_found: usize, - present: &mut [bool], - ) -> Vec> { - let end_index = index_found.saturating_sub(1); - // The index of current shred must be within the range of shreds that are being - // recovered - if !(first_index_in_fec_set..first_index_in_fec_set + num_data + num_coding) - .contains(&end_index) - { - return vec![]; - } - - let missing_blocks: Vec> = (expected_index..index_found) - .map(|missing| { - present[missing.saturating_sub(first_index_in_fec_set)] = false; - if missing < first_index_in_fec_set + num_data { - Shred::new_empty_data_shred().payload - } else { - vec![0; SHRED_PAYLOAD_SIZE] - } - }) - .collect(); - missing_blocks - } - pub fn try_recovery( shreds: Vec, num_data: usize, @@ -848,92 +818,54 @@ impl Shredder { first_index: usize, slot: Slot, ) -> std::result::Result, reed_solomon_erasure::Error> { + use reed_solomon_erasure::Error::InvalidIndex; Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?; - let mut recovered_data = vec![]; let fec_set_size = num_data + num_coding; - - if num_coding > 0 && shreds.len() < fec_set_size { - // Let's try recovering missing shreds using erasure - let present = &mut vec![true; fec_set_size]; - let mut next_expected_index = first_index; - let mut shred_bufs: Vec> = shreds - .into_iter() - .flat_map(|shred| { - let offset = if shred.is_data() { 0 } else { num_data }; - let index = offset + shred.index() as usize; - let mut blocks = Self::fill_in_missing_shreds( - num_data, - num_coding, - first_index, - next_expected_index, - index, - present, - ); - blocks.push(shred.payload); - next_expected_index = index + 1; - blocks - }) - .collect(); - - // Insert any other missing shreds after the last shred we have received in the - // current FEC block - let mut pending_shreds = Self::fill_in_missing_shreds( - num_data, - num_coding, - first_index, - next_expected_index, - first_index + fec_set_size, - present, - ); - - shred_bufs.append(&mut pending_shreds); - - if shred_bufs.len() != fec_set_size { - return Err(reed_solomon_erasure::Error::TooFewShardsPresent); - } - - let session = Session::new(num_data, num_coding)?; - - // All information (excluding the restricted section) from a data shred is encoded - let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; - let coding_block_offset = SIZE_OF_CODING_SHRED_HEADERS; - let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs - .iter_mut() - .enumerate() - .map(|(position, x)| { - if position < num_data { - x[..valid_data_len].as_mut() - } else { - x[coding_block_offset..].as_mut() - } - }) - .zip(present.clone()) - .collect(); - session.decode_blocks(&mut blocks)?; - - let mut num_drained = 0; - present - .iter() - .enumerate() - .for_each(|(position, was_present)| { - if !*was_present && position < num_data { - let drain_this = position - num_drained; - let shred_buf = shred_bufs.remove(drain_this); - num_drained += 1; - if let Ok(shred) = Shred::new_from_serialized_shred(shred_buf) { - let shred_index = shred.index() as usize; - // Valid shred must be in the same slot as the original shreds - if shred.slot() == slot { - // A valid data shred must be indexed between first_index and first+num_data index - if (first_index..first_index + num_data).contains(&shred_index) { - recovered_data.push(shred) - } - } - } - } - }); + if num_coding == 0 || shreds.len() >= fec_set_size { + return Ok(Vec::default()); } - + // Mask to exclude data shreds already received from the return value. + let mut mask = vec![false; num_data]; + let mut blocks = vec![None; fec_set_size]; + for shred in shreds { + if (shred.index() as usize) < first_index { + return Err(InvalidIndex); + } + let shred_is_data = shred.is_data(); + let offset = if shred_is_data { 0 } else { num_data }; + let index = offset + shred.index() as usize - first_index; + let mut block = shred.payload; + if shred_is_data { + if index >= num_data { + return Err(InvalidIndex); + } + mask[index] = true; + // SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds + // is never used and is not part of erasure coding. + block.resize(SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS, 0u8); + } else { + if index >= fec_set_size { + return Err(InvalidIndex); + } + // SIZE_OF_CODING_SHRED_HEADERS bytes at the begining of the + // coding shreds contains the header and is not part of erasure + // coding. + block.drain(..SIZE_OF_CODING_SHRED_HEADERS); + }; + blocks[index] = Some(block); + } + Session::new(num_data, num_coding)?.decode_blocks(&mut blocks)?; + let data_shred_indices = first_index..first_index + num_data; + let recovered_data = mask + .into_iter() + .zip(blocks) + .filter(|(mask, _)| !mask) + .filter_map(|(_, block)| Shred::new_from_serialized_shred(block?).ok()) + .filter(|shred| { + let index = shred.index() as usize; + shred.slot() == slot && data_shred_indices.contains(&index) + }) + .collect(); Ok(recovered_data) } @@ -1634,13 +1566,13 @@ pub mod tests { 15, slot, ), - Err(reed_solomon_erasure::Error::TooFewShardsPresent) + Err(reed_solomon_erasure::Error::InvalidIndex) ); // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery(shred_info, num_data_shreds, num_coding_shreds, 35, slot), - Err(reed_solomon_erasure::Error::TooFewShardsPresent) + Err(reed_solomon_erasure::Error::InvalidIndex) ); }