diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index cac54c6f35..97b9718b2e 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -342,55 +342,83 @@ impl Blocktree { // 3. Before trying recovery, check if enough number of shreds have been received // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data for (&(slot, set_index), erasure_meta) in erasure_metas.iter() { + let submit_metrics = |attempted: bool, status: String| { + datapoint_info!( + "blocktree-erasure", + ("slot", slot as i64, i64), + ("start_index", set_index as i64, i64), + ("end_index", erasure_meta.end_indexes().0 as i64, i64), + ("recovery_attempted", attempted, bool), + ("recovery_status", status, String), + ); + }; + let index = index_working_set.get(&slot).expect("Index"); - if let ErasureMetaStatus::CanRecover = erasure_meta.status(&index) { - // Find shreds for this erasure set and try recovery - let slot = index.slot; - let mut available_shreds = vec![]; - (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { - if index.data().is_present(i) { - if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { - let some_data = data_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch data shred"); - if let Some(data) = some_data { - bincode::deserialize(&data).ok() - } else { - warn!("Data shred deleted while reading for recovery"); - None + match erasure_meta.status(&index) { + ErasureMetaStatus::CanRecover => { + // Find shreds for this erasure set and try recovery + let slot = index.slot; + let mut available_shreds = vec![]; + (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { + if index.data().is_present(i) { + if let Some(shred) = + prev_inserted_datas.remove(&(slot, i)).or_else(|| { + let some_data = data_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch data shred"); + if let Some(data) = some_data { + bincode::deserialize(&data).ok() + } else { + warn!("Data shred deleted while reading for recovery"); + None + } + }) + { + available_shreds.push(shred); } - }) { - available_shreds.push(shred); } - } - }); - (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(|i| { - if index.coding().is_present(i) { - if let Some(shred) = prev_inserted_codes.remove(&(slot, i)).or_else(|| { - let some_code = code_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch code shred"); - if let Some(code) = some_code { - bincode::deserialize(&code).ok() - } else { - warn!("Code shred deleted while reading for recovery"); - None + }); + (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( + |i| { + if index.coding().is_present(i) { + if let Some(shred) = + prev_inserted_codes.remove(&(slot, i)).or_else(|| { + let some_code = code_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch code shred"); + if let Some(code) = some_code { + bincode::deserialize(&code).ok() + } else { + warn!("Code shred deleted while reading for recovery"); + None + } + }) + { + available_shreds.push(shred); + } } - }) { - available_shreds.push(shred); - } + }, + ); + if let Ok(mut result) = Shredder::try_recovery( + &available_shreds, + erasure_meta.config.num_data(), + erasure_meta.config.num_coding(), + set_index as usize, + slot, + ) { + submit_metrics(true, "complete".into()); + recovered_data_shreds.append(&mut result.recovered_data); + } else { + submit_metrics(true, "incomplete".into()); } - }); - if let Ok(mut result) = Shredder::try_recovery( - &available_shreds, - erasure_meta.config.num_data(), - erasure_meta.config.num_coding(), - set_index as usize, - slot, - ) { - recovered_data_shreds.append(&mut result.recovered_data); } - } + ErasureMetaStatus::DataFull => { + submit_metrics(false, "complete".into()); + } + ErasureMetaStatus::StillNeed(needed) => { + submit_metrics(false, format!("still need: {}", needed)); + } + }; } recovered_data_shreds } @@ -475,6 +503,10 @@ impl Blocktree { &mut write_batch, )?; + for ((slot, set_index), erasure_meta) in erasure_metas { + write_batch.put::((slot, set_index), &erasure_meta)?; + } + for (&slot, index) in index_working_set.iter() { write_batch.put::(slot, index)?; }