From 4ceb2689f5337706ae42c27a54794bce9ae29443 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 14 Dec 2021 17:34:02 +0000 Subject: [PATCH] adds ShredId uniquely identifying each shred (#21820) --- core/src/retransmit_stage.rs | 6 +- core/src/window_service.rs | 9 +-- ledger/src/blockstore.rs | 134 +++++++++++++++++------------------ ledger/src/shred.rs | 19 +++++ 4 files changed, 89 insertions(+), 79 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ee91be1cd..5093bb253 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -24,7 +24,7 @@ use { solana_ledger::{ blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, - shred::{Shred, ShredType}, + shred::{Shred, ShredId}, }, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, @@ -145,13 +145,13 @@ impl RetransmitStats { } // Map of shred (slot, index, type) => list of hash values seen for that key. -type ShredFilter = LruCache<(Slot, u32, ShredType), Vec>; +type ShredFilter = LruCache>; type ShredFilterAndHasher = (ShredFilter, PacketHasher); // Returns true if shred is already received and should skip retransmit. fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex) -> bool { - let key = (shred.slot(), shred.index(), shred.shred_type()); + let key = shred.id(); let mut shreds_received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut(); match cache.get_mut(&key) { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 978d39c3e..7f20dc89c 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -217,12 +217,9 @@ fn run_check_duplicate( let check_duplicate = |shred: Shred| -> Result<()> { let shred_slot = shred.slot(); if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { - if let Some(existing_shred_payload) = blockstore.is_shred_duplicate( - shred_slot, - shred.index(), - shred.payload.clone(), - shred.shred_type(), - ) { + if let Some(existing_shred_payload) = + blockstore.is_shred_duplicate(shred.id(), shred.payload.clone()) + { cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; blockstore.store_duplicate_slot( shred_slot, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index a7553f5a1..ab9e4e790 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -13,7 +13,7 @@ use { leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ - max_ticks_per_n_shreds, Result as ShredResult, Shred, ShredType, Shredder, + max_ticks_per_n_shreds, Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE, }, }, @@ -629,12 +629,13 @@ impl Blockstore { index: &'a Index, slot: Slot, erasure_meta: &'a ErasureMeta, - prev_inserted_datas: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_shreds: &'a HashMap, data_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.data_shreds_indices().filter_map(move |i| { - if let Some(shred) = prev_inserted_datas.remove(&(slot, i)) { - return Some(shred); + let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data); + if let Some(shred) = prev_inserted_shreds.get(&key) { + return Some(shred.clone()); } if !index.data().is_present(i) { return None; @@ -650,14 +651,15 @@ impl Blockstore { } fn get_recovery_coding_shreds<'a>( - index: &'a mut Index, + index: &'a Index, slot: Slot, erasure_meta: &'a ErasureMeta, - prev_inserted_codes: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_shreds: &'a HashMap, code_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.coding_shreds_indices().filter_map(move |i| { - if let Some(shred) = prev_inserted_codes.get(&(slot, i)) { + let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code); + if let Some(shred) = prev_inserted_shreds.get(&key) { return Some(shred.clone()); } if !index.coding().is_present(i) { @@ -676,24 +678,28 @@ impl Blockstore { fn recover_shreds( index: &mut Index, erasure_meta: &ErasureMeta, - prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, - prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_shreds: &HashMap, recovered_data_shreds: &mut Vec, data_cf: &LedgerColumn, code_cf: &LedgerColumn, ) { // Find shreds for this erasure set and try recovery let slot = index.slot; - let mut available_shreds: Vec<_> = - Self::get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_datas, data_cf) - .collect(); - available_shreds.extend(Self::get_recovery_coding_shreds( + let available_shreds: Vec<_> = Self::get_recovery_data_shreds( index, slot, erasure_meta, - prev_inserted_codes, + prev_inserted_shreds, + data_cf, + ) + .chain(Self::get_recovery_coding_shreds( + index, + slot, + erasure_meta, + prev_inserted_shreds, code_cf, - )); + )) + .collect(); if let Ok(mut result) = Shredder::try_recovery(available_shreds) { Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len()); recovered_data_shreds.append(&mut result); @@ -727,8 +733,7 @@ impl Blockstore { db: &Database, erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, - prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, - prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>, + prev_inserted_shreds: &HashMap, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -746,8 +751,7 @@ impl Blockstore { Self::recover_shreds( index, erasure_meta, - prev_inserted_datas, - prev_inserted_codes, + prev_inserted_shreds, &mut recovered_data_shreds, &data_cf, &code_cf, @@ -793,8 +797,7 @@ impl Blockstore { let db = &*self.db; let mut write_batch = db.batch()?; - let mut just_inserted_coding_shreds = HashMap::new(); - let mut just_inserted_data_shreds = HashMap::new(); + let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); @@ -818,7 +821,7 @@ impl Blockstore { &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, - &mut just_inserted_data_shreds, + &mut just_inserted_shreds, &mut index_meta_time, is_trusted, handle_duplicate, @@ -846,7 +849,7 @@ impl Blockstore { &mut erasure_metas, &mut index_working_set, &mut write_batch, - &mut just_inserted_coding_shreds, + &mut just_inserted_shreds, &mut index_meta_time, handle_duplicate, is_trusted, @@ -865,8 +868,7 @@ impl Blockstore { db, &erasure_metas, &mut index_working_set, - &mut just_inserted_data_shreds, - &just_inserted_coding_shreds, + &just_inserted_shreds, ); metrics.num_recovered += recovered_data_shreds.len(); @@ -885,7 +887,7 @@ impl Blockstore { &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, - &mut just_inserted_data_shreds, + &mut just_inserted_shreds, &mut index_meta_time, is_trusted, &handle_duplicate, @@ -1013,6 +1015,8 @@ impl Blockstore { } fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool { + // TODO should also compare first-coding-index once position field is + // populated across cluster. shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds || shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds } @@ -1024,7 +1028,7 @@ impl Blockstore { erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + just_received_shreds: &mut HashMap, index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, @@ -1073,7 +1077,7 @@ impl Blockstore { &shred, slot, erasure_meta, - just_received_coding_shreds, + just_received_shreds, ); if let Some(conflicting_shred) = conflicting_shred { if self @@ -1112,8 +1116,7 @@ impl Blockstore { metrics.num_inserted += 1; } - if let HashMapEntry::Vacant(entry) = just_received_coding_shreds.entry((slot, shred_index)) - { + if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) { metrics.num_coding_shreds_inserted += 1; entry.insert(shred); } @@ -1126,30 +1129,27 @@ impl Blockstore { shred: &Shred, slot: Slot, erasure_meta: &ErasureMeta, - just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + just_received_shreds: &HashMap, ) -> Option> { // Search for the shred which set the initial erasure config, either inserted, - // or in the current batch in just_received_coding_shreds. - let mut conflicting_shred = None; + // or in the current batch in just_received_shreds. for coding_index in erasure_meta.coding_shreds_indices() { let maybe_shred = self.get_coding_shred(slot, coding_index); if let Ok(Some(shred_data)) = maybe_shred { let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap(); if Self::erasure_mismatch(&potential_shred, shred) { - conflicting_shred = Some(potential_shred.payload); + return Some(potential_shred.payload); } - break; - } else if let Some(potential_shred) = - just_received_coding_shreds.get(&(slot, coding_index)) - { + } else if let Some(potential_shred) = { + let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code); + just_received_shreds.get(&key) + } { if Self::erasure_mismatch(potential_shred, shred) { - conflicting_shred = Some(potential_shred.payload.clone()); + return Some(potential_shred.payload.clone()); } - break; } } - - conflicting_shred + None } #[allow(clippy::too_many_arguments)] @@ -1160,7 +1160,7 @@ impl Blockstore { index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_data_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, + just_inserted_shreds: &mut HashMap, index_meta_time: &mut u64, is_trusted: bool, handle_duplicate: &F, @@ -1211,7 +1211,7 @@ impl Blockstore { if !self.should_insert_data_shred( &shred, slot_meta, - just_inserted_data_shreds, + just_inserted_shreds, &self.last_root, leader_schedule, shred_source.clone(), @@ -1228,7 +1228,7 @@ impl Blockstore { write_batch, shred_source, )?; - just_inserted_data_shreds.insert((slot, shred_index), shred); + just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) { @@ -1272,11 +1272,12 @@ impl Blockstore { fn get_data_shred_from_just_inserted_or_db<'a>( &'a self, - just_inserted_data_shreds: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>, + just_inserted_shreds: &'a HashMap, slot: Slot, index: u64, ) -> Cow<'a, Vec> { - if let Some(shred) = just_inserted_data_shreds.get(&(slot, index)) { + let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data); + if let Some(shred) = just_inserted_shreds.get(&key) { Cow::Borrowed(&shred.payload) } else { // If it doesn't exist in the just inserted set, it must exist in @@ -1289,7 +1290,7 @@ impl Blockstore { &self, shred: &Shred, slot_meta: &SlotMeta, - just_inserted_data_shreds: &HashMap<(Slot, /*shred index:*/ u64), Shred>, + just_inserted_shreds: &HashMap, last_root: &RwLock, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, @@ -1346,7 +1347,7 @@ impl Blockstore { .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_data_shreds, + just_inserted_shreds, slot, last_index.unwrap(), ); @@ -1382,7 +1383,7 @@ impl Blockstore { .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_data_shreds, + just_inserted_shreds, slot, slot_meta.received - 1, ); @@ -3003,13 +3004,8 @@ impl Blockstore { // Returns the existing shred if `new_shred` is not equal to the existing shred at the // given slot and index as this implies the leader generated two different shreds with // the same slot and index - pub fn is_shred_duplicate( - &self, - slot: u64, - index: u32, - mut payload: Vec, - shred_type: ShredType, - ) -> Option> { + pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec) -> Option> { + let (slot, index, shred_type) = shred.unwrap(); let existing_shred = match shred_type { ShredType::Data => self.get_data_shred(slot, index as u64), ShredType::Code => self.get_coding_shred(slot, index as u64), @@ -5564,7 +5560,7 @@ pub mod tests { let mut erasure_metas = HashMap::new(); let mut index_working_set = HashMap::new(); - let mut just_received_coding_shreds = HashMap::new(); + let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); let mut index_meta_time = 0; assert!(blockstore.check_insert_coding_shred( @@ -5572,7 +5568,7 @@ pub mod tests { &mut erasure_metas, &mut index_working_set, &mut write_batch, - &mut just_received_coding_shreds, + &mut just_received_shreds, &mut index_meta_time, &|_shred| { panic!("no dupes"); @@ -5590,7 +5586,7 @@ pub mod tests { &mut erasure_metas, &mut index_working_set, &mut write_batch, - &mut just_received_coding_shreds, + &mut just_received_shreds, &mut index_meta_time, &|_shred| { counter.fetch_add(1, Ordering::Relaxed); @@ -8096,19 +8092,15 @@ pub mod tests { // Check if shreds are duplicated assert_eq!( blockstore.is_shred_duplicate( - slot, - 0, + ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()), duplicate_shred.payload.clone(), - duplicate_shred.shred_type(), ), Some(shred.payload.to_vec()) ); assert!(blockstore .is_shred_duplicate( - slot, - 0, - non_duplicate_shred.payload.clone(), - non_duplicate_shred.shred_type(), + ShredId::new(slot, /*index:*/ 0, non_duplicate_shred.shred_type()), + non_duplicate_shred.payload, ) .is_none()); @@ -8576,10 +8568,12 @@ pub mod tests { std::u8::MAX - even_smaller_last_shred_duplicate.payload[0]; assert!(blockstore .is_shred_duplicate( - slot, - even_smaller_last_shred_duplicate.index(), + ShredId::new( + slot, + even_smaller_last_shred_duplicate.index(), + ShredType::Data + ), even_smaller_last_shred_duplicate.payload.clone(), - ShredType::Data, ) .is_some()); blockstore diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index e8b6ae242..172b65408 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -236,6 +236,20 @@ pub struct Shred { pub payload: Vec, } +/// Tuple which should uniquely identify a shred if it exists. +#[derive(Clone, Copy, Eq, Hash, PartialEq)] +pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); + +impl ShredId { + pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId { + ShredId(slot, index, shred_type) + } + + pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) { + (self.0, self.1, self.2) + } +} + impl Shred { fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result where @@ -438,6 +452,11 @@ impl Shred { ) } + /// Unique identifier for each shred. + pub fn id(&self) -> ShredId { + ShredId(self.slot(), self.index(), self.shred_type()) + } + pub fn slot(&self) -> Slot { self.common_header.slot }