adds methods to obtain data/coding shreds indices from ErasureMeta

This commit is contained in:
behzad nouri 2021-11-12 14:16:48 -05:00
parent d25d9be555
commit 3fc858eb60
3 changed files with 123 additions and 157 deletions

View File

@ -86,6 +86,8 @@ fn check_shreds(
if shred1.slot() != shred2.slot() {
Err(Error::SlotMismatch)
} else if shred1.index() != shred2.index() {
// TODO: Should also allow two coding shreds with different indices but
// same fec-set-index and mismatching erasure-config.
Err(Error::ShredIndexMismatch)
} else if shred1.common_header.shred_type != shred2.common_header.shred_type {
Err(Error::ShredTypeMismatch)

View File

@ -623,140 +623,103 @@ impl Blockstore {
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
}
fn get_recovery_data_shreds(
index: &mut Index,
set_index: u64,
fn get_recovery_data_shreds<'a>(
index: &'a Index,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
) {
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
}) {
available_shreds.push(shred);
}
erasure_meta: &'a ErasureMeta,
prev_inserted_datas: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
data_cf: &'a LedgerColumn<cf::ShredData>,
) -> impl Iterator<Item = Shred> + 'a {
erasure_meta.data_shreds_indices().filter_map(move |i| {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)) {
return Some(shred);
}
});
if !index.data().is_present(i) {
return None;
}
match data_cf.get_bytes((slot, i)).unwrap() {
None => {
warn!("Data shred deleted while reading for recovery");
None
}
Some(data) => Shred::new_from_serialized_shred(data).ok(),
}
})
}
fn get_recovery_coding_shreds(
index: &mut Index,
fn get_recovery_coding_shreds<'a>(
index: &'a mut Index,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
(erasure_meta.set_index..erasure_meta.set_index + erasure_meta.config.num_coding() as u64)
.for_each(|i| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Shred::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
} else {
None
}
})
{
available_shreds.push(shred);
erasure_meta: &'a ErasureMeta,
prev_inserted_codes: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
code_cf: &'a LedgerColumn<cf::ShredCode>,
) -> impl Iterator<Item = Shred> + 'a {
erasure_meta.coding_shreds_indices().filter_map(move |i| {
if let Some(shred) = prev_inserted_codes.remove(&(slot, i)) {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
return Some(shred);
}
if !index.coding().is_present(i) {
return None;
}
match code_cf.get_bytes((slot, i)).unwrap() {
None => {
warn!("Code shred deleted while reading for recovery");
None
}
});
Some(code) => Shred::new_from_serialized_shred(code).ok(),
}
})
}
fn recover_shreds(
index: &mut Index,
set_index: u64,
erasure_meta: &ErasureMeta,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_codes: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
recovered_data_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds = vec![];
Self::get_recovery_data_shreds(
index,
set_index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_datas,
data_cf,
);
Self::get_recovery_coding_shreds(
let mut available_shreds: Vec<_> =
Self::get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_datas, data_cf)
.collect();
available_shreds.extend(Self::get_recovery_coding_shreds(
index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_codes,
code_cf,
);
));
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
Self::submit_metrics(
slot,
set_index,
erasure_meta,
true,
"complete".into(),
result.len(),
);
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, set_index, erasure_meta, true, "incomplete".into(), 0);
Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0);
}
}
fn submit_metrics(
slot: Slot,
set_index: u64,
erasure_meta: &ErasureMeta,
attempted: bool,
status: String,
recovered: usize,
) {
let mut data_shreds_indices = erasure_meta.data_shreds_indices();
let start_index = data_shreds_indices.next().unwrap_or_default();
let end_index = data_shreds_indices.last().unwrap_or(start_index);
datapoint_debug!(
"blockstore-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
(
"end_index",
(erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64,
i64
),
("start_index", start_index, i64),
("end_index", end_index + 1, i64),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
("recovered", recovered as i64, i64),
@ -765,10 +728,10 @@ impl Blockstore {
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_codes: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
@ -778,14 +741,13 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (&(slot, set_index), erasure_meta) in erasure_metas.iter() {
for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() {
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(index) {
ErasureMetaStatus::CanRecover => {
Self::recover_shreds(
index,
set_index,
erasure_meta,
prev_inserted_datas,
prev_inserted_codes,
@ -795,31 +757,21 @@ impl Blockstore {
);
}
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
// Remove saved coding shreds. We don't need these for future recovery.
if prev_inserted_codes.remove(&(slot, i)).is_some() {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
}
},
);
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
"complete".into(),
0,
);
for i in erasure_meta.coding_shreds_indices() {
// Remove saved coding shreds. We don't need these for future recovery.
if prev_inserted_codes.remove(&(slot, i)).is_some() {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blockstore
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
}
}
Self::submit_metrics(slot, erasure_meta, false, "complete".into(), 0);
}
ErasureMetaStatus::StillNeed(needed) => {
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
format!("still need: {}", needed),
@ -1110,9 +1062,9 @@ impl Blockstore {
fn check_cache_coding_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
@ -1153,13 +1105,14 @@ impl Blockstore {
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta_cf
.get((slot, set_index))
self.erasure_meta(slot, set_index)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, erasure_config))
});
if erasure_config != erasure_meta.config {
// TODO: handle_duplicate is not invoked and so duplicate shreds are
// not gossiped to the rest of cluster.
if erasure_config != erasure_meta.config() {
metrics.num_coding_shreds_invalid_erasure_config += 1;
let conflicting_shred = self.find_conflicting_coding_shred(
&shred,
@ -1182,7 +1135,7 @@ impl Blockstore {
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), erasure_config
);
return false;
@ -1214,14 +1167,12 @@ impl Blockstore {
shred: &Shred,
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
) -> Option<Vec<u8>> {
// Search for the shred which set the initial erasure config, either inserted,
// or in the current batch in just_received_coding_shreds.
let coding_indices = erasure_meta.set_index
..erasure_meta.set_index + erasure_meta.config.num_coding() as u64;
let mut conflicting_shred = None;
for coding_index in coding_indices {
for coding_index in erasure_meta.coding_shreds_indices() {
let maybe_shred = self.get_coding_shred(slot, coding_index);
if let Ok(Some(shred_data)) = maybe_shred {
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
@ -1246,11 +1197,11 @@ impl Blockstore {
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
just_inserted_data_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
@ -1315,14 +1266,9 @@ impl Blockstore {
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let std::collections::hash_map::Entry::Vacant(_) = erasure_metas.entry((slot, set_index))
{
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() {
entry.insert(meta);
}
}
Ok(newly_completed_data_sets)
@ -1370,7 +1316,7 @@ impl Blockstore {
fn get_data_shred_from_just_inserted_or_db<'a>(
&'a self,
just_inserted_data_shreds: &'a HashMap<(u64, u64), Shred>,
just_inserted_data_shreds: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>,
slot: Slot,
index: u64,
) -> Cow<'a, Vec<u8>> {
@ -1387,7 +1333,7 @@ impl Blockstore {
&self,
shred: &Shred,
slot_meta: &SlotMeta,
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
just_inserted_data_shreds: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
last_root: &RwLock<u64>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,

View File

@ -1,7 +1,12 @@
use crate::erasure::ErasureConfig;
use serde::{Deserialize, Serialize};
use solana_sdk::{clock::Slot, hash::Hash};
use std::{collections::BTreeSet, ops::RangeBounds};
use {
crate::erasure::ErasureConfig,
serde::{Deserialize, Serialize},
solana_sdk::{clock::Slot, hash::Hash},
std::{
collections::BTreeSet,
ops::{Range, RangeBounds},
},
};
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family
@ -50,7 +55,7 @@ pub struct ShredIndex {
/// Erasure coding information
pub struct ErasureMeta {
/// Which erasure set in the slot this is
pub set_index: u64,
set_index: u64,
/// Deprecated field.
#[serde(rename = "first_coding_index")]
__unused_first_coding_index: u64,
@ -58,7 +63,7 @@ pub struct ErasureMeta {
#[serde(rename = "size")]
__unused_size: usize,
/// Erasure configuration for this erasure set
pub config: ErasureConfig,
config: ErasureConfig,
}
#[derive(Deserialize, Serialize)]
@ -213,7 +218,7 @@ impl SlotMeta {
}
impl ErasureMeta {
pub fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta {
pub(crate) fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta {
ErasureMeta {
set_index,
config,
@ -222,14 +227,27 @@ impl ErasureMeta {
}
}
pub fn status(&self, index: &Index) -> ErasureMetaStatus {
pub(crate) fn config(&self) -> ErasureConfig {
self.config
}
pub(crate) fn data_shreds_indices(&self) -> Range<u64> {
let num_data = self.config.num_data() as u64;
self.set_index..self.set_index + num_data
}
pub(crate) fn coding_shreds_indices(&self) -> Range<u64> {
let num_coding = self.config.num_coding() as u64;
self.set_index..self.set_index + num_coding
}
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;
let coding_indices = self.set_index..self.set_index + self.config.num_coding() as u64;
let num_coding = index.coding().present_in_bounds(coding_indices);
let num_data = index
.data()
.present_in_bounds(self.set_index..self.set_index + self.config.num_data() as u64);
let num_coding = index
.coding()
.present_in_bounds(self.coding_shreds_indices());
let num_data = index.data().present_in_bounds(self.data_shreds_indices());
let (data_missing, num_needed) = (
self.config.num_data().saturating_sub(num_data),