diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index a943701e3..e27085cb3 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -910,7 +910,7 @@ impl Blocktree { if remaining_ticks_in_slot == 0 { shredder.finalize_slot(); } else { - shredder.finalize_fec_block(); + shredder.finalize_data(); } } @@ -1060,7 +1060,7 @@ impl Blocktree { let mut shred_chunk = vec![]; while look_for_last_shred && !shreds.is_empty() { let shred = shreds.remove(0); - if let Shred::LastInFECSet(_) = shred { + if let Shred::DataComplete(_) = shred { look_for_last_shred = false; } else if let Shred::LastInSlot(_) = shred { look_for_last_shred = false; @@ -1706,7 +1706,7 @@ pub fn entries_to_test_shreds( if is_full_slot { shredder.finalize_slot(); } else { - shredder.finalize_fec_block(); + shredder.finalize_data(); } let shreds: Vec = shredder diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 8229d941f..5f26f3880 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -98,7 +98,7 @@ pub(super) fn entries_to_shreds( if i == (num_ventries - 1) && last_tick == bank_max_tick { shredder.finalize_slot(); } else { - shredder.finalize_fec_block(); + shredder.finalize_data(); } let mut shreds: Vec = shredder diff --git a/core/src/chacha.rs b/core/src/chacha.rs index 91534c72e..81d11b58c 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -153,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "HVUt3uy4it4NgbGgYiG9C3gnvxDCzkgqwzzSCM9N4sSt" + let golden: Hash = "2rAoJANqvtAVcPDcKjBficfUN3NA1fRbCjd3Y7YYGqnu" .parse() .unwrap(); diff --git a/core/src/shred.rs b/core/src/shred.rs index 3f48cc4be..b8cdc8d6f 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -15,20 +15,25 @@ use std::{cmp, io}; #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub enum Shred { FirstInSlot(DataShred), - FirstInFECSet(DataShred), Data(DataShred), - LastInFECSet(DataShred), + DataComplete(DataShred), LastInSlot(DataShred), Coding(CodingShred), } +/// This limit comes from reed solomon library, but unfortunately they don't have +/// a public constant defined for it. +const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 128; + +const LAST_SHRED_IN_SLOT: u8 = 1; +const DATA_COMPLETE_SHRED: u8 = 2; + impl Shred { pub fn slot(&self) -> u64 { match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => s.header.common_header.slot, Shred::Coding(s) => s.header.common_header.slot, } @@ -37,9 +42,8 @@ impl Shred { pub fn parent(&self) -> u64 { match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => { s.header.common_header.slot - u64::from(s.header.parent_offset) } @@ -51,9 +55,8 @@ impl Shred { let parent = self.parent(); match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => { s.header.common_header.slot = slot; s.header.parent_offset = (slot - parent) as u16; @@ -65,9 +68,8 @@ impl Shred { pub fn index(&self) -> u32 { match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => s.header.common_header.index, Shred::Coding(s) => s.header.common_header.index, } @@ -76,9 +78,8 @@ impl Shred { pub fn set_index(&mut self, index: u32) { match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => s.header.common_header.index = index, Shred::Coding(s) => s.header.common_header.index = index, }; @@ -87,9 +88,8 @@ impl Shred { pub fn signature(&self) -> Signature { match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => s.header.common_header.signature, Shred::Coding(s) => s.header.common_header.signature, } @@ -100,9 +100,8 @@ impl Shred { let seed_len = seed.len(); let sig = match self { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => &s.header.common_header.signature, Shred::Coding(s) => &s.header.common_header.signature, } @@ -120,9 +119,8 @@ impl Shred { pub fn fast_verify(&self, shred_buf: &[u8], pubkey: &Pubkey) -> bool { let signed_payload_offset = match self { Shred::FirstInSlot(_) - | Shred::FirstInFECSet(_) | Shred::Data(_) - | Shred::LastInFECSet(_) + | Shred::DataComplete(_) | Shred::LastInSlot(_) => CodingShred::overhead(), Shred::Coding(_) => { CodingShred::overhead() @@ -149,7 +147,7 @@ pub struct DataShredHeader { _reserved: CodingShredHeader, pub common_header: ShredCommonHeader, pub parent_offset: u16, - pub last_in_slot: u8, + pub flags: u8, } /// The coding shred header has FEC information @@ -262,12 +260,13 @@ impl ShredCommon for CodingShred { pub struct Shredder { slot: u64, pub index: u32, + fec_set_index: u32, parent_offset: u16, fec_rate: f32, signer: Arc, pub shreds: Vec>, - pub active_shred: Option, - pub active_offset: usize, + active_shred: Option, + active_offset: usize, } impl Write for Shredder { @@ -281,7 +280,7 @@ impl Write for Shredder { Shred::FirstInSlot(self.new_data_shred()) } else { // Else, it is the first shred in FEC set - Shred::FirstInFECSet(self.new_data_shred()) + Shred::Data(self.new_data_shred()) }) }) .unwrap(); @@ -289,9 +288,8 @@ impl Write for Shredder { let written = self.active_offset; let (slice_len, capacity) = match current_shred.borrow_mut() { Shred::FirstInSlot(s) - | Shred::FirstInFECSet(s) | Shred::Data(s) - | Shred::LastInFECSet(s) + | Shred::DataComplete(s) | Shred::LastInSlot(s) => s.write_at(written, buf), Shred::Coding(s) => s.write_at(written, buf), }; @@ -308,6 +306,9 @@ impl Write for Shredder { }; self.active_shred = Some(active_shred); + if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK { + self.generate_coding_shreds(); + } Ok(slice_len) } @@ -358,6 +359,7 @@ impl Shredder { Ok(Shredder { slot, index, + fec_set_index: index, parent_offset: (slot - parent) as u16, fec_rate, signer: signer.clone(), @@ -414,7 +416,7 @@ impl Shredder { /// Generates coding shreds for the data shreds in the current FEC set fn generate_coding_shreds(&mut self) { if self.fec_rate != 0.0 { - let num_data = self.shreds.len(); + let num_data = (self.index - self.fec_set_index) as usize; let num_coding = (self.fec_rate * num_data as f32) as usize; let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); @@ -462,7 +464,8 @@ impl Shredder { // Finalize the coding blocks (sign and append to the shred list) coding_shreds .into_iter() - .for_each(|code| self.finalize_shred(code, coding_header_offset)) + .for_each(|code| self.finalize_shred(code, coding_header_offset)); + self.fec_set_index = self.index; } } @@ -470,33 +473,32 @@ impl Shredder { /// If there's an active data shred, morph it into the final shred /// If the current active data shred is first in slot, finalize it and create a new shred fn make_final_data_shred(&mut self) -> DataShred { - self.active_shred.take().map_or( - self.new_data_shred(), - |current_shred| match current_shred { + let mut shred = self + .active_shred + .take() + .map_or(self.new_data_shred(), |current_shred| match current_shred { Shred::FirstInSlot(s) => { self.finalize_data_shred(Shred::FirstInSlot(s)); self.new_data_shred() } - Shred::FirstInFECSet(s) - | Shred::Data(s) - | Shred::LastInFECSet(s) - | Shred::LastInSlot(s) => s, + Shred::Data(s) | Shred::DataComplete(s) | Shred::LastInSlot(s) => s, Shred::Coding(_) => self.new_data_shred(), - }, - ) + }); + shred.header.flags |= DATA_COMPLETE_SHRED; + shred } /// Finalize the current FEC block, and generate coding shreds - pub fn finalize_fec_block(&mut self) { + pub fn finalize_data(&mut self) { let final_shred = self.make_final_data_shred(); - self.finalize_data_shred(Shred::LastInFECSet(final_shred)); + self.finalize_data_shred(Shred::DataComplete(final_shred)); self.generate_coding_shreds(); } /// Finalize the current slot (i.e. add last slot shred) and generate coding shreds pub fn finalize_slot(&mut self) { let mut final_shred = self.make_final_data_shred(); - final_shred.header.last_in_slot = 1; + final_shred.header.flags |= LAST_SHRED_IN_SLOT; self.finalize_data_shred(Shred::LastInSlot(final_shred)); self.generate_coding_shreds(); } @@ -545,11 +547,7 @@ impl Shredder { let mut data_shred = DataShred::default(); data_shred.header.common_header.slot = slot; data_shred.header.common_header.index = missing as u32; - if missing == first_index + num_data - 1 { - Shred::LastInFECSet(data_shred) - } else { - Shred::Data(data_shred) - } + Shred::Data(data_shred) } else { Shred::Coding(Self::new_coding_shred( slot, @@ -637,17 +635,15 @@ impl Shredder { // Check if the last recovered data shred is also last in Slot. // If so, it needs to be morphed into the correct type let shred = if let Shred::Data(s) = shred { - if s.header.last_in_slot == 1 { + if s.header.flags & LAST_SHRED_IN_SLOT == LAST_SHRED_IN_SLOT { Shred::LastInSlot(s) + } else if s.header.flags & DATA_COMPLETE_SHRED + == DATA_COMPLETE_SHRED + { + Shred::DataComplete(s) } else { Shred::Data(s) } - } else if let Shred::LastInFECSet(s) = shred { - if s.header.last_in_slot == 1 { - Shred::LastInSlot(s) - } else { - Shred::LastInFECSet(s) - } } else { shred }; @@ -669,17 +665,13 @@ impl Shredder { } /// Combines all shreds to recreate the original buffer - /// If the shreds include coding shreds, and if not all shreds are present, it tries - /// to reconstruct missing shreds using erasure - /// Note: The shreds are expected to be sorted - /// (lower to higher index, and data shreds before coding shreds) pub fn deshred(shreds: &[Shred]) -> Result, reed_solomon_erasure::Error> { let num_data = shreds.len(); let data_shred_bufs = { - let first_index = Shredder::get_shred_index(shreds.first().unwrap(), num_data); + let first_index = shreds.first().unwrap().index() as usize; let last_index = match shreds.last().unwrap() { - Shred::LastInFECSet(s) | Shred::LastInSlot(s) => { + Shred::DataComplete(s) | Shred::LastInSlot(s) => { s.header.common_header.index as usize } _ => 0, @@ -796,8 +788,8 @@ mod tests { assert!(shredder.shreds.is_empty()); // Test6: Let's finalize the FEC block. That should result in the current shred to morph into - // a signed LastInFECSetData shred - shredder.finalize_fec_block(); + // a signed LastInFECBlock shred + shredder.finalize_data(); // We should have a new signed shred assert!(!shredder.shreds.is_empty()); @@ -807,7 +799,7 @@ mod tests { assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); - assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + assert_matches!(deserialized_shred, Shred::DataComplete(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); @@ -825,12 +817,11 @@ mod tests { // We should have a new signed shred assert!(!shredder.shreds.is_empty()); - // Must be FirstInFECSet let shred = shredder.shreds.pop().unwrap(); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); - assert_matches!(deserialized_shred, Shred::FirstInFECSet(_)); + assert_matches!(deserialized_shred, Shred::Data(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); @@ -895,9 +886,9 @@ mod tests { // We should have 0 shreds now assert_eq!(shredder.shreds.len(), 0); - shredder.finalize_fec_block(); + shredder.finalize_data(); - // We should have 2 shreds now (FirstInSlot, and LastInFECSet) + // We should have 2 shreds now (FirstInSlot, and LastInFECBlock) assert_eq!(shredder.shreds.len(), 2); let shred = shredder.shreds.remove(0); @@ -912,7 +903,7 @@ mod tests { let shred = shredder.shreds.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); - assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + assert_matches!(deserialized_shred, Shred::DataComplete(_)); assert_eq!(deserialized_shred.index(), 1); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); @@ -932,14 +923,14 @@ mod tests { // We should have 0 shreds now assert_eq!(shredder.shreds.len(), 0); - shredder.finalize_fec_block(); + shredder.finalize_data(); - // We should have 1 shred now (LastInFECSet) + // We should have 1 shred now (LastInFECBlock) assert_eq!(shredder.shreds.len(), 1); let shred = shredder.shreds.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); - assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + assert_matches!(deserialized_shred, Shred::DataComplete(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); @@ -970,7 +961,7 @@ mod tests { // We should have 2 shreds now assert_eq!(shredder.shreds.len(), 2); - shredder.finalize_fec_block(); + shredder.finalize_data(); // Finalize must have created 1 final data shred and 3 coding shreds // assert_eq!(shredder.shreds.len(), 6); @@ -995,7 +986,7 @@ mod tests { let shred = shredder.shreds.remove(0); assert_eq!(shred.len(), PACKET_DATA_SIZE); let deserialized_shred: Shred = bincode::deserialize(&shred).unwrap(); - assert_matches!(deserialized_shred, Shred::LastInFECSet(_)); + assert_matches!(deserialized_shred, Shred::DataComplete(_)); assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); @@ -1053,7 +1044,7 @@ mod tests { ); assert_eq!(offset, data.len()); - shredder.finalize_fec_block(); + shredder.finalize_data(); // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; @@ -1206,7 +1197,7 @@ mod tests { shreds.insert(2, recovered_shred); let recovered_shred = result.recovered_data.remove(0); - assert_matches!(recovered_shred, Shred::LastInFECSet(_)); + assert_matches!(recovered_shred, Shred::DataComplete(_)); assert_eq!(recovered_shred.index(), 4); assert_eq!(recovered_shred.slot(), slot); assert_eq!(recovered_shred.parent(), slot - 5);