Don't recover coding shreds (#6034)

* Don't recover coding shreds

* cleanup
This commit is contained in:
Pankaj Garg 2019-09-23 16:24:21 -07:00 committed by GitHub
parent f055d2f0cc
commit 3a9c03cc89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 136 deletions

View File

@ -397,7 +397,7 @@ impl Blocktree {
slot, slot,
) { ) {
submit_metrics(true, "complete".into()); submit_metrics(true, "complete".into());
recovered_data_shreds.append(&mut result.recovered_data); recovered_data_shreds.append(&mut result);
} else { } else {
submit_metrics(true, "incomplete".into()); submit_metrics(true, "incomplete".into());
} }

View File

@ -115,7 +115,7 @@ impl Session {
/// * `coding` - array of coding blocks /// * `coding` - array of coding blocks
/// * `erasures` - list of indices in data where blocks should be recovered /// * `erasures` - list of indices in data where blocks should be recovered
pub fn decode_blocks(&self, blocks: &mut [(&mut [u8], bool)]) -> Result<()> { pub fn decode_blocks(&self, blocks: &mut [(&mut [u8], bool)]) -> Result<()> {
self.0.reconstruct(blocks)?; self.0.reconstruct_data(blocks)?;
Ok(()) Ok(())
} }

View File

@ -133,6 +133,13 @@ impl Shred {
Shred { headers, payload } Shred { headers, payload }
} }
pub fn new_empty_data_shred() -> Self {
let mut payload = vec![0; PACKET_DATA_SIZE];
payload[0] = DATA_SHRED;
let headers = DataShredHeader::default();
Shred { headers, payload }
}
fn header(&self) -> &ShredCommonHeader { fn header(&self) -> &ShredCommonHeader {
if self.is_data() { if self.is_data() {
&self.headers.data_header &self.headers.data_header
@ -281,12 +288,6 @@ impl Write for Shredder {
} }
} }
#[derive(Default, Debug, PartialEq)]
pub struct RecoveryResult {
pub recovered_data: Vec<Shred>,
pub recovered_code: Vec<Shred>,
}
impl Shredder { impl Shredder {
pub fn new( pub fn new(
slot: u64, slot: u64,
@ -492,54 +493,33 @@ impl Shredder {
} }
fn fill_in_missing_shreds( fn fill_in_missing_shreds(
shred: &Shred,
num_data: usize, num_data: usize,
num_coding: usize, num_coding: usize,
slot: u64, first_index_in_fec_set: usize,
first_index: usize,
expected_index: usize, expected_index: usize,
index_found: usize,
present: &mut [bool], present: &mut [bool],
) -> (Vec<Vec<u8>>, usize) { ) -> Vec<Vec<u8>> {
let index = Self::get_shred_index(shred, num_data); let end_index = index_found.saturating_sub(1);
// The index of current shred must be within the range of shreds that are being // The index of current shred must be within the range of shreds that are being
// recovered // recovered
if !(first_index..first_index + num_data + num_coding).contains(&index) { if !(first_index_in_fec_set..first_index_in_fec_set + num_data + num_coding)
return (vec![], index); .contains(&end_index)
{
return vec![];
} }
let missing_blocks: Vec<Vec<u8>> = (expected_index..index) let missing_blocks: Vec<Vec<u8>> = (expected_index..index_found)
.map(|missing| { .map(|missing| {
present[missing.saturating_sub(first_index)] = false; present[missing.saturating_sub(first_index_in_fec_set)] = false;
Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) if missing < first_index_in_fec_set + num_data {
Shred::new_empty_data_shred().payload
} else {
vec![0; PACKET_DATA_SIZE]
}
}) })
.collect(); .collect();
(missing_blocks, index) missing_blocks
}
fn new_empty_missing_shred(
num_data: usize,
num_coding: usize,
slot: u64,
first_index: usize,
missing: usize,
) -> Vec<u8> {
let header = if missing < first_index + num_data {
let mut header = DataShredHeader::default();
header.data_header.slot = slot;
header.data_header.index = missing as u32;
header
} else {
Self::new_coding_shred_header(
slot,
missing.saturating_sub(num_data) as u32,
num_data,
num_coding,
missing - first_index - num_data,
)
};
let shred = Shred::new_empty_from_header(header);
shred.payload
} }
pub fn try_recovery( pub fn try_recovery(
@ -548,9 +528,8 @@ impl Shredder {
num_coding: usize, num_coding: usize,
first_index: usize, first_index: usize,
slot: u64, slot: u64,
) -> Result<RecoveryResult, reed_solomon_erasure::Error> { ) -> Result<Vec<Shred>, reed_solomon_erasure::Error> {
let mut recovered_data = vec![]; let mut recovered_data = vec![];
let mut recovered_code = vec![];
let fec_set_size = num_data + num_coding; let fec_set_size = num_data + num_coding;
if num_coding > 0 && shreds.len() < fec_set_size { if num_coding > 0 && shreds.len() < fec_set_size {
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER;
@ -561,30 +540,32 @@ impl Shredder {
let mut shred_bufs: Vec<Vec<u8>> = shreds let mut shred_bufs: Vec<Vec<u8>> = shreds
.into_iter() .into_iter()
.flat_map(|shred| { .flat_map(|shred| {
let (mut blocks, last_index) = Self::fill_in_missing_shreds( let index = Self::get_shred_index(&shred, num_data);
&shred, let mut blocks = Self::fill_in_missing_shreds(
num_data, num_data,
num_coding, num_coding,
slot,
first_index, first_index,
next_expected_index, next_expected_index,
index,
&mut present, &mut present,
); );
blocks.push(shred.payload); blocks.push(shred.payload);
next_expected_index = last_index + 1; next_expected_index = index + 1;
blocks blocks
}) })
.collect(); .collect();
// Insert any other missing shreds after the last shred we have received in the // Insert any other missing shreds after the last shred we have received in the
// current FEC block // current FEC block
let mut pending_shreds: Vec<Vec<u8>> = (next_expected_index let mut pending_shreds = Self::fill_in_missing_shreds(
..first_index + fec_set_size) num_data,
.map(|missing| { num_coding,
present[missing.saturating_sub(first_index)] = false; first_index,
Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) next_expected_index,
}) first_index + fec_set_size,
.collect(); &mut present,
);
shred_bufs.append(&mut pending_shreds); shred_bufs.append(&mut pending_shreds);
if shred_bufs.len() != fec_set_size { if shred_bufs.len() != fec_set_size {
@ -605,7 +586,7 @@ impl Shredder {
.iter() .iter()
.enumerate() .enumerate()
.for_each(|(position, was_present)| { .for_each(|(position, was_present)| {
if !was_present { if !*was_present && position < num_data {
let drain_this = position - num_drained; let drain_this = position - num_drained;
let shred_buf = shred_bufs.remove(drain_this); let shred_buf = shred_bufs.remove(drain_this);
num_drained += 1; num_drained += 1;
@ -613,18 +594,9 @@ impl Shredder {
let shred_index = shred.index() as usize; let shred_index = shred.index() as usize;
// Valid shred must be in the same slot as the original shreds // Valid shred must be in the same slot as the original shreds
if shred.slot() == slot { if shred.slot() == slot {
// Data shreds are "positioned" at the start of the iterator. First num_data // A valid data shred must be indexed between first_index and first+num_data index
// shreds are expected to be the data shreds. if (first_index..first_index + num_data).contains(&shred_index) {
if position < num_data
&& (first_index..first_index + num_data).contains(&shred_index)
{
// Also, a valid data shred must be indexed between first_index and first+num_data index
recovered_data.push(shred) recovered_data.push(shred)
} else if (first_index..first_index + num_coding)
.contains(&shred_index)
{
// A valid coding shred must be indexed between first_index and first+num_coding index
recovered_code.push(shred)
} }
} }
} }
@ -632,10 +604,7 @@ impl Shredder {
}); });
} }
Ok(RecoveryResult { Ok(recovered_data)
recovered_data,
recovered_code,
})
} }
/// Combines all shreds to recreate the original buffer /// Combines all shreds to recreate the original buffer
@ -957,7 +926,7 @@ mod tests {
); );
// Test1: Try recovery/reassembly with only data shreds. Hint: should work // Test1: Try recovery/reassembly with only data shreds. Hint: should work
let result = Shredder::try_recovery( let recovered_data = Shredder::try_recovery(
shred_infos[..4].to_vec(), shred_infos[..4].to_vec(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -965,9 +934,7 @@ mod tests {
slot, slot,
) )
.unwrap(); .unwrap();
assert_ne!(RecoveryResult::default(), result); assert!(recovered_data.is_empty());
assert!(result.recovered_data.is_empty());
assert!(!result.recovered_code.is_empty());
let result = Shredder::deshred(&shred_infos[..4]).unwrap(); let result = Shredder::deshred(&shred_infos[..4]).unwrap();
assert!(result.len() >= data.len()); assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
@ -980,7 +947,7 @@ mod tests {
.filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None })
.collect(); .collect();
let mut result = Shredder::try_recovery( let mut recovered_data = Shredder::try_recovery(
shred_info.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -988,26 +955,16 @@ mod tests {
slot, slot,
) )
.unwrap(); .unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 1, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 1, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(1, recovered_shred); shred_info.insert(1, recovered_shred);
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 3, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 3, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(3, recovered_shred); shred_info.insert(3, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 1, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 1)));
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 3, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 3)));
let result = Shredder::deshred(&shred_info[..4]).unwrap(); let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len()); assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
@ -1020,7 +977,7 @@ mod tests {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect(); .collect();
let mut result = Shredder::try_recovery( let mut recovered_data = Shredder::try_recovery(
shred_info.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -1028,26 +985,16 @@ mod tests {
slot, slot,
) )
.unwrap(); .unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(0, recovered_shred); shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(2, recovered_shred); shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 0, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 2, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap(); let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len()); assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
@ -1083,7 +1030,7 @@ mod tests {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect(); .collect();
let mut result = Shredder::try_recovery( let mut recovered_data = Shredder::try_recovery(
shred_info.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -1091,26 +1038,16 @@ mod tests {
slot, slot,
) )
.unwrap(); .unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(0, recovered_shred); shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true);
shred_info.insert(2, recovered_shred); shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 0, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 2, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap(); let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len()); assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
@ -1166,7 +1103,7 @@ mod tests {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect(); .collect();
let mut result = Shredder::try_recovery( let mut recovered_data = Shredder::try_recovery(
shred_info.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -1174,10 +1111,9 @@ mod tests {
slot, slot,
) )
.unwrap(); .unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred( verify_test_data_shred(
&recovered_shred, &recovered_shred,
25, 25,
@ -1188,7 +1124,7 @@ mod tests {
); );
shred_info.insert(0, recovered_shred); shred_info.insert(0, recovered_shred);
let recovered_shred = result.recovered_data.remove(0); let recovered_shred = recovered_data.remove(0);
verify_test_data_shred( verify_test_data_shred(
&recovered_shred, &recovered_shred,
27, 27,
@ -1199,21 +1135,12 @@ mod tests {
); );
shred_info.insert(2, recovered_shred); shred_info.insert(2, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 25, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0)));
let recovered_shred = result.recovered_code.remove(0);
verify_test_code_shred(&recovered_shred, 27, slot, &keypair.pubkey(), false);
assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2)));
let result = Shredder::deshred(&shred_info[..4]).unwrap(); let result = Shredder::deshred(&shred_info[..4]).unwrap();
assert!(result.len() >= data.len()); assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]); assert_eq!(data[..], result[..data.len()]);
// Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds
let result = Shredder::try_recovery( let recovered_data = Shredder::try_recovery(
shred_info.clone(), shred_info.clone(),
expected_shred_count / 2, expected_shred_count / 2,
expected_shred_count / 2, expected_shred_count / 2,
@ -1221,7 +1148,7 @@ mod tests {
slot + 1, slot + 1,
) )
.unwrap(); .unwrap();
assert!(result.recovered_data.is_empty()); assert!(recovered_data.is_empty());
// Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!( assert_matches!(