diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 7d9c57f0d..76d407d6c 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -387,9 +387,9 @@ impl Blocktree { }); (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( |i| { - if index.coding().is_present(i) { - if let Some(shred) = - prev_inserted_codes.remove(&(slot, i)).or_else(|| { + if let Some(shred) = + prev_inserted_codes.remove(&(slot, i)).or_else(|| { + if index.coding().is_present(i) { let some_code = code_cf .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); @@ -399,10 +399,12 @@ impl Blocktree { warn!("Code shred deleted while reading for recovery"); None } - }) - { - available_shreds.push(shred); - } + } else { + None + } + }) + { + available_shreds.push(shred); } }, ); @@ -420,6 +422,12 @@ impl Blocktree { } } ErasureMetaStatus::DataFull => { + (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( + |i| { + // Remove saved coding shreds. We don't need these for future recovery + let _ = prev_inserted_codes.remove(&(slot, i)); + }, + ); submit_metrics(false, "complete".into(), 0); } ErasureMetaStatus::StillNeed(needed) => { @@ -466,11 +474,10 @@ impl Blocktree { &mut index_meta_time, ) } else if shred.is_code() { - self.check_insert_coding_shred( + self.check_cache_coding_shred( shred, &mut erasure_metas, &mut index_working_set, - &mut write_batch, &mut just_inserted_coding_shreds, &mut index_meta_time, ) @@ -515,6 +522,17 @@ impl Blocktree { start.stop(); let shred_recovery_elapsed = start.as_us(); + just_inserted_coding_shreds + .into_iter() + .for_each(|((_, _), shred)| { + self.check_insert_coding_shred( + shred, + &mut index_working_set, + &mut write_batch, + &mut index_meta_time, + ); + }); + let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others @@ -573,10 +591,31 @@ impl Blocktree { fn check_insert_coding_shred( &self, shred: Shred, - erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, + index_meta_time: &mut u64, + ) -> bool { + let slot = shred.slot(); + + let index_meta_working_set_entry = + get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); + + let index_meta = &mut index_meta_working_set_entry.index; + // This gives the index of first coding shred in this FEC block + // So, all coding shreds in a given FEC block will have the same set index + self.insert_coding_shred(index_meta, &shred, write_batch) + .map(|_| { + index_meta_working_set_entry.did_insert_occur = true; + }) + .is_ok() + } + + fn check_cache_coding_shred( + &self, + shred: Shred, + erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + index_working_set: &mut HashMap, + just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, ) -> bool { let slot = shred.slot(); @@ -589,16 +628,33 @@ impl Blocktree { // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { - self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) - .map(|_| { - // Insert was a success! - just_inserted_coding_shreds - .entry((slot, shred_index)) - .or_insert_with(|| shred); + let set_index = shred_index - u64::from(shred.coding_header.position); + let erasure_config = ErasureConfig::new( + shred.coding_header.num_data_shreds as usize, + shred.coding_header.num_coding_shreds as usize, + ); - index_meta_working_set_entry.did_insert_occur = true; - }) - .is_ok() + let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { + self.erasure_meta_cf + .get((slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config)) + }); + + if erasure_config != erasure_meta.config { + // ToDo: This is a potential slashing condition + warn!("Received multiple erasure configs for the same erasure set!!!"); + warn!( + "Stored config: {:#?}, new config: {:#?}", + erasure_meta.config, erasure_config + ); + } + + just_received_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred); + + true } else { false } @@ -668,7 +724,6 @@ impl Blocktree { fn insert_coding_shred( &self, - erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_meta: &mut Index, shred: &Shred, write_batch: &mut WriteBatch, @@ -685,28 +740,6 @@ impl Blocktree { ))); } - let set_index = shred_index - u64::from(shred.coding_header.position); - let erasure_config = ErasureConfig::new( - shred.coding_header.num_data_shreds as usize, - shred.coding_header.num_coding_shreds as usize, - ); - - let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { - self.erasure_meta_cf - .get((slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config)) - }); - - if erasure_config != erasure_meta.config { - // ToDo: This is a potential slashing condition - warn!("Received multiple erasure configs for the same erasure set!!!"); - warn!( - "Stored config: {:#?}, new config: {:#?}", - erasure_meta.config, erasure_config - ); - } - // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. write_batch.put_bytes::((slot, shred_index), &shred.payload)?;