pads last erasure batch with empty data shreds (#639)

For duplicate blocks prevention we want to verify that the last erasure
batch was sufficiently propagated through turbine. This requires
additional bookkeeping because, depending on the erasure coding schema,
the entire batch might be recovered from only a few coding shreds.

In order to simplify above, this commit instead ensures that the last
erasure batch has >= 32 data shreds so that the batch cannot be
recovered unless 32+ shreds are received from turbine or repair.
This commit is contained in:
behzad nouri 2024-04-11 14:50:43 +00:00 committed by GitHub
parent 4fc6b0bd96
commit 293414f482
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 89 additions and 85 deletions

View File

@ -270,7 +270,7 @@ pub mod test {
&mut processed_slots,
1,
);
assert_eq!(repairs, [ShredRepairType::Shred(1, 4)]);
assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]);
}
fn add_tree_with_missing_shreds(

View File

@ -1665,7 +1665,7 @@ mod test {
vec![
ShredRepairType::Shred(2, 0),
ShredRepairType::HighestShred(82, 0),
ShredRepairType::HighestShred(7, 3),
ShredRepairType::Shred(7, 3),
],
);
}

View File

@ -2163,7 +2163,7 @@ mod tests {
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
shreds.retain(|shred| shred.slot() != 1);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
@ -2192,7 +2192,7 @@ mod tests {
let expected = vec![repair_response::repair_response_packet(
&blockstore,
2,
0,
31, // shred_index
&socketaddr_any!(),
nonce,
)

View File

@ -595,31 +595,15 @@ pub(crate) mod tests {
),
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
// With Merkle shreds, last erasure batch is padded with
// empty data shreds.
next_shred_index + if merkle_variant { 30 } else { 1 },
&shredder,
&leader,
merkle_variant,
false,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
),
(
new_rand_data_shred(
&mut rng,
@ -638,26 +622,8 @@ pub(crate) mod tests {
true,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
true,
),
),
];
for (shred1, shred2) in test_cases.into_iter() {
for (shred1, shred2) in test_cases.iter().flat_map(|(a, b)| [(a, b), (b, a)]) {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
@ -670,8 +636,8 @@ pub(crate) mod tests {
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
assert_eq!(shred1, &shred3);
assert_eq!(shred2, &shred4);
}
}

View File

@ -7665,7 +7665,7 @@ pub mod tests {
assert_eq!(slot_meta.last_index, Some(num_shreds - 1));
assert!(slot_meta.is_full());
let (shreds, _) = make_slot_entries(0, 0, 22, /*merkle_variant:*/ true);
let (shreds, _) = make_slot_entries(0, 0, 600, /*merkle_variant:*/ true);
assert!(shreds.len() > num_shreds as usize);
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
@ -10240,15 +10240,13 @@ pub mod tests {
.flat_map(|x| x.0)
.collect();
blockstore.insert_shreds(shreds, None, false).unwrap();
// Should only be one shred in slot 9
assert!(blockstore
.get_data_shred(unconfirmed_slot, 0)
.unwrap()
.is_some());
assert!(blockstore
.get_data_shred(unconfirmed_slot, 1)
.unwrap()
.is_none());
// There are 32 data shreds in slot 9.
for index in 0..32 {
assert_matches!(
blockstore.get_data_shred(unconfirmed_slot, index as u64),
Ok(Some(_))
);
}
blockstore.set_dead_slot(unconfirmed_slot).unwrap();
// Purge the slot
@ -10281,10 +10279,10 @@ pub mod tests {
.into_iter()
.flat_map(|x| x.0)
.collect();
assert_eq!(shreds.len(), 2);
assert_eq!(shreds.len(), 2 * 32);
// Save off unconfirmed_slot for later, just one shred at shreds[1]
let unconfirmed_slot_shreds = vec![shreds[1].clone()];
// Save off unconfirmed_slot for later, just one shred at shreds[32]
let unconfirmed_slot_shreds = vec![shreds[32].clone()];
assert_eq!(unconfirmed_slot_shreds[0].slot(), unconfirmed_slot);
// Insert into slot 9

View File

@ -127,6 +127,10 @@ impl Shred {
dispatch!(fn merkle_root(&self) -> Result<Hash, Error>);
dispatch!(fn proof_size(&self) -> Result<u8, Error>);
fn fec_set_index(&self) -> u32 {
self.common_header().fec_set_index
}
fn index(&self) -> u32 {
self.common_header().index
}
@ -1098,17 +1102,29 @@ pub(super) fn make_shreds_from_data(
// If shreds.is_empty() then the data argument was empty. In that case we
// want to generate one data shred with empty data.
if !data.is_empty() || shreds.is_empty() {
// Should generate at least one data shred (which may have no data).
// Last erasure batch should also be padded with empty data shreds to
// make >= 32 data shreds. This gaurantees that the batch cannot be
// recovered unless 32+ shreds are received from turbine or repair.
let min_num_data_shreds = if is_last_in_slot {
DATA_SHREDS_PER_FEC_BLOCK
} else {
1
};
// Find the Merkle proof_size and data_buffer_size
// which can embed the remaining data.
let (proof_size, data_buffer_size) = (1u8..32)
let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32)
.find_map(|proof_size| {
let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?;
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
let num_data_shreds = num_data_shreds.max(1);
let num_data_shreds = num_data_shreds.max(min_num_data_shreds);
let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
(proof_size == get_proof_size(erasure_batch_size))
.then_some((proof_size, data_buffer_size))
(proof_size == get_proof_size(erasure_batch_size)).then_some((
proof_size,
data_buffer_size,
num_data_shreds,
))
})
.ok_or(Error::UnknownProofSize)?;
common_header.shred_variant = ShredVariant::MerkleData {
@ -1117,13 +1133,11 @@ pub(super) fn make_shreds_from_data(
resigned,
};
common_header.fec_set_index = common_header.index;
let chunks = if data.is_empty() {
// Generate one data shred with empty data.
Either::Left(std::iter::once(data))
} else {
Either::Right(data.chunks(data_buffer_size))
};
for shred in chunks {
for shred in data
.chunks(data_buffer_size)
.chain(std::iter::repeat(&[][..]))
.take(num_data_shreds)
{
let shred = new_shred_data(common_header, data_header, shred);
shreds.push(shred);
common_header.index += 1;
@ -1132,12 +1146,17 @@ pub(super) fn make_shreds_from_data(
stats.data_buffer_residual += data_buffer_size - shred.data()?.len();
}
}
// Only the very last shred may have residual data buffer.
debug_assert!(shreds.iter().rev().skip(1).all(|shred| {
let proof_size = shred.proof_size().unwrap();
let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
shred.data().unwrap().len() == capacity
}));
// Only the trailing data shreds may have residual data buffer.
debug_assert!(shreds
.iter()
.rev()
.skip_while(|shred| is_last_in_slot && shred.data().unwrap().is_empty())
.skip(1)
.all(|shred| {
let proof_size = shred.proof_size().unwrap();
let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
shred.data().unwrap().len() == capacity
}));
// Adjust flags for the very last shred.
if let Some(shred) = shreds.last_mut() {
shred.data_header.flags |= if is_last_in_slot {
@ -1890,6 +1909,18 @@ mod test {
.contains(ShredFlags::LAST_SHRED_IN_SLOT),
is_last_in_slot
);
// Assert that the last erasure batch has 32+ data shreds.
if is_last_in_slot {
let fec_set_index = shreds.iter().map(Shred::fec_set_index).max().unwrap();
assert!(
shreds
.iter()
.filter(|shred| shred.fec_set_index() == fec_set_index)
.filter(|shred| shred.shred_type() == ShredType::Data)
.count()
>= 32
)
}
// Assert that data shreds can be recovered from coding shreds.
let recovered_data_shreds: Vec<_> = shreds
.iter()

View File

@ -513,7 +513,7 @@ mod tests {
assert_eq!(verify, shred.verify(pk));
}
fn run_test_data_shredder(slot: Slot, chained: bool) {
fn run_test_data_shredder(slot: Slot, chained: bool, is_last_in_slot: bool) {
let keypair = Arc::new(Keypair::new());
// Test that parent cannot be > current slot
@ -538,11 +538,15 @@ mod tests {
})
.collect();
let is_last_in_slot = true;
let size = serialized_size(&entries).unwrap() as usize;
// Integer division to ensure we have enough shreds to fit all the data
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_data_shreds = num_expected_data_shreds.max(if is_last_in_slot {
DATA_SHREDS_PER_FEC_BLOCK
} else {
1
});
let num_expected_coding_shreds =
get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot)
- num_expected_data_shreds;
@ -574,8 +578,8 @@ mod tests {
slot,
parent_slot,
&keypair.pubkey(),
true,
is_last,
true, // verify
is_last && is_last_in_slot,
is_last,
);
assert!(!data_shred_indexes.contains(&index));
@ -607,10 +611,12 @@ mod tests {
assert_eq!(entries, deshred_entries);
}
#[test_case(false)]
#[test_case(true)]
fn test_data_shredder(chained: bool) {
run_test_data_shredder(0x1234_5678_9abc_def0, chained);
#[test_case(false, false)]
#[test_case(false, true)]
#[test_case(true, false)]
#[test_case(true, true)]
fn test_data_shredder(chained: bool, is_last_in_slot: bool) {
run_test_data_shredder(0x1234_5678_9abc_def0, chained, is_last_in_slot);
}
#[test_case(false)]

View File

@ -3769,6 +3769,7 @@ fn test_kill_partition_switch_threshold_progress() {
#[test]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader() {
run_duplicate_shreds_broadcast_leader(true);

View File

@ -237,9 +237,11 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sigs,
);
assert_eq!(original_last_data_shred.len(), 1);
assert_eq!(partition_last_data_shred.len(), 1);
self.next_shred_index += 1;
assert_eq!(
original_last_data_shred.len(),
partition_last_data_shred.len()
);
self.next_shred_index += u32::try_from(original_last_data_shred.len()).unwrap();
(original_last_data_shred, partition_last_data_shred)
});