makes last erasure batch size >= 64 shreds (#34330)

This commit is contained in:
behzad nouri 2023-12-13 06:48:00 +00:00 committed by GitHub
parent 70cab76495
commit 750023530c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 40 deletions

View File

@ -821,7 +821,8 @@ pub(super) fn make_shreds_from_data(
} }
} }
let now = Instant::now(); let now = Instant::now();
let erasure_batch_size = shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK); let erasure_batch_size =
shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK, is_last_in_slot);
let proof_size = get_proof_size(erasure_batch_size); let proof_size = get_proof_size(erasure_batch_size);
let data_buffer_size = ShredData::capacity(proof_size)?; let data_buffer_size = ShredData::capacity(proof_size)?;
let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size; let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size;
@ -872,7 +873,8 @@ pub(super) fn make_shreds_from_data(
let data_buffer_size = ShredData::capacity(proof_size).ok()?; let data_buffer_size = ShredData::capacity(proof_size).ok()?;
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size; let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
let num_data_shreds = num_data_shreds.max(1); let num_data_shreds = num_data_shreds.max(1);
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
(proof_size == get_proof_size(erasure_batch_size)) (proof_size == get_proof_size(erasure_batch_size))
.then_some((proof_size, data_buffer_size)) .then_some((proof_size, data_buffer_size))
}) })
@ -932,7 +934,8 @@ pub(super) fn make_shreds_from_data(
.scan(next_code_index, |next_code_index, chunk| { .scan(next_code_index, |next_code_index, chunk| {
let out = Some(*next_code_index); let out = Some(*next_code_index);
let num_data_shreds = chunk.len(); let num_data_shreds = chunk.len();
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
let num_coding_shreds = erasure_batch_size - num_data_shreds; let num_coding_shreds = erasure_batch_size - num_data_shreds;
*next_code_index += num_coding_shreds as u32; *next_code_index += num_coding_shreds as u32;
out out
@ -945,7 +948,13 @@ pub(super) fn make_shreds_from_data(
.into_iter() .into_iter()
.zip(next_code_index) .zip(next_code_index)
.map(|(shreds, next_code_index)| { .map(|(shreds, next_code_index)| {
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache) make_erasure_batch(
keypair,
shreds,
next_code_index,
is_last_in_slot,
reed_solomon_cache,
)
}) })
.collect() .collect()
} else { } else {
@ -954,7 +963,13 @@ pub(super) fn make_shreds_from_data(
.into_par_iter() .into_par_iter()
.zip(next_code_index) .zip(next_code_index)
.map(|(shreds, next_code_index)| { .map(|(shreds, next_code_index)| {
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache) make_erasure_batch(
keypair,
shreds,
next_code_index,
is_last_in_slot,
reed_solomon_cache,
)
}) })
.collect() .collect()
}) })
@ -969,10 +984,11 @@ fn make_erasure_batch(
keypair: &Keypair, keypair: &Keypair,
shreds: Vec<ShredData>, shreds: Vec<ShredData>,
next_code_index: u32, next_code_index: u32,
is_last_in_slot: bool,
reed_solomon_cache: &ReedSolomonCache, reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> { ) -> Result<Vec<Shred>, Error> {
let num_data_shreds = shreds.len(); let num_data_shreds = shreds.len();
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
let num_coding_shreds = erasure_batch_size - num_data_shreds; let num_coding_shreds = erasure_batch_size - num_data_shreds;
let proof_size = get_proof_size(erasure_batch_size); let proof_size = get_proof_size(erasure_batch_size);
debug_assert!(shreds debug_assert!(shreds
@ -1056,7 +1072,10 @@ mod test {
itertools::Itertools, itertools::Itertools,
rand::{seq::SliceRandom, CryptoRng, Rng}, rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::ThreadPoolBuilder, rayon::ThreadPoolBuilder,
solana_sdk::signature::{Keypair, Signer}, solana_sdk::{
packet::PACKET_DATA_SIZE,
signature::{Keypair, Signer},
},
std::{cmp::Ordering, iter::repeat_with}, std::{cmp::Ordering, iter::repeat_with},
test_case::test_case, test_case::test_case,
}; };
@ -1124,8 +1143,7 @@ mod test {
assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]); assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]);
} }
fn run_merkle_tree_round_trip(size: usize) { fn run_merkle_tree_round_trip<R: Rng>(rng: &mut R, size: usize) {
let mut rng = rand::thread_rng();
let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from); let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from);
let nodes: Vec<_> = nodes.take(size).collect(); let nodes: Vec<_> = nodes.take(size).collect();
let tree = make_merkle_tree(nodes.clone()); let tree = make_merkle_tree(nodes.clone());
@ -1145,8 +1163,9 @@ mod test {
#[test] #[test]
fn test_merkle_tree_round_trip() { fn test_merkle_tree_round_trip() {
for size in [1, 2, 3, 4, 5, 6, 7, 8, 9, 19, 37, 64, 79] { let mut rng = rand::thread_rng();
run_merkle_tree_round_trip(size); for size in 1..=143 {
run_merkle_tree_round_trip(&mut rng, size);
} }
} }
@ -1327,32 +1346,49 @@ mod test {
} }
} }
#[test_case(0)] #[test_case(0, false)]
#[test_case(15600)] #[test_case(0, true)]
#[test_case(31200)] #[test_case(15600, false)]
#[test_case(46800)] #[test_case(15600, true)]
fn test_make_shreds_from_data(data_size: usize) { #[test_case(31200, false)]
#[test_case(31200, true)]
#[test_case(46800, false)]
#[test_case(46800, true)]
fn test_make_shreds_from_data(data_size: usize, is_last_in_slot: bool) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let data_size = data_size.saturating_sub(16); let data_size = data_size.saturating_sub(16);
let reed_solomon_cache = ReedSolomonCache::default(); let reed_solomon_cache = ReedSolomonCache::default();
for data_size in data_size..data_size + 32 { for data_size in data_size..data_size + 32 {
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
} }
} }
#[test] #[test_case(false)]
fn test_make_shreds_from_data_rand() { #[test_case(true)]
fn test_make_shreds_from_data_rand(is_last_in_slot: bool) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default(); let reed_solomon_cache = ReedSolomonCache::default();
for _ in 0..32 { for _ in 0..32 {
let data_size = rng.gen_range(0..31200 * 7); let data_size = rng.gen_range(0..31200 * 7);
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
}
}
#[ignore]
#[test_case(false)]
#[test_case(true)]
fn test_make_shreds_from_data_paranoid(is_last_in_slot: bool) {
let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default();
for data_size in 0..=PACKET_DATA_SIZE * 4 * 64 {
run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
} }
} }
fn run_make_shreds_from_data<R: Rng>( fn run_make_shreds_from_data<R: Rng>(
rng: &mut R, rng: &mut R,
data_size: usize, data_size: usize,
is_last_in_slot: bool,
reed_solomon_cache: &ReedSolomonCache, reed_solomon_cache: &ReedSolomonCache,
) { ) {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
@ -1373,7 +1409,7 @@ mod test {
parent_slot, parent_slot,
shred_version, shred_version,
reference_tick, reference_tick,
true, // is_last_in_slot is_last_in_slot,
next_shred_index, next_shred_index,
next_code_index, next_code_index,
reed_solomon_cache, reed_solomon_cache,
@ -1480,14 +1516,17 @@ mod test {
.flags .flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT)) .contains(ShredFlags::LAST_SHRED_IN_SLOT))
.count(), .count(),
1 if is_last_in_slot { 1 } else { 0 }
); );
assert!(data_shreds assert_eq!(
data_shreds
.last() .last()
.unwrap() .unwrap()
.data_header .data_header
.flags .flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT)); .contains(ShredFlags::LAST_SHRED_IN_SLOT),
is_last_in_slot
);
// Assert that data shreds can be recovered from coding shreds. // Assert that data shreds can be recovered from coding shreds.
let recovered_data_shreds: Vec<_> = shreds let recovered_data_shreds: Vec<_> = shreds
.iter() .iter()

View File

@ -207,7 +207,13 @@ impl Shredder {
.iter() .iter()
.scan(next_code_index, |next_code_index, chunk| { .scan(next_code_index, |next_code_index, chunk| {
let num_data_shreds = chunk.len(); let num_data_shreds = chunk.len();
let erasure_batch_size = get_erasure_batch_size(num_data_shreds); let is_last_in_slot = chunk
.last()
.copied()
.map(Shred::last_in_slot)
.unwrap_or(true);
let erasure_batch_size =
get_erasure_batch_size(num_data_shreds, is_last_in_slot);
*next_code_index += (erasure_batch_size - num_data_shreds) as u32; *next_code_index += (erasure_batch_size - num_data_shreds) as u32;
Some(*next_code_index) Some(*next_code_index)
}), }),
@ -276,7 +282,12 @@ impl Shredder {
&& shred.version() == version && shred.version() == version
&& shred.fec_set_index() == fec_set_index)); && shred.fec_set_index() == fec_set_index));
let num_data = data.len(); let num_data = data.len();
let num_coding = get_erasure_batch_size(num_data) let is_last_in_slot = data
.last()
.map(Borrow::borrow)
.map(Shred::last_in_slot)
.unwrap_or(true);
let num_coding = get_erasure_batch_size(num_data, is_last_in_slot)
.checked_sub(num_data) .checked_sub(num_data)
.unwrap(); .unwrap();
assert!(num_coding > 0); assert!(num_coding > 0);
@ -434,11 +445,16 @@ impl Default for ReedSolomonCache {
} }
/// Maps number of data shreds in each batch to the erasure batch size. /// Maps number of data shreds in each batch to the erasure batch size.
pub(crate) fn get_erasure_batch_size(num_data_shreds: usize) -> usize { pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize {
ERASURE_BATCH_SIZE let erasure_batch_size = ERASURE_BATCH_SIZE
.get(num_data_shreds) .get(num_data_shreds)
.copied() .copied()
.unwrap_or(2 * num_data_shreds) .unwrap_or(2 * num_data_shreds);
if is_last_in_slot {
erasure_batch_size.max(2 * DATA_SHREDS_PER_FEC_BLOCK)
} else {
erasure_batch_size
}
} }
// Returns offsets to fec_set_index when spliting shreds into erasure batches. // Returns offsets to fec_set_index when spliting shreds into erasure batches.
@ -518,17 +534,19 @@ mod tests {
}) })
.collect(); .collect();
let is_last_in_slot = true;
let size = serialized_size(&entries).unwrap() as usize; let size = serialized_size(&entries).unwrap() as usize;
// Integer division to ensure we have enough shreds to fit all the data // Integer division to ensure we have enough shreds to fit all the data
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_coding_shreds = let num_expected_coding_shreds =
get_erasure_batch_size(num_expected_data_shreds) - num_expected_data_shreds; get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot)
- num_expected_data_shreds;
let start_index = 0; let start_index = 0;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds( let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair, &keypair,
&entries, &entries,
true, // is_last_in_slot is_last_in_slot,
start_index, // next_shred_index start_index, // next_shred_index
start_index, // next_code_index start_index, // next_code_index
true, // merkle_variant true, // merkle_variant
@ -792,7 +810,7 @@ mod tests {
assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds);
assert_eq!( assert_eq!(
num_coding_shreds, num_coding_shreds,
get_erasure_batch_size(num_data_shreds) - num_data_shreds get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds
); );
let all_shreds = data_shreds let all_shreds = data_shreds
@ -1189,7 +1207,10 @@ mod tests {
.iter() .iter()
.group_by(|shred| shred.fec_set_index()) .group_by(|shred| shred.fec_set_index())
.into_iter() .into_iter()
.map(|(_, chunk)| get_erasure_batch_size(chunk.count())) .map(|(_, chunk)| {
let chunk: Vec<_> = chunk.collect();
get_erasure_batch_size(chunk.len(), chunk.last().unwrap().last_in_slot())
})
.sum(); .sum();
assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len()); assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len());
} }
@ -1232,7 +1253,8 @@ mod tests {
#[test] #[test]
fn test_max_shreds_per_slot() { fn test_max_shreds_per_slot() {
for num_data_shreds in 32..128 { for num_data_shreds in 32..128 {
let num_coding_shreds = get_erasure_batch_size(num_data_shreds) let num_coding_shreds =
get_erasure_batch_size(num_data_shreds, /*is_last_in_slot:*/ false)
.checked_sub(num_data_shreds) .checked_sub(num_data_shreds)
.unwrap(); .unwrap();
assert!( assert!(

View File

@ -2562,6 +2562,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
#[test] #[test]
#[serial] #[serial]
#[ignore]
fn test_rpc_block_subscribe() { fn test_rpc_block_subscribe() {
let total_stake = 100 * DEFAULT_NODE_STAKE; let total_stake = 100 * DEFAULT_NODE_STAKE;
let leader_stake = total_stake; let leader_stake = total_stake;