From 4f82b897bc1756b6e0123c6c13762bb4b5eadea4 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 23 Mar 2021 14:52:38 +0000 Subject: [PATCH] buffers data shreds to make larger erasure coded sets (#15849) Broadcast stage batches up to 8 entries: https://github.com/solana-labs/solana/blob/79280b304/core/src/broadcast_stage/broadcast_utils.rs#L26-L29 which will be serialized into some number of shreds and chunked into FEC sets of at most 32 shreds each: https://github.com/solana-labs/solana/blob/79280b304/ledger/src/shred.rs#L576-L597 So depending on the size of entries, FEC sets can be small, which may aggravate loss rate. For example 16 FEC sets of 2:2 data/code shreds each have higher loss rate than one 32:32 set. This commit broadcasts data shreds immediately, but also buffers them until it has a batch of 32 data shreds, at which point 32 coding shreds are generated and broadcasted. --- core/benches/shredder.rs | 12 +- core/src/broadcast_stage.rs | 14 +- core/src/broadcast_stage/broadcast_utils.rs | 8 +- .../broadcast_stage/standard_broadcast_run.rs | 290 ++++++++++++------ core/src/shred_fetch_stage.rs | 14 +- ledger/src/blockstore.rs | 4 +- ledger/src/shred.rs | 170 ++++++---- 7 files changed, 333 insertions(+), 179 deletions(-) diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 18bd9a90f7..2bfd080a80 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -42,7 +42,13 @@ fn make_shreds(num_shreds: usize) -> Vec { let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, Arc::new(Keypair::new()), 0, 0).unwrap(); let data_shreds = shredder - .entries_to_data_shreds(&entries, true, 0, &mut ProcessShredsStats::default()) + .entries_to_data_shreds( + &entries, + true, // is_last_in_slot + 0, // next_shred_index + 0, // fec_set_offset + &mut ProcessShredsStats::default(), + ) .0; assert!(data_shreds.len() >= num_shreds); data_shreds @@ -127,10 +133,8 @@ fn bench_shredder_coding(bencher: &mut Bencher) { let data_shreds = make_shreds(symbol_count); bencher.iter(|| { Shredder::generate_coding_shreds( - 0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], - 0, symbol_count, ) .len(); @@ -142,10 +146,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let data_shreds = make_shreds(symbol_count); let coding_shreds = Shredder::generate_coding_shreds( - 0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], - 0, symbol_count, ); bencher.iter(|| { diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index b018600367..6cfc635f1b 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -472,12 +472,14 @@ pub mod test { ) { let num_entries = max_ticks_per_n_shreds(num, None); let (data_shreds, _) = make_slot_entries(slot, 0, num_entries); - let keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0) - .expect("Expected to create a new shredder"); - - let coding_shreds = shredder - .data_shreds_to_coding_shreds(&data_shreds[0..], &mut ProcessShredsStats::default()); + let keypair = Keypair::new(); + let coding_shreds = Shredder::data_shreds_to_coding_shreds( + &keypair, + &data_shreds[0..], + RECOMMENDED_FEC_RATE, + &mut ProcessShredsStats::default(), + ) + .unwrap(); ( data_shreds.clone(), coding_shreds.clone(), diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index df5140401b..b421a2a524 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,6 +1,6 @@ use crate::poh_recorder::WorkingBankEntry; use crate::result::Result; -use solana_ledger::entry::Entry; +use solana_ledger::{entry::Entry, shred::Shred}; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use std::{ @@ -16,11 +16,15 @@ pub(super) struct ReceiveResults { pub last_tick_height: u64, } -#[derive(Copy, Clone)] +#[derive(Clone)] pub struct UnfinishedSlotInfo { pub next_shred_index: u32, pub slot: Slot, pub parent: Slot, + // Data shreds buffered to make a batch of size + // MAX_DATA_SHREDS_PER_FEC_BLOCK. + pub(crate) data_shreds_buffer: Vec, + pub(crate) fec_set_offset: u32, // See Shredder::fec_set_index. } /// This parameter tunes how many entries are received in one iteration of recv loop diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index d91da296e4..f642de2531 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -7,12 +7,13 @@ use super::{ use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::{ entry::Entry, - shred::{ProcessShredsStats, Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}, + shred::{ + ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, + SHRED_TICK_REFERENCE_MASK, + }, }; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; -use std::collections::HashMap; -use std::sync::RwLock; -use std::time::Duration; +use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration}; #[derive(Clone)] pub struct StandardBroadcastRun { @@ -40,94 +41,114 @@ impl StandardBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { Self { process_shreds_stats: ProcessShredsStats::default(), - transmit_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())), - insert_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())), + transmit_shreds_stats: Arc::default(), + insert_shreds_stats: Arc::default(), unfinished_slot: None, current_slot_and_parent: None, slot_broadcast_start: None, keypair, shred_version, - last_datapoint_submit: Arc::new(AtomicU64::new(0)), + last_datapoint_submit: Arc::default(), num_batches: 0, - broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())), - last_peer_update: Arc::new(AtomicU64::new(0)), + broadcast_peer_cache: Arc::default(), + last_peer_update: Arc::default(), } } - fn check_for_interrupted_slot(&mut self, max_ticks_in_slot: u8) -> Option { - let (slot, _) = self.current_slot_and_parent.unwrap(); - let mut last_unfinished_slot_shred = self - .unfinished_slot - .map(|last_unfinished_slot| { - if last_unfinished_slot.slot != slot { - self.report_and_reset_stats(); - Some(Shred::new_from_data( - last_unfinished_slot.slot, - last_unfinished_slot.next_shred_index, - (last_unfinished_slot.slot - last_unfinished_slot.parent) as u16, - None, - true, - true, - max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK, - self.shred_version, - last_unfinished_slot.next_shred_index, - )) - } else { - None - } - }) - .unwrap_or(None); - - // This shred should only be Some if the previous slot was interrupted - if let Some(ref mut shred) = last_unfinished_slot_shred { - Shredder::sign_shred(&self.keypair, shred); - self.unfinished_slot = None; + // If the current slot has changed, generates an empty shred indicating + // last shred in the previous slot, along with coding shreds for the data + // shreds buffered. + fn finish_prev_slot( + &mut self, + max_ticks_in_slot: u8, + stats: &mut ProcessShredsStats, + ) -> Vec { + let (current_slot, _) = self.current_slot_and_parent.unwrap(); + match self.unfinished_slot { + None => Vec::default(), + Some(ref state) if state.slot == current_slot => Vec::default(), + Some(ref mut state) => { + let parent_offset = state.slot - state.parent; + let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK; + let fec_set_index = + Shredder::fec_set_index(state.next_shred_index, state.fec_set_offset); + let mut shred = Shred::new_from_data( + state.slot, + state.next_shred_index, + parent_offset as u16, + None, // data + true, // is_last_in_fec_set + true, // is_last_in_slot + reference_tick, + self.shred_version, + fec_set_index.unwrap(), + ); + Shredder::sign_shred(self.keypair.deref(), &mut shred); + state.data_shreds_buffer.push(shred.clone()); + let mut shreds = make_coding_shreds( + self.keypair.deref(), + &mut self.unfinished_slot, + true, // is_last_in_slot + stats, + ); + shreds.insert(0, shred); + self.report_and_reset_stats(); + self.unfinished_slot = None; + shreds + } } + } - last_unfinished_slot_shred - } - fn init_shredder(&self, blockstore: &Blockstore, reference_tick: u8) -> (Shredder, u32) { - let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); - let next_shred_index = self - .unfinished_slot - .map(|s| s.next_shred_index) - .unwrap_or_else(|| { - blockstore - .meta(slot) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0) as u32 - }); - ( - Shredder::new( - slot, - parent_slot, - RECOMMENDED_FEC_RATE, - self.keypair.clone(), - reference_tick, - self.shred_version, - ) - .expect("Expected to create a new shredder"), - next_shred_index, - ) - } fn entries_to_data_shreds( &mut self, - shredder: &Shredder, - next_shred_index: u32, entries: &[Entry], + blockstore: &Blockstore, + reference_tick: u8, is_slot_end: bool, process_stats: &mut ProcessShredsStats, ) -> Vec { - let (data_shreds, new_next_shred_index) = - shredder.entries_to_data_shreds(entries, is_slot_end, next_shred_index, process_stats); - + let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); + let (next_shred_index, fec_set_offset) = match &self.unfinished_slot { + Some(state) => (state.next_shred_index, state.fec_set_offset), + None => match blockstore.meta(slot).unwrap() { + Some(slot_meta) => { + let shreds_consumed = slot_meta.consumed as u32; + (shreds_consumed, shreds_consumed) + } + None => (0, 0), + }, + }; + let (data_shreds, next_shred_index) = Shredder::new( + slot, + parent_slot, + RECOMMENDED_FEC_RATE, + self.keypair.clone(), + reference_tick, + self.shred_version, + ) + .unwrap() + .entries_to_data_shreds( + entries, + is_slot_end, + next_shred_index, + fec_set_offset, + process_stats, + ); + let mut data_shreds_buffer = match &mut self.unfinished_slot { + Some(state) => { + assert_eq!(state.slot, slot); + std::mem::take(&mut state.data_shreds_buffer) + } + None => Vec::default(), + }; + data_shreds_buffer.extend(data_shreds.clone()); self.unfinished_slot = Some(UnfinishedSlotInfo { - next_shred_index: new_next_shred_index, - slot: shredder.slot, - parent: shredder.parent_slot, + next_shred_index, + slot, + parent: parent_slot, + data_shreds_buffer, + fec_set_offset, }); - data_shreds } @@ -184,19 +205,16 @@ impl StandardBroadcastRun { let mut to_shreds_time = Measure::start("broadcast_to_shreds"); // 1) Check if slot was interrupted - let last_unfinished_slot_shred = - self.check_for_interrupted_slot(bank.ticks_per_slot() as u8); + let prev_slot_shreds = + self.finish_prev_slot(bank.ticks_per_slot() as u8, &mut process_stats); // 2) Convert entries to shreds and coding shreds - let (shredder, next_shred_index) = self.init_shredder( - blockstore, - (bank.tick_height() % bank.ticks_per_slot()) as u8, - ); let is_last_in_slot = last_tick_height == bank.max_tick_height(); + let reference_tick = bank.tick_height() % bank.ticks_per_slot(); let data_shreds = self.entries_to_data_shreds( - &shredder, - next_shred_index, &receive_results.entries, + blockstore, + reference_tick as u8, is_last_in_slot, &mut process_stats, ); @@ -208,27 +226,25 @@ impl StandardBroadcastRun { .insert_shreds(first, None, true) .expect("Failed to insert shreds in blockstore"); } - let last_data_shred = data_shreds.len(); to_shreds_time.stop(); let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); + let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new); // Broadcast the last shred of the interrupted slot if necessary - if let Some(last_shred) = last_unfinished_slot_shred { + if !prev_slot_shreds.is_empty() { let batch_info = Some(BroadcastShredBatchInfo { - slot: last_shred.slot(), + slot: prev_slot_shreds[0].slot(), num_expected_batches: Some(old_num_batches + 1), slot_start_ts: old_broadcast_start.expect( "Old broadcast start time for previous slot must exist if the previous slot was interrupted", ), }); - let last_shred = Arc::new(vec![last_shred]); - socket_sender.send(((stakes.clone(), last_shred.clone()), batch_info.clone()))?; - blockstore_sender.send((last_shred, batch_info))?; + let shreds = Arc::new(prev_slot_shreds); + socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?; + blockstore_sender.send((shreds, batch_info))?; } // Increment by two batches, one for the data batch, one for the coding batch. @@ -255,11 +271,15 @@ impl StandardBroadcastRun { // Send data shreds let data_shreds = Arc::new(data_shreds); socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; - blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?; + blockstore_sender.send((data_shreds, batch_info.clone()))?; // Create and send coding shreds - let coding_shreds = shredder - .data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred], &mut process_stats); + let coding_shreds = make_coding_shreds( + self.keypair.deref(), + &mut self.unfinished_slot, + is_last_in_slot, + &mut process_stats, + ); let coding_shreds = Arc::new(coding_shreds); socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; @@ -378,15 +398,15 @@ impl StandardBroadcastRun { fn report_and_reset_stats(&mut self) { let stats = &self.process_shreds_stats; - assert!(self.unfinished_slot.is_some()); + let unfinished_slot = self.unfinished_slot.as_ref().unwrap(); datapoint_info!( "broadcast-process-shreds-stats", - ("slot", self.unfinished_slot.unwrap().slot as i64, i64), + ("slot", unfinished_slot.slot as i64, i64), ("shredding_time", stats.shredding_elapsed, i64), ("receive_time", stats.receive_elapsed, i64), ( "num_data_shreds", - i64::from(self.unfinished_slot.unwrap().next_shred_index), + unfinished_slot.next_shred_index as i64, i64 ), ( @@ -409,6 +429,33 @@ impl StandardBroadcastRun { } } +// Consumes data_shreds_buffer returning corresponding coding shreds. +fn make_coding_shreds( + keypair: &Keypair, + unfinished_slot: &mut Option, + is_slot_end: bool, + stats: &mut ProcessShredsStats, +) -> Vec { + let data_shreds = match unfinished_slot { + None => Vec::default(), + Some(unfinished_slot) => { + let size = unfinished_slot.data_shreds_buffer.len(); + // Consume a multiple of 32, unless this is the slot end. + let offset = if is_slot_end { + 0 + } else { + size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + }; + unfinished_slot + .data_shreds_buffer + .drain(0..size - offset) + .collect() + } + }; + Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, RECOMMENDED_FEC_RATE, stats) + .unwrap() +} + impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, @@ -418,6 +465,8 @@ impl BroadcastRun for StandardBroadcastRun { blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { let receive_results = broadcast_utils::recv_slot_entries(receiver)?; + // TODO: Confirm that last chunk of coding shreds + // will not be lost or delayed for too long. self.process_receive_results( blockstore, socket_sender, @@ -508,6 +557,8 @@ mod test { next_shred_index, slot, parent, + data_shreds_buffer: Vec::default(), + fec_set_offset: next_shred_index, }); run.slot_broadcast_start = Some(Instant::now()); @@ -515,8 +566,9 @@ mod test { run.current_slot_and_parent = Some((4, 2)); // Slot 2 interrupted slot 1 - let shred = run - .check_for_interrupted_slot(0) + let shreds = run.finish_prev_slot(0, &mut ProcessShredsStats::default()); + let shred = shreds + .get(0) .expect("Expected a shred that signals an interrupt"); // Validate the shred @@ -642,6 +694,50 @@ mod test { ); } + #[test] + fn test_buffer_data_shreds() { + let num_shreds_per_slot = 2; + let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket) = + setup(num_shreds_per_slot); + let (bsend, brecv) = channel(); + let (ssend, _srecv) = channel(); + let mut last_tick_height = 0; + let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0); + let mut process_ticks = |num_ticks| { + let ticks = create_ticks(num_ticks, 0, genesis_config.hash()); + last_tick_height += (ticks.len() - 1) as u64; + let receive_results = ReceiveResults { + entries: ticks, + time_elapsed: Duration::new(1, 0), + bank: bank.clone(), + last_tick_height, + }; + standard_broadcast_run + .process_receive_results(&blockstore, &ssend, &bsend, receive_results) + .unwrap(); + }; + for i in 0..3 { + process_ticks((i + 1) * 100); + } + let mut shreds = Vec::::new(); + while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { + shreds.extend(recv_shreds.deref().clone()); + } + assert!(shreds.len() < 32, "shreds.len(): {}", shreds.len()); + assert!(shreds.iter().all(|shred| shred.is_data())); + process_ticks(75); + while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { + shreds.extend(recv_shreds.deref().clone()); + } + assert!(shreds.len() > 64, "shreds.len(): {}", shreds.len()); + let num_coding_shreds = shreds.iter().filter(|shred| shred.is_code()).count(); + assert_eq!( + num_coding_shreds, 32, + "num coding shreds: {}", + num_coding_shreds + ); + } + #[test] fn test_slot_finish() { // Setup diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 2cacdbff05..2a5e1b7fd3 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -236,7 +236,16 @@ mod tests { let mut stats = ShredFetchStats::default(); let slot = 1; - let shred = Shred::new_from_data(slot, 3, 0, None, true, true, 0, 0, 0); + let shred = Shred::new_from_data( + slot, 3, // shred index + 0, // parent offset + None, // data + true, // is_last_in_fec_set + true, // is_last_in_slot + 0, // reference_tick + 0, // version + 3, // fec_set_index + ); shred.copy_to_packet(&mut packet); let hasher = PacketHasher::default(); @@ -256,8 +265,7 @@ mod tests { ); assert!(!packet.meta.discard); - let coding = - solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10, 1); + let coding = solana_ledger::shred::Shredder::generate_coding_shreds(1.0f32, &[shred], 1); coding[0].copy_to_packet(&mut packet); ShredFetchStage::process_packet( &mut packet, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 01f6fcd6b4..1de8aecceb 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -7450,8 +7450,8 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let ledger = Blockstore::open(&ledger_path).unwrap(); - let coding1 = Shredder::generate_coding_shreds(slot, 0.5f32, &shreds, 0x42, usize::MAX); - let coding2 = Shredder::generate_coding_shreds(slot, 1.0f32, &shreds, 0x42, usize::MAX); + let coding1 = Shredder::generate_coding_shreds(0.5f32, &shreds, usize::MAX); + let coding2 = Shredder::generate_coding_shreds(1.0f32, &shreds, usize::MAX); for shred in &shreds { info!("shred {:?}", shred); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index d83416694a..57eb2dbf7b 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -22,7 +22,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use std::{mem::size_of, sync::Arc}; +use std::{mem::size_of, ops::Deref, sync::Arc}; use thiserror::Error; @@ -559,17 +559,40 @@ impl Shredder { next_shred_index: u32, ) -> (Vec, Vec, u32) { let mut stats = ProcessShredsStats::default(); - let (data_shreds, last_shred_index) = - self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index, &mut stats); - let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds, &mut stats); + let (data_shreds, last_shred_index) = self.entries_to_data_shreds( + entries, + is_last_in_slot, + next_shred_index, + next_shred_index, // fec_set_offset + &mut stats, + ); + let coding_shreds = Self::data_shreds_to_coding_shreds( + self.keypair.deref(), + &data_shreds, + self.fec_rate, + &mut stats, + ) + .unwrap(); (data_shreds, coding_shreds, last_shred_index) } + // Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds. + // "FEC set index" is the index of first data shred in that FEC block. + // Shred indices with the same value of: + // (shred_index - fec_set_offset) / MAX_DATA_SHREDS_PER_FEC_BLOCK + // belong to the same FEC set. + pub fn fec_set_index(shred_index: u32, fec_set_offset: u32) -> Option { + let diff = shred_index.checked_sub(fec_set_offset)?; + Some(shred_index - diff % MAX_DATA_SHREDS_PER_FEC_BLOCK) + } + pub fn entries_to_data_shreds( &self, entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, + // Shred index offset at which FEC sets are generated. + fec_set_offset: u32, process_stats: &mut ProcessShredsStats, ) -> (Vec, u32) { let mut serialize_time = Measure::start("shred_serialize"); @@ -578,11 +601,29 @@ impl Shredder { serialize_time.stop(); let mut gen_data_time = Measure::start("shred_gen_data_time"); - let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; // 1) Generate data shreds + let make_data_shred = |shred_index: u32, data| { + let is_last_data = shred_index == last_shred_index; + let is_last_in_slot = is_last_data && is_last_in_slot; + let parent_offset = self.slot - self.parent_slot; + let fec_set_index = Self::fec_set_index(shred_index, fec_set_offset); + let mut shred = Shred::new_from_data( + self.slot, + shred_index, + parent_offset as u16, + Some(data), + is_last_data, + is_last_in_slot, + self.reference_tick, + self.version, + fec_set_index.unwrap(), + ); + Shredder::sign_shred(self.keypair.deref(), &mut shred); + shred + }; let data_shreds: Vec = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { serialized_shreds @@ -590,34 +631,7 @@ impl Shredder { .enumerate() .map(|(i, shred_data)| { let shred_index = next_shred_index + i as u32; - - // Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds - // "FEC set index" is the index of first data shred in that FEC block - let fec_set_index = - shred_index - (i % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) as u32; - - let (is_last_data, is_last_in_slot) = { - if shred_index == last_shred_index { - (true, is_last_in_slot) - } else { - (false, false) - } - }; - - let mut shred = Shred::new_from_data( - self.slot, - shred_index, - (self.slot - self.parent_slot) as u16, - Some(shred_data), - is_last_data, - is_last_in_slot, - self.reference_tick, - self.version, - fec_set_index, - ); - - Shredder::sign_shred(&self.keypair, &mut shred); - shred + make_data_shred(shred_index, shred_data) }) .collect() }) @@ -631,10 +645,17 @@ impl Shredder { } pub fn data_shreds_to_coding_shreds( - &self, + keypair: &Keypair, data_shreds: &[Shred], + fec_rate: f32, process_stats: &mut ProcessShredsStats, - ) -> Vec { + ) -> Result> { + if !(0.0..=1.0).contains(&fec_rate) { + return Err(ShredError::InvalidFecRate(fec_rate)); + } + if data_shreds.is_empty() { + return Ok(Vec::default()); + } let mut gen_coding_time = Measure::start("gen_coding_shreds"); // 2) Generate coding shreds let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { @@ -643,11 +664,9 @@ impl Shredder { .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) .flat_map(|shred_data_batch| { Shredder::generate_coding_shreds( - self.slot, - self.fec_rate, + fec_rate, shred_data_batch, - self.version, - shred_data_batch.len(), + shred_data_batch.len(), // max_coding_shreds ) }) .collect() @@ -660,7 +679,7 @@ impl Shredder { PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { coding_shreds.par_iter_mut().for_each(|mut coding_shred| { - Shredder::sign_shred(&self.keypair, &mut coding_shred); + Shredder::sign_shred(keypair, &mut coding_shred); }) }) }); @@ -668,7 +687,7 @@ impl Shredder { process_stats.gen_coding_elapsed += gen_coding_time.as_us(); process_stats.sign_coding_elapsed += sign_coding_time.as_us(); - coding_shreds + Ok(coding_shreds) } pub fn sign_shred(signer: &Keypair, shred: &mut Shred) { @@ -707,10 +726,8 @@ impl Shredder { /// Generates coding shreds for the data shreds in the current FEC set pub fn generate_coding_shreds( - slot: Slot, fec_rate: f32, data_shred_batch: &[Shred], - version: u16, max_coding_shreds: usize, ) -> Vec { assert!(!data_shred_batch.is_empty()); @@ -721,8 +738,19 @@ impl Shredder { Self::calculate_num_coding_shreds(num_data, fec_rate, max_coding_shreds); let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); - let start_index = data_shred_batch[0].common_header.index; - + let ShredCommonHeader { + slot, + index: start_index, + version, + fec_set_index, + .. + } = data_shred_batch[0].common_header; + assert_eq!(fec_set_index, start_index); + assert!(data_shred_batch + .iter() + .all(|shred| shred.common_header.slot == slot + && shred.common_header.version == version + && shred.common_header.fec_set_index == fec_set_index)); // All information after coding shred field in a data shred is encoded let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; let data_ptrs: Vec<_> = data_shred_batch @@ -731,19 +759,20 @@ impl Shredder { .collect(); // Create empty coding shreds, with correctly populated headers - let mut coding_shreds = Vec::with_capacity(num_coding); - (0..num_coding).for_each(|i| { - let shred = Shred::new_empty_coding( - slot, - start_index + i as u32, - start_index, - num_data, - num_coding, - i, - version, - ); - coding_shreds.push(shred.payload); - }); + let mut coding_shreds: Vec<_> = (0..num_coding) + .map(|i| { + Shred::new_empty_coding( + slot, + start_index + i as u32, + fec_set_index, + num_data, + num_coding, + i, // position + version, + ) + .payload + }) + .collect(); // Grab pointers for the coding blocks let coding_block_offset = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; @@ -1765,21 +1794,34 @@ pub mod tests { let mut stats = ProcessShredsStats::default(); let start_index = 0x12; - let (data_shreds, _next_index) = - shredder.entries_to_data_shreds(&entries, true, start_index, &mut stats); + let (data_shreds, _next_index) = shredder.entries_to_data_shreds( + &entries, + true, // is_last_in_slot + start_index, + start_index, // fec_set_offset + &mut stats, + ); assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize); (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { - let coding_shreds = - shredder.data_shreds_to_coding_shreds(&data_shreds[..count], &mut stats); + let coding_shreds = Shredder::data_shreds_to_coding_shreds( + shredder.keypair.deref(), + &data_shreds[..count], + shredder.fec_rate, + &mut stats, + ) + .unwrap(); assert_eq!(coding_shreds.len(), count); }); - let coding_shreds = shredder.data_shreds_to_coding_shreds( + let coding_shreds = Shredder::data_shreds_to_coding_shreds( + shredder.keypair.deref(), &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], + shredder.fec_rate, &mut stats, - ); + ) + .unwrap(); assert_eq!( coding_shreds.len(), MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1