refactor db format upgrade and prepare_nullifiers_batch() to use ZebraDb instead of DiskDb, checks cancel_receiver before every db operation

This commit is contained in:
Arya 2024-10-04 18:52:17 -04:00
parent 511ff82396
commit ecb345aebb
3 changed files with 64 additions and 80 deletions

View File

@ -2,17 +2,13 @@
use std::sync::mpsc; use std::sync::mpsc;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use zebra_chain::block::Height;
use zebra_chain::{block::Height, transaction::Transaction};
use crate::{service::finalized_state::ZebraDb, TransactionIndex, TransactionLocation}; use crate::{service::finalized_state::ZebraDb, TransactionIndex, TransactionLocation};
use super::super::super::{DiskWriteBatch, WriteDisk}; use super::super::super::DiskWriteBatch;
use super::CancelFormatChange; use super::CancelFormatChange;
/// The number of transactions to process before writing a batch to disk.
const WRITE_BATCH_SIZE: usize = 10_000;
/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers. /// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers.
/// ///
/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled. /// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled.
@ -23,79 +19,53 @@ pub fn run(
zebra_db: &ZebraDb, zebra_db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>, cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<(), CancelFormatChange> { ) -> Result<(), CancelFormatChange> {
let db = zebra_db.db();
let tx_by_loc = db.cf_handle("tx_by_loc").unwrap();
let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap();
let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap();
let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap();
let end_bound = TransactionLocation::from_parts(initial_tip_height, TransactionIndex::MAX);
let transactions_iter = db
.zs_forward_range_iter::<_, _, Transaction, _>(tx_by_loc, ..=end_bound)
.filter(|(_, tx)| !tx.is_coinbase());
let mut batch = DiskWriteBatch::new();
let mut num_ops_in_write_batch: usize = 0;
for (tx_loc, tx) in transactions_iter {
// Return before I/O calls if the upgrade is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange); return Err(CancelFormatChange);
} }
// read spent outpoints' output locations in parallel let end_bound = TransactionLocation::from_parts(initial_tip_height, TransactionIndex::MAX);
let spent_output_locations: Vec<_> = tx
.inputs() for (tx_loc, tx) in zebra_db
.par_iter() .transactions_by_location_range(..=end_bound)
.map(|input| { .filter(|(_, tx)| !tx.is_coinbase())
{
let mut batch = DiskWriteBatch::new();
for input in tx.inputs() {
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange);
}
let spent_outpoint = input let spent_outpoint = input
.outpoint() .outpoint()
.expect("should filter out coinbase transactions"); .expect("should filter out coinbase transactions");
zebra_db let spent_output_location = zebra_db
.output_location(&spent_outpoint) .output_location(&spent_outpoint)
.expect("should have location for spent outpoint") .expect("should have location for spent outpoint");
})
.collect();
for spent_output_location in spent_output_locations {
let _ = zebra_db let _ = zebra_db
.tx_loc_by_spent_output_loc_cf() .tx_loc_by_spent_output_loc_cf()
.with_batch_for_writing(&mut batch) .with_batch_for_writing(&mut batch)
.zs_insert(&spent_output_location, &tx_loc); .zs_insert(&spent_output_location, &tx_loc);
num_ops_in_write_batch += 1;
} }
// Mark sprout, sapling and orchard nullifiers as spent batch
for sprout_nullifier in tx.sprout_nullifiers() { .prepare_nullifier_batch(zebra_db, &tx, tx_loc)
batch.zs_insert(&sprout_nullifiers, sprout_nullifier, tx_loc); .expect("should not return an error");
num_ops_in_write_batch += 1;
}
for sapling_nullifier in tx.sapling_nullifiers() {
batch.zs_insert(&sapling_nullifiers, sapling_nullifier, tx_loc);
num_ops_in_write_batch += 1;
}
for orchard_nullifier in tx.orchard_nullifiers() {
batch.zs_insert(&orchard_nullifiers, orchard_nullifier, tx_loc);
num_ops_in_write_batch += 1;
}
// Return before I/O calls if the upgrade is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange); return Err(CancelFormatChange);
} }
// write batches after processing all items in a transaction zebra_db
if num_ops_in_write_batch >= WRITE_BATCH_SIZE { .write_batch(batch)
db.write(std::mem::take(&mut batch))
.expect("unexpected database write failure"); .expect("unexpected database write failure");
num_ops_in_write_batch = 0;
}
}
// write any remaining items in the batch to the db if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
db.write(batch).expect("unexpected database write failure"); return Err(CancelFormatChange);
}
}
Ok(()) Ok(())
} }

View File

@ -11,6 +11,7 @@
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
ops::RangeBounds,
sync::Arc, sync::Arc,
}; };
@ -212,6 +213,33 @@ impl ZebraDb {
// Read transaction methods // Read transaction methods
/// Returns the [`Transaction`] with [`transaction::Hash`], and its [`Height`],
/// if a transaction with that hash exists in the finalized chain.
#[allow(clippy::unwrap_in_result)]
pub fn transaction(&self, hash: transaction::Hash) -> Option<(Arc<Transaction>, Height)> {
let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
let transaction_location = self.transaction_location(hash)?;
self.db
.zs_get(&tx_by_loc, &transaction_location)
.map(|tx| (tx, transaction_location.height))
}
/// Returns the [`Transaction`] with [`transaction::Hash`], and its [`Height`],
/// if a transaction with that hash exists in the finalized chain.
#[allow(clippy::unwrap_in_result)]
pub fn transactions_by_location_range<R>(
&self,
range: R,
) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_
where
R: RangeBounds<TransactionLocation>,
{
let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
self.db.zs_forward_range_iter(tx_by_loc, range)
}
/// Returns the [`TransactionLocation`] for [`transaction::Hash`], /// Returns the [`TransactionLocation`] for [`transaction::Hash`],
/// if it exists in the finalized chain. /// if it exists in the finalized chain.
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
@ -242,21 +270,6 @@ impl ZebraDb {
self.transaction_hash(tx_loc) self.transaction_hash(tx_loc)
} }
/// Returns the [`Transaction`] with [`transaction::Hash`], and its [`Height`],
/// if a transaction with that hash exists in the finalized chain.
//
// TODO: move this method to the start of the section
#[allow(clippy::unwrap_in_result)]
pub fn transaction(&self, hash: transaction::Hash) -> Option<(Arc<Transaction>, Height)> {
let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
let transaction_location = self.transaction_location(hash)?;
self.db
.zs_get(&tx_by_loc, &transaction_location)
.map(|tx| (tx, transaction_location.height))
}
/// Returns the [`transaction::Hash`]es in the block with `hash_or_height`, /// Returns the [`transaction::Hash`]es in the block with `hash_or_height`,
/// if it exists in this chain. /// if it exists in this chain.
/// ///
@ -482,7 +495,7 @@ impl DiskWriteBatch {
// which is already present from height 1 to the first shielded transaction. // which is already present from height 1 to the first shielded transaction.
// //
// In Zebra we include the nullifiers and note commitments in the genesis block because it simplifies our code. // In Zebra we include the nullifiers and note commitments in the genesis block because it simplifies our code.
self.prepare_shielded_transaction_batch(db, finalized)?; self.prepare_shielded_transaction_batch(zebra_db, finalized)?;
self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees)?; self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees)?;
// # Consensus // # Consensus

View File

@ -29,7 +29,7 @@ use zebra_chain::{
use crate::{ use crate::{
request::{FinalizedBlock, Treestate}, request::{FinalizedBlock, Treestate},
service::finalized_state::{ service::finalized_state::{
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_db::{DiskWriteBatch, ReadDisk, WriteDisk},
disk_format::RawBytes, disk_format::RawBytes,
zebra_db::ZebraDb, zebra_db::ZebraDb,
}, },
@ -470,7 +470,7 @@ impl DiskWriteBatch {
/// - Propagates any errors from updating note commitment trees /// - Propagates any errors from updating note commitment trees
pub fn prepare_shielded_transaction_batch( pub fn prepare_shielded_transaction_batch(
&mut self, &mut self,
db: &DiskDb, zebra_db: &ZebraDb,
finalized: &FinalizedBlock, finalized: &FinalizedBlock,
) -> Result<(), BoxError> { ) -> Result<(), BoxError> {
let FinalizedBlock { block, height, .. } = finalized; let FinalizedBlock { block, height, .. } = finalized;
@ -478,7 +478,7 @@ impl DiskWriteBatch {
// Index each transaction's shielded data // Index each transaction's shielded data
for (tx_index, transaction) in block.transactions.iter().enumerate() { for (tx_index, transaction) in block.transactions.iter().enumerate() {
let tx_loc = TransactionLocation::from_usize(*height, tx_index); let tx_loc = TransactionLocation::from_usize(*height, tx_index);
self.prepare_nullifier_batch(db, transaction, tx_loc)?; self.prepare_nullifier_batch(zebra_db, transaction, tx_loc)?;
} }
Ok(()) Ok(())
@ -493,10 +493,11 @@ impl DiskWriteBatch {
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
pub fn prepare_nullifier_batch( pub fn prepare_nullifier_batch(
&mut self, &mut self,
db: &DiskDb, zebra_db: &ZebraDb,
transaction: &Transaction, transaction: &Transaction,
transaction_location: TransactionLocation, transaction_location: TransactionLocation,
) -> Result<(), BoxError> { ) -> Result<(), BoxError> {
let db = &zebra_db.db;
let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap(); let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap();
let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap(); let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap();
let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap(); let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap();