Insert coding shreds to blocktree only if needed in future (#6836)

* Insert coding shreds to blocktree only if needed in future

* fixes
This commit is contained in:
Pankaj Garg 2019-11-11 13:12:55 -08:00 committed by GitHub
parent 816b2d7ff8
commit e8e13fdeeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 43 deletions

View File

@ -387,9 +387,9 @@ impl Blocktree {
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if index.coding().is_present(i) {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
@ -399,10 +399,12 @@ impl Blocktree {
warn!("Code shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
} else {
None
}
})
{
available_shreds.push(shred);
}
},
);
@ -420,6 +422,12 @@ impl Blocktree {
}
}
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
// Remove saved coding shreds. We don't need these for future recovery
let _ = prev_inserted_codes.remove(&(slot, i));
},
);
submit_metrics(false, "complete".into(), 0);
}
ErasureMetaStatus::StillNeed(needed) => {
@ -466,11 +474,10 @@ impl Blocktree {
&mut index_meta_time,
)
} else if shred.is_code() {
self.check_insert_coding_shred(
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
)
@ -515,6 +522,17 @@ impl Blocktree {
start.stop();
let shred_recovery_elapsed = start.as_us();
just_inserted_coding_shreds
.into_iter()
.for_each(|((_, _), shred)| {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
});
let mut start = Measure::start("Shred recovery");
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
// drop the others
@ -573,10 +591,31 @@ impl Blocktree {
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool {
let slot = shred.slot();
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
self.insert_coding_shred(index_meta, &shred, write_batch)
.map(|_| {
index_meta_working_set_entry.did_insert_occur = true;
})
.is_ok()
}
fn check_cache_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool {
let slot = shred.slot();
@ -589,16 +628,33 @@ impl Blocktree {
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
.map(|_| {
// Insert was a success!
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
let set_index = shred_index - u64::from(shred.coding_header.position);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
index_meta_working_set_entry.did_insert_occur = true;
})
.is_ok()
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config))
});
if erasure_config != erasure_meta.config {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Stored config: {:#?}, new config: {:#?}",
erasure_meta.config, erasure_config
);
}
just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
true
} else {
false
}
@ -668,7 +724,6 @@ impl Blocktree {
fn insert_coding_shred(
&self,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_meta: &mut Index,
shred: &Shred,
write_batch: &mut WriteBatch,
@ -685,28 +740,6 @@ impl Blocktree {
)));
}
let set_index = shred_index - u64::from(shred.coding_header.position);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config))
});
if erasure_config != erasure_meta.config {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Stored config: {:#?}, new config: {:#?}",
erasure_meta.config, erasure_config
);
}
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;