From 3a9c03cc897463e214778bfb477d419fcbc96fed Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 23 Sep 2019 16:24:21 -0700 Subject: [PATCH] Don't recover coding shreds (#6034) * Don't recover coding shreds * cleanup --- core/src/blocktree.rs | 2 +- core/src/erasure.rs | 2 +- core/src/shred.rs | 195 +++++++++++++----------------------------- 3 files changed, 63 insertions(+), 136 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index d39e024f9..b8d5dc691 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -397,7 +397,7 @@ impl Blocktree { slot, ) { submit_metrics(true, "complete".into()); - recovered_data_shreds.append(&mut result.recovered_data); + recovered_data_shreds.append(&mut result); } else { submit_metrics(true, "incomplete".into()); } diff --git a/core/src/erasure.rs b/core/src/erasure.rs index 53f5f7089..2e4c5ed0c 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -115,7 +115,7 @@ impl Session { /// * `coding` - array of coding blocks /// * `erasures` - list of indices in data where blocks should be recovered pub fn decode_blocks(&self, blocks: &mut [(&mut [u8], bool)]) -> Result<()> { - self.0.reconstruct(blocks)?; + self.0.reconstruct_data(blocks)?; Ok(()) } diff --git a/core/src/shred.rs b/core/src/shred.rs index 3f5cb9ae5..3e31af092 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -133,6 +133,13 @@ impl Shred { Shred { headers, payload } } + pub fn new_empty_data_shred() -> Self { + let mut payload = vec![0; PACKET_DATA_SIZE]; + payload[0] = DATA_SHRED; + let headers = DataShredHeader::default(); + Shred { headers, payload } + } + fn header(&self) -> &ShredCommonHeader { if self.is_data() { &self.headers.data_header @@ -281,12 +288,6 @@ impl Write for Shredder { } } -#[derive(Default, Debug, PartialEq)] -pub struct RecoveryResult { - pub recovered_data: Vec, - pub recovered_code: Vec, -} - impl Shredder { pub fn new( slot: u64, @@ -492,54 +493,33 @@ impl Shredder { } fn fill_in_missing_shreds( - shred: &Shred, num_data: usize, num_coding: usize, - slot: u64, - first_index: usize, + first_index_in_fec_set: usize, expected_index: usize, + index_found: usize, present: &mut [bool], - ) -> (Vec>, usize) { - let index = Self::get_shred_index(shred, num_data); - + ) -> Vec> { + let end_index = index_found.saturating_sub(1); // The index of current shred must be within the range of shreds that are being // recovered - if !(first_index..first_index + num_data + num_coding).contains(&index) { - return (vec![], index); + if !(first_index_in_fec_set..first_index_in_fec_set + num_data + num_coding) + .contains(&end_index) + { + return vec![]; } - let missing_blocks: Vec> = (expected_index..index) + let missing_blocks: Vec> = (expected_index..index_found) .map(|missing| { - present[missing.saturating_sub(first_index)] = false; - Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) + present[missing.saturating_sub(first_index_in_fec_set)] = false; + if missing < first_index_in_fec_set + num_data { + Shred::new_empty_data_shred().payload + } else { + vec![0; PACKET_DATA_SIZE] + } }) .collect(); - (missing_blocks, index) - } - - fn new_empty_missing_shred( - num_data: usize, - num_coding: usize, - slot: u64, - first_index: usize, - missing: usize, - ) -> Vec { - let header = if missing < first_index + num_data { - let mut header = DataShredHeader::default(); - header.data_header.slot = slot; - header.data_header.index = missing as u32; - header - } else { - Self::new_coding_shred_header( - slot, - missing.saturating_sub(num_data) as u32, - num_data, - num_coding, - missing - first_index - num_data, - ) - }; - let shred = Shred::new_empty_from_header(header); - shred.payload + missing_blocks } pub fn try_recovery( @@ -548,9 +528,8 @@ impl Shredder { num_coding: usize, first_index: usize, slot: u64, - ) -> Result { + ) -> Result, reed_solomon_erasure::Error> { let mut recovered_data = vec![]; - let mut recovered_code = vec![]; let fec_set_size = num_data + num_coding; if num_coding > 0 && shreds.len() < fec_set_size { let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; @@ -561,30 +540,32 @@ impl Shredder { let mut shred_bufs: Vec> = shreds .into_iter() .flat_map(|shred| { - let (mut blocks, last_index) = Self::fill_in_missing_shreds( - &shred, + let index = Self::get_shred_index(&shred, num_data); + let mut blocks = Self::fill_in_missing_shreds( num_data, num_coding, - slot, first_index, next_expected_index, + index, &mut present, ); blocks.push(shred.payload); - next_expected_index = last_index + 1; + next_expected_index = index + 1; blocks }) .collect(); // Insert any other missing shreds after the last shred we have received in the // current FEC block - let mut pending_shreds: Vec> = (next_expected_index - ..first_index + fec_set_size) - .map(|missing| { - present[missing.saturating_sub(first_index)] = false; - Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) - }) - .collect(); + let mut pending_shreds = Self::fill_in_missing_shreds( + num_data, + num_coding, + first_index, + next_expected_index, + first_index + fec_set_size, + &mut present, + ); + shred_bufs.append(&mut pending_shreds); if shred_bufs.len() != fec_set_size { @@ -605,7 +586,7 @@ impl Shredder { .iter() .enumerate() .for_each(|(position, was_present)| { - if !was_present { + if !*was_present && position < num_data { let drain_this = position - num_drained; let shred_buf = shred_bufs.remove(drain_this); num_drained += 1; @@ -613,18 +594,9 @@ impl Shredder { let shred_index = shred.index() as usize; // Valid shred must be in the same slot as the original shreds if shred.slot() == slot { - // Data shreds are "positioned" at the start of the iterator. First num_data - // shreds are expected to be the data shreds. - if position < num_data - && (first_index..first_index + num_data).contains(&shred_index) - { - // Also, a valid data shred must be indexed between first_index and first+num_data index + // A valid data shred must be indexed between first_index and first+num_data index + if (first_index..first_index + num_data).contains(&shred_index) { recovered_data.push(shred) - } else if (first_index..first_index + num_coding) - .contains(&shred_index) - { - // A valid coding shred must be indexed between first_index and first+num_coding index - recovered_code.push(shred) } } } @@ -632,10 +604,7 @@ impl Shredder { }); } - Ok(RecoveryResult { - recovered_data, - recovered_code, - }) + Ok(recovered_data) } /// Combines all shreds to recreate the original buffer @@ -957,7 +926,7 @@ mod tests { ); // Test1: Try recovery/reassembly with only data shreds. Hint: should work - let result = Shredder::try_recovery( + let recovered_data = Shredder::try_recovery( shred_infos[..4].to_vec(), expected_shred_count / 2, expected_shred_count / 2, @@ -965,9 +934,7 @@ mod tests { slot, ) .unwrap(); - assert_ne!(RecoveryResult::default(), result); - assert!(result.recovered_data.is_empty()); - assert!(!result.recovered_code.is_empty()); + assert!(recovered_data.is_empty()); let result = Shredder::deshred(&shred_infos[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -980,7 +947,7 @@ mod tests { .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) .collect(); - let mut result = Shredder::try_recovery( + let mut recovered_data = Shredder::try_recovery( shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, @@ -988,26 +955,16 @@ mod tests { slot, ) .unwrap(); - assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing - let recovered_shred = result.recovered_data.remove(0); + assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 1, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(1, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 3, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(3, recovered_shred); - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 1, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 1))); - - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 3, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 3))); - let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1020,7 +977,7 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let mut result = Shredder::try_recovery( + let mut recovered_data = Shredder::try_recovery( shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, @@ -1028,26 +985,16 @@ mod tests { slot, ) .unwrap(); - assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = result.recovered_data.remove(0); + assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(0, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(2, recovered_shred); - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 0, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 2, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); - let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1083,7 +1030,7 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let mut result = Shredder::try_recovery( + let mut recovered_data = Shredder::try_recovery( shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, @@ -1091,26 +1038,16 @@ mod tests { slot, ) .unwrap(); - assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = result.recovered_data.remove(0); + assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 0, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(0, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); + let recovered_shred = recovered_data.remove(0); verify_test_data_shred(&recovered_shred, 2, slot, slot - 5, &keypair.pubkey(), true); shred_info.insert(2, recovered_shred); - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 0, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 2, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); - let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1166,7 +1103,7 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let mut result = Shredder::try_recovery( + let mut recovered_data = Shredder::try_recovery( shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, @@ -1174,10 +1111,9 @@ mod tests { slot, ) .unwrap(); - assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing - let recovered_shred = result.recovered_data.remove(0); + assert_eq!(recovered_data.len(), 2); // Data shreds 0, 2 were missing + let recovered_shred = recovered_data.remove(0); verify_test_data_shred( &recovered_shred, 25, @@ -1188,7 +1124,7 @@ mod tests { ); shred_info.insert(0, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); + let recovered_shred = recovered_data.remove(0); verify_test_data_shred( &recovered_shred, 27, @@ -1199,21 +1135,12 @@ mod tests { ); shred_info.insert(2, recovered_shred); - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 25, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 0))); - - let recovered_shred = result.recovered_code.remove(0); - verify_test_code_shred(&recovered_shred, 27, slot, &keypair.pubkey(), false); - assert_eq!(recovered_shred.coding_params(), Some((4, 4, 2))); - let result = Shredder::deshred(&shred_info[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds - let result = Shredder::try_recovery( + let recovered_data = Shredder::try_recovery( shred_info.clone(), expected_shred_count / 2, expected_shred_count / 2, @@ -1221,7 +1148,7 @@ mod tests { slot + 1, ) .unwrap(); - assert!(result.recovered_data.is_empty()); + assert!(recovered_data.is_empty()); // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!(