Persist coding shreds (#21214)
This commit is contained in:
parent
22ada18957
commit
38fcfb7542
|
@ -842,10 +842,11 @@ impl Blockstore {
|
|||
};
|
||||
}
|
||||
ShredType::Code => {
|
||||
self.check_cache_coding_shred(
|
||||
self.check_insert_coding_shred(
|
||||
shred,
|
||||
&mut erasure_metas,
|
||||
&mut index_working_set,
|
||||
&mut write_batch,
|
||||
&mut just_inserted_coding_shreds,
|
||||
&mut index_meta_time,
|
||||
handle_duplicate,
|
||||
|
@ -924,16 +925,6 @@ impl Blockstore {
|
|||
start.stop();
|
||||
metrics.shred_recovery_elapsed += start.as_us();
|
||||
|
||||
metrics.num_inserted += just_inserted_coding_shreds.len() as u64;
|
||||
for shred in just_inserted_coding_shreds.into_values() {
|
||||
self.check_insert_coding_shred(
|
||||
shred,
|
||||
&mut index_working_set,
|
||||
&mut write_batch,
|
||||
&mut index_meta_time,
|
||||
);
|
||||
}
|
||||
|
||||
let mut start = Measure::start("Shred recovery");
|
||||
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
|
||||
// drop the others
|
||||
|
@ -1022,39 +1013,18 @@ impl Blockstore {
|
|||
)
|
||||
}
|
||||
|
||||
fn check_insert_coding_shred(
|
||||
&self,
|
||||
shred: Shred,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
write_batch: &mut WriteBatch,
|
||||
index_meta_time: &mut u64,
|
||||
) -> bool {
|
||||
let slot = shred.slot();
|
||||
|
||||
let index_meta_working_set_entry =
|
||||
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
|
||||
|
||||
let index_meta = &mut index_meta_working_set_entry.index;
|
||||
// This gives the index of first coding shred in this FEC block
|
||||
// So, all coding shreds in a given FEC block will have the same set index
|
||||
self.insert_coding_shred(index_meta, &shred, write_batch)
|
||||
.map(|_| {
|
||||
index_meta_working_set_entry.did_insert_occur = true;
|
||||
})
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
|
||||
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|
||||
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn check_cache_coding_shred<F>(
|
||||
fn check_insert_coding_shred<F>(
|
||||
&self,
|
||||
shred: Shred,
|
||||
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
write_batch: &mut WriteBatch,
|
||||
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
||||
index_meta_time: &mut u64,
|
||||
handle_duplicate: &F,
|
||||
|
@ -1138,19 +1108,23 @@ impl Blockstore {
|
|||
e.num_repaired += 1;
|
||||
}
|
||||
|
||||
// Should be safe to modify index_meta here. Two cases
|
||||
// 1) Recovery happens: Then all inserted erasure metas are removed
|
||||
// from just_received_coding_shreds, and nothing will be committed by
|
||||
// `check_insert_coding_shred`, so the coding index meta will not be
|
||||
// committed
|
||||
index_meta.coding_mut().set_present(shred_index, true);
|
||||
// insert coding shred into rocks
|
||||
let result = self
|
||||
.insert_coding_shred(index_meta, &shred, write_batch)
|
||||
.is_ok();
|
||||
|
||||
if result {
|
||||
index_meta_working_set_entry.did_insert_occur = true;
|
||||
metrics.num_inserted += 1;
|
||||
}
|
||||
|
||||
if let HashMapEntry::Vacant(entry) = just_received_coding_shreds.entry((slot, shred_index))
|
||||
{
|
||||
metrics.num_coding_shreds_inserted += 1;
|
||||
entry.insert(shred);
|
||||
}
|
||||
true
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn find_conflicting_coding_shred(
|
||||
|
@ -5573,7 +5547,7 @@ pub mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_check_cache_coding_shred() {
|
||||
pub fn test_check_insert_coding_shred() {
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
|
||||
|
||||
|
@ -5584,11 +5558,13 @@ pub mod tests {
|
|||
let mut erasure_metas = HashMap::new();
|
||||
let mut index_working_set = HashMap::new();
|
||||
let mut just_received_coding_shreds = HashMap::new();
|
||||
let mut write_batch = blockstore.db.batch().unwrap();
|
||||
let mut index_meta_time = 0;
|
||||
assert!(blockstore.check_cache_coding_shred(
|
||||
assert!(blockstore.check_insert_coding_shred(
|
||||
coding_shred.clone(),
|
||||
&mut erasure_metas,
|
||||
&mut index_working_set,
|
||||
&mut write_batch,
|
||||
&mut just_received_coding_shreds,
|
||||
&mut index_meta_time,
|
||||
&|_shred| {
|
||||
|
@ -5602,10 +5578,11 @@ pub mod tests {
|
|||
// insert again fails on dupe
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
let counter = AtomicUsize::new(0);
|
||||
assert!(!blockstore.check_cache_coding_shred(
|
||||
assert!(!blockstore.check_insert_coding_shred(
|
||||
coding_shred,
|
||||
&mut erasure_metas,
|
||||
&mut index_working_set,
|
||||
&mut write_batch,
|
||||
&mut just_received_coding_shreds,
|
||||
&mut index_meta_time,
|
||||
&|_shred| {
|
||||
|
|
Loading…
Reference in New Issue