From a02b307353207c2de11bad0800e0b663545bc696 Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 30 Sep 2024 22:18:48 -0400 Subject: [PATCH] updates nullifiers column families to include revaling transaction locations in db format upgrade --- .../finalized_state/disk_format/upgrade.rs | 9 +- .../upgrade/track_tx_locs_by_inputs.rs | 77 ------------- .../upgrade/track_tx_locs_by_spends.rs | 101 ++++++++++++++++++ 3 files changed, 104 insertions(+), 83 deletions(-) delete mode 100644 zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_inputs.rs create mode 100644 zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs index e0a8821c6..056cca253 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs @@ -27,7 +27,7 @@ use crate::{ pub(crate) mod add_subtrees; pub(crate) mod cache_genesis_roots; pub(crate) mod fix_tree_key_type; -pub(crate) mod track_tx_locs_by_inputs; +pub(crate) mod track_tx_locs_by_spends; /// The kind of database format change or validity check we're performing. #[derive(Clone, Debug, Eq, PartialEq)] @@ -559,14 +559,11 @@ impl DbFormatChange { if older_disk_version < &version_for_tx_loc_by_inputs_and_revealed_nullifiers { let timer = CodeTimer::start(); - track_tx_locs_by_inputs::run(initial_tip_height, db, cancel_receiver)?; - - // TODO: Add a db format upgrade for indexing spending tx ids (transaction locations) by - // spent outpoints (output locations) in the finalized state + track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?; // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the // database is marked, so the upgrade MUST be complete at this point. - Self::mark_as_upgraded_to(db, &version_for_tree_keys_and_caches); + Self::mark_as_upgraded_to(db, &version_for_tx_loc_by_inputs_and_revealed_nullifiers); timer.finish(module_path!(), line!(), "tree keys and caches upgrade"); } diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_inputs.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_inputs.rs deleted file mode 100644 index 47a03364d..000000000 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_inputs.rs +++ /dev/null @@ -1,77 +0,0 @@ -//! Tracks transaction locations by their inputs and revealed nullifiers. - -use std::sync::mpsc; - -use zebra_chain::{block::Height, transaction::Transaction}; - -use crate::{service::finalized_state::ZebraDb, TransactionIndex, TransactionLocation}; - -use super::CancelFormatChange; - -/// The number of transactions to process before writing a batch to disk. -const WRITE_BATCH_SIZE: usize = 1_000; - -/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers. -/// -/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled. -#[allow(clippy::unwrap_in_result)] -#[instrument(skip(zebra_db, cancel_receiver))] -pub fn run( - initial_tip_height: Height, - zebra_db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, -) -> Result<(), CancelFormatChange> { - let db = zebra_db.db(); - let tx_by_loc = db.cf_handle("tx_by_loc").unwrap(); - let end_bound = TransactionLocation::from_parts(initial_tip_height, TransactionIndex::MAX); - - let transactions_iter = db - .zs_forward_range_iter::<_, _, Transaction, _>(tx_by_loc, ..=end_bound) - .filter(|(_, tx)| !tx.is_coinbase()) - .map(|(tx_loc, tx)| (tx_loc, tx.inputs().to_vec())); - - let new_batch = || { - zebra_db - .tx_loc_by_spent_output_loc_cf() - .new_batch_for_writing() - }; - - let mut batch = new_batch(); - let mut num_inputs_processed_since_write: usize = 0; - for (tx_loc, inputs) in transactions_iter { - // Return before I/O calls if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { - return Err(CancelFormatChange); - } - - for input in inputs { - let spent_outpoint = input - .outpoint() - .expect("should filter out coinbase transactions"); - let spent_output_location = zebra_db - .output_location(&spent_outpoint) - .expect("should have location for spent outpoint"); - - batch = batch.zs_insert(&spent_output_location, &tx_loc); - num_inputs_processed_since_write += 1; - } - - // Return before I/O calls if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { - return Err(CancelFormatChange); - } - - if num_inputs_processed_since_write >= WRITE_BATCH_SIZE { - std::mem::replace(&mut batch, new_batch()) - .write_batch() - .expect("unexpected database write failure"); - num_inputs_processed_since_write = 0; - } - } - - batch - .write_batch() - .expect("unexpected database write failure"); - - Ok(()) -} diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs new file mode 100644 index 000000000..1ffb97fa6 --- /dev/null +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs @@ -0,0 +1,101 @@ +//! Tracks transaction locations by their inputs and revealed nullifiers. + +use std::sync::mpsc; + +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use zebra_chain::{block::Height, transaction::Transaction}; + +use crate::{service::finalized_state::ZebraDb, TransactionIndex, TransactionLocation}; + +use super::super::super::{DiskWriteBatch, WriteDisk}; +use super::CancelFormatChange; + +/// The number of transactions to process before writing a batch to disk. +const WRITE_BATCH_SIZE: usize = 10_000; + +/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers. +/// +/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled. +#[allow(clippy::unwrap_in_result)] +#[instrument(skip(zebra_db, cancel_receiver))] +pub fn run( + initial_tip_height: Height, + zebra_db: &ZebraDb, + cancel_receiver: &mpsc::Receiver, +) -> Result<(), CancelFormatChange> { + let db = zebra_db.db(); + let tx_by_loc = db.cf_handle("tx_by_loc").unwrap(); + let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap(); + let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap(); + let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap(); + let end_bound = TransactionLocation::from_parts(initial_tip_height, TransactionIndex::MAX); + + let transactions_iter = db + .zs_forward_range_iter::<_, _, Transaction, _>(tx_by_loc, ..=end_bound) + .filter(|(_, tx)| !tx.is_coinbase()); + + let mut batch = DiskWriteBatch::new(); + let mut num_ops_in_write_batch: usize = 0; + for (tx_loc, tx) in transactions_iter { + // Return before I/O calls if the upgrade is cancelled. + if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + // read spent outpoints' output locations in parallel + let spent_output_locations: Vec<_> = tx + .inputs() + .par_iter() + .map(|input| { + let spent_outpoint = input + .outpoint() + .expect("should filter out coinbase transactions"); + + zebra_db + .output_location(&spent_outpoint) + .expect("should have location for spent outpoint") + }) + .collect(); + + for spent_output_location in spent_output_locations { + let _ = zebra_db + .tx_loc_by_spent_output_loc_cf() + .with_batch_for_writing(&mut batch) + .zs_insert(&spent_output_location, &tx_loc); + num_ops_in_write_batch += 1; + } + + // Mark sprout, sapling and orchard nullifiers as spent + for sprout_nullifier in tx.sprout_nullifiers() { + batch.zs_insert(&sprout_nullifiers, sprout_nullifier, tx_loc); + num_ops_in_write_batch += 1; + } + + for sapling_nullifier in tx.sapling_nullifiers() { + batch.zs_insert(&sapling_nullifiers, sapling_nullifier, tx_loc); + num_ops_in_write_batch += 1; + } + + for orchard_nullifier in tx.orchard_nullifiers() { + batch.zs_insert(&orchard_nullifiers, orchard_nullifier, tx_loc); + num_ops_in_write_batch += 1; + } + + // Return before I/O calls if the upgrade is cancelled. + if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + // write batches after processing all items in a transaction + if num_ops_in_write_batch >= WRITE_BATCH_SIZE { + db.write(std::mem::take(&mut batch)) + .expect("unexpected database write failure"); + num_ops_in_write_batch = 0; + } + } + + // write any remaining items in the batch to the db + db.write(batch).expect("unexpected database write failure"); + + Ok(()) +}