rewrites Shredder::try_recovery (#21082)

For every missing data-shred, Shredder::try_recovery calls into
new_empty_data_shred which does a redundant serialization into payload
buffer which is then immediately overwritten by the erasure recovery:
https://github.com/solana-labs/solana/blob/696501500/ledger/src/shred.rs#L372-L417

Additionally, the implementation is unnecessary complex hindering
upcoming changes to erasure coding generation
https://github.com/solana-labs/solana/blob/696501500/ledger/src/shred.rs#L814-L938

The commit simplifies the Shredder::try_recovery implementation.
This commit is contained in:
behzad nouri 2021-11-04 16:15:51 +00:00 committed by GitHub
parent 76e533be46
commit 11a53de0e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 126 deletions

View File

@ -41,9 +41,10 @@
//! //!
//! //!
use reed_solomon_erasure::galois_8::Field; use {
use reed_solomon_erasure::ReedSolomon; reed_solomon_erasure::{galois_8::Field, ReconstructShard, ReedSolomon},
use serde::{Deserialize, Serialize}; serde::{Deserialize, Serialize},
};
//TODO(sakridge) pick these values //TODO(sakridge) pick these values
/// Number of data shreds /// Number of data shreds
@ -113,14 +114,11 @@ impl Session {
} }
/// Recover data + coding blocks into data blocks /// Recover data + coding blocks into data blocks
/// # Arguments pub fn decode_blocks<T>(&self, blocks: &mut [T]) -> Result<()>
/// * `data` - array of data blocks to recover into where
/// * `coding` - array of coding blocks T: ReconstructShard<Field>,
/// * `erasures` - list of indices in data where blocks should be recovered {
pub fn decode_blocks(&self, blocks: &mut [(&mut [u8], bool)]) -> Result<()> { self.0.reconstruct_data(blocks)
self.0.reconstruct_data(blocks)?;
Ok(())
} }
} }

View File

@ -811,36 +811,6 @@ impl Shredder {
.collect() .collect()
} }
fn fill_in_missing_shreds(
num_data: usize,
num_coding: usize,
first_index_in_fec_set: usize,
expected_index: usize,
index_found: usize,
present: &mut [bool],
) -> Vec<Vec<u8>> {
let end_index = index_found.saturating_sub(1);
// The index of current shred must be within the range of shreds that are being
// recovered
if !(first_index_in_fec_set..first_index_in_fec_set + num_data + num_coding)
.contains(&end_index)
{
return vec![];
}
let missing_blocks: Vec<Vec<u8>> = (expected_index..index_found)
.map(|missing| {
present[missing.saturating_sub(first_index_in_fec_set)] = false;
if missing < first_index_in_fec_set + num_data {
Shred::new_empty_data_shred().payload
} else {
vec![0; SHRED_PAYLOAD_SIZE]
}
})
.collect();
missing_blocks
}
pub fn try_recovery( pub fn try_recovery(
shreds: Vec<Shred>, shreds: Vec<Shred>,
num_data: usize, num_data: usize,
@ -848,92 +818,54 @@ impl Shredder {
first_index: usize, first_index: usize,
slot: Slot, slot: Slot,
) -> std::result::Result<Vec<Shred>, reed_solomon_erasure::Error> { ) -> std::result::Result<Vec<Shred>, reed_solomon_erasure::Error> {
use reed_solomon_erasure::Error::InvalidIndex;
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?; Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
let mut recovered_data = vec![];
let fec_set_size = num_data + num_coding; let fec_set_size = num_data + num_coding;
if num_coding == 0 || shreds.len() >= fec_set_size {
if num_coding > 0 && shreds.len() < fec_set_size { return Ok(Vec::default());
// Let's try recovering missing shreds using erasure
let present = &mut vec![true; fec_set_size];
let mut next_expected_index = first_index;
let mut shred_bufs: Vec<Vec<u8>> = shreds
.into_iter()
.flat_map(|shred| {
let offset = if shred.is_data() { 0 } else { num_data };
let index = offset + shred.index() as usize;
let mut blocks = Self::fill_in_missing_shreds(
num_data,
num_coding,
first_index,
next_expected_index,
index,
present,
);
blocks.push(shred.payload);
next_expected_index = index + 1;
blocks
})
.collect();
// Insert any other missing shreds after the last shred we have received in the
// current FEC block
let mut pending_shreds = Self::fill_in_missing_shreds(
num_data,
num_coding,
first_index,
next_expected_index,
first_index + fec_set_size,
present,
);
shred_bufs.append(&mut pending_shreds);
if shred_bufs.len() != fec_set_size {
return Err(reed_solomon_erasure::Error::TooFewShardsPresent);
}
let session = Session::new(num_data, num_coding)?;
// All information (excluding the restricted section) from a data shred is encoded
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADERS;
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
.iter_mut()
.enumerate()
.map(|(position, x)| {
if position < num_data {
x[..valid_data_len].as_mut()
} else {
x[coding_block_offset..].as_mut()
}
})
.zip(present.clone())
.collect();
session.decode_blocks(&mut blocks)?;
let mut num_drained = 0;
present
.iter()
.enumerate()
.for_each(|(position, was_present)| {
if !*was_present && position < num_data {
let drain_this = position - num_drained;
let shred_buf = shred_bufs.remove(drain_this);
num_drained += 1;
if let Ok(shred) = Shred::new_from_serialized_shred(shred_buf) {
let shred_index = shred.index() as usize;
// Valid shred must be in the same slot as the original shreds
if shred.slot() == slot {
// A valid data shred must be indexed between first_index and first+num_data index
if (first_index..first_index + num_data).contains(&shred_index) {
recovered_data.push(shred)
}
}
}
}
});
} }
// Mask to exclude data shreds already received from the return value.
let mut mask = vec![false; num_data];
let mut blocks = vec![None; fec_set_size];
for shred in shreds {
if (shred.index() as usize) < first_index {
return Err(InvalidIndex);
}
let shred_is_data = shred.is_data();
let offset = if shred_is_data { 0 } else { num_data };
let index = offset + shred.index() as usize - first_index;
let mut block = shred.payload;
if shred_is_data {
if index >= num_data {
return Err(InvalidIndex);
}
mask[index] = true;
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
// is never used and is not part of erasure coding.
block.resize(SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS, 0u8);
} else {
if index >= fec_set_size {
return Err(InvalidIndex);
}
// SIZE_OF_CODING_SHRED_HEADERS bytes at the begining of the
// coding shreds contains the header and is not part of erasure
// coding.
block.drain(..SIZE_OF_CODING_SHRED_HEADERS);
};
blocks[index] = Some(block);
}
Session::new(num_data, num_coding)?.decode_blocks(&mut blocks)?;
let data_shred_indices = first_index..first_index + num_data;
let recovered_data = mask
.into_iter()
.zip(blocks)
.filter(|(mask, _)| !mask)
.filter_map(|(_, block)| Shred::new_from_serialized_shred(block?).ok())
.filter(|shred| {
let index = shred.index() as usize;
shred.slot() == slot && data_shred_indices.contains(&index)
})
.collect();
Ok(recovered_data) Ok(recovered_data)
} }
@ -1634,13 +1566,13 @@ pub mod tests {
15, 15,
slot, slot,
), ),
Err(reed_solomon_erasure::Error::TooFewShardsPresent) Err(reed_solomon_erasure::Error::InvalidIndex)
); );
// Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!( assert_matches!(
Shredder::try_recovery(shred_info, num_data_shreds, num_coding_shreds, 35, slot), Shredder::try_recovery(shred_info, num_data_shreds, num_coding_shreds, 35, slot),
Err(reed_solomon_erasure::Error::TooFewShardsPresent) Err(reed_solomon_erasure::Error::InvalidIndex)
); );
} }