8. refactor(state): allow shared read access to the finalized state database (#3846)

* Move database read methods to a new ZebraDb wrapper type

* Rename struct fields
This commit is contained in:
teor 2022-03-12 06:23:32 +10:00 committed by GitHub
parent 9ad47d1081
commit 6fb426ef93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 328 additions and 240 deletions

View File

@ -39,7 +39,7 @@ use crate::{
request::HashOrHeight, request::HashOrHeight,
service::{ service::{
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::{DiskDb, FinalizedState}, finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks}, non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos, pending_utxos::PendingUtxos,
}, },
@ -132,18 +132,18 @@ pub(crate) struct StateService {
pub struct ReadStateService { pub struct ReadStateService {
/// The shared inner on-disk database for the finalized state. /// The shared inner on-disk database for the finalized state.
/// ///
/// RocksDB allows reads and writes via a shared reference. /// RocksDB allows reads and writes via a shared reference,
/// TODO: prevent write access via this type. /// but [`ZebraDb`] doesn't expose any write methods or types.
/// ///
/// This chain is updated concurrently with requests, /// This chain is updated concurrently with requests,
/// so it might include some block data that is also in `best_mem`. /// so it might include some block data that is also in `best_mem`.
disk: DiskDb, db: ZebraDb,
/// A watch channel for the current best in-memory chain. /// A watch channel for the current best in-memory chain.
/// ///
/// This chain is only updated between requests, /// This chain is only updated between requests,
/// so it might include some block data that is also on `disk`. /// so it might include some block data that is also on `disk`.
best_mem: watch::Receiver<Option<Arc<Chain>>>, best_chain_receiver: watch::Receiver<Option<Arc<Chain>>>,
/// The configured Zcash network. /// The configured Zcash network.
network: Network, network: Network,
@ -161,6 +161,7 @@ impl StateService {
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let disk = FinalizedState::new(&config, network); let disk = FinalizedState::new(&config, network);
let initial_tip = disk let initial_tip = disk
.db()
.tip_block() .tip_block()
.map(FinalizedBlock::from) .map(FinalizedBlock::from)
.map(ChainTipBlock::from); .map(ChainTipBlock::from);
@ -243,7 +244,8 @@ impl StateService {
tracing::debug!(block = %prepared.block, "queueing block for contextual verification"); tracing::debug!(block = %prepared.block, "queueing block for contextual verification");
let parent_hash = prepared.block.header.previous_block_hash; let parent_hash = prepared.block.header.previous_block_hash;
if self.mem.any_chain_contains(&prepared.hash) || self.disk.hash(prepared.height).is_some() if self.mem.any_chain_contains(&prepared.hash)
|| self.disk.db().hash(prepared.height).is_some()
{ {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("block is already committed to the state".into())); let _ = rsp_tx.send(Err("block is already committed to the state".into()));
@ -282,7 +284,7 @@ impl StateService {
); );
} }
let finalized_tip_height = self.disk.finalized_tip_height().expect( let finalized_tip_height = self.disk.db().finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state", "Finalized state must have at least one block before committing non-finalized state",
); );
self.queued_blocks.prune_by_height(finalized_tip_height); self.queued_blocks.prune_by_height(finalized_tip_height);
@ -326,10 +328,10 @@ impl StateService {
self.check_contextual_validity(&prepared)?; self.check_contextual_validity(&prepared)?;
let parent_hash = prepared.block.header.previous_block_hash; let parent_hash = prepared.block.header.previous_block_hash;
if self.disk.finalized_tip_hash() == parent_hash { if self.disk.db().finalized_tip_hash() == parent_hash {
self.mem.commit_new_chain(prepared, &self.disk)?; self.mem.commit_new_chain(prepared, self.disk.db())?;
} else { } else {
self.mem.commit_block(prepared, &self.disk)?; self.mem.commit_block(prepared, self.disk.db())?;
} }
Ok(()) Ok(())
@ -337,7 +339,7 @@ impl StateService {
/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.mem.any_chain_contains(hash) || &self.disk.finalized_tip_hash() == hash self.mem.any_chain_contains(hash) || &self.disk.db().finalized_tip_hash() == hash
} }
/// Attempt to validate and commit all queued blocks whose parents have /// Attempt to validate and commit all queued blocks whose parents have
@ -400,11 +402,11 @@ impl StateService {
check::block_is_valid_for_recent_chain( check::block_is_valid_for_recent_chain(
prepared, prepared,
self.network, self.network,
self.disk.finalized_tip_height(), self.disk.db().finalized_tip_height(),
relevant_chain, relevant_chain,
)?; )?;
check::nullifier::no_duplicates_in_finalized_chain(prepared, &self.disk)?; check::nullifier::no_duplicates_in_finalized_chain(prepared, self.disk.db())?;
Ok(()) Ok(())
} }
@ -427,7 +429,7 @@ impl StateService {
/// Return the tip of the current best chain. /// Return the tip of the current best chain.
pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> { pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
self.mem.best_tip().or_else(|| self.disk.tip()) self.mem.best_tip().or_else(|| self.disk.db().tip())
} }
/// Return the depth of block `hash` in the current best chain. /// Return the depth of block `hash` in the current best chain.
@ -436,7 +438,7 @@ impl StateService {
let height = self let height = self
.mem .mem
.best_height_by_hash(hash) .best_height_by_hash(hash)
.or_else(|| self.disk.height(hash))?; .or_else(|| self.disk.db().height(hash))?;
Some(tip.0 - height.0) Some(tip.0 - height.0)
} }
@ -447,7 +449,7 @@ impl StateService {
self.mem self.mem
.best_block(hash_or_height) .best_block(hash_or_height)
.map(|contextual| contextual.block) .map(|contextual| contextual.block)
.or_else(|| self.disk.block(hash_or_height)) .or_else(|| self.disk.db().block(hash_or_height))
} }
/// Return the transaction identified by `hash` if it exists in the current /// Return the transaction identified by `hash` if it exists in the current
@ -455,14 +457,14 @@ impl StateService {
pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> { pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
self.mem self.mem
.best_transaction(hash) .best_transaction(hash)
.or_else(|| self.disk.transaction(hash)) .or_else(|| self.disk.db().transaction(hash))
} }
/// Return the hash for the block at `height` in the current best chain. /// Return the hash for the block at `height` in the current best chain.
pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> { pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
self.mem self.mem
.best_hash(height) .best_hash(height)
.or_else(|| self.disk.hash(height)) .or_else(|| self.disk.db().hash(height))
} }
/// Return true if `hash` is in the current best chain. /// Return true if `hash` is in the current best chain.
@ -474,14 +476,14 @@ impl StateService {
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> { pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem self.mem
.best_height_by_hash(hash) .best_height_by_hash(hash)
.or_else(|| self.disk.height(hash)) .or_else(|| self.disk.db().height(hash))
} }
/// Return the height for the block at `hash` in any chain. /// Return the height for the block at `hash` in any chain.
pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> { pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem self.mem
.any_height_by_hash(hash) .any_height_by_hash(hash)
.or_else(|| self.disk.height(hash)) .or_else(|| self.disk.db().height(hash))
} }
/// Return the [`Utxo`] pointed to by `outpoint` if it exists in any chain. /// Return the [`Utxo`] pointed to by `outpoint` if it exists in any chain.
@ -489,7 +491,7 @@ impl StateService {
self.mem self.mem
.any_utxo(outpoint) .any_utxo(outpoint)
.or_else(|| self.queued_blocks.utxo(outpoint)) .or_else(|| self.queued_blocks.utxo(outpoint))
.or_else(|| self.disk.utxo(outpoint)) .or_else(|| self.disk.db().utxo(outpoint))
} }
/// Return an iterator over the relevant chain of the block identified by /// Return an iterator over the relevant chain of the block identified by
@ -674,8 +676,8 @@ impl ReadStateService {
let (best_chain_sender, best_chain_receiver) = watch::channel(None); let (best_chain_sender, best_chain_receiver) = watch::channel(None);
let read_only_service = Self { let read_only_service = Self {
disk: disk.db().clone(), db: disk.db().clone(),
best_mem: best_chain_receiver, best_chain_receiver,
network: disk.network(), network: disk.network(),
}; };

View File

@ -48,7 +48,7 @@ impl Iter<'_> {
IterState::Finished => unreachable!(), IterState::Finished => unreachable!(),
}; };
if let Some(block) = service.disk.block(hash_or_height) { if let Some(block) = service.disk.db().block(hash_or_height) {
let height = block let height = block
.coinbase_height() .coinbase_height()
.expect("valid blocks have a coinbase height"); .expect("valid blocks have a coinbase height");

View File

@ -6,7 +6,7 @@ use std::collections::HashMap;
use zebra_chain::sprout; use zebra_chain::sprout;
use crate::{ use crate::{
service::{finalized_state::FinalizedState, non_finalized_state::Chain}, service::{finalized_state::ZebraDb, non_finalized_state::Chain},
PreparedBlock, ValidateContextError, PreparedBlock, ValidateContextError,
}; };
@ -20,7 +20,7 @@ use crate::{
/// treestate of any prior `JoinSplit` _within the same transaction_. /// treestate of any prior `JoinSplit` _within the same transaction_.
#[tracing::instrument(skip(finalized_state, parent_chain, prepared))] #[tracing::instrument(skip(finalized_state, parent_chain, prepared))]
pub(crate) fn anchors_refer_to_earlier_treestates( pub(crate) fn anchors_refer_to_earlier_treestates(
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
parent_chain: &Chain, parent_chain: &Chain,
prepared: &PreparedBlock, prepared: &PreparedBlock,
) -> Result<(), ValidateContextError> { ) -> Result<(), ValidateContextError> {

View File

@ -5,7 +5,7 @@ use std::collections::HashSet;
use tracing::trace; use tracing::trace;
use crate::{ use crate::{
error::DuplicateNullifierError, service::finalized_state::FinalizedState, PreparedBlock, error::DuplicateNullifierError, service::finalized_state::ZebraDb, PreparedBlock,
ValidateContextError, ValidateContextError,
}; };
@ -26,7 +26,7 @@ use crate::{
#[tracing::instrument(skip(prepared, finalized_state))] #[tracing::instrument(skip(prepared, finalized_state))]
pub(crate) fn no_duplicates_in_finalized_chain( pub(crate) fn no_duplicates_in_finalized_chain(
prepared: &PreparedBlock, prepared: &PreparedBlock,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<(), ValidateContextError> { ) -> Result<(), ValidateContextError> {
for nullifier in prepared.block.sprout_nullifiers() { for nullifier in prepared.block.sprout_nullifiers() {
if finalized_state.contains_sprout_nullifier(nullifier) { if finalized_state.contains_sprout_nullifier(nullifier) {

View File

@ -9,7 +9,7 @@ use zebra_chain::{
use crate::{ use crate::{
constants::MIN_TRANSPARENT_COINBASE_MATURITY, constants::MIN_TRANSPARENT_COINBASE_MATURITY,
service::finalized_state::FinalizedState, service::finalized_state::ZebraDb,
PreparedBlock, PreparedBlock,
ValidateContextError::{ ValidateContextError::{
self, DuplicateTransparentSpend, EarlyTransparentSpend, ImmatureTransparentCoinbaseSpend, self, DuplicateTransparentSpend, EarlyTransparentSpend, ImmatureTransparentCoinbaseSpend,
@ -39,7 +39,7 @@ pub fn transparent_spend(
prepared: &PreparedBlock, prepared: &PreparedBlock,
non_finalized_chain_unspent_utxos: &HashMap<transparent::OutPoint, transparent::Utxo>, non_finalized_chain_unspent_utxos: &HashMap<transparent::OutPoint, transparent::Utxo>,
non_finalized_chain_spent_utxos: &HashSet<transparent::OutPoint>, non_finalized_chain_spent_utxos: &HashSet<transparent::OutPoint>,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<HashMap<transparent::OutPoint, transparent::Utxo>, ValidateContextError> { ) -> Result<HashMap<transparent::OutPoint, transparent::Utxo>, ValidateContextError> {
let mut block_spends = HashMap::new(); let mut block_spends = HashMap::new();
@ -117,7 +117,7 @@ fn transparent_spend_chain_order(
block_new_outputs: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>, block_new_outputs: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
non_finalized_chain_unspent_utxos: &HashMap<transparent::OutPoint, transparent::Utxo>, non_finalized_chain_unspent_utxos: &HashMap<transparent::OutPoint, transparent::Utxo>,
non_finalized_chain_spent_utxos: &HashSet<transparent::OutPoint>, non_finalized_chain_spent_utxos: &HashSet<transparent::OutPoint>,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<transparent::Utxo, ValidateContextError> { ) -> Result<transparent::Utxo, ValidateContextError> {
if let Some(output) = block_new_outputs.get(&spend) { if let Some(output) = block_new_outputs.get(&spend) {
// reject the spend if it uses an output from this block, // reject the spend if it uses an output from this block,

View File

@ -2,8 +2,8 @@
//! //!
//! Zebra's database is implemented in 4 layers: //! Zebra's database is implemented in 4 layers:
//! - [`FinalizedState`]: queues, validates, and commits blocks, using... //! - [`FinalizedState`]: queues, validates, and commits blocks, using...
//! - [`zebra_db`]: reads and writes [`zebra_chain`] types to the database, using... //! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the database, using...
//! - [`disk_db`]: reads and writes format-specific types to the database, using... //! - [`DiskDb`]: reads and writes format-specific types to the database, using...
//! - [`disk_format`]: converts types to raw database bytes. //! - [`disk_format`]: converts types to raw database bytes.
//! //!
//! These layers allow us to split [`zebra_chain`] types for efficient database storage. //! These layers allow us to split [`zebra_chain`] types for efficient database storage.
@ -20,7 +20,7 @@ use std::{
path::Path, path::Path,
}; };
use zebra_chain::{block, history_tree::HistoryTree, parameters::Network}; use zebra_chain::{block, parameters::Network};
use crate::{ use crate::{
service::{check, QueuedFinalized}, service::{check, QueuedFinalized},
@ -37,15 +37,13 @@ mod arbitrary;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
// TODO: replace with a ZebraDb struct pub(super) use zebra_db::ZebraDb;
pub(super) use disk_db::DiskDb;
/// The finalized part of the chain state, stored in the db. /// The finalized part of the chain state, stored in the db.
#[derive(Debug)] #[derive(Debug)]
pub struct FinalizedState { pub struct FinalizedState {
/// The underlying database. /// The underlying database.
// TODO: replace with a ZebraDb struct db: ZebraDb,
db: DiskDb,
/// Queued blocks that arrived out of order, indexed by their parent block hash. /// Queued blocks that arrived out of order, indexed by their parent block hash.
queued_by_prev_hash: HashMap<block::Hash, QueuedFinalized>, queued_by_prev_hash: HashMap<block::Hash, QueuedFinalized>,
@ -67,7 +65,7 @@ pub struct FinalizedState {
impl FinalizedState { impl FinalizedState {
pub fn new(config: &Config, network: Network) -> Self { pub fn new(config: &Config, network: Network) -> Self {
let db = DiskDb::new(config, network); let db = ZebraDb::new(config, network);
let new_state = Self { let new_state = Self {
queued_by_prev_hash: HashMap::new(), queued_by_prev_hash: HashMap::new(),
@ -77,12 +75,13 @@ impl FinalizedState {
network, network,
}; };
if let Some(tip_height) = new_state.finalized_tip_height() { // TODO: move debug_stop_at_height into a task in the start command (#3442)
if let Some(tip_height) = new_state.db.finalized_tip_height() {
if new_state.is_at_stop_height(tip_height) { if new_state.is_at_stop_height(tip_height) {
let debug_stop_at_height = new_state let debug_stop_at_height = new_state
.debug_stop_at_height .debug_stop_at_height
.expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some"); .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
let tip_hash = new_state.finalized_tip_hash(); let tip_hash = new_state.db.finalized_tip_hash();
if tip_height > debug_stop_at_height { if tip_height > debug_stop_at_height {
tracing::error!( tracing::error!(
@ -108,7 +107,7 @@ impl FinalizedState {
} }
} }
tracing::info!(tip = ?new_state.tip(), "loaded Zebra state cache"); tracing::info!(tip = ?new_state.db.tip(), "loaded Zebra state cache");
new_state new_state
} }
@ -124,7 +123,7 @@ impl FinalizedState {
} }
/// Returns a reference to the inner database instance. /// Returns a reference to the inner database instance.
pub(crate) fn db(&self) -> &DiskDb { pub(crate) fn db(&self) -> &ZebraDb {
&self.db &self.db
} }
@ -146,7 +145,10 @@ impl FinalizedState {
let height = queued.0.height; let height = queued.0.height;
self.queued_by_prev_hash.insert(prev_hash, queued); self.queued_by_prev_hash.insert(prev_hash, queued);
while let Some(queued_block) = self.queued_by_prev_hash.remove(&self.finalized_tip_hash()) { while let Some(queued_block) = self
.queued_by_prev_hash
.remove(&self.db.finalized_tip_hash())
{
if let Ok(finalized) = self.commit_finalized(queued_block) { if let Ok(finalized) = self.commit_finalized(queued_block) {
highest_queue_commit = Some(finalized); highest_queue_commit = Some(finalized);
} else { } else {
@ -232,11 +234,11 @@ impl FinalizedState {
finalized: FinalizedBlock, finalized: FinalizedBlock,
source: &str, source: &str,
) -> Result<block::Hash, BoxError> { ) -> Result<block::Hash, BoxError> {
let committed_tip_hash = self.finalized_tip_hash(); let committed_tip_hash = self.db.finalized_tip_hash();
let committed_tip_height = self.finalized_tip_height(); let committed_tip_height = self.db.finalized_tip_height();
// Assert that callers (including unit tests) get the chain order correct // Assert that callers (including unit tests) get the chain order correct
if self.is_empty() { if self.db.is_empty() {
assert_eq!( assert_eq!(
committed_tip_hash, finalized.block.header.previous_block_hash, committed_tip_hash, finalized.block.header.previous_block_hash,
"the first block added to an empty state must be a genesis block, source: {}", "the first block added to an empty state must be a genesis block, source: {}",
@ -270,7 +272,7 @@ impl FinalizedState {
// the history tree root. While it _is_ checked during contextual validation, // the history tree root. While it _is_ checked during contextual validation,
// that is not called by the checkpoint verifier, and keeping a history tree there // that is not called by the checkpoint verifier, and keeping a history tree there
// would be harder to implement. // would be harder to implement.
let history_tree = self.history_tree(); let history_tree = self.db.history_tree();
check::finalized_block_commitment_is_valid_for_chain_history( check::finalized_block_commitment_is_valid_for_chain_history(
&finalized, &finalized,
self.network, self.network,
@ -280,7 +282,9 @@ impl FinalizedState {
let finalized_height = finalized.height; let finalized_height = finalized.height;
let finalized_hash = finalized.hash; let finalized_hash = finalized.hash;
let result = self.write_block(finalized, history_tree, source); let result = self
.db
.write_block(finalized, history_tree, self.network, source);
// TODO: move the stop height check to the syncer (#3442) // TODO: move the stop height check to the syncer (#3442)
if result.is_ok() && self.is_at_stop_height(finalized_height) { if result.is_ok() && self.is_at_stop_height(finalized_height) {
@ -300,54 +304,6 @@ impl FinalizedState {
result result
} }
/// Write `finalized` to the finalized state.
///
/// Uses:
/// - `history_tree`: the current tip's history tree
/// - `source`: the source of the block in log messages
///
/// # Errors
///
/// - Propagates any errors from writing to the DB
/// - Propagates any errors from updating history and note commitment trees
fn write_block(
&mut self,
finalized: FinalizedBlock,
history_tree: HistoryTree,
source: &str,
) -> Result<block::Hash, BoxError> {
let finalized_hash = finalized.hash;
// Get a list of the spent UTXOs, before we delete any from the database
let all_utxos_spent_by_block = finalized
.block
.transactions
.iter()
.flat_map(|tx| tx.inputs().iter())
.flat_map(|input| input.outpoint())
.flat_map(|outpoint| self.utxo(&outpoint).map(|utxo| (outpoint, utxo)))
.collect();
let mut batch = disk_db::DiskWriteBatch::new();
// In case of errors, propagate and do not write the batch.
batch.prepare_block_batch(
&self.db,
finalized,
self.network,
all_utxos_spent_by_block,
self.note_commitment_trees(),
history_tree,
self.finalized_value_pool(),
)?;
self.db.write(batch)?;
tracing::trace!(?source, "committed block from");
Ok(finalized_hash)
}
/// Stop the process if `block_height` is greater than or equal to the /// Stop the process if `block_height` is greater than or equal to the
/// configured stop height. /// configured stop height.
fn is_at_stop_height(&self, block_height: block::Height) -> bool { fn is_at_stop_height(&self, block_height: block::Height) -> bool {

View File

@ -2,16 +2,22 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::sync::Arc; use std::{ops::Deref, sync::Arc};
use zebra_chain::{amount::NonNegative, block::Block, sprout, value_balance::ValueBalance};
use crate::service::finalized_state::{ use crate::service::finalized_state::{
disk_db::{DiskWriteBatch, WriteDisk},
disk_format::{FromDisk, IntoDisk}, disk_format::{FromDisk, IntoDisk},
FinalizedState, FinalizedState, ZebraDb,
}; };
// Enable older test code to automatically access the inner database via Deref coercion.
impl Deref for FinalizedState {
type Target = ZebraDb;
fn deref(&self) -> &Self::Target {
self.db()
}
}
pub fn round_trip<T>(input: T) -> T pub fn round_trip<T>(input: T) -> T
where where
T: IntoDisk + FromDisk, T: IntoDisk + FromDisk,
@ -74,47 +80,3 @@ where
assert_round_trip_arc(Arc::new(input.clone())); assert_round_trip_arc(Arc::new(input.clone()));
assert_round_trip(input); assert_round_trip(input);
} }
impl FinalizedState {
/// Allow to set up a fake value pool in the database for testing purposes.
pub fn set_finalized_value_pool(&self, fake_value_pool: ValueBalance<NonNegative>) {
let mut batch = DiskWriteBatch::new();
let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap();
batch.zs_insert(value_pool_cf, (), fake_value_pool);
self.db.write(batch).unwrap();
}
/// Artificially prime the note commitment tree anchor sets with anchors
/// referenced in a block, for testing purposes _only_.
pub fn populate_with_anchors(&self, block: &Block) {
let mut batch = DiskWriteBatch::new();
let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap();
let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap();
let orchard_anchors = self.db.cf_handle("orchard_anchors").unwrap();
for transaction in block.transactions.iter() {
// Sprout
for joinsplit in transaction.sprout_groth16_joinsplits() {
batch.zs_insert(
sprout_anchors,
joinsplit.anchor,
sprout::tree::NoteCommitmentTree::default(),
);
}
// Sapling
for anchor in transaction.sapling_anchors() {
batch.zs_insert(sapling_anchors, anchor, ());
}
// Orchard
if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() {
batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ());
}
}
self.db.write(batch).unwrap();
}
}

View File

@ -182,6 +182,8 @@ impl DiskDb {
/// stdio (3), and other OS facilities (2+). /// stdio (3), and other OS facilities (2+).
const RESERVED_FILE_COUNT: u64 = 48; const RESERVED_FILE_COUNT: u64 = 48;
/// Opens or creates the database at `config.path` for `network`,
/// and returns a shared low-level database wrapper.
pub fn new(config: &Config, network: Network) -> DiskDb { pub fn new(config: &Config, network: Network) -> DiskDb {
let path = config.db_path(network); let path = config.db_path(network);
let db_options = DiskDb::options(); let db_options = DiskDb::options();

View File

@ -2,7 +2,7 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::service::finalized_state::DiskDb; use crate::service::finalized_state::disk_db::DiskDb;
impl DiskDb { impl DiskDb {
/// Returns a list of column family names in this database. /// Returns a list of column family names in this database.

View File

@ -9,8 +9,51 @@
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must //! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
//! be incremented each time the database format (column, serialization, etc) changes. //! be incremented each time the database format (column, serialization, etc) changes.
use std::path::Path;
use zebra_chain::parameters::Network;
use crate::{service::finalized_state::disk_db::DiskDb, Config};
pub mod block; pub mod block;
pub mod chain; pub mod chain;
pub mod metrics; pub mod metrics;
pub mod shielded; pub mod shielded;
pub mod transparent; pub mod transparent;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod arbitrary;
/// Wrapper struct to ensure high-level typed database access goes through the correct API.
#[derive(Clone, Debug)]
pub struct ZebraDb {
/// The inner low-level database wrapper for the RocksDB database.
/// This wrapper can be cloned and shared.
db: DiskDb,
}
impl ZebraDb {
/// Opens or creates the database at `config.path` for `network`,
/// and returns a shared high-level typed database wrapper.
pub fn new(config: &Config, network: Network) -> ZebraDb {
ZebraDb {
db: DiskDb::new(config, network),
}
}
/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
self.db.path()
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
/// `force` can cause errors accessing the database from other shared references.
/// It should only be used in debugging or test code, immediately before a manual shutdown.
///
/// See [`DiskDb::shutdown`] for details.
pub(crate) fn shutdown(&mut self, force: bool) {
self.db.shutdown(force);
}
}

View File

@ -0,0 +1,73 @@
//! Arbitrary value generation and test harnesses for high-level typed database access.
#![allow(dead_code)]
use std::ops::Deref;
use zebra_chain::{amount::NonNegative, block::Block, sprout, value_balance::ValueBalance};
use crate::service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, WriteDisk},
ZebraDb,
};
// Enable older test code to automatically access the inner database via Deref coercion.
impl Deref for ZebraDb {
type Target = DiskDb;
fn deref(&self) -> &Self::Target {
self.db()
}
}
impl ZebraDb {
/// Returns the inner database.
///
/// This is a test-only method, because it allows write access
/// and raw read access to the RocksDB instance.
pub fn db(&self) -> &DiskDb {
&self.db
}
/// Allow to set up a fake value pool in the database for testing purposes.
pub fn set_finalized_value_pool(&self, fake_value_pool: ValueBalance<NonNegative>) {
let mut batch = DiskWriteBatch::new();
let value_pool_cf = self.db().cf_handle("tip_chain_value_pool").unwrap();
batch.zs_insert(value_pool_cf, (), fake_value_pool);
self.db().write(batch).unwrap();
}
/// Artificially prime the note commitment tree anchor sets with anchors
/// referenced in a block, for testing purposes _only_.
pub fn populate_with_anchors(&self, block: &Block) {
let mut batch = DiskWriteBatch::new();
let sprout_anchors = self.db().cf_handle("sprout_anchors").unwrap();
let sapling_anchors = self.db().cf_handle("sapling_anchors").unwrap();
let orchard_anchors = self.db().cf_handle("orchard_anchors").unwrap();
for transaction in block.transactions.iter() {
// Sprout
for joinsplit in transaction.sprout_groth16_joinsplits() {
batch.zs_insert(
sprout_anchors,
joinsplit.anchor,
sprout::tree::NoteCommitmentTree::default(),
);
}
// Sapling
for anchor in transaction.sapling_anchors() {
batch.zs_insert(sapling_anchors, anchor, ());
}
// Orchard
if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() {
batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ());
}
}
self.db().write(batch).unwrap();
}
}

View File

@ -25,8 +25,8 @@ use crate::{
service::finalized_state::{ service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
disk_format::{FromDisk, TransactionLocation}, disk_format::{FromDisk, TransactionLocation},
zebra_db::shielded::NoteCommitmentTrees, zebra_db::{metrics::block_precommit_metrics, shielded::NoteCommitmentTrees, ZebraDb},
FinalizedBlock, FinalizedState, FinalizedBlock,
}, },
BoxError, HashOrHeight, BoxError, HashOrHeight,
}; };
@ -34,7 +34,7 @@ use crate::{
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
impl FinalizedState { impl ZebraDb {
// Read block methods // Read block methods
/// Returns true if the database is empty. /// Returns true if the database is empty.
@ -115,6 +115,58 @@ impl FinalizedState {
block.transactions[index.as_usize()].clone() block.transactions[index.as_usize()].clone()
}) })
} }
// Write block methods
/// Write `finalized` to the finalized state.
///
/// Uses:
/// - `history_tree`: the current tip's history tree
/// - `network`: the configured network
/// - `source`: the source of the block in log messages
///
/// # Errors
///
/// - Propagates any errors from writing to the DB
/// - Propagates any errors from updating history and note commitment trees
pub(in super::super) fn write_block(
&mut self,
finalized: FinalizedBlock,
history_tree: HistoryTree,
network: Network,
source: &str,
) -> Result<block::Hash, BoxError> {
let finalized_hash = finalized.hash;
// Get a list of the spent UTXOs, before we delete any from the database
let all_utxos_spent_by_block = finalized
.block
.transactions
.iter()
.flat_map(|tx| tx.inputs().iter())
.flat_map(|input| input.outpoint())
.flat_map(|outpoint| self.utxo(&outpoint).map(|utxo| (outpoint, utxo)))
.collect();
let mut batch = DiskWriteBatch::new();
// In case of errors, propagate and do not write the batch.
batch.prepare_block_batch(
&self.db,
finalized,
network,
all_utxos_spent_by_block,
self.note_commitment_trees(),
history_tree,
self.finalized_value_pool(),
)?;
self.db.write(batch)?;
tracing::trace!(?source, "committed block from");
Ok(finalized_hash)
}
} }
impl DiskWriteBatch { impl DiskWriteBatch {
@ -180,7 +232,7 @@ impl DiskWriteBatch {
self.prepare_chain_value_pools_batch(db, &finalized, all_utxos_spent_by_block, value_pool)?; self.prepare_chain_value_pools_batch(db, &finalized, all_utxos_spent_by_block, value_pool)?;
// The block has passed contextual validation, so update the metrics // The block has passed contextual validation, so update the metrics
FinalizedState::block_precommit_metrics(block, *hash, *height); block_precommit_metrics(block, *hash, *height);
Ok(()) Ok(())
} }

View File

@ -25,12 +25,13 @@ use zebra_chain::{
use crate::{ use crate::{
service::finalized_state::{ service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
FinalizedBlock, FinalizedState, zebra_db::ZebraDb,
FinalizedBlock,
}, },
BoxError, BoxError,
}; };
impl FinalizedState { impl ZebraDb {
/// Returns the ZIP-221 history tree of the finalized tip or `None` /// Returns the ZIP-221 history tree of the finalized tip or `None`
/// if it does not exist yet in the state (pre-Heartwood). /// if it does not exist yet in the state (pre-Heartwood).
pub fn history_tree(&self) -> HistoryTree { pub fn history_tree(&self) -> HistoryTree {

View File

@ -2,9 +2,6 @@
use zebra_chain::block::{self, Block}; use zebra_chain::block::{self, Block};
use crate::service::finalized_state::FinalizedState;
impl FinalizedState {
/// Update metrics before committing a block. /// Update metrics before committing a block.
/// ///
/// The metrics are updated after contextually validating a block, /// The metrics are updated after contextually validating a block,
@ -82,4 +79,3 @@ impl FinalizedState {
orchard_nullifier_count as u64 orchard_nullifier_count as u64
); );
} }
}

View File

@ -20,7 +20,8 @@ use zebra_chain::{
use crate::{ use crate::{
service::finalized_state::{ service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
FinalizedBlock, FinalizedState, zebra_db::ZebraDb,
FinalizedBlock,
}, },
BoxError, BoxError,
}; };
@ -33,7 +34,7 @@ pub struct NoteCommitmentTrees {
orchard: orchard::tree::NoteCommitmentTree, orchard: orchard::tree::NoteCommitmentTree,
} }
impl FinalizedState { impl ZebraDb {
// Read shielded methods // Read shielded methods
/// Returns `true` if the finalized state contains `sprout_nullifier`. /// Returns `true` if the finalized state contains `sprout_nullifier`.

View File

@ -19,12 +19,13 @@ use crate::{
service::finalized_state::{ service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
disk_format::transparent::OutputLocation, disk_format::transparent::OutputLocation,
FinalizedBlock, FinalizedState, zebra_db::ZebraDb,
FinalizedBlock,
}, },
BoxError, BoxError,
}; };
impl FinalizedState { impl ZebraDb {
// Read transparent methods // Read transparent methods
/// Returns the `transparent::Output` pointed to by the given /// Returns the `transparent::Output` pointed to by the given

View File

@ -2,14 +2,6 @@
//! //!
//! [RFC0005]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html //! [RFC0005]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html
mod chain;
mod queued_blocks;
#[cfg(test)]
mod tests;
pub use queued_blocks::QueuedBlocks;
use std::{collections::BTreeSet, mem, sync::Arc}; use std::{collections::BTreeSet, mem, sync::Arc};
use zebra_chain::{ use zebra_chain::{
@ -23,13 +15,20 @@ use zebra_chain::{
}; };
use crate::{ use crate::{
request::ContextuallyValidBlock, FinalizedBlock, HashOrHeight, PreparedBlock, request::ContextuallyValidBlock,
ValidateContextError, service::{check, finalized_state::ZebraDb},
FinalizedBlock, HashOrHeight, PreparedBlock, ValidateContextError,
}; };
pub(crate) use self::chain::Chain; mod chain;
mod queued_blocks;
use super::{check, finalized_state::FinalizedState}; #[cfg(test)]
mod tests;
pub use queued_blocks::QueuedBlocks;
pub(crate) use chain::Chain;
/// The state of the chains in memory, including queued blocks. /// The state of the chains in memory, including queued blocks.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -138,7 +137,7 @@ impl NonFinalizedState {
pub fn commit_block( pub fn commit_block(
&mut self, &mut self,
prepared: PreparedBlock, prepared: PreparedBlock,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<(), ValidateContextError> { ) -> Result<(), ValidateContextError> {
let parent_hash = prepared.block.header.previous_block_hash; let parent_hash = prepared.block.header.previous_block_hash;
let (height, hash) = (prepared.height, prepared.hash); let (height, hash) = (prepared.height, prepared.hash);
@ -174,7 +173,7 @@ impl NonFinalizedState {
pub fn commit_new_chain( pub fn commit_new_chain(
&mut self, &mut self,
prepared: PreparedBlock, prepared: PreparedBlock,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<(), ValidateContextError> { ) -> Result<(), ValidateContextError> {
let chain = Chain::new( let chain = Chain::new(
self.network, self.network,
@ -206,7 +205,7 @@ impl NonFinalizedState {
&self, &self,
new_chain: Arc<Chain>, new_chain: Arc<Chain>,
prepared: PreparedBlock, prepared: PreparedBlock,
finalized_state: &FinalizedState, finalized_state: &ZebraDb,
) -> Result<Arc<Chain>, ValidateContextError> { ) -> Result<Arc<Chain>, ValidateContextError> {
let spent_utxos = check::utxo::transparent_spend( let spent_utxos = check::utxo::transparent_spend(
&prepared, &prepared,