From 38fcfb754217d50ba5576ea80dcde8d74c51f74d Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Tue, 30 Nov 2021 09:18:36 -0800 Subject: [PATCH] Persist coding shreds (#21214) --- ledger/src/blockstore.rs | 65 +++++++++++++--------------------------- 1 file changed, 21 insertions(+), 44 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5fd17b895..eb6d01620 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -842,10 +842,11 @@ impl Blockstore { }; } ShredType::Code => { - self.check_cache_coding_shred( + self.check_insert_coding_shred( shred, &mut erasure_metas, &mut index_working_set, + &mut write_batch, &mut just_inserted_coding_shreds, &mut index_meta_time, handle_duplicate, @@ -924,16 +925,6 @@ impl Blockstore { start.stop(); metrics.shred_recovery_elapsed += start.as_us(); - metrics.num_inserted += just_inserted_coding_shreds.len() as u64; - for shred in just_inserted_coding_shreds.into_values() { - self.check_insert_coding_shred( - shred, - &mut index_working_set, - &mut write_batch, - &mut index_meta_time, - ); - } - let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others @@ -1022,39 +1013,18 @@ impl Blockstore { ) } - fn check_insert_coding_shred( - &self, - shred: Shred, - index_working_set: &mut HashMap, - write_batch: &mut WriteBatch, - index_meta_time: &mut u64, - ) -> bool { - let slot = shred.slot(); - - let index_meta_working_set_entry = - get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); - - let index_meta = &mut index_meta_working_set_entry.index; - // This gives the index of first coding shred in this FEC block - // So, all coding shreds in a given FEC block will have the same set index - self.insert_coding_shred(index_meta, &shred, write_batch) - .map(|_| { - index_meta_working_set_entry.did_insert_occur = true; - }) - .is_ok() - } - fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool { shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds || shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds } #[allow(clippy::too_many_arguments)] - fn check_cache_coding_shred( + fn check_insert_coding_shred( &self, shred: Shred, erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>, index_working_set: &mut HashMap, + write_batch: &mut WriteBatch, just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>, index_meta_time: &mut u64, handle_duplicate: &F, @@ -1138,19 +1108,23 @@ impl Blockstore { e.num_repaired += 1; } - // Should be safe to modify index_meta here. Two cases - // 1) Recovery happens: Then all inserted erasure metas are removed - // from just_received_coding_shreds, and nothing will be committed by - // `check_insert_coding_shred`, so the coding index meta will not be - // committed - index_meta.coding_mut().set_present(shred_index, true); + // insert coding shred into rocks + let result = self + .insert_coding_shred(index_meta, &shred, write_batch) + .is_ok(); + + if result { + index_meta_working_set_entry.did_insert_occur = true; + metrics.num_inserted += 1; + } if let HashMapEntry::Vacant(entry) = just_received_coding_shreds.entry((slot, shred_index)) { metrics.num_coding_shreds_inserted += 1; entry.insert(shred); } - true + + result } fn find_conflicting_coding_shred( @@ -5573,7 +5547,7 @@ pub mod tests { } #[test] - pub fn test_check_cache_coding_shred() { + pub fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -5584,11 +5558,13 @@ pub mod tests { let mut erasure_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_coding_shreds = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); let mut index_meta_time = 0; - assert!(blockstore.check_cache_coding_shred( + assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, &mut index_working_set, + &mut write_batch, &mut just_received_coding_shreds, &mut index_meta_time, &|_shred| { @@ -5602,10 +5578,11 @@ pub mod tests { // insert again fails on dupe use std::sync::atomic::{AtomicUsize, Ordering}; let counter = AtomicUsize::new(0); - assert!(!blockstore.check_cache_coding_shred( + assert!(!blockstore.check_insert_coding_shred( coding_shred, &mut erasure_metas, &mut index_working_set, + &mut write_batch, &mut just_received_coding_shreds, &mut index_meta_time, &|_shred| {