Fix erasure (#10095)

* Fix bad FEC blocks

* Add test

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-05-19 16:13:12 -07:00 committed by GitHub
parent e66b5d09db
commit 439fd30840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 189 additions and 45 deletions

View File

@ -293,13 +293,8 @@ mod tests {
);
assert!(!packet.meta.discard);
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
slot,
1.0f32,
&[shred],
10,
false,
);
let coding =
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,

View File

@ -20,8 +20,8 @@ use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use std::mem::size_of;
use std::{sync::Arc, time::Instant};
use std::{mem::size_of, sync::Arc, time::Instant};
use thiserror::Error;
pub type Nonce = u32;
@ -169,7 +169,7 @@ impl Shred {
index: u32,
parent_offset: u16,
data: Option<&[u8]>,
is_last_data: bool,
is_last_in_fec_set: bool,
is_last_in_slot: bool,
reference_tick: u8,
version: u16,
@ -194,7 +194,7 @@ impl Shred {
size,
};
if is_last_data {
if is_last_in_fec_set {
data_header.flags |= DATA_COMPLETE_SHRED
}
@ -496,7 +496,6 @@ impl Shredder {
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
// 1) Generate data shreds
let data_shreds: Vec<Shred> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
@ -511,7 +510,7 @@ impl Shredder {
let fec_set_index =
shred_index - (i % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) as u32;
let (is_last_data, is_last_in_slot) = {
let (is_last_in_fec_set, is_last_in_slot) = {
if shred_index == last_shred_index {
(true, is_last_in_slot)
} else {
@ -524,7 +523,7 @@ impl Shredder {
shred_index,
(self.slot - self.parent_slot) as u16,
Some(shred_data),
is_last_data,
is_last_in_fec_set,
is_last_in_slot,
self.reference_tick,
self.version,
@ -550,7 +549,6 @@ impl Shredder {
pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec<Shred> {
let now = Instant::now();
let max_coding_shreds = data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
// 2) Generate coding shreds
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
@ -562,7 +560,6 @@ impl Shredder {
self.fec_rate,
shred_data_batch,
self.version,
max_coding_shreds,
)
})
.collect()
@ -630,25 +627,12 @@ impl Shredder {
fec_rate: f32,
data_shred_batch: &[Shred],
version: u16,
max_coding_shreds: bool,
) -> Vec<Shred> {
assert!(!data_shred_batch.is_empty());
if fec_rate != 0.0 {
let num_data = data_shred_batch.len();
// always generate at least 1 coding shred even if the fec_rate doesn't allow it
let shred_count = if max_coding_shreds {
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
} else {
num_data
};
let num_coding = Self::calculate_num_coding_shreds(shred_count as f32, fec_rate);
if num_coding > num_data {
trace!(
"Generated more codes ({}) than data shreds ({})",
num_coding,
num_data
);
}
let num_coding = Self::calculate_num_coding_shreds(num_data, fec_rate);
let session =
Session::new(num_data, num_coding).expect("Failed to create erasure session");
let start_index = data_shred_batch[0].common_header.index;
@ -716,8 +700,12 @@ impl Shredder {
}
}
fn calculate_num_coding_shreds(num_data_shreds: f32, fec_rate: f32) -> usize {
1.max((fec_rate * num_data_shreds) as usize)
fn calculate_num_coding_shreds(num_data_shreds: usize, fec_rate: f32) -> usize {
if num_data_shreds == 0 {
0
} else {
num_data_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize))
}
}
fn fill_in_missing_shreds(
@ -971,8 +959,7 @@ pub mod tests {
use bincode::serialized_size;
use matches::assert_matches;
use solana_sdk::{hash::hash, shred_version, system_transaction};
use std::collections::HashSet;
use std::convert::TryInto;
use std::{collections::HashSet, convert::TryInto};
#[test]
fn test_shred_constants() {
@ -1061,7 +1048,7 @@ pub mod tests {
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
let num_expected_coding_shreds =
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate);
Shredder::calculate_num_coding_shreds(num_expected_data_shreds as usize, fec_rate);
let start_index = 0;
let (data_shreds, coding_shreds, next_index) =
@ -1640,7 +1627,7 @@ pub mod tests {
);
assert_eq!(
coding_shreds.len(),
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize * 2
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1
);
}

View File

@ -3,13 +3,22 @@ use solana_ledger::shred::{
max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
};
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::{clock::Slot, hash::Hash, system_transaction};
use std::convert::TryInto;
use std::sync::Arc;
use solana_sdk::{
clock::Slot,
hash::Hash,
signature::{Keypair, Signer},
system_transaction,
};
use std::{
collections::{BTreeMap, HashSet},
convert::TryInto,
sync::Arc,
};
fn run_test_multi_fec_block_coding(slot: Slot) {
#[test]
fn test_multi_fec_block_coding() {
let keypair = Arc::new(Keypair::new());
let slot = 0x1234_5678_9abc_def0;
let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0)
.expect("Failed in creating shredder");
@ -19,8 +28,11 @@ fn run_test_multi_fec_block_coding(slot: Slot) {
let keypair1 = Keypair::new();
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size));
let num_entries = max_entries_per_n_shred(
&entry,
num_data_shreds as u64,
Some(SIZE_OF_DATA_SHRED_PAYLOAD),
);
let entries: Vec<_> = (0..num_entries)
.map(|_| {
@ -96,6 +108,156 @@ fn run_test_multi_fec_block_coding(slot: Slot) {
}
#[test]
fn test_multi_fec_block_coding() {
run_test_multi_fec_block_coding(0x1234_5678_9abc_def0);
fn test_multi_fec_block_different_size_coding() {
let slot = 0x1234_5678_9abc_def0;
let parent_slot = slot - 5;
let keypair = Arc::new(Keypair::new());
let (fec_data, fec_coding, num_shreds_per_iter) =
setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone());
let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum();
// Test recovery
for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) {
let first_data_index = fec_data_shreds.first().unwrap().index() as usize;
let first_code_index = fec_coding_shreds.first().unwrap().index() as usize;
let num_data = fec_data_shreds.len();
let num_coding = fec_coding_shreds.len();
let all_shreds: Vec<Shred> = fec_data_shreds
.into_iter()
.step_by(2)
.chain(fec_coding_shreds.into_iter().step_by(2))
.cloned()
.collect();
let recovered_data = Shredder::try_recovery(
all_shreds,
num_data,
num_coding,
first_data_index,
first_code_index,
slot,
)
.unwrap();
// Necessary in order to ensure the last shred in the slot
// is part of the recovered set, and that the below `index`
// cacluation in the loop is correct
assert!(fec_data_shreds.len() % 2 == 0);
for (i, recovered_shred) in recovered_data.into_iter().enumerate() {
let index = first_data_index + (i * 2) + 1;
verify_test_data_shred(
&recovered_shred,
index.try_into().unwrap(),
slot,
parent_slot,
&keypair.pubkey(),
true,
index == total_num_data_shreds - 1,
index % num_shreds_per_iter == num_shreds_per_iter - 1,
);
}
}
}
fn sort_data_coding_into_fec_sets(
data_shreds: Vec<Shred>,
coding_shreds: Vec<Shred>,
fec_data: &mut BTreeMap<u32, Vec<Shred>>,
fec_coding: &mut BTreeMap<u32, Vec<Shred>>,
data_slot_and_index: &mut HashSet<(Slot, u32)>,
coding_slot_and_index: &mut HashSet<(Slot, u32)>,
) {
for shred in data_shreds {
assert!(shred.is_data());
let key = (shred.slot(), shred.index());
// Make sure there are no duplicates for same key
assert!(!data_slot_and_index.contains(&key));
data_slot_and_index.insert(key);
let fec_entry = fec_data
.entry(shred.common_header.fec_set_index)
.or_insert(vec![]);
fec_entry.push(shred);
}
for shred in coding_shreds {
assert!(!shred.is_data());
let key = (shred.slot(), shred.index());
// Make sure there are no duplicates for same key
assert!(!coding_slot_and_index.contains(&key));
coding_slot_and_index.insert(key);
let fec_entry = fec_coding
.entry(shred.common_header.fec_set_index)
.or_insert(vec![]);
fec_entry.push(shred);
}
}
fn setup_different_sized_fec_blocks(
slot: Slot,
parent_slot: Slot,
keypair: Arc<Keypair>,
) -> (BTreeMap<u32, Vec<Shred>>, BTreeMap<u32, Vec<Shred>>, usize) {
let shredder =
Shredder::new(slot, parent_slot, 1.0, keypair, 0, 0).expect("Failed in creating shredder");
let keypair0 = Keypair::new();
let keypair1 = Keypair::new();
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
// Make enough entries for `MAX_DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one
// fec set will have `MAX_DATA_SHREDS_PER_FEC_BLOCK` shreds and the next
// will have 2 shreds.
assert!(MAX_DATA_SHREDS_PER_FEC_BLOCK > 2);
let num_shreds_per_iter = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 2;
let num_entries = max_entries_per_n_shred(
&entry,
num_shreds_per_iter as u64,
Some(SIZE_OF_DATA_SHRED_PAYLOAD),
);
let entries: Vec<_> = (0..num_entries)
.map(|_| {
let keypair0 = Keypair::new();
let keypair1 = Keypair::new();
let tx0 =
system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
Entry::new(&Hash::default(), 1, vec![tx0])
})
.collect();
// Run the shredder twice, generate data and coding shreds
let mut next_index = 0;
let mut fec_data = BTreeMap::new();
let mut fec_coding = BTreeMap::new();
let mut data_slot_and_index = HashSet::new();
let mut coding_slot_and_index = HashSet::new();
let total_num_data_shreds: usize = 2 * num_shreds_per_iter;
for i in 0..2 {
let is_last = i == 1;
let (data_shreds, coding_shreds, new_next_index) =
shredder.entries_to_shreds(&entries, is_last, next_index);
for shred in &data_shreds {
if (shred.index() as usize) == total_num_data_shreds - 1 {
assert!(shred.data_complete());
assert!(shred.last_in_slot());
} else if (shred.index() as usize) % num_shreds_per_iter == num_shreds_per_iter - 1 {
assert!(shred.data_complete());
} else {
assert!(!shred.data_complete());
assert!(!shred.last_in_slot());
}
}
assert_eq!(data_shreds.len(), num_shreds_per_iter as usize);
next_index = new_next_index;
sort_data_coding_into_fec_sets(
data_shreds,
coding_shreds,
&mut fec_data,
&mut fec_coding,
&mut data_slot_and_index,
&mut coding_slot_and_index,
);
}
assert_eq!(fec_data.len(), fec_coding.len());
(fec_data, fec_coding, num_shreds_per_iter)
}