Refactor blockstore recovery code (#10008)

This commit is contained in:
sakridge 2020-05-13 10:09:38 -07:00 committed by GitHub
parent 1e80044e93
commit 9575afc8fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 174 additions and 86 deletions

View File

@ -547,6 +547,154 @@ impl Blockstore {
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
}
fn get_recovery_data_shreds(
index: &mut Index,
set_index: u64,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
) {
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
}) {
available_shreds.push(shred);
}
}
});
}
fn get_recovery_coding_shreds(
index: &mut Index,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
(erasure_meta.first_coding_index
..erasure_meta.first_coding_index + erasure_meta.config.num_coding() as u64)
.for_each(|i| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Shred::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
} else {
None
}
})
{
available_shreds.push(shred);
}
});
}
fn recover_shreds(
index: &mut Index,
set_index: u64,
erasure_meta: &ErasureMeta,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
recovered_data_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds = vec![];
Self::get_recovery_data_shreds(
index,
set_index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_datas,
data_cf,
);
Self::get_recovery_coding_shreds(
index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_codes,
code_cf,
);
if let Ok(mut result) = Shredder::try_recovery(
available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
erasure_meta.first_coding_index as usize,
slot,
) {
Self::submit_metrics(
slot,
set_index,
erasure_meta,
true,
"complete".into(),
result.len(),
);
recovered_data_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, set_index, erasure_meta, true, "incomplete".into(), 0);
}
}
fn submit_metrics(
slot: Slot,
set_index: u64,
erasure_meta: &ErasureMeta,
attempted: bool,
status: String,
recovered: usize,
) {
datapoint_debug!(
"blockstore-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
(
"end_index",
(erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64,
i64
),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
("recovered", recovered as i64, i64),
);
}
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
@ -563,94 +711,20 @@ impl Blockstore {
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (&(slot, set_index), erasure_meta) in erasure_metas.iter() {
let submit_metrics = |attempted: bool, status: String, recovered: usize| {
datapoint_debug!(
"blockstore-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
(
"end_index",
(erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64,
i64
),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
("recovered", recovered as i64, i64),
);
};
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds = vec![];
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) =
prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
}
});
(erasure_meta.first_coding_index
..erasure_meta.first_coding_index
+ erasure_meta.config.num_coding() as u64)
.for_each(|i| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Shred::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
} else {
None
}
})
{
available_shreds.push(shred);
}
});
if let Ok(mut result) = Shredder::try_recovery(
available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
erasure_meta.first_coding_index as usize,
slot,
) {
submit_metrics(true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
} else {
submit_metrics(true, "incomplete".into(), 0);
}
Self::recover_shreds(
index,
set_index,
erasure_meta,
prev_inserted_datas,
prev_inserted_codes,
&mut recovered_data_shreds,
&data_cf,
&code_cf,
);
}
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
@ -665,10 +739,24 @@ impl Blockstore {
}
},
);
submit_metrics(false, "complete".into(), 0);
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
"complete".into(),
0,
);
}
ErasureMetaStatus::StillNeed(needed) => {
submit_metrics(false, format!("still need: {}", needed), 0);
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
format!("still need: {}", needed),
0,
);
}
};
}