diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index c58b701da..3aa46559e 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -61,6 +61,11 @@ pub struct Blocktree { pub completed_slots_senders: Vec>>, } +pub struct IndexMetaWorkingSetEntry { + index: Index, + did_insert_occur: bool, +} + pub struct BlocktreeInsertionMetrics { pub num_shreds: usize, pub insert_lock_elapsed: u64, @@ -72,6 +77,7 @@ pub struct BlocktreeInsertionMetrics { pub total_elapsed: u64, pub num_inserted: u64, pub num_recovered: usize, + pub index_meta_time: u64, } impl BlocktreeInsertionMetrics { @@ -305,7 +311,7 @@ impl Blocktree { fn try_shred_recovery( db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, - index_working_set: &HashMap, + index_working_set: &HashMap, prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, ) -> Vec { @@ -330,7 +336,8 @@ impl Blocktree { ); }; - let index = index_working_set.get(&slot).expect("Index"); + let index_meta_entry = index_working_set.get(&slot).expect("Index"); + let index = &index_meta_entry.index; match erasure_meta.status(&index) { ErasureMetaStatus::CanRecover => { // Find shreds for this erasure set and try recovery @@ -423,6 +430,7 @@ impl Blocktree { let num_shreds = shreds.len(); let mut start = Measure::start("Shred insertion"); let mut num_inserted = 0; + let mut index_meta_time = 0; shreds.into_iter().for_each(|shred| { let insert_success = { if shred.is_data() { @@ -432,6 +440,7 @@ impl Blocktree { &mut slot_meta_working_set, &mut write_batch, &mut just_inserted_data_shreds, + &mut index_meta_time, ) } else if shred.is_code() { self.check_insert_coding_shred( @@ -440,6 +449,7 @@ impl Blocktree { &mut index_working_set, &mut write_batch, &mut just_inserted_coding_shreds, + &mut index_meta_time, ) } else { panic!("There should be no other case"); @@ -473,6 +483,7 @@ impl Blocktree { &mut slot_meta_working_set, &mut write_batch, &mut just_inserted_coding_shreds, + &mut index_meta_time, ); } } @@ -498,8 +509,10 @@ impl Blocktree { write_batch.put::((slot, set_index), &erasure_meta)?; } - for (&slot, index) in index_working_set.iter() { - write_batch.put::(slot, index)?; + for (&slot, index_working_set_entry) in index_working_set.iter() { + if index_working_set_entry.did_insert_occur { + write_batch.put::(slot, &index_working_set_entry.index)?; + } } start.stop(); let commit_working_sets_elapsed = start.as_us(); @@ -535,6 +548,7 @@ impl Blocktree { write_batch_elapsed, num_inserted, num_recovered, + index_meta_time, }) } @@ -542,26 +556,29 @@ impl Blocktree { &self, shred: Shred, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, - index_working_set: &mut HashMap, + index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, + index_meta_time: &mut u64, ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); - let (index_meta, mut new_index_meta) = - get_index_meta_entry(&self.db, slot, index_working_set); + let index_meta_working_set_entry = + get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); - let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); + let index_meta = &mut index_meta_working_set_entry.index; // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) .map(|_| { + // Insert was a success! just_inserted_coding_shreds .entry((slot, shred_index)) .or_insert_with(|| shred); - new_index_meta.map(|n| index_working_set.insert(slot, n)) + + index_meta_working_set_entry.did_insert_occur = true; }) .is_ok() } else { @@ -572,20 +589,23 @@ impl Blocktree { fn check_insert_data_shred( &self, shred: Shred, - index_working_set: &mut HashMap, + index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, + index_meta_time: &mut u64, ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); - let (index_meta, mut new_index_meta) = - get_index_meta_entry(&self.db, slot, index_working_set); + + let index_meta_working_set_entry = + get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); + + let index_meta = &mut index_meta_working_set_entry.index; let (slot_meta_entry, mut new_slot_meta_entry) = get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); let insert_success = { - let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); let mut slot_meta = entry.0.borrow_mut(); @@ -602,7 +622,7 @@ impl Blocktree { write_batch, ) { just_inserted_data_shreds.insert((slot, shred_index), shred); - new_index_meta.map(|n| index_working_set.insert(slot, n)); + index_meta_working_set_entry.did_insert_occur = true; true } else { false @@ -1314,22 +1334,24 @@ fn update_slot_meta( fn get_index_meta_entry<'a>( db: &Database, slot: u64, - index_working_set: &'a mut HashMap, -) -> (Option<&'a mut Index>, Option) { + index_working_set: &'a mut HashMap, + index_meta_time: &mut u64, +) -> &'a mut IndexMetaWorkingSetEntry { let index_cf = db.column::(); - index_working_set - .get_mut(&slot) - .map(|i| (Some(i), None)) - .unwrap_or_else(|| { - let newly_inserted_meta = Some( - index_cf - .get(slot) - .unwrap() - .unwrap_or_else(|| Index::new(slot)), - ); - - (None, newly_inserted_meta) - }) + let mut total_start = Measure::start("Total elapsed"); + let res = index_working_set.entry(slot).or_insert_with(|| { + let newly_inserted_meta = index_cf + .get(slot) + .unwrap() + .unwrap_or_else(|| Index::new(slot)); + IndexMetaWorkingSetEntry { + index: newly_inserted_meta, + did_insert_occur: false, + } + }); + total_start.stop(); + *index_meta_time += total_start.as_us(); + res } fn get_slot_meta_entry<'a>(