add merkle root meta column to blockstore (#33979)

* add merkle root meta column to blockstore

* pr feedback: remove write/reads to column

* pr feedback: u64 -> u32 + revert

* pr feedback: fec_set_index u32, use Self::Index

* pr feedback: key size 16 -> 12
This commit is contained in:
Ashwin Sekar 2023-11-11 21:14:18 -05:00 committed by GitHub
parent 04e4efd8ae
commit e457c02879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 0 deletions

View File

@ -215,6 +215,7 @@ pub struct Blockstore {
bank_hash_cf: LedgerColumn<cf::BankHash>,
optimistic_slots_cf: LedgerColumn<cf::OptimisticSlots>,
max_root: AtomicU64,
merkle_root_meta_cf: LedgerColumn<cf::MerkleRootMeta>,
insert_shreds_lock: Mutex<()>,
new_shreds_signals: Mutex<Vec<Sender<bool>>>,
completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
@ -315,6 +316,7 @@ impl Blockstore {
let program_costs_cf = db.column();
let bank_hash_cf = db.column();
let optimistic_slots_cf = db.column();
let merkle_root_meta_cf = db.column();
let db = Arc::new(db);
@ -352,6 +354,7 @@ impl Blockstore {
program_costs_cf,
bank_hash_cf,
optimistic_slots_cf,
merkle_root_meta_cf,
new_shreds_signals: Mutex::default(),
completed_slots_senders: Mutex::default(),
shred_timing_point_sender: None,
@ -711,6 +714,7 @@ impl Blockstore {
self.program_costs_cf.submit_rocksdb_cf_metrics();
self.bank_hash_cf.submit_rocksdb_cf_metrics();
self.optimistic_slots_cf.submit_rocksdb_cf_metrics();
self.merkle_root_meta_cf.submit_rocksdb_cf_metrics();
}
fn try_shred_recovery(

View File

@ -220,6 +220,10 @@ impl Blockstore {
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot)
.is_ok();
match purge_type {
PurgeType::Exact => {
@ -329,6 +333,10 @@ impl Blockstore {
.db
.delete_file_in_range_cf::<cf::OptimisticSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::MerkleRootMeta>(from_slot, to_slot)
.is_ok()
}
/// Returns true if the special columns, TransactionStatus and

View File

@ -2,6 +2,7 @@ pub use rocksdb::Direction as IteratorDirection;
use {
crate::{
blockstore_meta,
blockstore_meta::MerkleRootMeta,
blockstore_metrics::{
maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf,
BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET,
@ -103,6 +104,8 @@ const BLOCK_HEIGHT_CF: &str = "block_height";
const PROGRAM_COSTS_CF: &str = "program_costs";
/// Column family for optimistic slots
const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots";
/// Column family for merkle roots
const MERKLE_ROOT_META_CF: &str = "merkle_root_meta";
#[derive(Error, Debug)]
pub enum BlockstoreError {
@ -339,6 +342,19 @@ pub mod columns {
/// * value type: [`blockstore_meta::OptimisticSlotMetaVersioned`]
pub struct OptimisticSlots;
#[derive(Debug)]
/// The merkle root meta column
///
/// Each merkle shred is part of a merkle tree for
/// its FEC set. This column stores that merkle root and associated
/// meta information about the first shred received.
///
/// Its index type is (Slot, fec_set_index).
///
/// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u32)`
/// * value type: [`blockstore_meta::MerkleRootMeta`]`
pub struct MerkleRootMeta;
// When adding a new column ...
// - Add struct below and implement `Column` and `ColumnName` traits
// - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns()
@ -474,6 +490,7 @@ impl Rocks {
new_cf_descriptor::<BlockHeight>(options, oldest_slot),
new_cf_descriptor::<ProgramCosts>(options, oldest_slot),
new_cf_descriptor::<OptimisticSlots>(options, oldest_slot),
new_cf_descriptor::<MerkleRootMeta>(options, oldest_slot),
]
}
@ -501,6 +518,7 @@ impl Rocks {
BlockHeight::NAME,
ProgramCosts::NAME,
OptimisticSlots::NAME,
MerkleRootMeta::NAME,
]
}
@ -1227,6 +1245,39 @@ impl TypedColumn for columns::OptimisticSlots {
type Type = blockstore_meta::OptimisticSlotMetaVersioned;
}
impl Column for columns::MerkleRootMeta {
type Index = (Slot, /*fec_set_index:*/ u32);
fn index(key: &[u8]) -> Self::Index {
let slot = BigEndian::read_u64(&key[..8]);
let fec_set_index = BigEndian::read_u32(&key[8..]);
(slot, fec_set_index)
}
fn key((slot, fec_set_index): Self::Index) -> Vec<u8> {
let mut key = vec![0; 12];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u32(&mut key[8..], fec_set_index);
key
}
fn slot((slot, _fec_set_index): Self::Index) -> Slot {
slot
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl ColumnName for columns::MerkleRootMeta {
const NAME: &'static str = MERKLE_ROOT_META_CF;
}
impl TypedColumn for columns::MerkleRootMeta {
type Type = MerkleRootMeta;
}
#[derive(Debug)]
pub struct Database {
backend: Arc<Rocks>,

View File

@ -138,6 +138,16 @@ pub(crate) struct ErasureConfig {
num_coding: usize,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MerkleRootMeta {
/// The merkle root
merkle_root: Hash,
/// The first received shred index
first_received_shred_index: u32,
/// The shred type of the first received shred
first_received_shred_type: ShredType,
}
#[derive(Deserialize, Serialize)]
pub struct DuplicateSlotProof {
#[serde(with = "serde_bytes")]