From 8183f286368941018f1f35431da48e6373c406fe Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 16 Dec 2021 14:18:55 +0000 Subject: [PATCH] adds ErasureSetId identifying erasure coding sets of shreds (#21928) --- ledger/src/blockstore.rs | 37 +++++++++++++++++++------------------ ledger/src/shred.rs | 22 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c4dc4e74c0..30b232d268 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -13,8 +13,8 @@ use { leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ - max_ticks_per_n_shreds, Result as ShredResult, Shred, ShredId, ShredType, Shredder, - SHRED_PAYLOAD_SIZE, + max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType, + Shredder, SHRED_PAYLOAD_SIZE, }, }, bincode::deserialize, @@ -551,8 +551,8 @@ impl Blockstore { false } - pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result> { - self.erasure_meta_cf.get((slot, set_index)) + fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result> { + self.erasure_meta_cf.get(erasure_set.store_key()) } pub fn orphan(&self, slot: Slot) -> Result> { @@ -731,7 +731,7 @@ impl Blockstore { fn try_shred_recovery( db: &Database, - erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, + erasure_metas: &HashMap, index_working_set: &mut HashMap, prev_inserted_shreds: &HashMap, ) -> Vec { @@ -743,7 +743,8 @@ impl Blockstore { // 2. For new data shreds, check if an erasure set exists. If not, don't try recovery // 3. Before trying recovery, check if enough number of shreds have been received // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data - for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() { + for (erasure_set, erasure_meta) in erasure_metas.iter() { + let slot = erasure_set.slot(); let index_meta_entry = index_working_set.get_mut(&slot).expect("Index"); let index = &mut index_meta_entry.index; match erasure_meta.status(index) { @@ -940,8 +941,8 @@ impl Blockstore { &mut write_batch, )?; - for ((slot, set_index), erasure_meta) in erasure_metas { - write_batch.put::((slot, set_index), &erasure_meta)?; + for (erasure_set, erasure_meta) in erasure_metas { + write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } for (&slot, index_working_set_entry) in index_working_set.iter() { @@ -1032,7 +1033,7 @@ impl Blockstore { fn check_insert_coding_shred( &self, shred: Shred, - erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, + erasure_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1069,9 +1070,9 @@ impl Blockstore { } } - let set_index = u64::from(shred.fec_set_index()); - let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { - self.erasure_meta(slot, set_index) + let erasure_set = shred.erasure_set(); + let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| { + self.erasure_meta(erasure_set) .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); @@ -1100,8 +1101,8 @@ impl Blockstore { // ToDo: This is a potential slashing condition warn!("Received multiple erasure configs for the same erasure set!!!"); warn!( - "Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", - slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header, + "Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", + slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header, ); return false; @@ -1200,7 +1201,7 @@ impl Blockstore { fn check_insert_data_shred( &self, shred: Shred, - erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, + erasure_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1264,7 +1265,7 @@ impl Blockstore { } } - let set_index = u64::from(shred.fec_set_index()); + let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), @@ -1275,8 +1276,8 @@ impl Blockstore { just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) { - if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() { + if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) { + if let Some(meta) = self.erasure_meta(erasure_set).unwrap() { entry.insert(meta); } } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 172b654082..f2a6fe2b75 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -236,7 +236,7 @@ pub struct Shred { pub payload: Vec, } -/// Tuple which should uniquely identify a shred if it exists. +/// Tuple which uniquely identifies a shred should it exists. #[derive(Clone, Copy, Eq, Hash, PartialEq)] pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); @@ -250,6 +250,21 @@ impl ShredId { } } +/// Tuple which identifies erasure coding set that the shred belongs to. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub(crate) struct ErasureSetId(Slot, /*fec_set_index:*/ u32); + +impl ErasureSetId { + pub(crate) fn slot(&self) -> Slot { + self.0 + } + + // Storage key for ErasureMeta in blockstore db. + pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) { + (self.0, u64::from(self.1)) + } +} + impl Shred { fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result where @@ -518,6 +533,11 @@ impl Shred { self.common_header.version } + // Identifier for the erasure coding set that the shred belongs to. + pub(crate) fn erasure_set(&self) -> ErasureSetId { + ErasureSetId(self.slot(), self.fec_set_index()) + } + // Returns the block index within the erasure coding set. fn erasure_block_index(&self) -> Option { let index = self.index().checked_sub(self.fec_set_index())?;