From c1880e3f3e5a0ba2b416139bf511d37ef65d9a37 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Sun, 15 Sep 2019 10:37:12 -0700 Subject: [PATCH] Reduce number of shreds per FEC block (#5908) --- core/src/shred.rs | 189 ++++++++++++++++++++++++---------------------- 1 file changed, 99 insertions(+), 90 deletions(-) diff --git a/core/src/shred.rs b/core/src/shred.rs index 9e0a8909a..878b33031 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -47,7 +47,7 @@ pub enum Shred { /// This limit comes from reed solomon library, but unfortunately they don't have /// a public constant defined for it. -const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 128; +const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 4; const LAST_SHRED_IN_SLOT: u8 = 1; const DATA_COMPLETE_SHRED: u8 = 2; @@ -807,7 +807,7 @@ mod tests { assert_eq!(shredder.active_offset, 0); // Test3: Assert that the first shred in slot was created (since we gave a parent to shredder) - let (_, shred) = shredder.shred_tuples.pop().unwrap(); + let (_, shred) = &shredder.shred_tuples[0]; assert_eq!(shred.len(), PACKET_DATA_SIZE); info!("Len: {}", shred.len()); info!("{:?}", shred); @@ -828,9 +828,6 @@ mod tests { // Test5: Write left over data, and assert that a data shred is being created shredder.write(&data[offset..]).unwrap(); - // It shouldn't generate a signed shred - assert!(shredder.shred_tuples.is_empty()); - // Test6: Let's finalize the FEC block. That should result in the current shred to morph into // a signed LastInFECBlock shred shredder.finalize_data(); @@ -839,7 +836,7 @@ mod tests { assert!(!shredder.shred_tuples.is_empty()); // Must be Last in FEC Set - let (_, shred) = shredder.shred_tuples.pop().unwrap(); + let (_, shred) = &shredder.shred_tuples[1]; assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -861,7 +858,7 @@ mod tests { // We should have a new signed shred assert!(!shredder.shred_tuples.is_empty()); - let (_, shred) = shredder.shred_tuples.pop().unwrap(); + let (_, shred) = &shredder.shred_tuples[2]; assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -878,7 +875,7 @@ mod tests { assert!(!shredder.shred_tuples.is_empty()); // Must be a Data shred - let (_, shred) = shredder.shred_tuples.pop().unwrap(); + let (_, shred) = &shredder.shred_tuples[3]; assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -898,7 +895,7 @@ mod tests { assert!(!shredder.shred_tuples.is_empty()); // Must be LastInSlot - let (_, shred) = shredder.shred_tuples.pop().unwrap(); + let (_, shred) = &shredder.shred_tuples[4]; assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); @@ -1065,7 +1062,7 @@ mod tests { assert!(shredder.shred_tuples.is_empty()); assert_eq!(shredder.active_offset, 0); - let data: Vec<_> = (0..5000).collect(); + let data: Vec<_> = (0..4000).collect(); let data: Vec = data.iter().map(|x| *x as u8).collect(); let mut offset = shredder.write(&data).unwrap(); let approx_shred_payload_size = offset; @@ -1106,7 +1103,7 @@ mod tests { // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( Shredder::try_recovery( - shred_meta_bufs[..4].to_vec(), + shred_meta_bufs[..3].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1117,7 +1114,7 @@ mod tests { // Test1: Try recovery/reassembly with only data shreds. Hint: should work let result = Shredder::try_recovery( - shred_meta_bufs[..5].to_vec(), + shred_meta_bufs[..4].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1127,7 +1124,7 @@ mod tests { assert_ne!(RecoveryResult::default(), result); assert!(result.recovered_data.is_empty()); assert!(!result.recovered_code.is_empty()); - let result = Shredder::deshred(&shreds[..5]).unwrap(); + let result = Shredder::deshred(&shreds[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1173,40 +1170,32 @@ mod tests { shreds.insert(1, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::Data(_)); + assert_matches!(recovered_shred, Shred::DataComplete(_)); assert_eq!(recovered_shred.index(), 3); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5); assert!(recovered_shred.verify(&keypair.pubkey())); shreds.insert(3, recovered_shred); - assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 0); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 1); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 0); + assert_eq!(code.header.common_header.index, 1); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 2); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 3); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 2); - } - let recovered_shred = result.recovered_code.remove(0); - if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 4); - assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 4); + assert_eq!(code.header.common_header.index, 3); } - let result = Shredder::deshred(&shreds[..5]).unwrap(); + let result = Shredder::deshred(&shreds[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1242,7 +1231,7 @@ mod tests { .unwrap(); assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::FirstInSlot(_)); assert_eq!(recovered_shred.index(), 0); @@ -1259,33 +1248,25 @@ mod tests { assert!(recovered_shred.verify(&keypair.pubkey())); shreds.insert(2, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::DataComplete(_)); - assert_eq!(recovered_shred.index(), 4); - assert_eq!(recovered_shred.slot(), slot); - assert_eq!(recovered_shred.parent(), slot - 5); - assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(4, recovered_shred); - - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 1); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 0); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 1); + assert_eq!(code.header.common_header.index, 0); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 3); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 2); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 3); + assert_eq!(code.header.common_header.index, 2); } - let result = Shredder::deshred(&shreds[..5]).unwrap(); + let result = Shredder::deshred(&shreds[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1344,7 +1325,7 @@ mod tests { .unwrap(); assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::FirstInSlot(_)); assert_eq!(recovered_shred.index(), 0); @@ -1361,33 +1342,25 @@ mod tests { assert!(recovered_shred.verify(&keypair.pubkey())); shreds.insert(2, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::LastInSlot(_)); - assert_eq!(recovered_shred.index(), 4); - assert_eq!(recovered_shred.slot(), slot); - assert_eq!(recovered_shred.parent(), slot - 5); - assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(4, recovered_shred); - - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 1); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 0); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 1); + assert_eq!(code.header.common_header.index, 0); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 3); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 2); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 3); + assert_eq!(code.header.common_header.index, 2); } - let result = Shredder::deshred(&shreds[..5]).unwrap(); + let result = Shredder::deshred(&shreds[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1397,7 +1370,7 @@ mod tests { .iter() .enumerate() .filter_map(|(i, (s, _))| { - if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) { + if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) { Some(s.clone()) } else { None @@ -1466,7 +1439,7 @@ mod tests { .unwrap(); assert_ne!(RecoveryResult::default(), result); - assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing + assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing let recovered_shred = result.recovered_data.remove(0); assert_matches!(recovered_shred, Shred::Data(_)); assert_eq!(recovered_shred.index(), 25); @@ -1483,33 +1456,25 @@ mod tests { assert!(recovered_shred.verify(&keypair.pubkey())); shreds.insert(2, recovered_shred); - let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::LastInSlot(_)); - assert_eq!(recovered_shred.index(), 29); - assert_eq!(recovered_shred.slot(), slot); - assert_eq!(recovered_shred.parent(), slot - 5); - assert!(recovered_shred.verify(&keypair.pubkey())); - shreds.insert(4, recovered_shred); - - assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing + assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 1); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 0); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 26); + assert_eq!(code.header.common_header.index, 25); } let recovered_shred = result.recovered_code.remove(0); if let Shred::Coding(code) = recovered_shred { - assert_eq!(code.header.num_data_shreds, 5); - assert_eq!(code.header.num_coding_shreds, 5); - assert_eq!(code.header.position, 3); + assert_eq!(code.header.num_data_shreds, 4); + assert_eq!(code.header.num_coding_shreds, 4); + assert_eq!(code.header.position, 2); assert_eq!(code.header.common_header.slot, slot); - assert_eq!(code.header.common_header.index, 28); + assert_eq!(code.header.common_header.index, 27); } - let result = Shredder::deshred(&shreds[..5]).unwrap(); + let result = Shredder::deshred(&shreds[..4]).unwrap(); assert!(result.len() >= data.len()); assert_eq!(data[..], result[..data.len()]); @@ -1548,4 +1513,48 @@ mod tests { Err(reed_solomon_erasure::Error::TooFewShardsPresent) ); } + + #[test] + fn test_multi_fec_block_coding() { + let keypair = Arc::new(Keypair::new()); + let slot = 0x123456789abcdef0; + let mut shredder = + Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); + + assert!(shredder.shred_tuples.is_empty()); + assert_eq!(shredder.active_offset, 0); + + let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect(); + let data: Vec = data.iter().map(|x| *x as u8).collect(); + let mut offset = shredder.write(&data).unwrap(); + let approx_shred_payload_size = offset; + while offset < data.len() { + offset += shredder.write(&data[offset..]).unwrap(); + } + + // We should have some shreds now + assert!(shredder.shred_tuples.len() > data.len() / approx_shred_payload_size); + assert_eq!(offset, data.len()); + + shredder.finalize_data(); + let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; + assert_eq!(shredder.shred_tuples.len(), expected_shred_count); + + let mut index = 0; + + while index < shredder.shred_tuples.len() { + let num_data_shreds = cmp::min( + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, + (shredder.shred_tuples.len() - index) / 2, + ); + let coding_start = index + num_data_shreds; + shredder.shred_tuples[index..coding_start] + .iter() + .for_each(|(s, _)| assert!(s.is_data())); + index = coding_start + num_data_shreds; + shredder.shred_tuples[coding_start..index] + .iter() + .for_each(|(s, _)| assert!(!s.is_data())); + } + } }