Make sure scanner database is accessed using the correct types (#8112)
* impl TryFrom<zcash_primitives::BlockHeight> for Height * Add type-safe read and write database methods * Only allow typed access to the scanner DB * Update docs * Implement a common method as a trait * Fix imports * Tidy state imports * Activate tracing logging macros in the whole scanner crate * Fix dead code warnings
This commit is contained in:
parent
39830b0b55
commit
4b5838c500
|
@ -112,6 +112,15 @@ impl From<Height> for BlockHeight {
|
|||
}
|
||||
}
|
||||
|
||||
impl TryFrom<BlockHeight> for Height {
|
||||
type Error = &'static str;
|
||||
|
||||
/// Checks that the `height` is within the valid [`Height`] range.
|
||||
fn try_from(height: BlockHeight) -> Result<Self, Self::Error> {
|
||||
Self::try_from(u32::from(height))
|
||||
}
|
||||
}
|
||||
|
||||
/// A difference between two [`Height`]s, possibly negative.
|
||||
///
|
||||
/// This can represent the difference between any height values,
|
||||
|
|
|
@ -4,6 +4,9 @@
|
|||
#![doc(html_logo_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-icon.png")]
|
||||
#![doc(html_root_url = "https://docs.rs/zebra_scan")]
|
||||
|
||||
#[macro_use]
|
||||
extern crate tracing;
|
||||
|
||||
pub mod config;
|
||||
pub mod init;
|
||||
pub mod scan;
|
||||
|
|
|
@ -9,7 +9,6 @@ use std::{
|
|||
use color_eyre::{eyre::eyre, Report};
|
||||
use itertools::Itertools;
|
||||
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
|
||||
use tracing::info;
|
||||
|
||||
use zcash_client_backend::{
|
||||
data_api::ScannedBlock,
|
||||
|
|
|
@ -6,9 +6,7 @@ use zebra_chain::{
|
|||
block::Height,
|
||||
parameters::{Network, NetworkUpgrade},
|
||||
};
|
||||
use zebra_state::{
|
||||
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, TransactionIndex, TransactionLocation,
|
||||
};
|
||||
use zebra_state::TransactionIndex;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
|
@ -17,11 +15,9 @@ pub mod db;
|
|||
// Public types and APIs
|
||||
pub use db::{SaplingScannedResult, SaplingScanningKey};
|
||||
|
||||
use self::db::ScannerWriteBatch;
|
||||
|
||||
/// We insert an empty results entry to the database every this interval for each stored key,
|
||||
/// so we can track progress.
|
||||
const INSERT_CONTROL_INTERVAL: u32 = 1_000;
|
||||
pub const INSERT_CONTROL_INTERVAL: u32 = 1_000;
|
||||
|
||||
/// Store key info and results of the scan.
|
||||
///
|
||||
|
@ -84,11 +80,7 @@ impl Storage {
|
|||
|
||||
// It's ok to write some keys and not others during shutdown, so each key can get its own
|
||||
// batch. (They will be re-written on startup anyway.)
|
||||
let mut batch = ScannerWriteBatch::default();
|
||||
|
||||
batch.insert_sapling_key(self, sapling_key, birthday);
|
||||
|
||||
self.write_batch(batch);
|
||||
self.insert_sapling_key(sapling_key, birthday);
|
||||
}
|
||||
|
||||
/// Returns all the keys and their last scanned heights.
|
||||
|
@ -104,6 +96,9 @@ impl Storage {
|
|||
/// Add the sapling results for `height` to the storage. The results can be any map of
|
||||
/// [`TransactionIndex`] to [`SaplingScannedResult`].
|
||||
///
|
||||
/// All the results for the same height must be written at the same time, to avoid partial
|
||||
/// writes during shutdown.
|
||||
///
|
||||
/// Also adds empty progress tracking entries every `INSERT_CONTROL_INTERVAL` blocks if needed.
|
||||
///
|
||||
/// # Performance / Hangs
|
||||
|
@ -116,37 +111,7 @@ impl Storage {
|
|||
height: Height,
|
||||
sapling_results: BTreeMap<TransactionIndex, SaplingScannedResult>,
|
||||
) {
|
||||
// We skip heights that have one or more results, so the results for each height must be
|
||||
// in a single batch.
|
||||
let mut batch = ScannerWriteBatch::default();
|
||||
|
||||
// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
|
||||
// so we can track progress made in the last interval even if no transaction was yet found.
|
||||
let needs_control_entry =
|
||||
height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();
|
||||
|
||||
// Add scanner progress tracking entry for key.
|
||||
// Defensive programming: add the tracking entry first, so that we don't accidentally
|
||||
// overwrite real results with it. (This is currently prevented by the empty check.)
|
||||
if needs_control_entry {
|
||||
batch.insert_sapling_height(self, sapling_key, height);
|
||||
}
|
||||
|
||||
for (index, sapling_result) in sapling_results {
|
||||
let index = SaplingScannedDatabaseIndex {
|
||||
sapling_key: sapling_key.clone(),
|
||||
tx_loc: TransactionLocation::from_parts(height, index),
|
||||
};
|
||||
|
||||
let entry = SaplingScannedDatabaseEntry {
|
||||
index,
|
||||
value: Some(sapling_result),
|
||||
};
|
||||
|
||||
batch.insert_sapling_result(self, entry);
|
||||
}
|
||||
|
||||
self.write_batch(batch);
|
||||
self.insert_sapling_results(sapling_key, height, sapling_results)
|
||||
}
|
||||
|
||||
/// Returns all the results for a sapling key, for every scanned block height.
|
||||
|
|
|
@ -5,7 +5,6 @@ use std::path::Path;
|
|||
use semver::Version;
|
||||
|
||||
use zebra_chain::parameters::Network;
|
||||
use zebra_state::{DiskWriteBatch, ReadDisk};
|
||||
|
||||
use crate::Config;
|
||||
|
||||
|
@ -86,7 +85,7 @@ impl Storage {
|
|||
// Report where we are for each key in the database.
|
||||
let keys = new_storage.sapling_keys_last_heights();
|
||||
for (key_num, (_key, height)) in keys.iter().enumerate() {
|
||||
tracing::info!(
|
||||
info!(
|
||||
"Last scanned height for key number {} is {}, resuming at {}",
|
||||
key_num,
|
||||
height.as_usize(),
|
||||
|
@ -94,7 +93,7 @@ impl Storage {
|
|||
);
|
||||
}
|
||||
|
||||
tracing::info!("loaded Zebra scanner cache");
|
||||
info!("loaded Zebra scanner cache");
|
||||
|
||||
new_storage
|
||||
}
|
||||
|
@ -134,28 +133,6 @@ impl Storage {
|
|||
/// Returns true if the database is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
// Any column family that is populated at (or near) startup can be used here.
|
||||
self.db.zs_is_empty(&self.sapling_tx_ids_cf())
|
||||
}
|
||||
}
|
||||
|
||||
// General writing
|
||||
|
||||
/// Wrapper type for scanner database writes.
|
||||
#[must_use = "batches must be written to the database"]
|
||||
#[derive(Default)]
|
||||
pub struct ScannerWriteBatch(pub DiskWriteBatch);
|
||||
|
||||
// Redirect method calls to DiskWriteBatch for convenience.
|
||||
impl std::ops::Deref for ScannerWriteBatch {
|
||||
type Target = DiskWriteBatch;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for ScannerWriteBatch {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
self.sapling_tx_ids_cf().zs_is_empty()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,9 @@
|
|||
//!
|
||||
//! The sapling scanner database has the following format:
|
||||
//!
|
||||
//! | name | key | value |
|
||||
//! |------------------|-------------------------------|--------------------------|
|
||||
//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Option<SaplingScannedResult>` |
|
||||
//! | name | Reading & Writing Key/Values |
|
||||
//! |--------------------|-------------------------------------------------|
|
||||
//! | [`SAPLING_TX_IDS`] | [`SaplingTxIdsCf`] & [`WriteSaplingTxIdsBatch`] |
|
||||
//!
|
||||
//! And types:
|
||||
//! `SaplingScannedResult`: same as `transaction::Hash`, but with bytes in display order.
|
||||
|
@ -30,32 +30,42 @@ use itertools::Itertools;
|
|||
|
||||
use zebra_chain::block::Height;
|
||||
use zebra_state::{
|
||||
AsColumnFamilyRef, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex,
|
||||
SaplingScannedResult, SaplingScanningKey, TransactionIndex, WriteDisk,
|
||||
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult,
|
||||
SaplingScanningKey, TransactionIndex, TransactionLocation, TypedColumnFamily, WriteTypedBatch,
|
||||
};
|
||||
|
||||
use crate::storage::Storage;
|
||||
|
||||
use super::ScannerWriteBatch;
|
||||
use crate::storage::{Storage, INSERT_CONTROL_INTERVAL};
|
||||
|
||||
/// The name of the sapling transaction IDs result column family.
|
||||
///
|
||||
/// This constant should be used so the compiler can detect typos.
|
||||
pub const SAPLING_TX_IDS: &str = "sapling_tx_ids";
|
||||
|
||||
/// The type for reading sapling transaction IDs results from the database.
|
||||
///
|
||||
/// This constant should be used so the compiler can detect incorrectly typed accesses to the
|
||||
/// column family.
|
||||
pub type SaplingTxIdsCf<'cf> =
|
||||
TypedColumnFamily<'cf, SaplingScannedDatabaseIndex, Option<SaplingScannedResult>>;
|
||||
|
||||
/// The type for writing sapling transaction IDs results from the database.
|
||||
///
|
||||
/// This constant should be used so the compiler can detect incorrectly typed accesses to the
|
||||
/// column family.
|
||||
pub type WriteSaplingTxIdsBatch<'cf> =
|
||||
WriteTypedBatch<'cf, SaplingScannedDatabaseIndex, Option<SaplingScannedResult>>;
|
||||
|
||||
impl Storage {
|
||||
// Reading Sapling database entries
|
||||
|
||||
/// Returns the result for a specific database index (key, block height, transaction index).
|
||||
/// Returns `None` if the result is missing or an empty marker for a birthday or progress
|
||||
/// height.
|
||||
//
|
||||
// TODO: add tests for this method
|
||||
pub fn sapling_result_for_index(
|
||||
&self,
|
||||
index: &SaplingScannedDatabaseIndex,
|
||||
) -> Option<SaplingScannedResult> {
|
||||
self.db.zs_get(&self.sapling_tx_ids_cf(), &index).flatten()
|
||||
self.sapling_tx_ids_cf().zs_get(index).flatten()
|
||||
}
|
||||
|
||||
/// Returns the results for a specific key and block height.
|
||||
|
@ -102,10 +112,7 @@ impl Storage {
|
|||
let sapling_tx_ids = self.sapling_tx_ids_cf();
|
||||
let mut keys = HashMap::new();
|
||||
|
||||
let mut last_stored_record: Option<(
|
||||
SaplingScannedDatabaseIndex,
|
||||
Option<SaplingScannedResult>,
|
||||
)> = self.db.zs_last_key_value(&sapling_tx_ids);
|
||||
let mut last_stored_record = sapling_tx_ids.zs_last_key_value();
|
||||
|
||||
while let Some((last_stored_record_index, _result)) = last_stored_record {
|
||||
let sapling_key = last_stored_record_index.sapling_key.clone();
|
||||
|
@ -119,8 +126,7 @@ impl Storage {
|
|||
);
|
||||
|
||||
// Skip all the results until the next key.
|
||||
last_stored_record = self.db.zs_prev_key_value_strictly_before(
|
||||
&sapling_tx_ids,
|
||||
last_stored_record = sapling_tx_ids.zs_prev_key_value_strictly_before(
|
||||
&SaplingScannedDatabaseIndex::min_for_key(&sapling_key),
|
||||
);
|
||||
}
|
||||
|
@ -135,43 +141,63 @@ impl Storage {
|
|||
&self,
|
||||
range: impl RangeBounds<SaplingScannedDatabaseIndex>,
|
||||
) -> BTreeMap<SaplingScannedDatabaseIndex, Option<SaplingScannedResult>> {
|
||||
self.db
|
||||
.zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), range)
|
||||
self.sapling_tx_ids_cf().zs_items_in_range_ordered(range)
|
||||
}
|
||||
|
||||
// Column family convenience methods
|
||||
|
||||
/// Returns a handle to the `sapling_tx_ids` column family.
|
||||
pub(crate) fn sapling_tx_ids_cf(&self) -> impl AsColumnFamilyRef + '_ {
|
||||
self.db
|
||||
.cf_handle(SAPLING_TX_IDS)
|
||||
/// Returns a typed handle to the `sapling_tx_ids` column family.
|
||||
pub(crate) fn sapling_tx_ids_cf(&self) -> SaplingTxIdsCf {
|
||||
SaplingTxIdsCf::new(&self.db, SAPLING_TX_IDS)
|
||||
.expect("column family was created when database was created")
|
||||
}
|
||||
|
||||
// Writing batches
|
||||
// Writing database entries
|
||||
//
|
||||
// To avoid exposing internal types, and accidentally forgetting to write a batch,
|
||||
// each pub(crate) write method should write an entire batch.
|
||||
|
||||
/// Write `batch` to the database for this storage.
|
||||
pub(crate) fn write_batch(&self, batch: ScannerWriteBatch) {
|
||||
// Just panic on errors for now.
|
||||
self.db
|
||||
.write_batch(batch.0)
|
||||
.expect("unexpected database error")
|
||||
}
|
||||
}
|
||||
|
||||
// Writing database entries
|
||||
//
|
||||
// TODO: split the write type into state and scanner, so we can't call state write methods on
|
||||
// scanner databases
|
||||
impl ScannerWriteBatch {
|
||||
/// Inserts a scanned sapling result for a key and height.
|
||||
/// If a result already exists for that key and height, it is replaced.
|
||||
pub(crate) fn insert_sapling_result(
|
||||
/// Inserts a batch of scanned sapling result for a key and height.
|
||||
/// If a result already exists for that key, height, and index, it is replaced.
|
||||
pub(crate) fn insert_sapling_results(
|
||||
&mut self,
|
||||
storage: &Storage,
|
||||
entry: SaplingScannedDatabaseEntry,
|
||||
sapling_key: &SaplingScanningKey,
|
||||
height: Height,
|
||||
sapling_results: BTreeMap<TransactionIndex, SaplingScannedResult>,
|
||||
) {
|
||||
self.zs_insert(&storage.sapling_tx_ids_cf(), entry.index, entry.value);
|
||||
// We skip key heights that have one or more results, so the results for each key height
|
||||
// must be in a single batch.
|
||||
let mut batch = self.sapling_tx_ids_cf().for_writing();
|
||||
|
||||
// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
|
||||
// so we can track progress made in the last interval even if no transaction was yet found.
|
||||
let needs_control_entry =
|
||||
height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();
|
||||
|
||||
// Add scanner progress tracking entry for key.
|
||||
// Defensive programming: add the tracking entry first, so that we don't accidentally
|
||||
// overwrite real results with it. (This is currently prevented by the empty check.)
|
||||
if needs_control_entry {
|
||||
batch = batch.insert_sapling_height(sapling_key, height);
|
||||
}
|
||||
|
||||
for (index, sapling_result) in sapling_results {
|
||||
let index = SaplingScannedDatabaseIndex {
|
||||
sapling_key: sapling_key.clone(),
|
||||
tx_loc: TransactionLocation::from_parts(height, index),
|
||||
};
|
||||
|
||||
let entry = SaplingScannedDatabaseEntry {
|
||||
index,
|
||||
value: Some(sapling_result),
|
||||
};
|
||||
|
||||
batch = batch.zs_insert(entry.index, entry.value);
|
||||
}
|
||||
|
||||
batch
|
||||
.write_batch()
|
||||
.expect("unexpected database write failure");
|
||||
}
|
||||
|
||||
/// Insert a sapling scanning `key`, and mark all heights before `birthday_height` so they
|
||||
|
@ -184,11 +210,10 @@ impl ScannerWriteBatch {
|
|||
/// TODO: ignore incorrect changes to birthday heights
|
||||
pub(crate) fn insert_sapling_key(
|
||||
&mut self,
|
||||
storage: &Storage,
|
||||
sapling_key: &SaplingScanningKey,
|
||||
birthday_height: Option<Height>,
|
||||
) {
|
||||
let min_birthday_height = storage.min_sapling_birthday_height();
|
||||
let min_birthday_height = self.min_sapling_birthday_height();
|
||||
|
||||
// The birthday height must be at least the minimum height for that pool.
|
||||
let birthday_height = birthday_height
|
||||
|
@ -197,19 +222,33 @@ impl ScannerWriteBatch {
|
|||
// And we want to skip up to the height before it.
|
||||
let skip_up_to_height = birthday_height.previous().unwrap_or(Height::MIN);
|
||||
|
||||
let index =
|
||||
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
|
||||
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
|
||||
}
|
||||
|
||||
/// Insert sapling height with no results
|
||||
pub(crate) fn insert_sapling_height(
|
||||
&mut self,
|
||||
storage: &Storage,
|
||||
sapling_key: &SaplingScanningKey,
|
||||
height: Height,
|
||||
) {
|
||||
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
|
||||
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
|
||||
// It's ok to write some keys and not others during shutdown, so each key can get its own
|
||||
// batch. (They will be re-written on startup anyway.)
|
||||
//
|
||||
// TODO: ignore incorrect changes to birthday heights,
|
||||
// and redundant birthday heights
|
||||
self.sapling_tx_ids_cf()
|
||||
.for_writing()
|
||||
.insert_sapling_height(sapling_key, skip_up_to_height)
|
||||
.write_batch()
|
||||
.expect("unexpected database write failure");
|
||||
}
|
||||
}
|
||||
|
||||
/// Utility trait for inserting sapling heights into a WriteSaplingTxIdsBatch.
|
||||
trait InsertSaplingHeight {
|
||||
fn insert_sapling_height(self, sapling_key: &SaplingScanningKey, height: Height) -> Self;
|
||||
}
|
||||
|
||||
impl<'cf> InsertSaplingHeight for WriteSaplingTxIdsBatch<'cf> {
|
||||
/// Insert sapling height with no results.
|
||||
///
|
||||
/// If a result already exists for the coinbase transaction at that height,
|
||||
/// it is replaced with an empty result. This should never happen.
|
||||
fn insert_sapling_height(self, sapling_key: &SaplingScanningKey, height: Height) -> Self {
|
||||
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
|
||||
|
||||
// TODO: assert that we don't overwrite any entries here.
|
||||
self.zs_insert(index, None)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,16 +59,20 @@ pub use service::{
|
|||
OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
|
||||
};
|
||||
|
||||
#[cfg(feature = "shielded-scan")]
|
||||
pub use rocksdb::AsColumnFamilyRef;
|
||||
#[cfg(feature = "shielded-scan")]
|
||||
pub use service::finalized_state::{
|
||||
FromDisk, IntoDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex,
|
||||
SaplingScannedResult, SaplingScanningKey, ZebraDb,
|
||||
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult,
|
||||
SaplingScanningKey,
|
||||
};
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))]
|
||||
pub use service::finalized_state::{DiskWriteBatch, ReadDisk, WriteDisk};
|
||||
pub use service::{
|
||||
finalized_state::{
|
||||
DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk,
|
||||
WriteTypedBatch, ZebraDb,
|
||||
},
|
||||
ReadStateService,
|
||||
};
|
||||
|
||||
#[cfg(feature = "getblocktemplate-rpcs")]
|
||||
pub use response::GetBlockTemplateChainInfo;
|
||||
|
@ -78,9 +82,12 @@ pub use service::{
|
|||
arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT},
|
||||
chain_tip::{ChainTipBlock, ChainTipSender},
|
||||
finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT},
|
||||
init_test, init_test_services, ReadStateService,
|
||||
init_test, init_test_services,
|
||||
};
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
pub use constants::latest_version_for_adding_subtrees;
|
||||
|
||||
#[cfg(not(any(test, feature = "proptest-impl")))]
|
||||
#[allow(unused_imports)]
|
||||
pub(crate) use config::hidden::{
|
||||
|
@ -92,7 +99,4 @@ pub use config::hidden::{
|
|||
write_database_format_version_to_disk, write_state_database_format_version_to_disk,
|
||||
};
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
pub use constants::latest_version_for_adding_subtrees;
|
||||
|
||||
pub(crate) use request::ContextuallyVerifiedBlock;
|
||||
|
|
|
@ -28,6 +28,8 @@ use crate::{
|
|||
BoxError, CheckpointVerifiedBlock, CloneError, Config,
|
||||
};
|
||||
|
||||
pub mod column_family;
|
||||
|
||||
mod disk_db;
|
||||
mod disk_format;
|
||||
mod zebra_db;
|
||||
|
@ -38,6 +40,8 @@ mod arbitrary;
|
|||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
pub use column_family::{TypedColumnFamily, WriteTypedBatch};
|
||||
#[allow(unused_imports)]
|
||||
pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
|
||||
#[allow(unused_imports)]
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
//! Type-safe column family access.
|
||||
|
||||
// When these types aren't exported, they become dead code.
|
||||
#![cfg_attr(
|
||||
not(any(test, feature = "proptest-impl", feature = "shielded-scan")),
|
||||
allow(dead_code)
|
||||
)]
|
||||
|
||||
use std::{
|
||||
any::type_name,
|
||||
collections::{BTreeMap, HashMap},
|
||||
fmt::Debug,
|
||||
hash::Hash,
|
||||
marker::PhantomData,
|
||||
ops::RangeBounds,
|
||||
};
|
||||
|
||||
use crate::service::finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, WriteDisk};
|
||||
|
||||
use super::DiskDb;
|
||||
|
||||
/// A type-safe read-only column family reference.
|
||||
///
|
||||
/// Use this struct instead of raw [`ReadDisk`] access, because it is type-safe.
|
||||
/// So you only have to define the types once, and you can't accidentally use different types for
|
||||
/// reading and writing. (Which is a source of subtle database bugs.)
|
||||
#[derive(Clone)]
|
||||
pub struct TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
/// The database.
|
||||
db: DiskDb,
|
||||
|
||||
/// The column family reference in the database.
|
||||
cf: rocksdb::ColumnFamilyRef<'cf>,
|
||||
|
||||
/// The column family name, only used for debugging and equality checking.
|
||||
_cf_name: String,
|
||||
|
||||
/// A marker type used to bind the key and value types to the struct.
|
||||
_marker: PhantomData<(Key, Value)>,
|
||||
}
|
||||
|
||||
/// A type-safe and drop-safe batch write to a column family.
|
||||
///
|
||||
/// Use this struct instead of raw [`WriteDisk`] access, because it is type-safe.
|
||||
/// So you only have to define the types once, and you can't accidentally use different types for
|
||||
/// reading and writing. (Which is a source of subtle database bugs.)
|
||||
///
|
||||
/// This type is also drop-safe: unwritten batches have to be specifically ignored.
|
||||
#[must_use = "batches must be written to the database"]
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub struct WriteTypedBatch<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
inner: TypedColumnFamily<'cf, Key, Value>,
|
||||
|
||||
batch: DiskWriteBatch,
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> Debug for TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct(&format!(
|
||||
"TypedColumnFamily<{}, {}>",
|
||||
type_name::<Key>(),
|
||||
type_name::<Value>()
|
||||
))
|
||||
.field("db", &self.db)
|
||||
.field("cf", &self._cf_name)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> PartialEq for TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.db == other.db && self._cf_name == other._cf_name
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> Eq for TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
// Creation
|
||||
|
||||
/// Returns a new typed column family, if it exists in the database.
|
||||
pub fn new(db: &'cf DiskDb, cf_name: &str) -> Option<Self> {
|
||||
let cf = db.cf_handle(cf_name)?;
|
||||
|
||||
Some(Self {
|
||||
db: db.clone(),
|
||||
cf,
|
||||
_cf_name: cf_name.to_string(),
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a new writeable typed column family for this column family.
|
||||
///
|
||||
/// This is the only way to get a writeable column family, which ensures
|
||||
/// that the read and write types are consistent.
|
||||
pub fn for_writing(self) -> WriteTypedBatch<'cf, Key, Value> {
|
||||
WriteTypedBatch {
|
||||
inner: self,
|
||||
batch: DiskWriteBatch::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reading
|
||||
|
||||
/// Returns true if this rocksdb column family does not contain any entries.
|
||||
pub fn zs_is_empty(&self) -> bool {
|
||||
self.db.zs_is_empty(&self.cf)
|
||||
}
|
||||
|
||||
/// Returns the value for `key` in this rocksdb column family, if present.
|
||||
pub fn zs_get(&self, key: &Key) -> Option<Value> {
|
||||
self.db.zs_get(&self.cf, key)
|
||||
}
|
||||
|
||||
/// Check if this rocksdb column family contains the serialized form of `key`.
|
||||
pub fn zs_contains(&self, key: &Key) -> bool {
|
||||
self.db.zs_contains(&self.cf, key)
|
||||
}
|
||||
|
||||
/// Returns the lowest key in this column family, and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if this column family is empty.
|
||||
pub fn zs_first_key_value(&self) -> Option<(Key, Value)> {
|
||||
self.db.zs_first_key_value(&self.cf)
|
||||
}
|
||||
|
||||
/// Returns the highest key in this column family, and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if this column family is empty.
|
||||
pub fn zs_last_key_value(&self) -> Option<(Key, Value)> {
|
||||
self.db.zs_last_key_value(&self.cf)
|
||||
}
|
||||
|
||||
/// Returns the first key greater than or equal to `lower_bound` in this column family,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys greater than or equal to `lower_bound`.
|
||||
pub fn zs_next_key_value_from(&self, lower_bound: &Key) -> Option<(Key, Value)> {
|
||||
self.db.zs_next_key_value_from(&self.cf, lower_bound)
|
||||
}
|
||||
|
||||
/// Returns the first key strictly greater than `lower_bound` in this column family,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys greater than `lower_bound`.
|
||||
pub fn zs_next_key_value_strictly_after(&self, lower_bound: &Key) -> Option<(Key, Value)> {
|
||||
self.db
|
||||
.zs_next_key_value_strictly_after(&self.cf, lower_bound)
|
||||
}
|
||||
|
||||
/// Returns the first key less than or equal to `upper_bound` in this column family,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys less than or equal to `upper_bound`.
|
||||
pub fn zs_prev_key_value_back_from(&self, upper_bound: &Key) -> Option<(Key, Value)> {
|
||||
self.db.zs_prev_key_value_back_from(&self.cf, upper_bound)
|
||||
}
|
||||
|
||||
/// Returns the first key strictly less than `upper_bound` in this column family,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys less than `upper_bound`.
|
||||
pub fn zs_prev_key_value_strictly_before(&self, upper_bound: &Key) -> Option<(Key, Value)> {
|
||||
self.db
|
||||
.zs_prev_key_value_strictly_before(&self.cf, upper_bound)
|
||||
}
|
||||
|
||||
/// Returns a forward iterator over the items in this column family in `range`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
pub fn zs_forward_range_iter<Range>(
|
||||
&self,
|
||||
range: Range,
|
||||
) -> impl Iterator<Item = (Key, Value)> + '_
|
||||
where
|
||||
Range: RangeBounds<Key>,
|
||||
{
|
||||
self.db.zs_forward_range_iter(&self.cf, range)
|
||||
}
|
||||
|
||||
/// Returns a reverse iterator over the items in this column family in `range`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
pub fn zs_reverse_range_iter<Range>(
|
||||
&self,
|
||||
range: Range,
|
||||
) -> impl Iterator<Item = (Key, Value)> + '_
|
||||
where
|
||||
Range: RangeBounds<Key>,
|
||||
{
|
||||
self.db.zs_reverse_range_iter(&self.cf, range)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug + Ord,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
/// Returns the keys and values in this column family in `range`, in an ordered `BTreeMap`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
pub fn zs_items_in_range_ordered<Range>(&self, range: Range) -> BTreeMap<Key, Value>
|
||||
where
|
||||
Range: RangeBounds<Key>,
|
||||
{
|
||||
self.db.zs_items_in_range_ordered(&self.cf, range)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug + Hash + Eq,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
/// Returns the keys and values in this column family in `range`, in an unordered `HashMap`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
pub fn zs_items_in_range_unordered<Range>(&self, range: Range) -> HashMap<Key, Value>
|
||||
where
|
||||
Range: RangeBounds<Key>,
|
||||
{
|
||||
self.db.zs_items_in_range_unordered(&self.cf, range)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cf, Key, Value> WriteTypedBatch<'cf, Key, Value>
|
||||
where
|
||||
Key: IntoDisk + FromDisk + Debug,
|
||||
Value: IntoDisk + FromDisk,
|
||||
{
|
||||
// Writing batches
|
||||
|
||||
/// Writes this batch to this column family in the database.
|
||||
pub fn write_batch(self) -> Result<(), rocksdb::Error> {
|
||||
self.inner.db.write(self.batch)
|
||||
}
|
||||
|
||||
// Batching before writing
|
||||
|
||||
/// Serialize and insert the given key and value into this column family,
|
||||
/// overwriting any existing `value` for `key`.
|
||||
pub fn zs_insert(mut self, key: Key, value: Value) -> Self {
|
||||
self.batch.zs_insert(&self.inner.cf, key, value);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Remove the given key from this column family, if it exists.
|
||||
pub fn zs_delete(mut self, key: Key) -> Self {
|
||||
self.batch.zs_delete(&self.inner.cf, key);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Delete the given key range from this rocksdb column family, if it exists, including `from`
|
||||
/// and excluding `until_strictly_before`.
|
||||
//.
|
||||
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
|
||||
// see zs_range_iter() for an example of the edge cases
|
||||
pub fn zs_delete_range(mut self, from: Key, until_strictly_before: Key) -> Self {
|
||||
self.batch
|
||||
.zs_delete_range(&self.inner.cf, from, until_strictly_before);
|
||||
|
||||
self
|
||||
}
|
||||
}
|
|
@ -30,6 +30,10 @@ use crate::{
|
|||
Config,
|
||||
};
|
||||
|
||||
// Doc-only imports
|
||||
#[allow(unused_imports)]
|
||||
use super::{TypedColumnFamily, WriteTypedBatch};
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
mod tests;
|
||||
|
||||
|
@ -107,10 +111,30 @@ pub struct DiskWriteBatch {
|
|||
batch: rocksdb::WriteBatch,
|
||||
}
|
||||
|
||||
/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
|
||||
/// defined format
|
||||
impl Debug for DiskWriteBatch {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DiskWriteBatch")
|
||||
.field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DiskWriteBatch {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.batch.data() == other.batch.data()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for DiskWriteBatch {}
|
||||
|
||||
/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
|
||||
///
|
||||
/// # Deprecation
|
||||
///
|
||||
/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
|
||||
//
|
||||
// TODO: just implement these methods directly on WriteBatch
|
||||
// TODO: replace uses of this trait with WriteTypedBatch,
|
||||
// implement these methods directly on WriteTypedBatch, and delete the trait.
|
||||
pub trait WriteDisk {
|
||||
/// Serialize and insert the given key and value into a rocksdb column family,
|
||||
/// overwriting any existing `value` for `key`.
|
||||
|
@ -120,20 +144,29 @@ pub trait WriteDisk {
|
|||
K: IntoDisk + Debug,
|
||||
V: IntoDisk;
|
||||
|
||||
/// Remove the given key from rocksdb column family if it exists.
|
||||
/// Remove the given key from a rocksdb column family, if it exists.
|
||||
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug;
|
||||
|
||||
/// Deletes the given key range from rocksdb column family if it exists, including `from` and
|
||||
/// excluding `to`.
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, to: K)
|
||||
/// Delete the given key range from a rocksdb column family, if it exists, including `from`
|
||||
/// and excluding `until_strictly_before`.
|
||||
//
|
||||
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
|
||||
// see zs_range_iter() for an example of the edge cases
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug;
|
||||
}
|
||||
|
||||
/// # Deprecation
|
||||
///
|
||||
/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
|
||||
//
|
||||
// TODO: replace uses of these impls with WriteTypedBatch,
|
||||
// implement these methods directly on WriteTypedBatch, and delete the trait.
|
||||
impl WriteDisk for DiskWriteBatch {
|
||||
fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
|
||||
where
|
||||
|
@ -156,23 +189,27 @@ impl WriteDisk for DiskWriteBatch {
|
|||
}
|
||||
|
||||
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
|
||||
// see zs_forward_range_iter() for an example of the edge cases
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, to: K)
|
||||
// see zs_range_iter() for an example of the edge cases
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug,
|
||||
{
|
||||
let from_bytes = from.as_bytes();
|
||||
let to_bytes = to.as_bytes();
|
||||
self.batch.delete_range_cf(cf, from_bytes, to_bytes);
|
||||
let until_strictly_before_bytes = until_strictly_before.as_bytes();
|
||||
self.batch
|
||||
.delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper trait for retrieving values from rocksdb column familys with a consistently
|
||||
/// defined format
|
||||
/// Helper trait for retrieving and deserializing values from rocksdb column families.
|
||||
///
|
||||
/// # Deprecation
|
||||
///
|
||||
/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
|
||||
//
|
||||
// TODO: just implement these methods directly on DiskDb
|
||||
// move this trait, its methods, and support methods to another module
|
||||
// TODO: replace uses of this trait with TypedColumnFamily,
|
||||
// implement these methods directly on DiskDb, and delete the trait.
|
||||
pub trait ReadDisk {
|
||||
/// Returns true if a rocksdb column family `cf` does not contain any entries.
|
||||
fn zs_is_empty<C>(&self, cf: &C) -> bool
|
||||
|
@ -292,6 +329,12 @@ impl PartialEq for DiskDb {
|
|||
|
||||
impl Eq for DiskDb {}
|
||||
|
||||
/// # Deprecation
|
||||
///
|
||||
/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
|
||||
//
|
||||
// TODO: replace uses of these impls with TypedColumnFamily,
|
||||
// implement these methods directly on DiskDb, and delete the trait.
|
||||
impl ReadDisk for DiskDb {
|
||||
fn zs_is_empty<C>(&self, cf: &C) -> bool
|
||||
where
|
||||
|
@ -740,8 +783,18 @@ impl DiskDb {
|
|||
self.db.path()
|
||||
}
|
||||
|
||||
/// Returns the low-level rocksdb inner database.
|
||||
#[allow(dead_code)]
|
||||
fn inner(&self) -> &Arc<DB> {
|
||||
&self.db
|
||||
}
|
||||
|
||||
/// Returns the column family handle for `cf_name`.
|
||||
pub fn cf_handle(&self, cf_name: &str) -> Option<impl rocksdb::AsColumnFamilyRef + '_> {
|
||||
pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
|
||||
// Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
|
||||
// the shorter of &self and &str, but RocksDB clones column family names internally, so it
|
||||
// should just be &self. To avoid this restriction, clone the string before passing it to
|
||||
// this method. Currently Zebra uses static strings, so this doesn't matter.
|
||||
self.db.cf_handle(cf_name)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue