adds ErasureSetId identifying erasure coding sets of shreds (#21928)

This commit is contained in:
behzad nouri 2021-12-16 14:18:55 +00:00 committed by GitHub
parent 49cb161203
commit 8183f28636
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 19 deletions

View File

@ -13,8 +13,8 @@ use {
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
max_ticks_per_n_shreds, Result as ShredResult, Shred, ShredId, ShredType, Shredder,
SHRED_PAYLOAD_SIZE,
max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType,
Shredder, SHRED_PAYLOAD_SIZE,
},
},
bincode::deserialize,
@ -551,8 +551,8 @@ impl Blockstore {
false
}
pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get((slot, set_index))
fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get(erasure_set.store_key())
}
pub fn orphan(&self, slot: Slot) -> Result<Option<bool>> {
@ -731,7 +731,7 @@ impl Blockstore {
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
) -> Vec<Shred> {
@ -743,7 +743,8 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() {
for (erasure_set, erasure_meta) in erasure_metas.iter() {
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(index) {
@ -940,8 +941,8 @@ impl Blockstore {
&mut write_batch,
)?;
for ((slot, set_index), erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for (erasure_set, erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>(erasure_set.store_key(), &erasure_meta)?;
}
for (&slot, index_working_set_entry) in index_working_set.iter() {
@ -1032,7 +1033,7 @@ impl Blockstore {
fn check_insert_coding_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
@ -1069,9 +1070,9 @@ impl Blockstore {
}
}
let set_index = u64::from(shred.fec_set_index());
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta(slot, set_index)
let erasure_set = shred.erasure_set();
let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
});
@ -1100,8 +1101,8 @@ impl Blockstore {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
"Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
);
return false;
@ -1200,7 +1201,7 @@ impl Blockstore {
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
@ -1264,7 +1265,7 @@ impl Blockstore {
}
}
let set_index = u64::from(shred.fec_set_index());
let erasure_set = shred.erasure_set();
let newly_completed_data_sets = self.insert_data_shred(
slot_meta,
index_meta.data_mut(),
@ -1275,8 +1276,8 @@ impl Blockstore {
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() {
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
entry.insert(meta);
}
}

View File

@ -236,7 +236,7 @@ pub struct Shred {
pub payload: Vec<u8>,
}
/// Tuple which should uniquely identify a shred if it exists.
/// Tuple which uniquely identifies a shred should it exists.
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);
@ -250,6 +250,21 @@ impl ShredId {
}
}
/// Tuple which identifies erasure coding set that the shred belongs to.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(crate) struct ErasureSetId(Slot, /*fec_set_index:*/ u32);
impl ErasureSetId {
pub(crate) fn slot(&self) -> Slot {
self.0
}
// Storage key for ErasureMeta in blockstore db.
pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) {
(self.0, u64::from(self.1))
}
}
impl Shred {
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
where
@ -518,6 +533,11 @@ impl Shred {
self.common_header.version
}
// Identifier for the erasure coding set that the shred belongs to.
pub(crate) fn erasure_set(&self) -> ErasureSetId {
ErasureSetId(self.slot(), self.fec_set_index())
}
// Returns the block index within the erasure coding set.
fn erasure_block_index(&self) -> Option<usize> {
let index = self.index().checked_sub(self.fec_set_index())?;