Rework get_slot_meta (#6642)
* Assert slotmeta is not orphan * Clean up get_slot_meta functionality * Add test
This commit is contained in:
parent
e8e5ddc55d
commit
c52830980a
|
@ -42,7 +42,6 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
|
|||
|
||||
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
||||
|
||||
pub type SlotMetaWorkingSetEntry = (Rc<RefCell<SlotMeta>>, Option<SlotMeta>);
|
||||
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
|
||||
|
||||
// ledger window
|
||||
|
@ -63,6 +62,16 @@ pub struct Blocktree {
|
|||
|
||||
pub struct IndexMetaWorkingSetEntry {
|
||||
index: Index,
|
||||
// true only if at least one shred for this Index was inserted since the time this
|
||||
// struct was created
|
||||
did_insert_occur: bool,
|
||||
}
|
||||
|
||||
pub struct SlotMetaWorkingSetEntry {
|
||||
new_slot_meta: Rc<RefCell<SlotMeta>>,
|
||||
old_slot_meta: Option<SlotMeta>,
|
||||
// True only if at least one shred for this SlotMeta was inserted since the time this
|
||||
// struct was created.
|
||||
did_insert_occur: bool,
|
||||
}
|
||||
|
||||
|
@ -80,6 +89,16 @@ pub struct BlocktreeInsertionMetrics {
|
|||
pub index_meta_time: u64,
|
||||
}
|
||||
|
||||
impl SlotMetaWorkingSetEntry {
|
||||
fn new(new_slot_meta: Rc<RefCell<SlotMeta>>, old_slot_meta: Option<SlotMeta>) -> Self {
|
||||
Self {
|
||||
new_slot_meta,
|
||||
old_slot_meta,
|
||||
did_insert_occur: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlocktreeInsertionMetrics {
|
||||
pub fn report_metrics(&self, metric_name: &'static str) {
|
||||
datapoint_debug!(
|
||||
|
@ -460,8 +479,8 @@ impl Blocktree {
|
|||
}
|
||||
});
|
||||
start.stop();
|
||||
let insert_shreds_elapsed = start.as_us();
|
||||
|
||||
let insert_shreds_elapsed = start.as_us();
|
||||
let mut start = Measure::start("Shred recovery");
|
||||
let mut num_recovered = 0;
|
||||
if let Some(leader_schedule_cache) = leader_schedule {
|
||||
|
@ -493,12 +512,13 @@ impl Blocktree {
|
|||
let shred_recovery_elapsed = start.as_us();
|
||||
|
||||
let mut start = Measure::start("Shred recovery");
|
||||
// Handle chaining for the working set
|
||||
handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?;
|
||||
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
|
||||
// drop the others
|
||||
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
|
||||
start.stop();
|
||||
let chaining_elapsed = start.as_us();
|
||||
|
||||
let mut start = Measure::start("Commit Worknig Sets");
|
||||
let mut start = Measure::start("Commit Working Sets");
|
||||
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
|
||||
&slot_meta_working_set,
|
||||
&self.completed_slots_senders,
|
||||
|
@ -602,41 +622,30 @@ impl Blocktree {
|
|||
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
|
||||
|
||||
let index_meta = &mut index_meta_working_set_entry.index;
|
||||
let (slot_meta_entry, mut new_slot_meta_entry) =
|
||||
let slot_meta_entry =
|
||||
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
|
||||
|
||||
let insert_success = {
|
||||
let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
|
||||
let mut slot_meta = entry.0.borrow_mut();
|
||||
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
|
||||
|
||||
if Blocktree::should_insert_data_shred(
|
||||
&shred,
|
||||
&slot_meta,
|
||||
index_meta.data(),
|
||||
&self.last_root,
|
||||
) {
|
||||
if let Ok(()) = self.insert_data_shred(
|
||||
&mut slot_meta,
|
||||
index_meta.data_mut(),
|
||||
&shred,
|
||||
write_batch,
|
||||
) {
|
||||
just_inserted_data_shreds.insert((slot, shred_index), shred);
|
||||
index_meta_working_set_entry.did_insert_occur = true;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
if Blocktree::should_insert_data_shred(
|
||||
&shred,
|
||||
slot_meta,
|
||||
index_meta.data(),
|
||||
&self.last_root,
|
||||
) {
|
||||
if let Ok(()) =
|
||||
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
|
||||
{
|
||||
just_inserted_data_shreds.insert((slot, shred_index), shred);
|
||||
index_meta_working_set_entry.did_insert_occur = true;
|
||||
slot_meta_entry.did_insert_occur = true;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if insert_success {
|
||||
new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n));
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
||||
insert_success
|
||||
}
|
||||
|
||||
fn should_insert_coding_shred(
|
||||
|
@ -775,7 +784,6 @@ impl Blocktree {
|
|||
) -> Result<()> {
|
||||
let slot = shred.slot();
|
||||
let index = u64::from(shred.index());
|
||||
let parent = shred.parent();
|
||||
|
||||
let last_in_slot = if shred.last_in_slot() {
|
||||
debug!("got last in slot");
|
||||
|
@ -791,9 +799,8 @@ impl Blocktree {
|
|||
false
|
||||
};
|
||||
|
||||
if is_orphan(slot_meta) {
|
||||
slot_meta.parent_slot = parent;
|
||||
}
|
||||
// Parent for slot meta should have been set by this point
|
||||
assert!(!is_orphan(slot_meta));
|
||||
|
||||
let data_cf = self.db.column::<cf::ShredData>();
|
||||
|
||||
|
@ -1351,39 +1358,30 @@ fn get_slot_meta_entry<'a>(
|
|||
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
slot: u64,
|
||||
parent_slot: u64,
|
||||
) -> (
|
||||
Option<&'a mut SlotMetaWorkingSetEntry>,
|
||||
Option<SlotMetaWorkingSetEntry>,
|
||||
) {
|
||||
) -> &'a mut SlotMetaWorkingSetEntry {
|
||||
let meta_cf = db.column::<cf::SlotMeta>();
|
||||
|
||||
// Check if we've already inserted the slot metadata for this blob's slot
|
||||
slot_meta_working_set
|
||||
.get_mut(&slot)
|
||||
.map(|s| (Some(s), None))
|
||||
.unwrap_or_else(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if is_orphan(&meta) {
|
||||
meta.parent_slot = parent_slot;
|
||||
}
|
||||
|
||||
(None, Some((Rc::new(RefCell::new(meta)), backup)))
|
||||
} else {
|
||||
(
|
||||
None,
|
||||
Some((
|
||||
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
|
||||
None,
|
||||
)),
|
||||
)
|
||||
slot_meta_working_set.entry(slot).or_insert_with(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if is_orphan(&meta) {
|
||||
meta.parent_slot = parent_slot;
|
||||
}
|
||||
})
|
||||
|
||||
SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup)
|
||||
} else {
|
||||
SlotMetaWorkingSetEntry::new(
|
||||
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool {
|
||||
|
@ -1437,8 +1435,11 @@ fn commit_slot_meta_working_set(
|
|||
|
||||
// Check if any metadata was changed, if so, insert the new version of the
|
||||
// metadata into the write batch
|
||||
for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() {
|
||||
let meta: &SlotMeta = &RefCell::borrow(&*meta);
|
||||
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
|
||||
// Any slot that wasn't written to should have been filtered out by now.
|
||||
assert!(slot_meta_entry.did_insert_occur);
|
||||
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
|
||||
let meta_backup = &slot_meta_entry.old_slot_meta;
|
||||
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
|
||||
newly_completed_slots.push(*slot);
|
||||
}
|
||||
|
@ -1498,8 +1499,8 @@ fn find_slot_meta_in_cached_state<'a>(
|
|||
chained_slots: &'a HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot: u64,
|
||||
) -> Result<Option<Rc<RefCell<SlotMeta>>>> {
|
||||
if let Some((entry, _)) = working_set.get(&slot) {
|
||||
Ok(Some(entry.clone()))
|
||||
if let Some(entry) = working_set.get(&slot) {
|
||||
Ok(Some(entry.new_slot_meta.clone()))
|
||||
} else if let Some(entry) = chained_slots.get(&slot) {
|
||||
Ok(Some(entry.clone()))
|
||||
} else {
|
||||
|
@ -1511,12 +1512,14 @@ fn find_slot_meta_in_cached_state<'a>(
|
|||
fn handle_chaining(
|
||||
db: &Database,
|
||||
write_batch: &mut WriteBatch,
|
||||
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
) -> Result<()> {
|
||||
// Handle chaining for all the SlotMetas that were inserted into
|
||||
working_set.retain(|_, entry| entry.did_insert_occur);
|
||||
let mut new_chained_slots = HashMap::new();
|
||||
let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect();
|
||||
let working_set_slots: Vec<_> = working_set.keys().collect();
|
||||
for slot in working_set_slots {
|
||||
handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, slot)?;
|
||||
handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?;
|
||||
}
|
||||
|
||||
// Write all the newly changed slots in new_chained_slots to the write_batch
|
||||
|
@ -1534,10 +1537,13 @@ fn handle_chaining_for_slot(
|
|||
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
||||
slot: u64,
|
||||
) -> Result<()> {
|
||||
let (meta, meta_backup) = working_set
|
||||
let slot_meta_entry = working_set
|
||||
.get(&slot)
|
||||
.expect("Slot must exist in the working_set hashmap");
|
||||
|
||||
let meta = &slot_meta_entry.new_slot_meta;
|
||||
let meta_backup = &slot_meta_entry.old_slot_meta;
|
||||
|
||||
{
|
||||
let mut meta_mut = meta.borrow_mut();
|
||||
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
|
||||
|
@ -3772,5 +3778,37 @@ pub mod tests {
|
|||
.expect("Expected successful write of shreds");
|
||||
assert!(blocktree.get_slot_entries(slot, 0, None).is_err());
|
||||
}
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_no_insert_but_modify_slot_meta() {
|
||||
// This tests correctness of the SlotMeta in various cases in which a shred
|
||||
// that gets filtered out by checks
|
||||
let (shreds0, _) = make_slot_entries(0, 0, 200);
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
|
||||
// Insert the first 5 shreds, we don't have a "is_last" shred yet
|
||||
blocktree
|
||||
.insert_shreds(shreds0[0..5].to_vec(), None)
|
||||
.unwrap();
|
||||
|
||||
// Insert a repetitive shred for slot 's', should get ignored, but also
|
||||
// insert shreds that chains to 's', should see the update in the SlotMeta
|
||||
// for 's'.
|
||||
let (mut shreds2, _) = make_slot_entries(2, 0, 200);
|
||||
let (mut shreds3, _) = make_slot_entries(3, 0, 200);
|
||||
shreds2.push(shreds0[1].clone());
|
||||
shreds3.insert(0, shreds0[1].clone());
|
||||
blocktree.insert_shreds(shreds2, None).unwrap();
|
||||
let slot_meta = blocktree.meta(0).unwrap().unwrap();
|
||||
assert_eq!(slot_meta.next_slots, vec![2]);
|
||||
blocktree.insert_shreds(shreds3, None).unwrap();
|
||||
let slot_meta = blocktree.meta(0).unwrap().unwrap();
|
||||
assert_eq!(slot_meta.next_slots, vec![2, 3]);
|
||||
}
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue