blockstore: populate merkle root metas column (#34097)

This commit is contained in:
Ashwin Sekar 2023-11-29 11:14:24 -05:00 committed by GitHub
parent 1f00f8e29c
commit e1165aaf00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 430 additions and 9 deletions

View File

@ -467,6 +467,10 @@ impl Blockstore {
self.erasure_meta_cf.get(erasure_set.store_key())
}
fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result<Option<MerkleRootMeta>> {
self.merkle_root_meta_cf.get(erasure_set.key())
}
/// Check whether the specified slot is an orphan slot which does not
/// have a parent slot.
///
@ -801,6 +805,9 @@ impl Blockstore {
/// - [`cf::ErasureMeta`]: the associated ErasureMeta of the coding and data
/// shreds inside `shreds` will be updated and committed to
/// `cf::ErasureMeta`.
/// - [`cf::MerkleRootMeta`]: the associated MerkleRootMeta of the coding and data
/// shreds inside `shreds` will be updated and committed to
/// `cf::MerkleRootMeta`.
/// - [`cf::Index`]: stores (slot id, index to the index_working_set_entry)
/// pair to the `cf::Index` column family for each index_working_set_entry
/// which insert did occur in this function call.
@ -843,6 +850,7 @@ impl Blockstore {
let mut just_inserted_shreds = HashMap::with_capacity(shreds.len());
let mut erasure_metas = HashMap::new();
let mut merkle_root_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let mut duplicate_shreds = vec![];
@ -862,6 +870,7 @@ impl Blockstore {
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
@ -899,6 +908,7 @@ impl Blockstore {
self.check_insert_coding_shred(
shred,
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_shreds,
@ -945,6 +955,7 @@ impl Blockstore {
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
@ -1010,6 +1021,10 @@ impl Blockstore {
write_batch.put::<cf::ErasureMeta>(erasure_set.store_key(), &erasure_meta)?;
}
for (erasure_set, merkle_root_meta) in merkle_root_metas {
write_batch.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)?;
}
for (&slot, index_working_set_entry) in index_working_set.iter() {
if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
@ -1167,6 +1182,7 @@ impl Blockstore {
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
@ -1183,10 +1199,16 @@ impl Blockstore {
self.get_index_meta_entry(slot, index_working_set, index_meta_time_us);
let index_meta = &mut index_meta_working_set_entry.index;
let erasure_set = shred.erasure_set();
if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
}
}
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if !is_trusted {
if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1;
@ -1200,7 +1222,6 @@ impl Blockstore {
}
}
let erasure_set = shred.erasure_set();
let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
@ -1263,6 +1284,10 @@ impl Blockstore {
if result {
index_meta_working_set_entry.did_insert_occur = true;
metrics.num_inserted += 1;
merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
}
if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
@ -1311,8 +1336,8 @@ impl Blockstore {
///
/// The resulting `write_batch` may include updates to [`cf::DeadSlots`]
/// and [`cf::ShredData`]. Note that it will also update the in-memory copy
/// of `erasure_metas` and `index_working_set`, which will later be
/// used to update other column families such as [`cf::ErasureMeta`] and
/// of `erasure_metas`, `merkle_root_metas`, and `index_working_set`, which will
/// later be used to update other column families such as [`cf::ErasureMeta`] and
/// [`cf::Index`].
///
/// Arguments:
@ -1320,6 +1345,9 @@ impl Blockstore {
/// - `erasure_metas`: the in-memory hash-map that maintains the dirty
/// copy of the erasure meta. It will later be written to
/// `cf::ErasureMeta` in insert_shreds_handle_duplicate().
/// - `merkle_root_metas`: the in-memory hash-map that maintains the dirty
/// copy of the merkle root meta. It will later be written to
/// `cf::MerkleRootMeta` in `insert_shreds_handle_duplicate()`.
/// - `index_working_set`: the in-memory hash-map that maintains the
/// dirty copy of the index meta. It will later be written to
/// `cf::Index` in insert_shreds_handle_duplicate().
@ -1343,6 +1371,7 @@ impl Blockstore {
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
@ -1368,6 +1397,12 @@ impl Blockstore {
);
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
let erasure_set = shred.erasure_set();
if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
}
}
if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) {
@ -1402,7 +1437,6 @@ impl Blockstore {
}
}
let erasure_set = shred.erasure_set();
let newly_completed_data_sets = self.insert_data_shred(
slot_meta,
index_meta.data_mut(),
@ -1410,6 +1444,9 @@ impl Blockstore {
write_batch,
shred_source,
)?;
merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
@ -6735,6 +6772,374 @@ pub mod tests {
),);
}
#[test]
fn test_merkle_root_metas_coding() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let parent_slot = 0;
let slot = 1;
let index = 0;
let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10);
let coding_shred = coding_shreds[index as usize].clone();
let mut erasure_metas = HashMap::new();
let mut merkle_root_metas = HashMap::new();
let mut index_working_set = HashMap::new();
let mut just_received_shreds = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
let mut index_meta_time_us = 0;
assert!(blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&mut vec![],
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
assert_eq!(merkle_root_metas.len(), 1);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.merkle_root(),
coding_shred.merkle_root().ok(),
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.first_received_shred_type(),
ShredType::Code,
);
for (erasure_set, merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
// Add a shred with different merkle root and index
let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10);
let new_coding_shred = coding_shreds[(index + 1) as usize].clone();
erasure_metas.clear();
index_working_set.clear();
just_received_shreds.clear();
let mut merkle_root_metas = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
let mut duplicates = vec![];
assert!(blockstore.check_insert_coding_shred(
new_coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&mut duplicates,
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
// Verify that we still have the merkle root meta from the original shred
assert_eq!(merkle_root_metas.len(), 1);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
// Blockstore should also have the merkle root meta of the original shred
assert_eq!(
blockstore
.merkle_root_meta(coding_shred.erasure_set())
.unwrap()
.unwrap()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
blockstore
.merkle_root_meta(coding_shred.erasure_set())
.unwrap()
.unwrap()
.first_received_shred_index(),
index
);
// Add a shred from different fec set
let new_index = index + 31;
let (_, coding_shreds, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, new_index);
let new_coding_shred = coding_shreds[0].clone();
assert!(blockstore.check_insert_coding_shred(
new_coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&mut vec![],
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
// Verify that we still have the merkle root meta for the original shred
// and the new shred
assert_eq!(merkle_root_metas.len(), 2);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.merkle_root(),
new_coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
new_index
);
}
#[test]
fn test_merkle_root_metas_data() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let parent_slot = 0;
let slot = 1;
let index = 11;
let fec_set_index = 11;
let (data_shreds, _, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let data_shred = data_shreds[0].clone();
let mut erasure_metas = HashMap::new();
let mut merkle_root_metas = HashMap::new();
let mut index_working_set = HashMap::new();
let mut just_received_shreds = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
let mut index_meta_time_us = 0;
blockstore
.check_insert_data_shred(
data_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
false,
&mut vec![],
None,
ShredSource::Turbine,
)
.unwrap();
assert_eq!(merkle_root_metas.len(), 1);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.first_received_shred_type(),
ShredType::Data,
);
for (erasure_set, merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
// Add a shred with different merkle root and index
let (data_shreds, _, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let new_data_shred = data_shreds[1].clone();
erasure_metas.clear();
index_working_set.clear();
just_received_shreds.clear();
let mut merkle_root_metas = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
let mut duplicates = vec![];
assert!(blockstore
.check_insert_data_shred(
new_data_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
false,
&mut duplicates,
None,
ShredSource::Turbine,
)
.is_ok());
// Verify that we still have the merkle root meta from the original shred
assert_eq!(merkle_root_metas.len(), 1);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
// Blockstore should also have the merkle root meta of the original shred
assert_eq!(
blockstore
.merkle_root_meta(data_shred.erasure_set())
.unwrap()
.unwrap()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
blockstore
.merkle_root_meta(data_shred.erasure_set())
.unwrap()
.unwrap()
.first_received_shred_index(),
index
);
// Add a shred from different fec set
let new_index = fec_set_index + 31;
let new_data_shred = Shred::new_from_data(
slot,
new_index,
1, // parent_offset
&[3, 3, 3], // data
ShredFlags::empty(),
0, // reference_tick,
0, // version
fec_set_index + 30,
);
blockstore
.check_insert_data_shred(
new_data_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
false,
&mut vec![],
None,
ShredSource::Turbine,
)
.unwrap();
// Verify that we still have the merkle root meta for the original shred
// and the new shred
assert_eq!(merkle_root_metas.len(), 2);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.merkle_root(),
new_data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.first_received_shred_index(),
new_index
);
}
#[test]
fn test_check_insert_coding_shred() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -6753,6 +7158,7 @@ pub mod tests {
);
let mut erasure_metas = HashMap::new();
let mut merkle_root_metas = HashMap::new();
let mut index_working_set = HashMap::new();
let mut just_received_shreds = HashMap::new();
let mut write_batch = blockstore.db.batch().unwrap();
@ -6760,6 +7166,7 @@ pub mod tests {
assert!(blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
@ -6775,6 +7182,7 @@ pub mod tests {
assert!(!blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
@ -9262,6 +9670,15 @@ pub mod tests {
slot: u64,
parent_slot: u64,
num_entries: u64,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
setup_erasure_shreds_with_index(slot, parent_slot, num_entries, 0)
}
fn setup_erasure_shreds_with_index(
slot: u64,
parent_slot: u64,
num_entries: u64,
fec_set_index: u32,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
@ -9269,10 +9686,10 @@ pub mod tests {
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&leader_keypair,
&entries,
true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
true, // is_last_in_slot
fec_set_index, // next_shred_index
fec_set_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);

View File

@ -287,6 +287,10 @@ impl ErasureSetId {
pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) {
(self.0, u64::from(self.1))
}
pub(crate) fn key(&self) -> (Slot, /*fec_set_index:*/ u32) {
(self.0, self.1)
}
}
macro_rules! dispatch {