From 4b5838c5008e75988102e5e8f8f1fddc5db9eba0 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 19 Dec 2023 03:33:49 +1100 Subject: [PATCH] Make sure scanner database is accessed using the correct types (#8112) * impl TryFrom for Height * Add type-safe read and write database methods * Only allow typed access to the scanner DB * Update docs * Implement a common method as a trait * Fix imports * Tidy state imports * Activate tracing logging macros in the whole scanner crate * Fix dead code warnings --- zebra-chain/src/block/height.rs | 9 + zebra-scan/src/lib.rs | 3 + zebra-scan/src/scan.rs | 1 - zebra-scan/src/storage.rs | 49 +-- zebra-scan/src/storage/db.rs | 29 +- zebra-scan/src/storage/db/sapling.rs | 159 ++++++---- zebra-state/src/lib.rs | 22 +- zebra-state/src/service/finalized_state.rs | 4 + .../service/finalized_state/column_family.rs | 293 ++++++++++++++++++ .../src/service/finalized_state/disk_db.rs | 85 ++++- 10 files changed, 500 insertions(+), 154 deletions(-) create mode 100644 zebra-state/src/service/finalized_state/column_family.rs diff --git a/zebra-chain/src/block/height.rs b/zebra-chain/src/block/height.rs index e13f03f08..71f664c75 100644 --- a/zebra-chain/src/block/height.rs +++ b/zebra-chain/src/block/height.rs @@ -112,6 +112,15 @@ impl From for BlockHeight { } } +impl TryFrom for Height { + type Error = &'static str; + + /// Checks that the `height` is within the valid [`Height`] range. + fn try_from(height: BlockHeight) -> Result { + Self::try_from(u32::from(height)) + } +} + /// A difference between two [`Height`]s, possibly negative. /// /// This can represent the difference between any height values, diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index eee89247c..1426aa2d5 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -4,6 +4,9 @@ #![doc(html_logo_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-icon.png")] #![doc(html_root_url = "https://docs.rs/zebra_scan")] +#[macro_use] +extern crate tracing; + pub mod config; pub mod init; pub mod scan; diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs index 7c4f96b73..158fadadd 100644 --- a/zebra-scan/src/scan.rs +++ b/zebra-scan/src/scan.rs @@ -9,7 +9,6 @@ use std::{ use color_eyre::{eyre::eyre, Report}; use itertools::Itertools; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; -use tracing::info; use zcash_client_backend::{ data_api::ScannedBlock, diff --git a/zebra-scan/src/storage.rs b/zebra-scan/src/storage.rs index 745d09830..61a51a29d 100644 --- a/zebra-scan/src/storage.rs +++ b/zebra-scan/src/storage.rs @@ -6,9 +6,7 @@ use zebra_chain::{ block::Height, parameters::{Network, NetworkUpgrade}, }; -use zebra_state::{ - SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, TransactionIndex, TransactionLocation, -}; +use zebra_state::TransactionIndex; use crate::config::Config; @@ -17,11 +15,9 @@ pub mod db; // Public types and APIs pub use db::{SaplingScannedResult, SaplingScanningKey}; -use self::db::ScannerWriteBatch; - /// We insert an empty results entry to the database every this interval for each stored key, /// so we can track progress. -const INSERT_CONTROL_INTERVAL: u32 = 1_000; +pub const INSERT_CONTROL_INTERVAL: u32 = 1_000; /// Store key info and results of the scan. /// @@ -84,11 +80,7 @@ impl Storage { // It's ok to write some keys and not others during shutdown, so each key can get its own // batch. (They will be re-written on startup anyway.) - let mut batch = ScannerWriteBatch::default(); - - batch.insert_sapling_key(self, sapling_key, birthday); - - self.write_batch(batch); + self.insert_sapling_key(sapling_key, birthday); } /// Returns all the keys and their last scanned heights. @@ -104,6 +96,9 @@ impl Storage { /// Add the sapling results for `height` to the storage. The results can be any map of /// [`TransactionIndex`] to [`SaplingScannedResult`]. /// + /// All the results for the same height must be written at the same time, to avoid partial + /// writes during shutdown. + /// /// Also adds empty progress tracking entries every `INSERT_CONTROL_INTERVAL` blocks if needed. /// /// # Performance / Hangs @@ -116,37 +111,7 @@ impl Storage { height: Height, sapling_results: BTreeMap, ) { - // We skip heights that have one or more results, so the results for each height must be - // in a single batch. - let mut batch = ScannerWriteBatch::default(); - - // Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key - // so we can track progress made in the last interval even if no transaction was yet found. - let needs_control_entry = - height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty(); - - // Add scanner progress tracking entry for key. - // Defensive programming: add the tracking entry first, so that we don't accidentally - // overwrite real results with it. (This is currently prevented by the empty check.) - if needs_control_entry { - batch.insert_sapling_height(self, sapling_key, height); - } - - for (index, sapling_result) in sapling_results { - let index = SaplingScannedDatabaseIndex { - sapling_key: sapling_key.clone(), - tx_loc: TransactionLocation::from_parts(height, index), - }; - - let entry = SaplingScannedDatabaseEntry { - index, - value: Some(sapling_result), - }; - - batch.insert_sapling_result(self, entry); - } - - self.write_batch(batch); + self.insert_sapling_results(sapling_key, height, sapling_results) } /// Returns all the results for a sapling key, for every scanned block height. diff --git a/zebra-scan/src/storage/db.rs b/zebra-scan/src/storage/db.rs index a99278bcd..784ffb3d7 100644 --- a/zebra-scan/src/storage/db.rs +++ b/zebra-scan/src/storage/db.rs @@ -5,7 +5,6 @@ use std::path::Path; use semver::Version; use zebra_chain::parameters::Network; -use zebra_state::{DiskWriteBatch, ReadDisk}; use crate::Config; @@ -86,7 +85,7 @@ impl Storage { // Report where we are for each key in the database. let keys = new_storage.sapling_keys_last_heights(); for (key_num, (_key, height)) in keys.iter().enumerate() { - tracing::info!( + info!( "Last scanned height for key number {} is {}, resuming at {}", key_num, height.as_usize(), @@ -94,7 +93,7 @@ impl Storage { ); } - tracing::info!("loaded Zebra scanner cache"); + info!("loaded Zebra scanner cache"); new_storage } @@ -134,28 +133,6 @@ impl Storage { /// Returns true if the database is empty. pub fn is_empty(&self) -> bool { // Any column family that is populated at (or near) startup can be used here. - self.db.zs_is_empty(&self.sapling_tx_ids_cf()) - } -} - -// General writing - -/// Wrapper type for scanner database writes. -#[must_use = "batches must be written to the database"] -#[derive(Default)] -pub struct ScannerWriteBatch(pub DiskWriteBatch); - -// Redirect method calls to DiskWriteBatch for convenience. -impl std::ops::Deref for ScannerWriteBatch { - type Target = DiskWriteBatch; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for ScannerWriteBatch { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + self.sapling_tx_ids_cf().zs_is_empty() } } diff --git a/zebra-scan/src/storage/db/sapling.rs b/zebra-scan/src/storage/db/sapling.rs index eb561a11c..f49550dc5 100644 --- a/zebra-scan/src/storage/db/sapling.rs +++ b/zebra-scan/src/storage/db/sapling.rs @@ -2,9 +2,9 @@ //! //! The sapling scanner database has the following format: //! -//! | name | key | value | -//! |------------------|-------------------------------|--------------------------| -//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Option` | +//! | name | Reading & Writing Key/Values | +//! |--------------------|-------------------------------------------------| +//! | [`SAPLING_TX_IDS`] | [`SaplingTxIdsCf`] & [`WriteSaplingTxIdsBatch`] | //! //! And types: //! `SaplingScannedResult`: same as `transaction::Hash`, but with bytes in display order. @@ -30,32 +30,42 @@ use itertools::Itertools; use zebra_chain::block::Height; use zebra_state::{ - AsColumnFamilyRef, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, - SaplingScannedResult, SaplingScanningKey, TransactionIndex, WriteDisk, + SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult, + SaplingScanningKey, TransactionIndex, TransactionLocation, TypedColumnFamily, WriteTypedBatch, }; -use crate::storage::Storage; - -use super::ScannerWriteBatch; +use crate::storage::{Storage, INSERT_CONTROL_INTERVAL}; /// The name of the sapling transaction IDs result column family. /// /// This constant should be used so the compiler can detect typos. pub const SAPLING_TX_IDS: &str = "sapling_tx_ids"; +/// The type for reading sapling transaction IDs results from the database. +/// +/// This constant should be used so the compiler can detect incorrectly typed accesses to the +/// column family. +pub type SaplingTxIdsCf<'cf> = + TypedColumnFamily<'cf, SaplingScannedDatabaseIndex, Option>; + +/// The type for writing sapling transaction IDs results from the database. +/// +/// This constant should be used so the compiler can detect incorrectly typed accesses to the +/// column family. +pub type WriteSaplingTxIdsBatch<'cf> = + WriteTypedBatch<'cf, SaplingScannedDatabaseIndex, Option>; + impl Storage { // Reading Sapling database entries /// Returns the result for a specific database index (key, block height, transaction index). /// Returns `None` if the result is missing or an empty marker for a birthday or progress /// height. - // - // TODO: add tests for this method pub fn sapling_result_for_index( &self, index: &SaplingScannedDatabaseIndex, ) -> Option { - self.db.zs_get(&self.sapling_tx_ids_cf(), &index).flatten() + self.sapling_tx_ids_cf().zs_get(index).flatten() } /// Returns the results for a specific key and block height. @@ -102,10 +112,7 @@ impl Storage { let sapling_tx_ids = self.sapling_tx_ids_cf(); let mut keys = HashMap::new(); - let mut last_stored_record: Option<( - SaplingScannedDatabaseIndex, - Option, - )> = self.db.zs_last_key_value(&sapling_tx_ids); + let mut last_stored_record = sapling_tx_ids.zs_last_key_value(); while let Some((last_stored_record_index, _result)) = last_stored_record { let sapling_key = last_stored_record_index.sapling_key.clone(); @@ -119,8 +126,7 @@ impl Storage { ); // Skip all the results until the next key. - last_stored_record = self.db.zs_prev_key_value_strictly_before( - &sapling_tx_ids, + last_stored_record = sapling_tx_ids.zs_prev_key_value_strictly_before( &SaplingScannedDatabaseIndex::min_for_key(&sapling_key), ); } @@ -135,43 +141,63 @@ impl Storage { &self, range: impl RangeBounds, ) -> BTreeMap> { - self.db - .zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), range) + self.sapling_tx_ids_cf().zs_items_in_range_ordered(range) } // Column family convenience methods - /// Returns a handle to the `sapling_tx_ids` column family. - pub(crate) fn sapling_tx_ids_cf(&self) -> impl AsColumnFamilyRef + '_ { - self.db - .cf_handle(SAPLING_TX_IDS) + /// Returns a typed handle to the `sapling_tx_ids` column family. + pub(crate) fn sapling_tx_ids_cf(&self) -> SaplingTxIdsCf { + SaplingTxIdsCf::new(&self.db, SAPLING_TX_IDS) .expect("column family was created when database was created") } - // Writing batches + // Writing database entries + // + // To avoid exposing internal types, and accidentally forgetting to write a batch, + // each pub(crate) write method should write an entire batch. - /// Write `batch` to the database for this storage. - pub(crate) fn write_batch(&self, batch: ScannerWriteBatch) { - // Just panic on errors for now. - self.db - .write_batch(batch.0) - .expect("unexpected database error") - } -} - -// Writing database entries -// -// TODO: split the write type into state and scanner, so we can't call state write methods on -// scanner databases -impl ScannerWriteBatch { - /// Inserts a scanned sapling result for a key and height. - /// If a result already exists for that key and height, it is replaced. - pub(crate) fn insert_sapling_result( + /// Inserts a batch of scanned sapling result for a key and height. + /// If a result already exists for that key, height, and index, it is replaced. + pub(crate) fn insert_sapling_results( &mut self, - storage: &Storage, - entry: SaplingScannedDatabaseEntry, + sapling_key: &SaplingScanningKey, + height: Height, + sapling_results: BTreeMap, ) { - self.zs_insert(&storage.sapling_tx_ids_cf(), entry.index, entry.value); + // We skip key heights that have one or more results, so the results for each key height + // must be in a single batch. + let mut batch = self.sapling_tx_ids_cf().for_writing(); + + // Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key + // so we can track progress made in the last interval even if no transaction was yet found. + let needs_control_entry = + height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty(); + + // Add scanner progress tracking entry for key. + // Defensive programming: add the tracking entry first, so that we don't accidentally + // overwrite real results with it. (This is currently prevented by the empty check.) + if needs_control_entry { + batch = batch.insert_sapling_height(sapling_key, height); + } + + for (index, sapling_result) in sapling_results { + let index = SaplingScannedDatabaseIndex { + sapling_key: sapling_key.clone(), + tx_loc: TransactionLocation::from_parts(height, index), + }; + + let entry = SaplingScannedDatabaseEntry { + index, + value: Some(sapling_result), + }; + + batch = batch.zs_insert(entry.index, entry.value); + } + + batch + .write_batch() + .expect("unexpected database write failure"); } /// Insert a sapling scanning `key`, and mark all heights before `birthday_height` so they @@ -184,11 +210,10 @@ impl ScannerWriteBatch { /// TODO: ignore incorrect changes to birthday heights pub(crate) fn insert_sapling_key( &mut self, - storage: &Storage, sapling_key: &SaplingScanningKey, birthday_height: Option, ) { - let min_birthday_height = storage.min_sapling_birthday_height(); + let min_birthday_height = self.min_sapling_birthday_height(); // The birthday height must be at least the minimum height for that pool. let birthday_height = birthday_height @@ -197,19 +222,33 @@ impl ScannerWriteBatch { // And we want to skip up to the height before it. let skip_up_to_height = birthday_height.previous().unwrap_or(Height::MIN); - let index = - SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height); - self.zs_insert(&storage.sapling_tx_ids_cf(), index, None); - } - - /// Insert sapling height with no results - pub(crate) fn insert_sapling_height( - &mut self, - storage: &Storage, - sapling_key: &SaplingScanningKey, - height: Height, - ) { - let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height); - self.zs_insert(&storage.sapling_tx_ids_cf(), index, None); + // It's ok to write some keys and not others during shutdown, so each key can get its own + // batch. (They will be re-written on startup anyway.) + // + // TODO: ignore incorrect changes to birthday heights, + // and redundant birthday heights + self.sapling_tx_ids_cf() + .for_writing() + .insert_sapling_height(sapling_key, skip_up_to_height) + .write_batch() + .expect("unexpected database write failure"); + } +} + +/// Utility trait for inserting sapling heights into a WriteSaplingTxIdsBatch. +trait InsertSaplingHeight { + fn insert_sapling_height(self, sapling_key: &SaplingScanningKey, height: Height) -> Self; +} + +impl<'cf> InsertSaplingHeight for WriteSaplingTxIdsBatch<'cf> { + /// Insert sapling height with no results. + /// + /// If a result already exists for the coinbase transaction at that height, + /// it is replaced with an empty result. This should never happen. + fn insert_sapling_height(self, sapling_key: &SaplingScanningKey, height: Height) -> Self { + let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height); + + // TODO: assert that we don't overwrite any entries here. + self.zs_insert(index, None) } } diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 6dbffd9ac..686aeb8c3 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -59,16 +59,20 @@ pub use service::{ OutputIndex, OutputLocation, TransactionIndex, TransactionLocation, }; -#[cfg(feature = "shielded-scan")] -pub use rocksdb::AsColumnFamilyRef; #[cfg(feature = "shielded-scan")] pub use service::finalized_state::{ - FromDisk, IntoDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, - SaplingScannedResult, SaplingScanningKey, ZebraDb, + SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult, + SaplingScanningKey, }; #[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))] -pub use service::finalized_state::{DiskWriteBatch, ReadDisk, WriteDisk}; +pub use service::{ + finalized_state::{ + DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk, + WriteTypedBatch, ZebraDb, + }, + ReadStateService, +}; #[cfg(feature = "getblocktemplate-rpcs")] pub use response::GetBlockTemplateChainInfo; @@ -78,9 +82,12 @@ pub use service::{ arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT}, chain_tip::{ChainTipBlock, ChainTipSender}, finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT}, - init_test, init_test_services, ReadStateService, + init_test, init_test_services, }; +#[cfg(any(test, feature = "proptest-impl"))] +pub use constants::latest_version_for_adding_subtrees; + #[cfg(not(any(test, feature = "proptest-impl")))] #[allow(unused_imports)] pub(crate) use config::hidden::{ @@ -92,7 +99,4 @@ pub use config::hidden::{ write_database_format_version_to_disk, write_state_database_format_version_to_disk, }; -#[cfg(any(test, feature = "proptest-impl"))] -pub use constants::latest_version_for_adding_subtrees; - pub(crate) use request::ContextuallyVerifiedBlock; diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 895a043be..0fc76b8a3 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -28,6 +28,8 @@ use crate::{ BoxError, CheckpointVerifiedBlock, CloneError, Config, }; +pub mod column_family; + mod disk_db; mod disk_format; mod zebra_db; @@ -38,6 +40,8 @@ mod arbitrary; #[cfg(test)] mod tests; +#[allow(unused_imports)] +pub use column_family::{TypedColumnFamily, WriteTypedBatch}; #[allow(unused_imports)] pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}; #[allow(unused_imports)] diff --git a/zebra-state/src/service/finalized_state/column_family.rs b/zebra-state/src/service/finalized_state/column_family.rs new file mode 100644 index 000000000..bf7eea031 --- /dev/null +++ b/zebra-state/src/service/finalized_state/column_family.rs @@ -0,0 +1,293 @@ +//! Type-safe column family access. + +// When these types aren't exported, they become dead code. +#![cfg_attr( + not(any(test, feature = "proptest-impl", feature = "shielded-scan")), + allow(dead_code) +)] + +use std::{ + any::type_name, + collections::{BTreeMap, HashMap}, + fmt::Debug, + hash::Hash, + marker::PhantomData, + ops::RangeBounds, +}; + +use crate::service::finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, WriteDisk}; + +use super::DiskDb; + +/// A type-safe read-only column family reference. +/// +/// Use this struct instead of raw [`ReadDisk`] access, because it is type-safe. +/// So you only have to define the types once, and you can't accidentally use different types for +/// reading and writing. (Which is a source of subtle database bugs.) +#[derive(Clone)] +pub struct TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + /// The database. + db: DiskDb, + + /// The column family reference in the database. + cf: rocksdb::ColumnFamilyRef<'cf>, + + /// The column family name, only used for debugging and equality checking. + _cf_name: String, + + /// A marker type used to bind the key and value types to the struct. + _marker: PhantomData<(Key, Value)>, +} + +/// A type-safe and drop-safe batch write to a column family. +/// +/// Use this struct instead of raw [`WriteDisk`] access, because it is type-safe. +/// So you only have to define the types once, and you can't accidentally use different types for +/// reading and writing. (Which is a source of subtle database bugs.) +/// +/// This type is also drop-safe: unwritten batches have to be specifically ignored. +#[must_use = "batches must be written to the database"] +#[derive(Debug, Eq, PartialEq)] +pub struct WriteTypedBatch<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + inner: TypedColumnFamily<'cf, Key, Value>, + + batch: DiskWriteBatch, +} + +impl<'cf, Key, Value> Debug for TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct(&format!( + "TypedColumnFamily<{}, {}>", + type_name::(), + type_name::() + )) + .field("db", &self.db) + .field("cf", &self._cf_name) + .finish() + } +} + +impl<'cf, Key, Value> PartialEq for TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + fn eq(&self, other: &Self) -> bool { + self.db == other.db && self._cf_name == other._cf_name + } +} + +impl<'cf, Key, Value> Eq for TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ +} + +impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + // Creation + + /// Returns a new typed column family, if it exists in the database. + pub fn new(db: &'cf DiskDb, cf_name: &str) -> Option { + let cf = db.cf_handle(cf_name)?; + + Some(Self { + db: db.clone(), + cf, + _cf_name: cf_name.to_string(), + _marker: PhantomData, + }) + } + + /// Returns a new writeable typed column family for this column family. + /// + /// This is the only way to get a writeable column family, which ensures + /// that the read and write types are consistent. + pub fn for_writing(self) -> WriteTypedBatch<'cf, Key, Value> { + WriteTypedBatch { + inner: self, + batch: DiskWriteBatch::new(), + } + } + + // Reading + + /// Returns true if this rocksdb column family does not contain any entries. + pub fn zs_is_empty(&self) -> bool { + self.db.zs_is_empty(&self.cf) + } + + /// Returns the value for `key` in this rocksdb column family, if present. + pub fn zs_get(&self, key: &Key) -> Option { + self.db.zs_get(&self.cf, key) + } + + /// Check if this rocksdb column family contains the serialized form of `key`. + pub fn zs_contains(&self, key: &Key) -> bool { + self.db.zs_contains(&self.cf, key) + } + + /// Returns the lowest key in this column family, and the corresponding value. + /// + /// Returns `None` if this column family is empty. + pub fn zs_first_key_value(&self) -> Option<(Key, Value)> { + self.db.zs_first_key_value(&self.cf) + } + + /// Returns the highest key in this column family, and the corresponding value. + /// + /// Returns `None` if this column family is empty. + pub fn zs_last_key_value(&self) -> Option<(Key, Value)> { + self.db.zs_last_key_value(&self.cf) + } + + /// Returns the first key greater than or equal to `lower_bound` in this column family, + /// and the corresponding value. + /// + /// Returns `None` if there are no keys greater than or equal to `lower_bound`. + pub fn zs_next_key_value_from(&self, lower_bound: &Key) -> Option<(Key, Value)> { + self.db.zs_next_key_value_from(&self.cf, lower_bound) + } + + /// Returns the first key strictly greater than `lower_bound` in this column family, + /// and the corresponding value. + /// + /// Returns `None` if there are no keys greater than `lower_bound`. + pub fn zs_next_key_value_strictly_after(&self, lower_bound: &Key) -> Option<(Key, Value)> { + self.db + .zs_next_key_value_strictly_after(&self.cf, lower_bound) + } + + /// Returns the first key less than or equal to `upper_bound` in this column family, + /// and the corresponding value. + /// + /// Returns `None` if there are no keys less than or equal to `upper_bound`. + pub fn zs_prev_key_value_back_from(&self, upper_bound: &Key) -> Option<(Key, Value)> { + self.db.zs_prev_key_value_back_from(&self.cf, upper_bound) + } + + /// Returns the first key strictly less than `upper_bound` in this column family, + /// and the corresponding value. + /// + /// Returns `None` if there are no keys less than `upper_bound`. + pub fn zs_prev_key_value_strictly_before(&self, upper_bound: &Key) -> Option<(Key, Value)> { + self.db + .zs_prev_key_value_strictly_before(&self.cf, upper_bound) + } + + /// Returns a forward iterator over the items in this column family in `range`. + /// + /// Holding this iterator open might delay block commit transactions. + pub fn zs_forward_range_iter( + &self, + range: Range, + ) -> impl Iterator + '_ + where + Range: RangeBounds, + { + self.db.zs_forward_range_iter(&self.cf, range) + } + + /// Returns a reverse iterator over the items in this column family in `range`. + /// + /// Holding this iterator open might delay block commit transactions. + pub fn zs_reverse_range_iter( + &self, + range: Range, + ) -> impl Iterator + '_ + where + Range: RangeBounds, + { + self.db.zs_reverse_range_iter(&self.cf, range) + } +} + +impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug + Ord, + Value: IntoDisk + FromDisk, +{ + /// Returns the keys and values in this column family in `range`, in an ordered `BTreeMap`. + /// + /// Holding this iterator open might delay block commit transactions. + pub fn zs_items_in_range_ordered(&self, range: Range) -> BTreeMap + where + Range: RangeBounds, + { + self.db.zs_items_in_range_ordered(&self.cf, range) + } +} + +impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug + Hash + Eq, + Value: IntoDisk + FromDisk, +{ + /// Returns the keys and values in this column family in `range`, in an unordered `HashMap`. + /// + /// Holding this iterator open might delay block commit transactions. + pub fn zs_items_in_range_unordered(&self, range: Range) -> HashMap + where + Range: RangeBounds, + { + self.db.zs_items_in_range_unordered(&self.cf, range) + } +} + +impl<'cf, Key, Value> WriteTypedBatch<'cf, Key, Value> +where + Key: IntoDisk + FromDisk + Debug, + Value: IntoDisk + FromDisk, +{ + // Writing batches + + /// Writes this batch to this column family in the database. + pub fn write_batch(self) -> Result<(), rocksdb::Error> { + self.inner.db.write(self.batch) + } + + // Batching before writing + + /// Serialize and insert the given key and value into this column family, + /// overwriting any existing `value` for `key`. + pub fn zs_insert(mut self, key: Key, value: Value) -> Self { + self.batch.zs_insert(&self.inner.cf, key, value); + + self + } + + /// Remove the given key from this column family, if it exists. + pub fn zs_delete(mut self, key: Key) -> Self { + self.batch.zs_delete(&self.inner.cf, key); + + self + } + + /// Delete the given key range from this rocksdb column family, if it exists, including `from` + /// and excluding `until_strictly_before`. + //. + // TODO: convert zs_delete_range() to take std::ops::RangeBounds + // see zs_range_iter() for an example of the edge cases + pub fn zs_delete_range(mut self, from: Key, until_strictly_before: Key) -> Self { + self.batch + .zs_delete_range(&self.inner.cf, from, until_strictly_before); + + self + } +} diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index c835971bf..78d543aaf 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -30,6 +30,10 @@ use crate::{ Config, }; +// Doc-only imports +#[allow(unused_imports)] +use super::{TypedColumnFamily, WriteTypedBatch}; + #[cfg(any(test, feature = "proptest-impl"))] mod tests; @@ -107,10 +111,30 @@ pub struct DiskWriteBatch { batch: rocksdb::WriteBatch, } -/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently -/// defined format +impl Debug for DiskWriteBatch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DiskWriteBatch") + .field("batch", &format!("{} bytes", self.batch.size_in_bytes())) + .finish() + } +} + +impl PartialEq for DiskWriteBatch { + fn eq(&self, other: &Self) -> bool { + self.batch.data() == other.batch.data() + } +} + +impl Eq for DiskWriteBatch {} + +/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb. +/// +/// # Deprecation +/// +/// This trait should not be used in new code, use [`WriteTypedBatch`] instead. // -// TODO: just implement these methods directly on WriteBatch +// TODO: replace uses of this trait with WriteTypedBatch, +// implement these methods directly on WriteTypedBatch, and delete the trait. pub trait WriteDisk { /// Serialize and insert the given key and value into a rocksdb column family, /// overwriting any existing `value` for `key`. @@ -120,20 +144,29 @@ pub trait WriteDisk { K: IntoDisk + Debug, V: IntoDisk; - /// Remove the given key from rocksdb column family if it exists. + /// Remove the given key from a rocksdb column family, if it exists. fn zs_delete(&mut self, cf: &C, key: K) where C: rocksdb::AsColumnFamilyRef, K: IntoDisk + Debug; - /// Deletes the given key range from rocksdb column family if it exists, including `from` and - /// excluding `to`. - fn zs_delete_range(&mut self, cf: &C, from: K, to: K) + /// Delete the given key range from a rocksdb column family, if it exists, including `from` + /// and excluding `until_strictly_before`. + // + // TODO: convert zs_delete_range() to take std::ops::RangeBounds + // see zs_range_iter() for an example of the edge cases + fn zs_delete_range(&mut self, cf: &C, from: K, until_strictly_before: K) where C: rocksdb::AsColumnFamilyRef, K: IntoDisk + Debug; } +/// # Deprecation +/// +/// These impls should not be used in new code, use [`WriteTypedBatch`] instead. +// +// TODO: replace uses of these impls with WriteTypedBatch, +// implement these methods directly on WriteTypedBatch, and delete the trait. impl WriteDisk for DiskWriteBatch { fn zs_insert(&mut self, cf: &C, key: K, value: V) where @@ -156,23 +189,27 @@ impl WriteDisk for DiskWriteBatch { } // TODO: convert zs_delete_range() to take std::ops::RangeBounds - // see zs_forward_range_iter() for an example of the edge cases - fn zs_delete_range(&mut self, cf: &C, from: K, to: K) + // see zs_range_iter() for an example of the edge cases + fn zs_delete_range(&mut self, cf: &C, from: K, until_strictly_before: K) where C: rocksdb::AsColumnFamilyRef, K: IntoDisk + Debug, { let from_bytes = from.as_bytes(); - let to_bytes = to.as_bytes(); - self.batch.delete_range_cf(cf, from_bytes, to_bytes); + let until_strictly_before_bytes = until_strictly_before.as_bytes(); + self.batch + .delete_range_cf(cf, from_bytes, until_strictly_before_bytes); } } -/// Helper trait for retrieving values from rocksdb column familys with a consistently -/// defined format +/// Helper trait for retrieving and deserializing values from rocksdb column families. +/// +/// # Deprecation +/// +/// This trait should not be used in new code, use [`TypedColumnFamily`] instead. // -// TODO: just implement these methods directly on DiskDb -// move this trait, its methods, and support methods to another module +// TODO: replace uses of this trait with TypedColumnFamily, +// implement these methods directly on DiskDb, and delete the trait. pub trait ReadDisk { /// Returns true if a rocksdb column family `cf` does not contain any entries. fn zs_is_empty(&self, cf: &C) -> bool @@ -292,6 +329,12 @@ impl PartialEq for DiskDb { impl Eq for DiskDb {} +/// # Deprecation +/// +/// These impls should not be used in new code, use [`TypedColumnFamily`] instead. +// +// TODO: replace uses of these impls with TypedColumnFamily, +// implement these methods directly on DiskDb, and delete the trait. impl ReadDisk for DiskDb { fn zs_is_empty(&self, cf: &C) -> bool where @@ -740,8 +783,18 @@ impl DiskDb { self.db.path() } + /// Returns the low-level rocksdb inner database. + #[allow(dead_code)] + fn inner(&self) -> &Arc { + &self.db + } + /// Returns the column family handle for `cf_name`. - pub fn cf_handle(&self, cf_name: &str) -> Option { + pub fn cf_handle(&self, cf_name: &str) -> Option> { + // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is + // the shorter of &self and &str, but RocksDB clones column family names internally, so it + // should just be &self. To avoid this restriction, clone the string before passing it to + // this method. Currently Zebra uses static strings, so this doesn't matter. self.db.cf_handle(cf_name) }