diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 7af18a1a52..4a9c051d69 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -86,6 +86,8 @@ fn check_shreds( if shred1.slot() != shred2.slot() { Err(Error::SlotMismatch) } else if shred1.index() != shred2.index() { + // TODO: Should also allow two coding shreds with different indices but + // same fec-set-index and mismatching erasure-config. Err(Error::ShredIndexMismatch) } else if shred1.common_header.shred_type != shred2.common_header.shred_type { Err(Error::ShredTypeMismatch) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 69ea35527c..c84b6ba6ac 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -623,140 +623,103 @@ impl Blockstore { Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot)) } - fn get_recovery_data_shreds( - index: &mut Index, - set_index: u64, + fn get_recovery_data_shreds<'a>( + index: &'a Index, slot: Slot, - erasure_meta: &ErasureMeta, - available_shreds: &mut Vec, - prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, - data_cf: &LedgerColumn, - ) { - (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { - if index.data().is_present(i) { - if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { - let some_data = data_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch data shred"); - if let Some(data) = some_data { - Shred::new_from_serialized_shred(data).ok() - } else { - warn!("Data shred deleted while reading for recovery"); - None - } - }) { - available_shreds.push(shred); - } + erasure_meta: &'a ErasureMeta, + prev_inserted_datas: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + data_cf: &'a LedgerColumn, + ) -> impl Iterator + 'a { + erasure_meta.data_shreds_indices().filter_map(move |i| { + if let Some(shred) = prev_inserted_datas.remove(&(slot, i)) { + return Some(shred); } - }); + if !index.data().is_present(i) { + return None; + } + match data_cf.get_bytes((slot, i)).unwrap() { + None => { + warn!("Data shred deleted while reading for recovery"); + None + } + Some(data) => Shred::new_from_serialized_shred(data).ok(), + } + }) } - fn get_recovery_coding_shreds( - index: &mut Index, + fn get_recovery_coding_shreds<'a>( + index: &'a mut Index, slot: Slot, - erasure_meta: &ErasureMeta, - available_shreds: &mut Vec, - prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, - code_cf: &LedgerColumn, - ) { - (erasure_meta.set_index..erasure_meta.set_index + erasure_meta.config.num_coding() as u64) - .for_each(|i| { - if let Some(shred) = prev_inserted_codes - .remove(&(slot, i)) - .map(|s| { - // Remove from the index so it doesn't get committed. We know - // this is safe to do because everything in - // `prev_inserted_codes` does not yet exist in blockstore - // (guaranteed by `check_cache_coding_shred`) - index.coding_mut().set_present(i, false); - s - }) - .or_else(|| { - if index.coding().is_present(i) { - let some_code = code_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch code shred"); - if let Some(code) = some_code { - Shred::new_from_serialized_shred(code).ok() - } else { - warn!("Code shred deleted while reading for recovery"); - None - } - } else { - None - } - }) - { - available_shreds.push(shred); + erasure_meta: &'a ErasureMeta, + prev_inserted_codes: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + code_cf: &'a LedgerColumn, + ) -> impl Iterator + 'a { + erasure_meta.coding_shreds_indices().filter_map(move |i| { + if let Some(shred) = prev_inserted_codes.remove(&(slot, i)) { + // Remove from the index so it doesn't get committed. We know + // this is safe to do because everything in + // `prev_inserted_codes` does not yet exist in blockstore + // (guaranteed by `check_cache_coding_shred`) + index.coding_mut().set_present(i, false); + return Some(shred); + } + if !index.coding().is_present(i) { + return None; + } + match code_cf.get_bytes((slot, i)).unwrap() { + None => { + warn!("Code shred deleted while reading for recovery"); + None } - }); + Some(code) => Shred::new_from_serialized_shred(code).ok(), + } + }) } fn recover_shreds( index: &mut Index, - set_index: u64, erasure_meta: &ErasureMeta, - prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, - prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_codes: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, recovered_data_shreds: &mut Vec, data_cf: &LedgerColumn, code_cf: &LedgerColumn, ) { // Find shreds for this erasure set and try recovery let slot = index.slot; - let mut available_shreds = vec![]; - - Self::get_recovery_data_shreds( - index, - set_index, - slot, - erasure_meta, - &mut available_shreds, - prev_inserted_datas, - data_cf, - ); - - Self::get_recovery_coding_shreds( + let mut available_shreds: Vec<_> = + Self::get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_datas, data_cf) + .collect(); + available_shreds.extend(Self::get_recovery_coding_shreds( index, slot, erasure_meta, - &mut available_shreds, prev_inserted_codes, code_cf, - ); - + )); if let Ok(mut result) = Shredder::try_recovery(available_shreds) { - Self::submit_metrics( - slot, - set_index, - erasure_meta, - true, - "complete".into(), - result.len(), - ); + Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len()); recovered_data_shreds.append(&mut result); } else { - Self::submit_metrics(slot, set_index, erasure_meta, true, "incomplete".into(), 0); + Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0); } } fn submit_metrics( slot: Slot, - set_index: u64, erasure_meta: &ErasureMeta, attempted: bool, status: String, recovered: usize, ) { + let mut data_shreds_indices = erasure_meta.data_shreds_indices(); + let start_index = data_shreds_indices.next().unwrap_or_default(); + let end_index = data_shreds_indices.last().unwrap_or(start_index); datapoint_debug!( "blockstore-erasure", ("slot", slot as i64, i64), - ("start_index", set_index as i64, i64), - ( - "end_index", - (erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64, - i64 - ), + ("start_index", start_index, i64), + ("end_index", end_index + 1, i64), ("recovery_attempted", attempted, bool), ("recovery_status", status, String), ("recovered", recovered as i64, i64), @@ -765,10 +728,10 @@ impl Blockstore { fn try_shred_recovery( db: &Database, - erasure_metas: &HashMap<(u64, u64), ErasureMeta>, + erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, - prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, - prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_codes: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -778,14 +741,13 @@ impl Blockstore { // 2. For new data shreds, check if an erasure set exists. If not, don't try recovery // 3. Before trying recovery, check if enough number of shreds have been received // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data - for (&(slot, set_index), erasure_meta) in erasure_metas.iter() { + for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() { let index_meta_entry = index_working_set.get_mut(&slot).expect("Index"); let index = &mut index_meta_entry.index; match erasure_meta.status(index) { ErasureMetaStatus::CanRecover => { Self::recover_shreds( index, - set_index, erasure_meta, prev_inserted_datas, prev_inserted_codes, @@ -795,31 +757,21 @@ impl Blockstore { ); } ErasureMetaStatus::DataFull => { - (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( - |i| { - // Remove saved coding shreds. We don't need these for future recovery. - if prev_inserted_codes.remove(&(slot, i)).is_some() { - // Remove from the index so it doesn't get committed. We know - // this is safe to do because everything in - // `prev_inserted_codes` does not yet exist in blockstore - // (guaranteed by `check_cache_coding_shred`) - index.coding_mut().set_present(i, false); - } - }, - ); - Self::submit_metrics( - slot, - set_index, - erasure_meta, - false, - "complete".into(), - 0, - ); + for i in erasure_meta.coding_shreds_indices() { + // Remove saved coding shreds. We don't need these for future recovery. + if prev_inserted_codes.remove(&(slot, i)).is_some() { + // Remove from the index so it doesn't get committed. We know + // this is safe to do because everything in + // `prev_inserted_codes` does not yet exist in blockstore + // (guaranteed by `check_cache_coding_shred`) + index.coding_mut().set_present(i, false); + } + } + Self::submit_metrics(slot, erasure_meta, false, "complete".into(), 0); } ErasureMetaStatus::StillNeed(needed) => { Self::submit_metrics( slot, - set_index, erasure_meta, false, format!("still need: {}", needed), @@ -1110,9 +1062,9 @@ impl Blockstore { fn check_cache_coding_shred( &self, shred: Shred, - erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, - just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, + just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, @@ -1153,13 +1105,14 @@ impl Blockstore { ); let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { - self.erasure_meta_cf - .get((slot, set_index)) + self.erasure_meta(slot, set_index) .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::new(set_index, erasure_config)) }); - if erasure_config != erasure_meta.config { + // TODO: handle_duplicate is not invoked and so duplicate shreds are + // not gossiped to the rest of cluster. + if erasure_config != erasure_meta.config() { metrics.num_coding_shreds_invalid_erasure_config += 1; let conflicting_shred = self.find_conflicting_coding_shred( &shred, @@ -1182,7 +1135,7 @@ impl Blockstore { warn!("Received multiple erasure configs for the same erasure set!!!"); warn!( "Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", - slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config + slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), erasure_config ); return false; @@ -1214,14 +1167,12 @@ impl Blockstore { shred: &Shred, slot: Slot, erasure_meta: &ErasureMeta, - just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, + just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, ) -> Option> { // Search for the shred which set the initial erasure config, either inserted, // or in the current batch in just_received_coding_shreds. - let coding_indices = erasure_meta.set_index - ..erasure_meta.set_index + erasure_meta.config.num_coding() as u64; let mut conflicting_shred = None; - for coding_index in coding_indices { + for coding_index in erasure_meta.coding_shreds_indices() { let maybe_shred = self.get_coding_shred(slot, coding_index); if let Ok(Some(shred_data)) = maybe_shred { let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap(); @@ -1246,11 +1197,11 @@ impl Blockstore { fn check_insert_data_shred( &self, shred: Shred, - erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, + just_inserted_data_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, index_meta_time: &mut u64, is_trusted: bool, handle_duplicate: &F, @@ -1315,14 +1266,9 @@ impl Blockstore { just_inserted_data_shreds.insert((slot, shred_index), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - if let std::collections::hash_map::Entry::Vacant(_) = erasure_metas.entry((slot, set_index)) - { - if let Some(meta) = self - .erasure_meta_cf - .get((slot, set_index)) - .expect("Expect database get to succeed") - { - erasure_metas.insert((slot, set_index), meta); + if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) { + if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() { + entry.insert(meta); } } Ok(newly_completed_data_sets) @@ -1370,7 +1316,7 @@ impl Blockstore { fn get_data_shred_from_just_inserted_or_db<'a>( &'a self, - just_inserted_data_shreds: &'a HashMap<(u64, u64), Shred>, + just_inserted_data_shreds: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>, slot: Slot, index: u64, ) -> Cow<'a, Vec> { @@ -1387,7 +1333,7 @@ impl Blockstore { &self, shred: &Shred, slot_meta: &SlotMeta, - just_inserted_data_shreds: &HashMap<(u64, u64), Shred>, + just_inserted_data_shreds: &HashMap<(Slot, /*shred index:*/ u64), Shred>, last_root: &RwLock, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 480bed61f3..a1187d048a 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -1,7 +1,12 @@ -use crate::erasure::ErasureConfig; -use serde::{Deserialize, Serialize}; -use solana_sdk::{clock::Slot, hash::Hash}; -use std::{collections::BTreeSet, ops::RangeBounds}; +use { + crate::erasure::ErasureConfig, + serde::{Deserialize, Serialize}, + solana_sdk::{clock::Slot, hash::Hash}, + std::{ + collections::BTreeSet, + ops::{Range, RangeBounds}, + }, +}; #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] // The Meta column family @@ -50,7 +55,7 @@ pub struct ShredIndex { /// Erasure coding information pub struct ErasureMeta { /// Which erasure set in the slot this is - pub set_index: u64, + set_index: u64, /// Deprecated field. #[serde(rename = "first_coding_index")] __unused_first_coding_index: u64, @@ -58,7 +63,7 @@ pub struct ErasureMeta { #[serde(rename = "size")] __unused_size: usize, /// Erasure configuration for this erasure set - pub config: ErasureConfig, + config: ErasureConfig, } #[derive(Deserialize, Serialize)] @@ -213,7 +218,7 @@ impl SlotMeta { } impl ErasureMeta { - pub fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta { + pub(crate) fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta { ErasureMeta { set_index, config, @@ -222,14 +227,27 @@ impl ErasureMeta { } } - pub fn status(&self, index: &Index) -> ErasureMetaStatus { + pub(crate) fn config(&self) -> ErasureConfig { + self.config + } + + pub(crate) fn data_shreds_indices(&self) -> Range { + let num_data = self.config.num_data() as u64; + self.set_index..self.set_index + num_data + } + + pub(crate) fn coding_shreds_indices(&self) -> Range { + let num_coding = self.config.num_coding() as u64; + self.set_index..self.set_index + num_coding + } + + pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus { use ErasureMetaStatus::*; - let coding_indices = self.set_index..self.set_index + self.config.num_coding() as u64; - let num_coding = index.coding().present_in_bounds(coding_indices); - let num_data = index - .data() - .present_in_bounds(self.set_index..self.set_index + self.config.num_data() as u64); + let num_coding = index + .coding() + .present_in_bounds(self.coding_shreds_indices()); + let num_data = index.data().present_in_bounds(self.data_shreds_indices()); let (data_missing, num_needed) = ( self.config.num_data().saturating_sub(num_data),