maps number of data shreds to erasure batch size (#25917)
In prepration of https://github.com/solana-labs/solana/pull/25807 which reworks erasure batch sizes, this commit: * adds a helper function mapping the number of data shreds to the erasure batch size. * adds ProcessShredsStats to Shredder::entries_to_shreds in order to replace and remove entries_to_data_shreds from the public interface.
This commit is contained in:
parent
09d3c890e9
commit
f534b8981b
|
@ -15,7 +15,7 @@ use {
|
|||
solana_ledger::{
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
shred::Shredder,
|
||||
shred::{ProcessShredsStats, Shredder},
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
|
@ -101,9 +101,12 @@ fn bench_retransmitter(bencher: &mut Bencher) {
|
|||
let parent = 0;
|
||||
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
|
||||
let (mut data_shreds, _) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
let num_packets = data_shreds.len();
|
||||
|
|
|
@ -46,12 +46,12 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
|
|||
);
|
||||
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
|
||||
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
|
||||
let data_shreds = shredder.entries_to_data_shreds(
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&Keypair::new(),
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // fec_set_offset
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
assert!(data_shreds.len() >= num_shreds);
|
||||
|
@ -79,7 +79,14 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
|
|||
let entries = create_ticks(num_ticks, 0, Hash::default());
|
||||
bencher.iter(|| {
|
||||
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
|
||||
shredder.entries_to_shreds(
|
||||
&kp,
|
||||
&entries,
|
||||
true,
|
||||
0,
|
||||
0,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -98,7 +105,14 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
|
|||
// 1Mb
|
||||
bencher.iter(|| {
|
||||
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
|
||||
shredder.entries_to_shreds(
|
||||
&kp,
|
||||
&entries,
|
||||
true,
|
||||
0,
|
||||
0,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -111,7 +125,14 @@ fn bench_deshredder(bencher: &mut Bencher) {
|
|||
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
|
||||
let entries = create_ticks(num_ticks, 0, Hash::default());
|
||||
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&kp,
|
||||
&entries,
|
||||
true,
|
||||
0,
|
||||
0,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
bencher.iter(|| {
|
||||
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
|
||||
assert_ne!(raw.len(), 0);
|
||||
|
|
|
@ -449,7 +449,7 @@ pub mod test {
|
|||
solana_entry::entry::create_ticks,
|
||||
solana_gossip::cluster_info::{ClusterInfo, Node},
|
||||
solana_ledger::{
|
||||
blockstore::{make_slot_entries, Blockstore},
|
||||
blockstore::Blockstore,
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
get_tmp_ledger_path,
|
||||
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
|
||||
|
@ -479,16 +479,19 @@ pub mod test {
|
|||
Vec<Arc<Vec<Shred>>>,
|
||||
) {
|
||||
let num_entries = max_ticks_per_n_shreds(num, None);
|
||||
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
|
||||
let keypair = Keypair::new();
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[0..],
|
||||
true, // is_last_in_slot
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
let entries = create_ticks(num_entries, /*hashes_per_tick:*/ 0, Hash::default());
|
||||
let shredder = Shredder::new(
|
||||
slot, /*parent_slot:*/ 0, /*reference_tick:*/ 0, /*version:*/ 0,
|
||||
)
|
||||
.unwrap();
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&Keypair::new(),
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index,
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
(
|
||||
data_shreds.clone(),
|
||||
coding_shreds.clone(),
|
||||
|
|
|
@ -4,7 +4,7 @@ use {
|
|||
itertools::Itertools,
|
||||
solana_entry::entry::Entry,
|
||||
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
|
||||
solana_ledger::shred::Shredder,
|
||||
solana_ledger::shred::{ProcessShredsStats, Shredder},
|
||||
solana_sdk::{
|
||||
hash::Hash,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
|
@ -163,6 +163,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
|
@ -177,6 +178,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
// Don't mark the last shred as last so that validators won't
|
||||
// know that they've gotten all the shreds, and will continue
|
||||
|
@ -187,6 +189,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let sigs: Vec<_> = partition_last_data_shred
|
||||
.iter()
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use {
|
||||
super::*,
|
||||
solana_entry::entry::Entry,
|
||||
solana_ledger::shred::Shredder,
|
||||
solana_ledger::shred::{ProcessShredsStats, Shredder},
|
||||
solana_sdk::{hash::Hash, signature::Keypair},
|
||||
};
|
||||
|
||||
|
@ -60,6 +60,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
// If the last blockhash is default, a new block is being created
|
||||
|
@ -78,6 +79,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
if let Some(index) = coding_shreds
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use {
|
||||
super::*,
|
||||
crate::cluster_nodes::ClusterNodesCache,
|
||||
solana_ledger::shred::Shredder,
|
||||
solana_ledger::shred::{ProcessShredsStats, Shredder},
|
||||
solana_sdk::{hash::Hash, signature::Keypair},
|
||||
std::{thread::sleep, time::Duration},
|
||||
};
|
||||
|
@ -92,6 +92,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
|
@ -105,6 +106,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
true,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
// Don't mark the last shred as last so that validators won't know
|
||||
// that they've gotten all the shreds, and will continue trying to
|
||||
|
@ -115,6 +117,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
false,
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
self.next_shred_index += 1;
|
||||
(good_last_data_shred, bad_last_data_shred)
|
||||
|
|
|
@ -710,7 +710,7 @@ mod test {
|
|||
blockstore::{make_many_slot_entries, Blockstore},
|
||||
genesis_utils::create_genesis_config_with_leader,
|
||||
get_tmp_ledger_path,
|
||||
shred::Shredder,
|
||||
shred::{ProcessShredsStats, Shredder},
|
||||
},
|
||||
solana_sdk::{
|
||||
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
||||
|
@ -729,9 +729,12 @@ mod test {
|
|||
) -> Vec<Shred> {
|
||||
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
keypair, entries, true, // is_last_in_slot
|
||||
keypair,
|
||||
entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
data_shreds
|
||||
}
|
||||
|
|
|
@ -284,7 +284,7 @@ pub(crate) mod tests {
|
|||
super::*,
|
||||
rand::Rng,
|
||||
solana_entry::entry::Entry,
|
||||
solana_ledger::shred::Shredder,
|
||||
solana_ledger::shred::{ProcessShredsStats, Shredder},
|
||||
solana_sdk::{
|
||||
hash,
|
||||
signature::{Keypair, Signer},
|
||||
|
@ -342,6 +342,7 @@ pub(crate) mod tests {
|
|||
true, // is_last_in_slot
|
||||
next_shred_index,
|
||||
next_shred_index, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
data_shreds.swap_remove(0)
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ use {
|
|||
leader_schedule_cache::LeaderScheduleCache,
|
||||
next_slots_iterator::NextSlotsIterator,
|
||||
shred::{
|
||||
self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredData, ShredId, ShredType,
|
||||
Shredder,
|
||||
self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, Shred, ShredData,
|
||||
ShredId, ShredType, Shredder,
|
||||
},
|
||||
slot_stats::{ShredSource, SlotsStats},
|
||||
},
|
||||
|
@ -1698,6 +1698,7 @@ impl Blockstore {
|
|||
true, // is_last_in_slot
|
||||
start_index, // next_shred_index
|
||||
start_index, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
all_shreds.append(&mut data_shreds);
|
||||
all_shreds.append(&mut coding_shreds);
|
||||
|
@ -1723,6 +1724,7 @@ impl Blockstore {
|
|||
is_full_slot,
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
all_shreds.append(&mut data_shreds);
|
||||
all_shreds.append(&mut coding_shreds);
|
||||
|
@ -3790,15 +3792,14 @@ pub fn create_new_ledger(
|
|||
let version = solana_sdk::shred_version::version_from_hash(&last_hash);
|
||||
|
||||
let shredder = Shredder::new(0, 0, 0, version).unwrap();
|
||||
let shreds = shredder
|
||||
.entries_to_shreds(
|
||||
&Keypair::new(),
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
)
|
||||
.0;
|
||||
let (shreds, _) = shredder.entries_to_shreds(
|
||||
&Keypair::new(),
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
assert!(shreds.last().unwrap().last_in_slot());
|
||||
|
||||
blockstore.insert_shreds(shreds, None, false)?;
|
||||
|
@ -4045,6 +4046,7 @@ pub fn entries_to_test_shreds(
|
|||
is_full_slot,
|
||||
0, // next_shred_index,
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
)
|
||||
.0
|
||||
}
|
||||
|
@ -8455,6 +8457,7 @@ pub mod tests {
|
|||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
|
||||
let genesis_config = create_genesis_config(2).genesis_config;
|
||||
|
@ -8515,6 +8518,7 @@ pub mod tests {
|
|||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let (duplicate_shreds, _) = shredder.entries_to_shreds(
|
||||
&leader_keypair,
|
||||
|
@ -8522,6 +8526,7 @@ pub mod tests {
|
|||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let shred = shreds[0].clone();
|
||||
let duplicate_shred = duplicate_shreds[0].clone();
|
||||
|
|
|
@ -59,25 +59,25 @@ impl Shredder {
|
|||
is_last_in_slot: bool,
|
||||
next_shred_index: u32,
|
||||
next_code_index: u32,
|
||||
stats: &mut ProcessShredsStats,
|
||||
) -> (
|
||||
Vec<Shred>, // data shreds
|
||||
Vec<Shred>, // coding shreds
|
||||
) {
|
||||
let mut stats = ProcessShredsStats::default();
|
||||
let data_shreds = self.entries_to_data_shreds(
|
||||
keypair,
|
||||
entries,
|
||||
is_last_in_slot,
|
||||
next_shred_index,
|
||||
next_shred_index, // fec_set_offset
|
||||
&mut stats,
|
||||
stats,
|
||||
);
|
||||
let coding_shreds = Self::data_shreds_to_coding_shreds(
|
||||
keypair,
|
||||
&data_shreds,
|
||||
is_last_in_slot,
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
(data_shreds, coding_shreds)
|
||||
|
@ -172,29 +172,23 @@ impl Shredder {
|
|||
return Ok(Vec::default());
|
||||
}
|
||||
let mut gen_coding_time = Measure::start("gen_coding_shreds");
|
||||
// Step size when advancing next_code_index from one batch to the next.
|
||||
let step = get_erasure_batch_size(
|
||||
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize,
|
||||
false, // is_last_in_slot
|
||||
) - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
||||
// 1) Generate coding shreds
|
||||
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| {
|
||||
data_shreds
|
||||
.par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||
.enumerate()
|
||||
.flat_map(|(i, shred_data_batch)| {
|
||||
// Assumption here is that, for now, each fec block has
|
||||
// as many coding shreds as data shreds (except for the
|
||||
// last one in the slot).
|
||||
// TODO: tie this more closely with
|
||||
// generate_coding_shreds.
|
||||
let next_code_index = next_code_index
|
||||
.checked_add(
|
||||
u32::try_from(i)
|
||||
.unwrap()
|
||||
.checked_mul(MAX_DATA_SHREDS_PER_FEC_BLOCK)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
.flat_map(|(k, shred_data_batch)| {
|
||||
let offset = u32::try_from(step.checked_mul(k).unwrap());
|
||||
let next_code_index = next_code_index.checked_add(offset.unwrap());
|
||||
Shredder::generate_coding_shreds(
|
||||
shred_data_batch,
|
||||
is_last_in_slot,
|
||||
next_code_index,
|
||||
next_code_index.unwrap(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
|
@ -235,13 +229,9 @@ impl Shredder {
|
|||
&& shred.version() == version
|
||||
&& shred.fec_set_index() == fec_set_index));
|
||||
let num_data = data.len();
|
||||
let num_coding = if is_last_in_slot {
|
||||
(2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||
.saturating_sub(num_data)
|
||||
.max(num_data)
|
||||
} else {
|
||||
num_data
|
||||
};
|
||||
let num_coding = get_erasure_batch_size(num_data, is_last_in_slot)
|
||||
.checked_sub(num_data)
|
||||
.unwrap();
|
||||
let data = data.iter().map(Shred::erasure_shard_as_slice);
|
||||
let data: Vec<_> = data.collect::<Result<_, _>>().unwrap();
|
||||
let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
|
||||
|
@ -353,6 +343,15 @@ impl Shredder {
|
|||
}
|
||||
}
|
||||
|
||||
/// Maps number of data shreds in each batch to the erasure batch size.
|
||||
fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize {
|
||||
if is_last_in_slot {
|
||||
2 * num_data_shreds.max(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||
} else {
|
||||
2 * num_data_shreds
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
|
@ -411,9 +410,9 @@ mod tests {
|
|||
// Integer division to ensure we have enough shreds to fit all the data
|
||||
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
|
||||
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
|
||||
let num_expected_coding_shreds = (2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||
.saturating_sub(num_expected_data_shreds)
|
||||
.max(num_expected_data_shreds);
|
||||
let num_expected_coding_shreds =
|
||||
get_erasure_batch_size(num_expected_data_shreds, /*is_last_in_slot:*/ true)
|
||||
- num_expected_data_shreds;
|
||||
let start_index = 0;
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair,
|
||||
|
@ -421,6 +420,7 @@ mod tests {
|
|||
true, // is_last_in_slot
|
||||
start_index, // next_shred_index
|
||||
start_index, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let next_index = data_shreds.last().unwrap().index() + 1;
|
||||
assert_eq!(next_index as usize, num_expected_data_shreds);
|
||||
|
@ -492,9 +492,12 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let deserialized_shred =
|
||||
Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload().clone())
|
||||
|
@ -519,9 +522,12 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
data_shreds.iter().for_each(|s| {
|
||||
assert_eq!(s.reference_tick(), 5);
|
||||
|
@ -551,9 +557,12 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
data_shreds.iter().for_each(|s| {
|
||||
assert_eq!(
|
||||
|
@ -592,9 +601,12 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
for (i, s) in data_shreds.iter().enumerate() {
|
||||
verify_test_data_shred(
|
||||
|
@ -648,20 +660,16 @@ mod tests {
|
|||
is_last_in_slot,
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let num_coding_shreds = coding_shreds.len();
|
||||
|
||||
// We should have 5 data shreds now
|
||||
assert_eq!(data_shreds.len(), num_data_shreds);
|
||||
if is_last_in_slot {
|
||||
assert_eq!(
|
||||
num_coding_shreds,
|
||||
2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - num_data_shreds
|
||||
);
|
||||
} else {
|
||||
// and an equal number of coding shreds
|
||||
assert_eq!(num_data_shreds, num_coding_shreds);
|
||||
}
|
||||
assert_eq!(
|
||||
num_coding_shreds,
|
||||
get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds
|
||||
);
|
||||
|
||||
let all_shreds = data_shreds
|
||||
.iter()
|
||||
|
@ -775,9 +783,12 @@ mod tests {
|
|||
// and 2 missing coding shreds. Hint: should work
|
||||
let serialized_entries = bincode::serialize(&entries).unwrap();
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
25, // next_shred_index,
|
||||
25, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
// We should have 10 shreds now
|
||||
assert_eq!(data_shreds.len(), num_data_shreds);
|
||||
|
@ -868,6 +879,7 @@ mod tests {
|
|||
is_last_in_slot,
|
||||
next_shred_index,
|
||||
next_shred_index, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let num_data_shreds = data_shreds.len();
|
||||
let mut shreds = coding_shreds;
|
||||
|
@ -922,9 +934,12 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
assert!(!data_shreds
|
||||
.iter()
|
||||
|
@ -956,6 +971,7 @@ mod tests {
|
|||
true, // is_last_in_slot
|
||||
start_index, // next_shred_index
|
||||
start_index, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
||||
data_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||
|
@ -1004,52 +1020,35 @@ mod tests {
|
|||
let next_code_index = data_shreds[0].index();
|
||||
|
||||
(1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| {
|
||||
for is_last_in_slot in [false, true] {
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[..count],
|
||||
is_last_in_slot,
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
let num_coding_shreds = get_erasure_batch_size(count, is_last_in_slot) - count;
|
||||
assert_eq!(coding_shreds.len(), num_coding_shreds);
|
||||
}
|
||||
});
|
||||
for is_last_in_slot in [false, true] {
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[..count],
|
||||
false, // is_last_in_slot
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(coding_shreds.len(), count);
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[..count],
|
||||
true, // is_last_in_slot
|
||||
&data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1],
|
||||
is_last_in_slot,
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
let num_shreds =
|
||||
get_erasure_batch_size(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, is_last_in_slot)
|
||||
+ get_erasure_batch_size(1, is_last_in_slot);
|
||||
assert_eq!(
|
||||
coding_shreds.len(),
|
||||
2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - count
|
||||
num_shreds - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1
|
||||
);
|
||||
});
|
||||
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1],
|
||||
false, // is_last_in_slot
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
coding_shreds.len(),
|
||||
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1
|
||||
);
|
||||
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
&keypair,
|
||||
&data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1],
|
||||
true, // is_last_in_slot
|
||||
next_code_index,
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
coding_shreds.len(),
|
||||
3 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
use {
|
||||
solana_entry::entry::Entry,
|
||||
solana_ledger::shred::{
|
||||
max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder,
|
||||
max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder,
|
||||
LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK,
|
||||
},
|
||||
solana_sdk::{
|
||||
|
@ -49,9 +49,12 @@ fn test_multi_fec_block_coding() {
|
|||
|
||||
let serialized_entries = bincode::serialize(&entries).unwrap();
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, true, // is_last_in_slot
|
||||
&keypair,
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let next_index = data_shreds.last().unwrap().index() + 1;
|
||||
assert_eq!(next_index as usize, num_data_shreds);
|
||||
|
@ -119,8 +122,6 @@ fn test_multi_fec_block_different_size_coding() {
|
|||
// Test recovery
|
||||
for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) {
|
||||
let first_data_index = fec_data_shreds.first().unwrap().index() as usize;
|
||||
let first_code_index = fec_coding_shreds.first().unwrap().index() as usize;
|
||||
assert_eq!(first_data_index, first_code_index);
|
||||
let all_shreds: Vec<Shred> = fec_data_shreds
|
||||
.iter()
|
||||
.step_by(2)
|
||||
|
@ -213,7 +214,8 @@ fn setup_different_sized_fec_blocks(
|
|||
.collect();
|
||||
|
||||
// Run the shredder twice, generate data and coding shreds
|
||||
let mut next_index = 0;
|
||||
let mut next_shred_index = 0;
|
||||
let mut next_code_index = 0;
|
||||
let mut fec_data = BTreeMap::new();
|
||||
let mut fec_coding = BTreeMap::new();
|
||||
let mut data_slot_and_index = HashSet::new();
|
||||
|
@ -223,8 +225,12 @@ fn setup_different_sized_fec_blocks(
|
|||
for i in 0..2 {
|
||||
let is_last = i == 1;
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&keypair, &entries, is_last, next_index, // next_shred_index
|
||||
next_index, // next_code_index
|
||||
&keypair,
|
||||
&entries,
|
||||
is_last,
|
||||
next_shred_index,
|
||||
next_code_index,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
for shred in &data_shreds {
|
||||
if (shred.index() as usize) == total_num_data_shreds - 1 {
|
||||
|
@ -238,7 +244,8 @@ fn setup_different_sized_fec_blocks(
|
|||
}
|
||||
}
|
||||
assert_eq!(data_shreds.len(), num_shreds_per_iter as usize);
|
||||
next_index = data_shreds.last().unwrap().index() + 1;
|
||||
next_shred_index = data_shreds.last().unwrap().index() + 1;
|
||||
next_code_index = coding_shreds.last().unwrap().index() + 1;
|
||||
sort_data_coding_into_fec_sets(
|
||||
data_shreds,
|
||||
coding_shreds,
|
||||
|
|
Loading…
Reference in New Issue