Rework get_index_meta (#6636)

This commit is contained in:
carllin 2019-10-30 16:48:59 -07:00 committed by GitHub
parent 7bb224f54a
commit 6454bfe754
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 51 additions and 29 deletions

View File

@ -61,6 +61,11 @@ pub struct Blocktree {
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>, pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
} }
pub struct IndexMetaWorkingSetEntry {
index: Index,
did_insert_occur: bool,
}
pub struct BlocktreeInsertionMetrics { pub struct BlocktreeInsertionMetrics {
pub num_shreds: usize, pub num_shreds: usize,
pub insert_lock_elapsed: u64, pub insert_lock_elapsed: u64,
@ -72,6 +77,7 @@ pub struct BlocktreeInsertionMetrics {
pub total_elapsed: u64, pub total_elapsed: u64,
pub num_inserted: u64, pub num_inserted: u64,
pub num_recovered: usize, pub num_recovered: usize,
pub index_meta_time: u64,
} }
impl BlocktreeInsertionMetrics { impl BlocktreeInsertionMetrics {
@ -305,7 +311,7 @@ impl Blocktree {
fn try_shred_recovery( fn try_shred_recovery(
db: &Database, db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>, erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, Index>, index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
) -> Vec<Shred> { ) -> Vec<Shred> {
@ -330,7 +336,8 @@ impl Blocktree {
); );
}; };
let index = index_working_set.get(&slot).expect("Index"); let index_meta_entry = index_working_set.get(&slot).expect("Index");
let index = &index_meta_entry.index;
match erasure_meta.status(&index) { match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => { ErasureMetaStatus::CanRecover => {
// Find shreds for this erasure set and try recovery // Find shreds for this erasure set and try recovery
@ -423,6 +430,7 @@ impl Blocktree {
let num_shreds = shreds.len(); let num_shreds = shreds.len();
let mut start = Measure::start("Shred insertion"); let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0; let mut num_inserted = 0;
let mut index_meta_time = 0;
shreds.into_iter().for_each(|shred| { shreds.into_iter().for_each(|shred| {
let insert_success = { let insert_success = {
if shred.is_data() { if shred.is_data() {
@ -432,6 +440,7 @@ impl Blocktree {
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
&mut just_inserted_data_shreds, &mut just_inserted_data_shreds,
&mut index_meta_time,
) )
} else if shred.is_code() { } else if shred.is_code() {
self.check_insert_coding_shred( self.check_insert_coding_shred(
@ -440,6 +449,7 @@ impl Blocktree {
&mut index_working_set, &mut index_working_set,
&mut write_batch, &mut write_batch,
&mut just_inserted_coding_shreds, &mut just_inserted_coding_shreds,
&mut index_meta_time,
) )
} else { } else {
panic!("There should be no other case"); panic!("There should be no other case");
@ -473,6 +483,7 @@ impl Blocktree {
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
&mut just_inserted_coding_shreds, &mut just_inserted_coding_shreds,
&mut index_meta_time,
); );
} }
} }
@ -498,8 +509,10 @@ impl Blocktree {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?; write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
} }
for (&slot, index) in index_working_set.iter() { for (&slot, index_working_set_entry) in index_working_set.iter() {
write_batch.put::<cf::Index>(slot, index)?; if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
}
} }
start.stop(); start.stop();
let commit_working_sets_elapsed = start.as_us(); let commit_working_sets_elapsed = start.as_us();
@ -535,6 +548,7 @@ impl Blocktree {
write_batch_elapsed, write_batch_elapsed,
num_inserted, num_inserted,
num_recovered, num_recovered,
index_meta_time,
}) })
} }
@ -542,26 +556,29 @@ impl Blocktree {
&self, &self,
shred: Shred, shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>, index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool { ) -> bool {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
let (index_meta, mut new_index_meta) = let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set); get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); let index_meta = &mut index_meta_working_set_entry.index;
// This gives the index of first coding shred in this FEC block // This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index // So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
.map(|_| { .map(|_| {
// Insert was a success!
just_inserted_coding_shreds just_inserted_coding_shreds
.entry((slot, shred_index)) .entry((slot, shred_index))
.or_insert_with(|| shred); .or_insert_with(|| shred);
new_index_meta.map(|n| index_working_set.insert(slot, n))
index_meta_working_set_entry.did_insert_occur = true;
}) })
.is_ok() .is_ok()
} else { } else {
@ -572,20 +589,23 @@ impl Blocktree {
fn check_insert_data_shred( fn check_insert_data_shred(
&self, &self,
shred: Shred, shred: Shred,
index_working_set: &mut HashMap<u64, Index>, index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>, slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool { ) -> bool {
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
let (index_meta, mut new_index_meta) =
get_index_meta_entry(&self.db, slot, index_working_set); let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
let (slot_meta_entry, mut new_slot_meta_entry) = let (slot_meta_entry, mut new_slot_meta_entry) =
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
let insert_success = { let insert_success = {
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
let mut slot_meta = entry.0.borrow_mut(); let mut slot_meta = entry.0.borrow_mut();
@ -602,7 +622,7 @@ impl Blocktree {
write_batch, write_batch,
) { ) {
just_inserted_data_shreds.insert((slot, shred_index), shred); just_inserted_data_shreds.insert((slot, shred_index), shred);
new_index_meta.map(|n| index_working_set.insert(slot, n)); index_meta_working_set_entry.did_insert_occur = true;
true true
} else { } else {
false false
@ -1314,22 +1334,24 @@ fn update_slot_meta(
fn get_index_meta_entry<'a>( fn get_index_meta_entry<'a>(
db: &Database, db: &Database,
slot: u64, slot: u64,
index_working_set: &'a mut HashMap<u64, Index>, index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
) -> (Option<&'a mut Index>, Option<Index>) { index_meta_time: &mut u64,
) -> &'a mut IndexMetaWorkingSetEntry {
let index_cf = db.column::<cf::Index>(); let index_cf = db.column::<cf::Index>();
index_working_set let mut total_start = Measure::start("Total elapsed");
.get_mut(&slot) let res = index_working_set.entry(slot).or_insert_with(|| {
.map(|i| (Some(i), None)) let newly_inserted_meta = index_cf
.unwrap_or_else(|| { .get(slot)
let newly_inserted_meta = Some( .unwrap()
index_cf .unwrap_or_else(|| Index::new(slot));
.get(slot) IndexMetaWorkingSetEntry {
.unwrap() index: newly_inserted_meta,
.unwrap_or_else(|| Index::new(slot)), did_insert_occur: false,
); }
});
(None, newly_inserted_meta) total_start.stop();
}) *index_meta_time += total_start.as_us();
res
} }
fn get_slot_meta_entry<'a>( fn get_slot_meta_entry<'a>(