Generate coding shreds on the fly based on erasure limits (#5852)

* Generate coding shreds on the fly based on erasure limits

* fix uncle
This commit is contained in:
Pankaj Garg 2019-09-09 17:26:51 -07:00 committed by GitHub
parent 61fe1aa9cf
commit 7682db4826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 76 deletions

View File

@ -910,7 +910,7 @@ impl Blocktree {
if remaining_ticks_in_slot == 0 {
shredder.finalize_slot();
} else {
shredder.finalize_fec_block();
shredder.finalize_data();
}
}
@ -1060,7 +1060,7 @@ impl Blocktree {
let mut shred_chunk = vec![];
while look_for_last_shred && !shreds.is_empty() {
let shred = shreds.remove(0);
if let Shred::LastInFECSet(_) = shred {
if let Shred::DataComplete(_) = shred {
look_for_last_shred = false;
} else if let Shred::LastInSlot(_) = shred {
look_for_last_shred = false;
@ -1706,7 +1706,7 @@ pub fn entries_to_test_shreds(
if is_full_slot {
shredder.finalize_slot();
} else {
shredder.finalize_fec_block();
shredder.finalize_data();
}
let shreds: Vec<Shred> = shredder

View File

@ -98,7 +98,7 @@ pub(super) fn entries_to_shreds(
if i == (num_ventries - 1) && last_tick == bank_max_tick {
shredder.finalize_slot();
} else {
shredder.finalize_fec_block();
shredder.finalize_data();
}
let mut shreds: Vec<Shred> = shredder

View File

@ -153,7 +153,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "HVUt3uy4it4NgbGgYiG9C3gnvxDCzkgqwzzSCM9N4sSt"
let golden: Hash = "2rAoJANqvtAVcPDcKjBficfUN3NA1fRbCjd3Y7YYGqnu"
.parse()
.unwrap();

View File

@ -15,20 +15,25 @@ use std::{cmp, io};
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub enum Shred {
FirstInSlot(DataShred),
FirstInFECSet(DataShred),
Data(DataShred),
LastInFECSet(DataShred),
DataComplete(DataShred),
LastInSlot(DataShred),
Coding(CodingShred),
}
/// This limit comes from reed solomon library, but unfortunately they don't have
/// a public constant defined for it.
const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 128;
const LAST_SHRED_IN_SLOT: u8 = 1;
const DATA_COMPLETE_SHRED: u8 = 2;
impl Shred {
pub fn slot(&self) -> u64 {
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.header.common_header.slot,
Shred::Coding(s) => s.header.common_header.slot,
}
@ -37,9 +42,8 @@ impl Shred {
pub fn parent(&self) -> u64 {
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => {
s.header.common_header.slot - u64::from(s.header.parent_offset)
}
@ -51,9 +55,8 @@ impl Shred {
let parent = self.parent();
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => {
s.header.common_header.slot = slot;
s.header.parent_offset = (slot - parent) as u16;
@ -65,9 +68,8 @@ impl Shred {
pub fn index(&self) -> u32 {
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.header.common_header.index,
Shred::Coding(s) => s.header.common_header.index,
}
@ -76,9 +78,8 @@ impl Shred {
pub fn set_index(&mut self, index: u32) {
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.header.common_header.index = index,
Shred::Coding(s) => s.header.common_header.index = index,
};
@ -87,9 +88,8 @@ impl Shred {
pub fn signature(&self) -> Signature {
match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.header.common_header.signature,
Shred::Coding(s) => s.header.common_header.signature,
}
@ -100,9 +100,8 @@ impl Shred {
let seed_len = seed.len();
let sig = match self {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => &s.header.common_header.signature,
Shred::Coding(s) => &s.header.common_header.signature,
}
@ -120,9 +119,8 @@ impl Shred {
pub fn fast_verify(&self, shred_buf: &[u8], pubkey: &Pubkey) -> bool {
let signed_payload_offset = match self {
Shred::FirstInSlot(_)
| Shred::FirstInFECSet(_)
| Shred::Data(_)
| Shred::LastInFECSet(_)
| Shred::DataComplete(_)
| Shred::LastInSlot(_) => CodingShred::overhead(),
Shred::Coding(_) => {
CodingShred::overhead()
@ -149,7 +147,7 @@ pub struct DataShredHeader {
_reserved: CodingShredHeader,
pub common_header: ShredCommonHeader,
pub parent_offset: u16,
pub last_in_slot: u8,
pub flags: u8,
}
/// The coding shred header has FEC information
@ -262,12 +260,13 @@ impl ShredCommon for CodingShred {
pub struct Shredder {
slot: u64,
pub index: u32,
fec_set_index: u32,
parent_offset: u16,
fec_rate: f32,
signer: Arc<Keypair>,
pub shreds: Vec<Vec<u8>>,
pub active_shred: Option<Shred>,
pub active_offset: usize,
active_shred: Option<Shred>,
active_offset: usize,
}
impl Write for Shredder {
@ -281,7 +280,7 @@ impl Write for Shredder {
Shred::FirstInSlot(self.new_data_shred())
} else {
// Else, it is the first shred in FEC set
Shred::FirstInFECSet(self.new_data_shred())
Shred::Data(self.new_data_shred())
})
})
.unwrap();
@ -289,9 +288,8 @@ impl Write for Shredder {
let written = self.active_offset;
let (slice_len, capacity) = match current_shred.borrow_mut() {
Shred::FirstInSlot(s)
| Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => s.write_at(written, buf),
Shred::Coding(s) => s.write_at(written, buf),
};
@ -308,6 +306,9 @@ impl Write for Shredder {
};
self.active_shred = Some(active_shred);
if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK {
self.generate_coding_shreds();
}
Ok(slice_len)
}
@ -358,6 +359,7 @@ impl Shredder {
Ok(Shredder {
slot,
index,
fec_set_index: index,
parent_offset: (slot - parent) as u16,
fec_rate,
signer: signer.clone(),
@ -414,7 +416,7 @@ impl Shredder {
/// Generates coding shreds for the data shreds in the current FEC set
fn generate_coding_shreds(&mut self) {
if self.fec_rate != 0.0 {
let num_data = self.shreds.len();
let num_data = (self.index - self.fec_set_index) as usize;
let num_coding = (self.fec_rate * num_data as f32) as usize;
let session =
Session::new(num_data, num_coding).expect("Failed to create erasure session");
@ -462,7 +464,8 @@ impl Shredder {
// Finalize the coding blocks (sign and append to the shred list)
coding_shreds
.into_iter()
.for_each(|code| self.finalize_shred(code, coding_header_offset))
.for_each(|code| self.finalize_shred(code, coding_header_offset));
self.fec_set_index = self.index;
}
}
@ -470,33 +473,32 @@ impl Shredder {
/// If there's an active data shred, morph it into the final shred
/// If the current active data shred is first in slot, finalize it and create a new shred
fn make_final_data_shred(&mut self) -> DataShred {
self.active_shred.take().map_or(
self.new_data_shred(),
|current_shred| match current_shred {
let mut shred = self
.active_shred
.take()
.map_or(self.new_data_shred(), |current_shred| match current_shred {
Shred::FirstInSlot(s) => {
self.finalize_data_shred(Shred::FirstInSlot(s));
self.new_data_shred()
}
Shred::FirstInFECSet(s)
| Shred::Data(s)
| Shred::LastInFECSet(s)
| Shred::LastInSlot(s) => s,
Shred::Data(s) | Shred::DataComplete(s) | Shred::LastInSlot(s) => s,
Shred::Coding(_) => self.new_data_shred(),
},
)
});
shred.header.flags |= DATA_COMPLETE_SHRED;
shred
}
/// Finalize the current FEC block, and generate coding shreds
pub fn finalize_fec_block(&mut self) {
pub fn finalize_data(&mut self) {
let final_shred = self.make_final_data_shred();
self.finalize_data_shred(Shred::LastInFECSet(final_shred));
self.finalize_data_shred(Shred::DataComplete(final_shred));
self.generate_coding_shreds();
}
/// Finalize the current slot (i.e. add last slot shred) and generate coding shreds
pub fn finalize_slot(&mut self) {
let mut final_shred = self.make_final_data_shred();
final_shred.header.last_in_slot = 1;
final_shred.header.flags |= LAST_SHRED_IN_SLOT;
self.finalize_data_shred(Shred::LastInSlot(final_shred));
self.generate_coding_shreds();
}
@ -545,11 +547,7 @@ impl Shredder {
let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = slot;
data_shred.header.common_header.index = missing as u32;
if missing == first_index + num_data - 1 {
Shred::LastInFECSet(data_shred)
} else {
Shred::Data(data_shred)
}
Shred::Data(data_shred)
} else {
Shred::Coding(Self::new_coding_shred(
slot,
@ -637,17 +635,15 @@ impl Shredder {
// Check if the last recovered data shred is also last in Slot.
// If so, it needs to be morphed into the correct type
let shred = if let Shred::Data(s) = shred {
if s.header.last_in_slot == 1 {
if s.header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT {
Shred::LastInSlot(s)
} else if s.header.flags & DATA_COMPLETE_SHRED
== DATA_COMPLETE_SHRED
{
Shred::DataComplete(s)
} else {
Shred::Data(s)
}
} else if let Shred::LastInFECSet(s) = shred {
if s.header.last_in_slot == 1 {
Shred::LastInSlot(s)
} else {
Shred::LastInFECSet(s)
}
} else {
shred
};
@ -669,17 +665,13 @@ impl Shredder {
}
/// Combines all shreds to recreate the original buffer
/// If the shreds include coding shreds, and if not all shreds are present, it tries
/// to reconstruct missing shreds using erasure
/// Note: The shreds are expected to be sorted
/// (lower to higher index, and data shreds before coding shreds)
pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, reed_solomon_erasure::Error> {
let num_data = shreds.len();
let data_shred_bufs = {
let first_index = Shredder::get_shred_index(shreds.first().unwrap(), num_data);
let first_index = shreds.first().unwrap().index() as usize;
let last_index = match shreds.last().unwrap() {
Shred::LastInFECSet(s) | Shred::LastInSlot(s) => {
Shred::DataComplete(s) | Shred::LastInSlot(s) => {
s.header.common_header.index as usize
}
_ => 0,
@ -796,8 +788,8 @@ mod tests {
assert!(shredder.shreds.is_empty());
// Test6: Let's finalize the FEC block. That should result in the current shred to morph into
// a signed LastInFECSetData shred
shredder.finalize_fec_block();
// a signed LastInFECBlock shred
shredder.finalize_data();
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
@ -807,7 +799,7 @@ mod tests {
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
assert_matches!(deserialized_shred, Shred::DataComplete(_));
assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
@ -825,12 +817,11 @@ mod tests {
// We should have a new signed shred
assert!(!shredder.shreds.is_empty());
// Must be FirstInFECSet
let shred = shredder.shreds.pop().unwrap();
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::FirstInFECSet(_));
assert_matches!(deserialized_shred, Shred::Data(_));
assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
@ -895,9 +886,9 @@ mod tests {
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
shredder.finalize_fec_block();
shredder.finalize_data();
// We should have 2 shreds now (FirstInSlot, and LastInFECSet)
// We should have 2 shreds now (FirstInSlot, and LastInFECBlock)
assert_eq!(shredder.shreds.len(), 2);
let shred = shredder.shreds.remove(0);
@ -912,7 +903,7 @@ mod tests {
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
assert_matches!(deserialized_shred, Shred::DataComplete(_));
assert_eq!(deserialized_shred.index(), 1);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
@ -932,14 +923,14 @@ mod tests {
// We should have 0 shreds now
assert_eq!(shredder.shreds.len(), 0);
shredder.finalize_fec_block();
shredder.finalize_data();
// We should have 1 shred now (LastInFECSet)
// We should have 1 shred now (LastInFECBlock)
assert_eq!(shredder.shreds.len(), 1);
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
assert_matches!(deserialized_shred, Shred::DataComplete(_));
assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
@ -970,7 +961,7 @@ mod tests {
// We should have 2 shreds now
assert_eq!(shredder.shreds.len(), 2);
shredder.finalize_fec_block();
shredder.finalize_data();
// Finalize must have created 1 final data shred and 3 coding shreds
// assert_eq!(shredder.shreds.len(), 6);
@ -995,7 +986,7 @@ mod tests {
let shred = shredder.shreds.remove(0);
assert_eq!(shred.len(), PACKET_DATA_SIZE);
let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap();
assert_matches!(deserialized_shred, Shred::LastInFECSet(_));
assert_matches!(deserialized_shred, Shred::DataComplete(_));
assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
@ -1053,7 +1044,7 @@ mod tests {
);
assert_eq!(offset, data.len());
shredder.finalize_fec_block();
shredder.finalize_data();
// We should have 10 shreds now (one additional final shred, and equal number of coding shreds)
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
@ -1206,7 +1197,7 @@ mod tests {
shreds.insert(2, recovered_shred);
let recovered_shred = result.recovered_data.remove(0);
assert_matches!(recovered_shred, Shred::LastInFECSet(_));
assert_matches!(recovered_shred, Shred::DataComplete(_));
assert_eq!(recovered_shred.index(), 4);
assert_eq!(recovered_shred.slot(), slot);
assert_eq!(recovered_shred.parent(), slot - 5);