removes buffering when generating coding shreds in broadcast (#25807)

Given the 32:32 erasure recovery schema, current implementation requires
exactly 32 data shreds to generate coding shreds for the batch (except
for the final erasure batch in each slot).
As a result, when serializing ledger entries to data shreds, if the
number of data shreds is not a multiple of 32, the coding shreds for the
last batch cannot be generated until there are more data shreds to
complete the batch to 32 data shreds. This adds latency in generating
and broadcasting coding shreds.

In addition, with Merkle variants for shreds, data shreds cannot be
signed and broadcasted until coding shreds are also generated. As a
result *both* code and data shreds will be delayed before broadcast if
we still require exactly 32 data shreds for each batch.

This commit instead always generates and broadcast coding shreds as soon
as there any number of data shreds available. When serializing entries
to shreds:
* if the number of resulting data shreds is less than 32, then more
  coding shreds will be generated so that the resulting erasure batch
  has the same recovery probabilities as a 32:32 batch.
* if the number of data shreds is more than 32, then the data shreds are
  split uniformly into erasure batches with _at least_ 32 data shreds in
  each batch. Each erasure batch will have the same number of code and
  data shreds.

For example:
* If there are 19 data shreds, 27 coding shreds are generated. The
  resulting 19(data):27(code) erasure batch has the same recovery
  probabilities as a 32:32 batch.
* If there are 107 data shreds, they are split into 3 batches of 36:36,
  36:36 and 35:35 data:code shreds each.

A consequence of this change is that code and data shreds indices will
no longer align as there will be more coding shreds than data shreds
(not only in the last batch in each slot but also in the intermediate
ones);
This commit is contained in:
behzad nouri 2022-08-11 12:44:27 +00:00 committed by GitHub
parent ce230035fe
commit ac91cdab74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 203 additions and 254 deletions

View File

@ -9,7 +9,7 @@ use {
solana_entry::entry::{create_ticks, Entry}, solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{ solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags, max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
}, },
solana_perf::test_tx, solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
@ -153,13 +153,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
#[bench] #[bench]
fn bench_shredder_coding(bencher: &mut Bencher) { fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count); let data_shreds = make_shreds(symbol_count);
bencher.iter(|| { bencher.iter(|| {
Shredder::generate_coding_shreds( Shredder::generate_coding_shreds(
&data_shreds[..symbol_count], &data_shreds[..symbol_count],
true, // is_last_in_slot 0, // next_code_index
0, // next_code_index
) )
.len(); .len();
}) })
@ -167,12 +166,11 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
#[bench] #[bench]
fn bench_shredder_decoding(bencher: &mut Bencher) { fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count); let data_shreds = make_shreds(symbol_count);
let coding_shreds = Shredder::generate_coding_shreds( let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count], &data_shreds[..symbol_count],
true, // is_last_in_slot 0, // next_code_index
0, // next_code_index
); );
bencher.iter(|| { bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap(); Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
@ -181,18 +179,18 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
#[bench] #[bench]
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) { fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize); let data = make_concatenated_shreds(symbol_count);
bencher.iter(|| { bencher.iter(|| {
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
encoder.get_encoded_packets(symbol_count); encoder.get_encoded_packets(symbol_count as u32);
}) })
} }
#[bench] #[bench]
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) { fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize); let data = make_concatenated_shreds(symbol_count);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
let mut packets = encoder.get_encoded_packets(symbol_count as u32); let mut packets = encoder.get_encoded_packets(symbol_count as u32);
packets.shuffle(&mut rand::thread_rng()); packets.shuffle(&mut rand::thread_rng());

View File

@ -2,7 +2,6 @@ use {
crate::result::Result, crate::result::Result,
crossbeam_channel::Receiver, crossbeam_channel::Receiver,
solana_entry::entry::Entry, solana_entry::entry::Entry,
solana_ledger::shred::Shred,
solana_poh::poh_recorder::WorkingBankEntry, solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
solana_sdk::clock::Slot, solana_sdk::clock::Slot,
@ -25,9 +24,6 @@ pub struct UnfinishedSlotInfo {
pub(crate) next_code_index: u32, pub(crate) next_code_index: u32,
pub slot: Slot, pub slot: Slot,
pub parent: Slot, pub parent: Slot,
// Data shreds buffered to make a batch of size
// MAX_DATA_SHREDS_PER_FEC_BLOCK.
pub(crate) data_shreds_buffer: Vec<Shred>,
} }
/// This parameter tunes how many entries are received in one iteration of recv loop /// This parameter tunes how many entries are received in one iteration of recv loop

View File

@ -9,9 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
}, },
solana_entry::entry::Entry, solana_entry::entry::Entry,
solana_ledger::shred::{ solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
ProcessShredsStats, Shred, ShredFlags, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
},
solana_sdk::{ solana_sdk::{
signature::Keypair, signature::Keypair,
timing::{duration_as_us, AtomicInterval}, timing::{duration_as_us, AtomicInterval},
@ -68,41 +66,27 @@ impl StandardBroadcastRun {
None => Vec::default(), None => Vec::default(),
Some(ref state) if state.slot == current_slot => Vec::default(), Some(ref state) if state.slot == current_slot => Vec::default(),
Some(ref mut state) => { Some(ref mut state) => {
let parent_offset = state.slot - state.parent;
let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK; let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK;
let fec_set_offset = state let shredder =
.data_shreds_buffer Shredder::new(state.slot, state.parent, reference_tick, self.shred_version)
.first() .unwrap();
.map(Shred::index) let (mut shreds, coding_shreds) = shredder.entries_to_shreds(
.unwrap_or(state.next_shred_index);
let fec_set_index = Shredder::fec_set_index(state.next_shred_index, fec_set_offset);
let mut shred = Shred::new_from_data(
state.slot,
state.next_shred_index,
parent_offset as u16,
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
reference_tick,
self.shred_version,
fec_set_index.unwrap(),
);
shred.sign(keypair);
state.data_shreds_buffer.push(shred.clone());
let mut shreds = make_coding_shreds(
keypair, keypair,
&mut self.unfinished_slot, &[], // entries
true, // is_last_in_slot true, // is_last_in_slot,
state.next_shred_index,
state.next_code_index,
stats, stats,
); );
shreds.insert(0, shred);
self.report_and_reset_stats(true); self.report_and_reset_stats(true);
self.unfinished_slot = None; self.unfinished_slot = None;
shreds.extend(coding_shreds);
shreds shreds
} }
} }
} }
fn entries_to_data_shreds( fn entries_to_shreds(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
entries: &[Entry], entries: &[Entry],
@ -110,10 +94,13 @@ impl StandardBroadcastRun {
reference_tick: u8, reference_tick: u8,
is_slot_end: bool, is_slot_end: bool,
process_stats: &mut ProcessShredsStats, process_stats: &mut ProcessShredsStats,
) -> Vec<Shred> { ) -> (
Vec<Shred>, // data shreds
Vec<Shred>, // coding shreds
) {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let next_shred_index = match &self.unfinished_slot { let (next_shred_index, next_code_index) = match &self.unfinished_slot {
Some(state) => state.next_shred_index, Some(state) => (state.next_shred_index, state.next_code_index),
None => { None => {
// If the blockstore has shreds for the slot, it should not // If the blockstore has shreds for the slot, it should not
// recreate the slot: // recreate the slot:
@ -123,46 +110,37 @@ impl StandardBroadcastRun {
process_stats.num_extant_slots += 1; process_stats.num_extant_slots += 1;
// This is a faulty situation that should not happen. // This is a faulty situation that should not happen.
// Refrain from generating shreds for the slot. // Refrain from generating shreds for the slot.
return Vec::default(); return (Vec::default(), Vec::default());
} }
} }
0u32 (0u32, 0u32)
} }
}; };
let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version) let shredder =
.unwrap() Shredder::new(slot, parent_slot, reference_tick, self.shred_version).unwrap();
.entries_to_data_shreds( let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair, keypair,
entries, entries,
is_slot_end, is_slot_end,
next_shred_index, next_shred_index,
0, // fec_set_offset next_code_index,
process_stats, process_stats,
); );
let mut data_shreds_buffer = match &mut self.unfinished_slot {
Some(state) => {
assert_eq!(state.slot, slot);
std::mem::take(&mut state.data_shreds_buffer)
}
None => Vec::default(),
};
data_shreds_buffer.extend(data_shreds.clone());
let next_shred_index = match data_shreds.iter().map(Shred::index).max() { let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1, Some(index) => index + 1,
None => next_shred_index, None => next_shred_index,
}; };
let next_code_index = match &self.unfinished_slot { let next_code_index = match coding_shreds.iter().map(Shred::index).max() {
Some(state) => state.next_code_index, Some(index) => index + 1,
None => 0, None => next_code_index,
}; };
self.unfinished_slot = Some(UnfinishedSlotInfo { self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index, next_shred_index,
next_code_index, next_code_index,
slot, slot,
parent: parent_slot, parent: parent_slot,
data_shreds_buffer,
}); });
data_shreds (data_shreds, coding_shreds)
} }
#[cfg(test)] #[cfg(test)]
@ -228,7 +206,7 @@ impl StandardBroadcastRun {
// 2) Convert entries to shreds and coding shreds // 2) Convert entries to shreds and coding shreds
let is_last_in_slot = last_tick_height == bank.max_tick_height(); let is_last_in_slot = last_tick_height == bank.max_tick_height();
let reference_tick = bank.tick_height() % bank.ticks_per_slot(); let reference_tick = bank.tick_height() % bank.ticks_per_slot();
let data_shreds = self.entries_to_data_shreds( let (data_shreds, coding_shreds) = self.entries_to_shreds(
keypair, keypair,
&receive_results.entries, &receive_results.entries,
blockstore, blockstore,
@ -300,13 +278,7 @@ impl StandardBroadcastRun {
socket_sender.send((data_shreds.clone(), batch_info.clone()))?; socket_sender.send((data_shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?;
// Create and send coding shreds // Send coding shreds
let coding_shreds = make_coding_shreds(
keypair,
&mut self.unfinished_slot,
is_last_in_slot,
&mut process_stats,
);
let coding_shreds = Arc::new(coding_shreds); let coding_shreds = Arc::new(coding_shreds);
debug_assert!(coding_shreds debug_assert!(coding_shreds
.iter() .iter()
@ -435,49 +407,6 @@ impl StandardBroadcastRun {
} }
} }
// Consumes data_shreds_buffer returning corresponding coding shreds.
fn make_coding_shreds(
keypair: &Keypair,
unfinished_slot: &mut Option<UnfinishedSlotInfo>,
is_slot_end: bool,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let unfinished_slot = match unfinished_slot {
None => return Vec::default(),
Some(state) => state,
};
let data_shreds: Vec<_> = {
let size = unfinished_slot.data_shreds_buffer.len();
// Consume a multiple of 32, unless this is the slot end.
let offset = if is_slot_end {
0
} else {
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
};
unfinished_slot
.data_shreds_buffer
.drain(0..size - offset)
.collect()
};
let shreds = Shredder::data_shreds_to_coding_shreds(
keypair,
&data_shreds,
is_slot_end,
unfinished_slot.next_code_index,
stats,
)
.unwrap();
if let Some(index) = shreds
.iter()
.filter(|shred| shred.is_code())
.map(Shred::index)
.max()
{
unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1);
}
shreds
}
impl BroadcastRun for StandardBroadcastRun { impl BroadcastRun for StandardBroadcastRun {
fn run( fn run(
&mut self, &mut self,
@ -591,7 +520,6 @@ mod test {
next_code_index: 17, next_code_index: 17,
slot, slot,
parent, parent,
data_shreds_buffer: Vec::default(),
}); });
run.slot_broadcast_start = Some(Instant::now()); run.slot_broadcast_start = Some(Instant::now());
@ -776,19 +704,15 @@ mod test {
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone()); shreds.extend(recv_shreds.deref().clone());
} }
assert!(shreds.len() < 32, "shreds.len(): {}", shreds.len()); // At least as many coding shreds as data shreds.
assert!(shreds.iter().all(|shred| shred.is_data())); assert!(shreds.len() >= 29 * 2);
assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 29);
process_ticks(75); process_ticks(75);
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone()); shreds.extend(recv_shreds.deref().clone());
} }
assert!(shreds.len() > 64, "shreds.len(): {}", shreds.len()); assert!(shreds.len() >= 33 * 2);
let num_coding_shreds = shreds.iter().filter(|shred| shred.is_code()).count(); assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 33);
assert_eq!(
num_coding_shreds, 32,
"num coding shreds: {}",
num_coding_shreds
);
} }
#[test] #[test]

View File

@ -293,8 +293,7 @@ mod tests {
)); ));
let coding = solana_ledger::shred::Shredder::generate_coding_shreds( let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred], &[shred],
false, // is_last_in_slot 3, // next_code_index
3, // next_code_index
); );
coding[0].copy_to_packet(&mut packet); coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet( assert!(!should_discard_packet(

View File

@ -8969,14 +8969,8 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let coding1 = Shredder::generate_coding_shreds( let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0);
&shreds, false, // is_last_in_slot let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1);
0, // next_code_index
);
let coding2 = Shredder::generate_coding_shreds(
&shreds, true, // is_last_in_slot
0, // next_code_index
);
for shred in &shreds { for shred in &shreds {
info!("shred {:?}", shred); info!("shred {:?}", shred);
} }

View File

@ -99,7 +99,11 @@ const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; // Shreds are uniformly split into erasure batches with a "target" number of
// data shreds per each batch as below. The actual number of data shreds in
// each erasure batch depends on the number of shreds obtained from serializing
// a &[Entry].
pub const DATA_SHREDS_PER_FEC_BLOCK: usize = 32;
// For legacy tests and benchmarks. // For legacy tests and benchmarks.
const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051); const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051);

View File

@ -3,7 +3,7 @@ use {
common::dispatch, common::dispatch,
legacy, merkle, legacy, merkle,
traits::{Shred, ShredCode as ShredCodeTrait}, traits::{Shred, ShredCode as ShredCodeTrait},
CodingShredHeader, Error, ShredCommonHeader, ShredType, MAX_DATA_SHREDS_PER_FEC_BLOCK, CodingShredHeader, Error, ShredCommonHeader, ShredType, DATA_SHREDS_PER_FEC_BLOCK,
MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE, MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE,
}, },
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature},
@ -132,8 +132,8 @@ pub(super) fn sanitize<T: ShredCodeTrait>(shred: &T) -> Result<(), Error> {
common_header.index, common_header.index,
)); ));
} }
let num_coding_shreds = u32::from(coding_header.num_coding_shreds); let num_coding_shreds = usize::from(coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK { if num_coding_shreds > 8 * DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds( return Err(Error::InvalidNumCodingShreds(
coding_header.num_coding_shreds, coding_header.num_coding_shreds,
)); ));

View File

@ -1,6 +1,6 @@
use { use {
crate::shred::{ crate::shred::{
Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, Error, ProcessShredsStats, Shred, ShredData, ShredFlags, DATA_SHREDS_PER_FEC_BLOCK,
}, },
itertools::Itertools, itertools::Itertools,
lazy_static::lazy_static, lazy_static::lazy_static,
@ -24,6 +24,15 @@ lazy_static! {
.unwrap(); .unwrap();
} }
// Maps number of data shreds to the optimal erasure batch size which has the
// same recovery probabilities as a 32:32 erasure batch.
const ERASURE_BATCH_SIZE: [usize; 33] = [
0, 18, 20, 22, 23, 25, 27, 28, 30, // 8
32, 33, 35, 36, 38, 39, 41, 42, // 16
43, 45, 46, 48, 49, 51, 52, 53, // 24
55, 56, 58, 59, 60, 62, 63, 64, // 32
];
type ReedSolomon = reed_solomon_erasure::ReedSolomon<Field>; type ReedSolomon = reed_solomon_erasure::ReedSolomon<Field>;
#[derive(Debug)] #[derive(Debug)]
@ -65,45 +74,20 @@ impl Shredder {
Vec<Shred>, // data shreds Vec<Shred>, // data shreds
Vec<Shred>, // coding shreds Vec<Shred>, // coding shreds
) { ) {
let data_shreds = self.entries_to_data_shreds( let data_shreds =
keypair, self.entries_to_data_shreds(keypair, entries, is_last_in_slot, next_shred_index, stats);
entries, let coding_shreds =
is_last_in_slot, Self::data_shreds_to_coding_shreds(keypair, &data_shreds, next_code_index, stats)
next_shred_index, .unwrap();
next_shred_index, // fec_set_offset
stats,
);
let coding_shreds = Self::data_shreds_to_coding_shreds(
keypair,
&data_shreds,
is_last_in_slot,
next_code_index,
stats,
)
.unwrap();
(data_shreds, coding_shreds) (data_shreds, coding_shreds)
} }
/// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds. fn entries_to_data_shreds(
/// "FEC set index" is the index of first data shred in that FEC block.
/// **Data** shreds with the same value of:
/// (data_shred.index() - fec_set_offset) / MAX_DATA_SHREDS_PER_FEC_BLOCK
/// belong to the same FEC set.
/// Coding shreds inherit their fec_set_index from the data shreds that
/// they are generated from.
pub fn fec_set_index(data_shred_index: u32, fec_set_offset: u32) -> Option<u32> {
let diff = data_shred_index.checked_sub(fec_set_offset)?;
Some(data_shred_index - diff % MAX_DATA_SHREDS_PER_FEC_BLOCK)
}
pub fn entries_to_data_shreds(
&self, &self,
keypair: &Keypair, keypair: &Keypair,
entries: &[Entry], entries: &[Entry],
is_last_in_slot: bool, is_last_in_slot: bool,
next_shred_index: u32, next_shred_index: u32,
// Shred index offset at which FEC sets are generated.
fec_set_offset: u32,
process_stats: &mut ProcessShredsStats, process_stats: &mut ProcessShredsStats,
) -> Vec<Shred> { ) -> Vec<Shred> {
let mut serialize_time = Measure::start("shred_serialize"); let mut serialize_time = Measure::start("shred_serialize");
@ -119,7 +103,7 @@ impl Shredder {
let num_shreds = (serialized_shreds.len() + data_buffer_size - 1) / data_buffer_size; let num_shreds = (serialized_shreds.len() + data_buffer_size - 1) / data_buffer_size;
let last_shred_index = next_shred_index + num_shreds as u32 - 1; let last_shred_index = next_shred_index + num_shreds as u32 - 1;
// 1) Generate data shreds // 1) Generate data shreds
let make_data_shred = |shred_index: u32, data| { let make_data_shred = |data, shred_index: u32, fec_set_index: u32| {
let flags = if shred_index != last_shred_index { let flags = if shred_index != last_shred_index {
ShredFlags::empty() ShredFlags::empty()
} else if is_last_in_slot { } else if is_last_in_slot {
@ -129,7 +113,6 @@ impl Shredder {
ShredFlags::DATA_COMPLETE_SHRED ShredFlags::DATA_COMPLETE_SHRED
}; };
let parent_offset = self.slot - self.parent_slot; let parent_offset = self.slot - self.parent_slot;
let fec_set_index = Self::fec_set_index(shred_index, fec_set_offset);
let mut shred = Shred::new_from_data( let mut shred = Shred::new_from_data(
self.slot, self.slot,
shred_index, shred_index,
@ -138,18 +121,24 @@ impl Shredder {
flags, flags,
self.reference_tick, self.reference_tick,
self.version, self.version,
fec_set_index.unwrap(), fec_set_index,
); );
shred.sign(keypair); shred.sign(keypair);
shred shred
}; };
let data_shreds: Vec<Shred> = PAR_THREAD_POOL.install(|| { let shreds: Vec<&[u8]> = serialized_shreds.chunks(data_buffer_size).collect();
serialized_shreds let fec_set_offsets: Vec<usize> =
.par_chunks(data_buffer_size) get_fec_set_offsets(shreds.len(), DATA_SHREDS_PER_FEC_BLOCK).collect();
assert_eq!(shreds.len(), fec_set_offsets.len());
let shreds: Vec<Shred> = PAR_THREAD_POOL.install(|| {
shreds
.into_par_iter()
.zip(fec_set_offsets)
.enumerate() .enumerate()
.map(|(i, shred_data)| { .map(|(i, (shred, offset))| {
let shred_index = next_shred_index + i as u32; let shred_index = next_shred_index + i as u32;
make_data_shred(shred_index, shred_data) let fec_set_index = next_shred_index + offset as u32;
make_data_shred(shred, shred_index, fec_set_index)
}) })
.collect() .collect()
}); });
@ -157,15 +146,14 @@ impl Shredder {
process_stats.serialize_elapsed += serialize_time.as_us(); process_stats.serialize_elapsed += serialize_time.as_us();
process_stats.gen_data_elapsed += gen_data_time.as_us(); process_stats.gen_data_elapsed += gen_data_time.as_us();
process_stats.record_num_data_shreds(data_shreds.len()); process_stats.record_num_data_shreds(shreds.len());
data_shreds shreds
} }
pub fn data_shreds_to_coding_shreds( fn data_shreds_to_coding_shreds(
keypair: &Keypair, keypair: &Keypair,
data_shreds: &[Shred], data_shreds: &[Shred],
is_last_in_slot: bool,
next_code_index: u32, next_code_index: u32,
process_stats: &mut ProcessShredsStats, process_stats: &mut ProcessShredsStats,
) -> Result<Vec<Shred>, Error> { ) -> Result<Vec<Shred>, Error> {
@ -185,8 +173,7 @@ impl Shredder {
.iter() .iter()
.scan(next_code_index, |next_code_index, chunk| { .scan(next_code_index, |next_code_index, chunk| {
let num_data_shreds = chunk.len(); let num_data_shreds = chunk.len();
let erasure_batch_size = let erasure_batch_size = get_erasure_batch_size(num_data_shreds);
get_erasure_batch_size(num_data_shreds, is_last_in_slot);
*next_code_index += (erasure_batch_size - num_data_shreds) as u32; *next_code_index += (erasure_batch_size - num_data_shreds) as u32;
Some(*next_code_index) Some(*next_code_index)
}), }),
@ -198,7 +185,7 @@ impl Shredder {
.into_par_iter() .into_par_iter()
.zip(next_code_index) .zip(next_code_index)
.flat_map(|(shreds, next_code_index)| { .flat_map(|(shreds, next_code_index)| {
Shredder::generate_coding_shreds(&shreds, is_last_in_slot, next_code_index) Shredder::generate_coding_shreds(&shreds, next_code_index)
}) })
.collect() .collect()
}); });
@ -221,7 +208,6 @@ impl Shredder {
/// Generates coding shreds for the data shreds in the current FEC set /// Generates coding shreds for the data shreds in the current FEC set
pub fn generate_coding_shreds<T: Borrow<Shred>>( pub fn generate_coding_shreds<T: Borrow<Shred>>(
data: &[T], data: &[T],
is_last_in_slot: bool,
next_code_index: u32, next_code_index: u32,
) -> Vec<Shred> { ) -> Vec<Shred> {
let (slot, index, version, fec_set_index) = { let (slot, index, version, fec_set_index) = {
@ -241,9 +227,10 @@ impl Shredder {
&& shred.version() == version && shred.version() == version
&& shred.fec_set_index() == fec_set_index)); && shred.fec_set_index() == fec_set_index));
let num_data = data.len(); let num_data = data.len();
let num_coding = get_erasure_batch_size(num_data, is_last_in_slot) let num_coding = get_erasure_batch_size(num_data)
.checked_sub(num_data) .checked_sub(num_data)
.unwrap(); .unwrap();
assert!(num_coding > 0);
let data: Vec<_> = data let data: Vec<_> = data
.iter() .iter()
.map(Borrow::borrow) .map(Borrow::borrow)
@ -360,12 +347,31 @@ impl Shredder {
} }
/// Maps number of data shreds in each batch to the erasure batch size. /// Maps number of data shreds in each batch to the erasure batch size.
fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize { fn get_erasure_batch_size(num_data_shreds: usize) -> usize {
if is_last_in_slot { ERASURE_BATCH_SIZE
2 * num_data_shreds.max(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) .get(num_data_shreds)
} else { .copied()
2 * num_data_shreds .unwrap_or(2 * num_data_shreds)
} }
// Returns offsets to fec_set_index when spliting shreds into erasure batches.
fn get_fec_set_offsets(
mut num_shreds: usize,
min_chunk_size: usize,
) -> impl Iterator<Item = usize> {
let mut offset = 0;
std::iter::from_fn(move || {
if num_shreds == 0 {
return None;
}
let num_chunks = (num_shreds / min_chunk_size).max(1);
let chunk_size = (num_shreds + num_chunks - 1) / num_chunks;
let offsets = std::iter::repeat(offset).take(chunk_size);
num_shreds -= chunk_size;
offset += chunk_size;
Some(offsets)
})
.flatten()
} }
#[cfg(test)] #[cfg(test)]
@ -427,8 +433,7 @@ mod tests {
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_coding_shreds = let num_expected_coding_shreds =
get_erasure_batch_size(num_expected_data_shreds, /*is_last_in_slot:*/ true) get_erasure_batch_size(num_expected_data_shreds) - num_expected_data_shreds;
- num_expected_data_shreds;
let start_index = 0; let start_index = 0;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds( let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair, &keypair,
@ -684,7 +689,7 @@ mod tests {
assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds);
assert_eq!( assert_eq!(
num_coding_shreds, num_coding_shreds,
get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds get_erasure_batch_size(num_data_shreds) - num_data_shreds
); );
let all_shreds = data_shreds let all_shreds = data_shreds
@ -989,19 +994,34 @@ mod tests {
start_index, // next_code_index start_index, // next_code_index
&mut ProcessShredsStats::default(), &mut ProcessShredsStats::default(),
); );
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK;
data_shreds.iter().enumerate().for_each(|(i, s)| { let chunks: Vec<_> = data_shreds
let expected_fec_set_index = start_index + (i - i % max_per_block) as u32; .iter()
assert_eq!(s.fec_set_index(), expected_fec_set_index); .group_by(|shred| shred.fec_set_index())
}); .into_iter()
.map(|(fec_set_index, chunk)| (fec_set_index, chunk.count()))
coding_shreds.iter().enumerate().for_each(|(i, s)| { .collect();
let mut expected_fec_set_index = start_index + (i - i % max_per_block) as u32; assert!(chunks
while expected_fec_set_index as usize - start_index as usize > data_shreds.len() { .iter()
expected_fec_set_index -= max_per_block as u32; .all(|(_, chunk_size)| *chunk_size >= MIN_CHUNK_SIZE));
} assert!(chunks
assert_eq!(s.fec_set_index(), expected_fec_set_index); .iter()
}); .all(|(_, chunk_size)| *chunk_size < 2 * MIN_CHUNK_SIZE));
assert_eq!(chunks[0].0, start_index);
assert!(chunks.iter().tuple_windows().all(
|((fec_set_index, chunk_size), (next_fec_set_index, _chunk_size))| fec_set_index
+ *chunk_size as u32
== *next_fec_set_index
));
assert!(coding_shreds.len() >= data_shreds.len());
assert!(coding_shreds
.iter()
.zip(&data_shreds)
.all(|(code, data)| code.fec_set_index() == data.fec_set_index()));
assert_eq!(
coding_shreds.last().unwrap().fec_set_index(),
data_shreds.last().unwrap().fec_set_index()
);
} }
#[test] #[test]
@ -1028,43 +1048,61 @@ mod tests {
&entries, &entries,
true, // is_last_in_slot true, // is_last_in_slot
start_index, start_index,
start_index, // fec_set_offset
&mut stats, &mut stats,
); );
assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize);
let next_code_index = data_shreds[0].index(); let next_code_index = data_shreds[0].index();
(1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { for size in (1..data_shreds.len()).step_by(5) {
for is_last_in_slot in [false, true] { let data_shreds = &data_shreds[..size];
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
&keypair,
&data_shreds[..count],
is_last_in_slot,
next_code_index,
&mut stats,
)
.unwrap();
let num_coding_shreds = get_erasure_batch_size(count, is_last_in_slot) - count;
assert_eq!(coding_shreds.len(), num_coding_shreds);
}
});
for is_last_in_slot in [false, true] {
let coding_shreds = Shredder::data_shreds_to_coding_shreds( let coding_shreds = Shredder::data_shreds_to_coding_shreds(
&keypair, &keypair,
&data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], data_shreds,
is_last_in_slot,
next_code_index, next_code_index,
&mut stats, &mut stats,
) )
.unwrap(); .unwrap();
let num_shreds = let num_shreds: usize = data_shreds
get_erasure_batch_size(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, is_last_in_slot) .iter()
+ get_erasure_batch_size(1, is_last_in_slot); .group_by(|shred| shred.fec_set_index())
.into_iter()
.map(|(_, chunk)| get_erasure_batch_size(chunk.count()))
.sum();
assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len());
}
}
#[test]
fn test_get_fec_set_offsets() {
const MIN_CHUNK_SIZE: usize = 32usize;
for num_shreds in 0usize..MIN_CHUNK_SIZE {
let offsets: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE).collect();
assert_eq!(offsets, vec![0usize; num_shreds]);
}
for num_shreds in MIN_CHUNK_SIZE..MIN_CHUNK_SIZE * 8 {
let chunks: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE)
.group_by(|offset| *offset)
.into_iter()
.map(|(offset, chunk)| (offset, chunk.count()))
.collect();
assert_eq!( assert_eq!(
coding_shreds.len(), chunks
num_shreds - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1 .iter()
.map(|(_offset, chunk_size)| chunk_size)
.sum::<usize>(),
num_shreds
); );
assert!(chunks
.iter()
.all(|(_offset, chunk_size)| *chunk_size >= MIN_CHUNK_SIZE));
assert!(chunks
.iter()
.all(|(_offset, chunk_size)| *chunk_size < 2 * MIN_CHUNK_SIZE));
assert_eq!(chunks[0].0, 0);
assert!(chunks.iter().tuple_windows().all(
|((offset, chunk_size), (next_offset, _chunk_size))| offset + chunk_size
== *next_offset
));
} }
} }
} }

View File

@ -3,7 +3,7 @@ use {
solana_entry::entry::Entry, solana_entry::entry::Entry,
solana_ledger::shred::{ solana_ledger::shred::{
max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder, max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder,
LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
}, },
solana_sdk::{ solana_sdk::{
clock::Slot, clock::Slot,
@ -26,7 +26,7 @@ fn test_multi_fec_block_coding() {
let slot = 0x1234_5678_9abc_def0; let slot = 0x1234_5678_9abc_def0;
let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap();
let num_fec_sets = 100; let num_fec_sets = 100;
let num_data_shreds = (MAX_DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets) as usize; let num_data_shreds = DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets;
let keypair0 = Keypair::new(); let keypair0 = Keypair::new();
let keypair1 = Keypair::new(); let keypair1 = Keypair::new();
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
@ -67,8 +67,8 @@ fn test_multi_fec_block_coding() {
let mut all_shreds = vec![]; let mut all_shreds = vec![];
for i in 0..num_fec_sets { for i in 0..num_fec_sets {
let shred_start_index = (MAX_DATA_SHREDS_PER_FEC_BLOCK * i) as usize; let shred_start_index = DATA_SHREDS_PER_FEC_BLOCK * i;
let end_index = shred_start_index + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1; let end_index = shred_start_index + DATA_SHREDS_PER_FEC_BLOCK - 1;
let fec_set_shreds = data_shreds[shred_start_index..=end_index] let fec_set_shreds = data_shreds[shred_start_index..=end_index]
.iter() .iter()
.cloned() .cloned()
@ -99,11 +99,7 @@ fn test_multi_fec_block_coding() {
shred_info.insert(i * 2, recovered_shred); shred_info.insert(i * 2, recovered_shred);
} }
all_shreds.extend( all_shreds.extend(shred_info.into_iter().take(DATA_SHREDS_PER_FEC_BLOCK));
shred_info
.into_iter()
.take(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize),
);
} }
let result = Shredder::deshred(&all_shreds[..]).unwrap(); let result = Shredder::deshred(&all_shreds[..]).unwrap();
@ -193,11 +189,11 @@ fn setup_different_sized_fec_blocks(
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
let entry = Entry::new(&Hash::default(), 1, vec![tx0]); let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
// Make enough entries for `MAX_DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one // Make enough entries for `DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one
// fec set will have `MAX_DATA_SHREDS_PER_FEC_BLOCK` shreds and the next // fec set will have `DATA_SHREDS_PER_FEC_BLOCK` shreds and the next
// will have 2 shreds. // will have 2 shreds.
assert!(MAX_DATA_SHREDS_PER_FEC_BLOCK > 2); assert!(DATA_SHREDS_PER_FEC_BLOCK > 2);
let num_shreds_per_iter = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 2; let num_shreds_per_iter = DATA_SHREDS_PER_FEC_BLOCK + 2;
let num_entries = max_entries_per_n_shred( let num_entries = max_entries_per_n_shred(
&entry, &entry,
num_shreds_per_iter as u64, num_shreds_per_iter as u64,