Reduce number of shreds per FEC block (#5908)

This commit is contained in:
Pankaj Garg 2019-09-15 10:37:12 -07:00 committed by GitHub
parent c490a50c91
commit c1880e3f3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 99 additions and 90 deletions

View File

@ -47,7 +47,7 @@ pub enum Shred {
/// This limit comes from reed solomon library, but unfortunately they don't have
/// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 128;
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 4;
const LAST_SHRED_IN_SLOT: u8 = 1;
const DATA_COMPLETE_SHRED: u8 = 2;
@ -807,7 +807,7 @@ mod tests {
assert_eq!(shredder.active_offset, 0);
// Test3: Assert that the first shred in slot was created (since we gave a parent to shredder)
let (_, shred) = shredder.shred_tuples.pop().unwrap();
let (_, shred) = &shredder.shred_tuples[0];
assert_eq!(shred.len(), PACKET_DATA_SIZE);
info!("Len: {}", shred.len());
info!("{:?}", shred);
@ -828,9 +828,6 @@ mod tests {
// Test5: Write left over data, and assert that a data shred is being created
shredder.write(&data[offset..]).unwrap();
// It shouldn't generate a signed shred
assert!(shredder.shred_tuples.is_empty());
// Test6: Let's finalize the FEC block. That should result in the current shred to morph into
// a signed LastInFECBlock shred
shredder.finalize_data();
@ -839,7 +836,7 @@ mod tests {
assert!(!shredder.shred_tuples.is_empty());
// Must be Last in FEC Set
let (_, shred) = shredder.shred_tuples.pop().unwrap();
let (_, shred) = &shredder.shred_tuples[1];
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -861,7 +858,7 @@ mod tests {
// We should have a new signed shred
assert!(!shredder.shred_tuples.is_empty());
let (_, shred) = shredder.shred_tuples.pop().unwrap();
let (_, shred) = &shredder.shred_tuples[2];
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -878,7 +875,7 @@ mod tests {
assert!(!shredder.shred_tuples.is_empty());
// Must be a Data shred
let (_, shred) = shredder.shred_tuples.pop().unwrap();
let (_, shred) = &shredder.shred_tuples[3];
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -898,7 +895,7 @@ mod tests {
assert!(!shredder.shred_tuples.is_empty());
// Must be LastInSlot
let (_, shred) = shredder.shred_tuples.pop().unwrap();
let (_, shred) = &shredder.shred_tuples[4];
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
@ -1065,7 +1062,7 @@ mod tests {
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..5000).collect();
let data: Vec<_> = (0..4000).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let mut offset = shredder.write(&data).unwrap();
let approx_shred_payload_size = offset;
@ -1106,7 +1103,7 @@ mod tests {
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
assert_matches!(
Shredder::try_recovery(
shred_meta_bufs[..4].to_vec(),
shred_meta_bufs[..3].to_vec(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1117,7 +1114,7 @@ mod tests {
// Test1: Try recovery/reassembly with only data shreds. Hint: should work
let result = Shredder::try_recovery(
shred_meta_bufs[..5].to_vec(),
shred_meta_bufs[..4].to_vec(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1127,7 +1124,7 @@ mod tests {
assert_ne!(RecoveryResult::default(), result);
assert!(result.recovered_data.is_empty());
assert!(!result.recovered_code.is_empty());
let result = Shredder::deshred(&shreds[..5]).unwrap();
let result = Shredder::deshred(&shreds[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1173,40 +1170,32 @@ mod tests {
shreds.insert(1, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert_matches!(recovered_shred, Shred::DataComplete(_));
assert_eq!(recovered_shred.index(), 3);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(3, recovered_shred);
assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 5, 7 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 0);
assert_eq!(code.header.common_header.index, 1);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 2);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 4);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 4);
assert_eq!(code.header.common_header.index, 3);
}
let result = Shredder::deshred(&shreds[..5]).unwrap();
let result = Shredder::deshred(&shreds[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1242,7 +1231,7 @@ mod tests {
.unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::FirstInSlot(_));
assert_eq!(recovered_shred.index(), 0);
@ -1259,33 +1248,25 @@ mod tests {
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::DataComplete(_));
assert_eq!(recovered_shred.index(), 4);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(4, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 1);
assert_eq!(code.header.common_header.index, 0);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 3);
assert_eq!(code.header.common_header.index, 2);
}
let result = Shredder::deshred(&shreds[..5]).unwrap();
let result = Shredder::deshred(&shreds[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1344,7 +1325,7 @@ mod tests {
.unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::FirstInSlot(_));
assert_eq!(recovered_shred.index(), 0);
@ -1361,33 +1342,25 @@ mod tests {
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::LastInSlot(_));
assert_eq!(recovered_shred.index(), 4);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(4, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 1);
assert_eq!(code.header.common_header.index, 0);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 3);
assert_eq!(code.header.common_header.index, 2);
}
let result = Shredder::deshred(&shreds[..5]).unwrap();
let result = Shredder::deshred(&shreds[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1397,7 +1370,7 @@ mod tests {
.iter()
.enumerate()
.filter_map(|(i, (s, _))| {
if (i < 5 && i % 2 != 0) || (i >= 5 && i % 2 == 0) {
if (i < 4 && i % 2 != 0) || (i >= 4 && i % 2 == 0) {
Some(s.clone())
} else {
None
@ -1466,7 +1439,7 @@ mod tests {
.unwrap();
assert_ne!(RecoveryResult::default(), result);
assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing
assert_eq!(result.recovered_data.len(), 2); // Data shreds 0, 2 were missing
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::Data(_));
assert_eq!(recovered_shred.index(), 25);
@ -1483,33 +1456,25 @@ mod tests {
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(2, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::LastInSlot(_));
assert_eq!(recovered_shred.index(), 29);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);
assert!(recovered_shred.verify(&keypair.pubkey()));
shreds.insert(4, recovered_shred);
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing
assert_eq!(result.recovered_code.len(), 2); // Coding shreds 4, 6 were missing
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 1);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 0);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 26);
assert_eq!(code.header.common_header.index, 25);
}
let recovered_shred = result.recovered_code.remove(0);
if let Shred::Coding(code) = recovered_shred {
assert_eq!(code.header.num_data_shreds, 5);
assert_eq!(code.header.num_coding_shreds, 5);
assert_eq!(code.header.position, 3);
assert_eq!(code.header.num_data_shreds, 4);
assert_eq!(code.header.num_coding_shreds, 4);
assert_eq!(code.header.position, 2);
assert_eq!(code.header.common_header.slot, slot);
assert_eq!(code.header.common_header.index, 28);
assert_eq!(code.header.common_header.index, 27);
}
let result = Shredder::deshred(&shreds[..5]).unwrap();
let result = Shredder::deshred(&shreds[..4]).unwrap();
assert!(result.len() >= data.len());
assert_eq!(data[..], result[..data.len()]);
@ -1548,4 +1513,48 @@ mod tests {
Err(reed_solomon_erasure::Error::TooFewShardsPresent)
);
}
#[test]
fn test_multi_fec_block_coding() {
let keypair = Arc::new(Keypair::new());
let slot = 0x123456789abcdef0;
let mut shredder =
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..MAX_DATA_SHREDS_PER_FEC_BLOCK * 1200 * 3).collect();
let data: Vec<u8> = data.iter().map(|x| *x as u8).collect();
let mut offset = shredder.write(&data).unwrap();
let approx_shred_payload_size = offset;
while offset < data.len() {
offset += shredder.write(&data[offset..]).unwrap();
}
// We should have some shreds now
assert!(shredder.shred_tuples.len() > data.len() / approx_shred_payload_size);
assert_eq!(offset, data.len());
shredder.finalize_data();
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let mut index = 0;
while index < shredder.shred_tuples.len() {
let num_data_shreds = cmp::min(
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize,
(shredder.shred_tuples.len() - index) / 2,
);
let coding_start = index + num_data_shreds;
shredder.shred_tuples[index..coding_start]
.iter()
.for_each(|(s, _)| assert!(s.is_data()));
index = coding_start + num_data_shreds;
shredder.shred_tuples[coding_start..index]
.iter()
.for_each(|(s, _)| assert!(!s.is_data()));
}
}
}