Better dupe detection (#13992)

This commit is contained in:
sakridge 2020-12-09 23:14:31 -08:00 committed by GitHub
parent 8d1651c8ad
commit c5fe076432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 380 additions and 114 deletions

View File

@ -125,8 +125,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let data_shreds = make_shreds(symbol_count);
bencher.iter(|| {
Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0)
.len();
Shredder::generate_coding_shreds(
0,
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
0,
symbol_count,
)
.len();
})
}
@ -134,8 +140,13 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let data_shreds = make_shreds(symbol_count);
let coding_shreds =
Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0);
let coding_shreds = Shredder::generate_coding_shreds(
0,
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
0,
symbol_count,
);
bencher.iter(|| {
Shredder::try_recovery(
coding_shreds[..].to_vec(),

View File

@ -989,29 +989,43 @@ impl ReplayStage {
// errors related to the slot being purged
let slot = bank.slot();
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
if let BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount) = err {
datapoint_info!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
} else {
datapoint_error!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
}
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
let is_serious = matches!(
err,
BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount)
);
Self::mark_dead_slot(blockstore, bank_progress, slot, &err, is_serious);
err
})?;
Ok(tx_count)
}
fn mark_dead_slot(
blockstore: &Blockstore,
bank_progress: &mut ForkProgress,
slot: Slot,
err: &BlockstoreProcessorError,
is_serious: bool,
) {
if is_serious {
datapoint_error!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
} else {
datapoint_info!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
}
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
}
#[allow(clippy::too_many_arguments)]
fn handle_votable_bank(
bank: &Arc<Bank>,
@ -1323,23 +1337,40 @@ impl ReplayStage {
}
assert_eq!(*bank_slot, bank.slot());
if bank.is_complete() {
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true;
info!("bank frozen: {}", bank.slot());
bank.freeze();
heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
if !blockstore.has_duplicate_shreds_in_slot(bank.slot()) {
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true;
info!("bank frozen: {}", bank.slot());
bank.freeze();
heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| {
warn!("bank_notification_sender failed: {:?}", err)
});
}
Self::record_rewards(&bank, &rewards_recorder_sender);
Self::record_rewards(&bank, &rewards_recorder_sender);
} else {
Self::mark_dead_slot(
blockstore,
bank_progress,
bank.slot(),
&BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock),
true,
);
warn!(
"{} duplicate shreds detected, not freezing bank {}",
my_pubkey,
bank.slot()
);
}
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",

View File

@ -305,7 +305,7 @@ mod tests {
assert!(!packet.meta.discard);
let coding =
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10);
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10, 1);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,

View File

@ -26,4 +26,7 @@ pub enum BlockError {
/// that each block has the same number of hashes
#[error("trailing entry")]
TrailingEntry,
#[error("duplicate block")]
DuplicateBlock,
}

View File

@ -809,6 +809,7 @@ impl Blockstore {
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
);
} else {
@ -1012,15 +1013,24 @@ impl Blockstore {
.is_ok()
}
fn check_cache_coding_shred(
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
}
fn check_cache_coding_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
) -> bool {
) -> bool
where
F: Fn(Shred),
{
let slot = shred.slot();
let shred_index = u64::from(shred.index());
@ -1028,52 +1038,109 @@ impl Blockstore {
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if is_trusted
|| Blockstore::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root)
{
let set_index = u64::from(shred.common_header.fec_set_index);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
let first_coding_index =
u64::from(shred.index()) - u64::from(shred.coding_header.position);
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| {
ErasureMeta::new(set_index, first_coding_index, &erasure_config)
})
});
if erasure_config != erasure_meta.config {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config
);
if !is_trusted {
if index_meta.coding().is_present(shred_index) {
handle_duplicate(shred);
return false;
}
// Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing will be committed by
// `check_insert_coding_shred`, so the coding index meta will not be
// committed
index_meta.coding_mut().set_present(shred_index, true);
just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
true
} else {
false
if !Blockstore::should_insert_coding_shred(&shred, &self.last_root) {
return false;
}
}
let set_index = u64::from(shred.common_header.fec_set_index);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
let first_coding_index =
u64::from(shred.index()) - u64::from(shred.coding_header.position);
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, first_coding_index, &erasure_config))
});
if erasure_config != erasure_meta.config {
let conflicting_shred = self.find_conflicting_coding_shred(
&shred,
slot,
erasure_meta,
just_received_coding_shreds,
);
if let Some(conflicting_shred) = conflicting_shred {
if self
.store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone())
.is_err()
{
warn!("bad duplicate store..");
}
} else {
datapoint_info!("bad-conflict-shred", ("slot", slot, i64));
}
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config
);
return false;
}
// Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing will be committed by
// `check_insert_coding_shred`, so the coding index meta will not be
// committed
index_meta.coding_mut().set_present(shred_index, true);
just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
true
}
fn find_conflicting_coding_shred(
&self,
shred: &Shred,
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
) -> Option<Vec<u8>> {
// Search for the shred which set the initial erasure config, either inserted,
// or in the current batch in just_received_coding_shreds.
let coding_start = erasure_meta.first_coding_index;
let coding_end = coding_start + erasure_meta.config.num_coding() as u64;
let mut conflicting_shred = None;
for coding_index in coding_start..coding_end {
let maybe_shred = self.get_coding_shred(slot, coding_index);
if let Ok(Some(shred_data)) = maybe_shred {
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
if Self::erasure_mismatch(&potential_shred, &shred) {
conflicting_shred = Some(potential_shred.payload);
}
break;
} else if let Some(potential_shred) =
just_received_coding_shreds.get(&(slot, coding_index))
{
if Self::erasure_mismatch(&potential_shred, &shred) {
conflicting_shred = Some(potential_shred.payload.clone());
}
break;
}
}
conflicting_shred
}
#[allow(clippy::too_many_arguments)]
@ -1110,7 +1177,7 @@ impl Blockstore {
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
handle_duplicate(shred);
return Err(InsertDataShredError::Exists);
} else if !Blockstore::should_insert_data_shred(
} else if !self.should_insert_data_shred(
&shred,
slot_meta,
&self.last_root,
@ -1139,11 +1206,7 @@ impl Blockstore {
Ok(newly_completed_data_sets)
}
fn should_insert_coding_shred(
shred: &Shred,
coding_index: &ShredIndex,
last_root: &RwLock<u64>,
) -> bool {
fn should_insert_coding_shred(shred: &Shred, last_root: &RwLock<u64>) -> bool {
let slot = shred.slot();
let shred_index = shred.index();
@ -1152,11 +1215,13 @@ impl Blockstore {
}
let set_index = shred.common_header.fec_set_index;
!(shred.coding_header.num_coding_shreds == 0
|| shred.coding_header.position >= shred.coding_header.num_coding_shreds
|| std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1
|| coding_index.is_present(u64::from(shred_index))
|| slot <= *last_root.read().unwrap())
|| slot <= *last_root.read().unwrap()
|| shred.coding_header.num_coding_shreds as u32
> (8 * crate::shred::MAX_DATA_SHREDS_PER_FEC_BLOCK))
}
fn insert_coding_shred(
@ -1187,6 +1252,7 @@ impl Blockstore {
}
fn should_insert_data_shred(
&self,
shred: &Shred,
slot_meta: &SlotMeta,
last_root: &RwLock<u64>,
@ -1209,6 +1275,15 @@ impl Blockstore {
let leader_pubkey = leader_schedule
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None);
let ending_shred = self.get_data_shred(slot, last_index).unwrap().unwrap();
if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone())
.is_err()
{
warn!("store duplicate error");
}
datapoint_error!(
"blockstore_error",
(
@ -1228,6 +1303,18 @@ impl Blockstore {
let leader_pubkey = leader_schedule
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None);
let ending_shred = self
.get_data_shred(slot, slot_meta.received - 1)
.unwrap()
.unwrap();
if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone())
.is_err()
{
warn!("store duplicate error");
}
datapoint_error!(
"blockstore_error",
(
@ -2599,6 +2686,19 @@ impl Blockstore {
self.dead_slots_cf.put(slot, &true)
}
pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
shred1: Vec<u8>,
shred2: Vec<u8>,
) -> Result<()> {
if !self.has_duplicate_shreds_in_slot(slot) {
self.store_duplicate_slot(slot, shred1, shred2)
} else {
Ok(())
}
}
pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
@ -5067,6 +5167,7 @@ pub mod tests {
#[test]
pub fn test_should_insert_data_shred() {
solana_logger::setup();
let (mut shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
@ -5094,9 +5195,10 @@ pub mod tests {
}
};
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
false
);
assert!(blockstore.has_duplicate_shreds_in_slot(0));
// Insert all pending shreds
let mut shred8 = shreds[8].clone();
@ -5110,7 +5212,7 @@ pub mod tests {
panic!("Shred in unexpected format")
}
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
false
);
}
@ -5154,12 +5256,56 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_check_cache_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let slot = 1;
let (shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 11, 10, 0);
let coding_shred =
Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
let mut erasure_metas = HashMap::new();
let mut index_working_set = HashMap::new();
let mut just_received_coding_shreds = HashMap::new();
let mut index_meta_time = 0;
assert!(blockstore.check_cache_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut just_received_coding_shreds,
&mut index_meta_time,
&|_shred| {
panic!("no dupes");
},
false,
));
// insert again fails on dupe
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
assert!(!blockstore.check_cache_coding_shred(
coding_shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_received_coding_shreds,
&mut index_meta_time,
&|_shred| {
counter.fetch_add(1, Ordering::Relaxed);
},
false,
));
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
}
#[test]
pub fn test_should_insert_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
let last_root = RwLock::new(0);
let slot = 1;
@ -5174,7 +5320,6 @@ pub mod tests {
// Insert a good coding shred
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
Index::new(slot).coding(),
&last_root
));
@ -5183,12 +5328,11 @@ pub mod tests {
.insert_shreds(vec![coding_shred.clone()], None, false)
.unwrap();
// Trying to insert the same shred again should fail
// Trying to insert the same shred again should pass since this doesn't check for
// duplicate index
{
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(!Blockstore::should_insert_coding_shred(
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5202,10 +5346,8 @@ pub mod tests {
DataShredHeader::default(),
coding.clone(),
);
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5220,10 +5362,8 @@ pub mod tests {
let index = coding_shred.coding_header.position - 1;
coding_shred.set_index(index as u32);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5236,10 +5376,8 @@ pub mod tests {
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5252,10 +5390,8 @@ pub mod tests {
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5272,10 +5408,14 @@ pub mod tests {
coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.common_header.index = std::u32::MAX - 1;
coding_shred.coding_header.position = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
coding_shred.coding_header.num_coding_shreds = 2000;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
@ -5283,7 +5423,6 @@ pub mod tests {
coding_shred.coding_header.num_coding_shreds = 2;
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
@ -5297,11 +5436,9 @@ pub mod tests {
{
let mut coding_shred =
Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
@ -5312,6 +5449,7 @@ pub mod tests {
#[test]
pub fn test_insert_multiple_is_last() {
solana_logger::setup();
let (shreds, _) = make_slot_entries(0, 0, 20);
let num_shreds = shreds.len() as u64;
let blockstore_path = get_tmp_ledger_path!();
@ -5334,6 +5472,8 @@ pub mod tests {
assert_eq!(slot_meta.last_index, num_shreds - 1);
assert!(slot_meta.is_full());
assert!(blockstore.has_duplicate_shreds_in_slot(0));
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
@ -7327,4 +7467,75 @@ pub mod tests {
assert!(stored_shred.last_in_slot());
assert_eq!(entries, ledger.get_any_valid_slot_entries(0, 0));
}
fn make_large_tx_entry(num_txs: usize) -> Entry {
let txs: Vec<_> = (0..num_txs)
.into_iter()
.map(|_| {
let keypair0 = Keypair::new();
let to = solana_sdk::pubkey::new_rand();
solana_sdk::system_transaction::transfer(&keypair0, &to, 1, Hash::default())
})
.collect();
Entry::new(&Hash::default(), 1, txs)
}
#[test]
fn erasure_multiple_config() {
solana_logger::setup();
let slot = 1;
let parent = 0;
let num_txs = 20;
let entry = make_large_tx_entry(num_txs);
let shreds = entries_to_test_shreds(vec![entry], slot, parent, true, 0);
assert!(shreds.len() > 1);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
let coding1 = Shredder::generate_coding_shreds(slot, 0.5f32, &shreds, 0x42, usize::MAX);
let coding2 = Shredder::generate_coding_shreds(slot, 1.0f32, &shreds, 0x42, usize::MAX);
for shred in &shreds {
info!("shred {:?}", shred);
}
for shred in &coding1 {
info!("coding1 {:?}", shred);
}
for shred in &coding2 {
info!("coding2 {:?}", shred);
}
ledger
.insert_shreds(shreds[..shreds.len() - 2].to_vec(), None, false)
.unwrap();
ledger
.insert_shreds(vec![coding1[0].clone(), coding2[1].clone()], None, false)
.unwrap();
assert!(ledger.has_duplicate_shreds_in_slot(slot));
}
#[test]
fn test_large_num_coding() {
solana_logger::setup();
let slot = 1;
let (_data_shreds, mut coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
coding_shreds[1].coding_header.num_coding_shreds = u16::MAX;
blockstore
.insert_shreds(
vec![coding_shreds[1].clone()],
Some(&leader_schedule_cache),
false,
)
.unwrap();
// Check no coding shreds are inserted
let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap();
assert!(res.is_empty());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}

View File

@ -612,6 +612,7 @@ impl Shredder {
self.fec_rate,
shred_data_batch,
self.version,
shred_data_batch.len(),
)
})
.collect()
@ -675,12 +676,14 @@ impl Shredder {
fec_rate: f32,
data_shred_batch: &[Shred],
version: u16,
max_coding_shreds: usize,
) -> Vec<Shred> {
assert!(!data_shred_batch.is_empty());
if fec_rate != 0.0 {
let num_data = data_shred_batch.len();
// always generate at least 1 coding shred even if the fec_rate doesn't allow it
let num_coding = Self::calculate_num_coding_shreds(num_data, fec_rate);
let num_coding =
Self::calculate_num_coding_shreds(num_data, fec_rate, max_coding_shreds);
let session =
Session::new(num_data, num_coding).expect("Failed to create erasure session");
let start_index = data_shred_batch[0].common_header.index;
@ -748,11 +751,15 @@ impl Shredder {
}
}
fn calculate_num_coding_shreds(num_data_shreds: usize, fec_rate: f32) -> usize {
fn calculate_num_coding_shreds(
num_data_shreds: usize,
fec_rate: f32,
max_coding_shreds: usize,
) -> usize {
if num_data_shreds == 0 {
0
} else {
num_data_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize))
max_coding_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize))
}
}
@ -1095,8 +1102,11 @@ pub mod tests {
let size = serialized_size(&entries).unwrap();
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
let num_expected_coding_shreds =
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as usize, fec_rate);
let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(
num_expected_data_shreds as usize,
fec_rate,
num_expected_data_shreds as usize,
);
let start_index = 0;
let (data_shreds, coding_shreds, next_index) =