Perform erasure recovery when repaired data shreds are received (#7463)

automerge
This commit is contained in:
Pankaj Garg 2019-12-12 17:50:28 -08:00 committed by Grimes
parent 49396a69bf
commit 75d1aa5135
1 changed files with 15 additions and 1 deletions

View File

@ -527,6 +527,7 @@ impl Blocktree {
if shred.is_data() { if shred.is_data() {
if self.check_insert_data_shred( if self.check_insert_data_shred(
shred, shred,
&mut erasure_metas,
&mut index_working_set, &mut index_working_set,
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
@ -569,6 +570,7 @@ impl Blocktree {
if shred.verify(&leader) { if shred.verify(&leader) {
self.check_insert_data_shred( self.check_insert_data_shred(
shred, shred,
&mut erasure_metas,
&mut index_working_set, &mut index_working_set,
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
@ -739,6 +741,7 @@ impl Blocktree {
fn check_insert_data_shred( fn check_insert_data_shred(
&self, &self,
shred: Shred, shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>, index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>, slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
@ -766,12 +769,22 @@ impl Blocktree {
&self.last_root, &self.last_root,
) )
{ {
let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) = if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{ {
just_inserted_data_shreds.insert((slot, shred_index), shred); just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true; index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
}
true true
} else { } else {
false false
@ -793,7 +806,7 @@ impl Blocktree {
return false; return false;
} }
let set_index = shred_index - u32::from(shred.coding_header.position); let set_index = shred.common_header.fec_set_index;
!(shred.coding_header.num_coding_shreds == 0 !(shred.coding_header.num_coding_shreds == 0
|| shred.coding_header.position >= shred.coding_header.num_coding_shreds || shred.coding_header.position >= shred.coding_header.num_coding_shreds
|| std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1 || std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1
@ -3851,6 +3864,7 @@ pub mod tests {
DataShredHeader::default(), DataShredHeader::default(),
coding.clone(), coding.clone(),
); );
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
coding_shred.coding_header.num_coding_shreds = 3; coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.common_header.index = std::u32::MAX - 1; coding_shred.common_header.index = std::u32::MAX - 1;
coding_shred.coding_header.position = 0; coding_shred.coding_header.position = 0;