Erasure statistics for shreds (#5676)

This commit is contained in:
Pankaj Garg 2019-08-27 11:22:06 -07:00 committed by GitHub
parent 7aaf5bc02c
commit 12ad95eb5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 43 deletions

View File

@ -342,55 +342,83 @@ impl Blocktree {
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (&(slot, set_index), erasure_meta) in erasure_metas.iter() {
let submit_metrics = |attempted: bool, status: String| {
datapoint_info!(
"blocktree-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
("end_index", erasure_meta.end_indexes().0 as i64, i64),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
);
};
let index = index_working_set.get(&slot).expect("Index");
if let ErasureMetaStatus::CanRecover = erasure_meta.status(&index) {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds = vec![];
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
bincode::deserialize(&data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds = vec![];
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) =
prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
bincode::deserialize(&data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
}) {
available_shreds.push(shred);
}
}
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(|i| {
if index.coding().is_present(i) {
if let Some(shred) = prev_inserted_codes.remove(&(slot, i)).or_else(|| {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
bincode::deserialize(&code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if index.coding().is_present(i) {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
bincode::deserialize(&code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
}
}) {
available_shreds.push(shred);
}
},
);
if let Ok(mut result) = Shredder::try_recovery(
&available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
slot,
) {
submit_metrics(true, "complete".into());
recovered_data_shreds.append(&mut result.recovered_data);
} else {
submit_metrics(true, "incomplete".into());
}
});
if let Ok(mut result) = Shredder::try_recovery(
&available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
slot,
) {
recovered_data_shreds.append(&mut result.recovered_data);
}
}
ErasureMetaStatus::DataFull => {
submit_metrics(false, "complete".into());
}
ErasureMetaStatus::StillNeed(needed) => {
submit_metrics(false, format!("still need: {}", needed));
}
};
}
recovered_data_shreds
}
@ -475,6 +503,10 @@ impl Blocktree {
&mut write_batch,
)?;
for ((slot, set_index), erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
}
for (&slot, index) in index_working_set.iter() {
write_batch.put::<cf::Index>(slot, index)?;
}