From 1b5a24a65580fa17739e311060123329412a0a7a Mon Sep 17 00:00:00 2001 From: Kris Nuttycombe Date: Wed, 5 Jul 2023 23:08:00 -0600 Subject: [PATCH 1/2] Batch data store writes of `put_block` Instead of calling `put_block` for each block scanned, `scan_cached_blocks` will now defer the block writes until the scan of a batch is complete and will perform the block writes and note commitment tree updates all within a single transaction. This should ordinarily be fine in terms of memory consumption, because the block data being saved is pruned to only that spend an output information that is related to transactions in the wallet, which will normally be sparse enough that the block range size that is appropriate for a given platform to run within a batch scanner adequately bounds the memory consumption of this pruned representation. --- zcash_client_backend/CHANGELOG.md | 6 +- zcash_client_backend/src/data_api.rs | 8 +- zcash_client_backend/src/data_api/chain.rs | 4 +- zcash_client_sqlite/src/lib.rs | 86 +++++++++++++--------- zcash_client_sqlite/src/wallet.rs | 4 +- 5 files changed, 62 insertions(+), 46 deletions(-) diff --git a/zcash_client_backend/CHANGELOG.md b/zcash_client_backend/CHANGELOG.md index 618bf2c88..02297d45a 100644 --- a/zcash_client_backend/CHANGELOG.md +++ b/zcash_client_backend/CHANGELOG.md @@ -15,9 +15,9 @@ and this library adheres to Rust's notion of - `NullifierQuery` for use with `WalletRead::get_sapling_nullifiers` - `ScannedBlock` - `ShieldedProtocol` - - `WalletRead::{block_metadata, block_fully_scanned, suggest_scan_ranges}` - - `WalletWrite::put_block` - `WalletCommitmentTrees` + - `WalletRead::{block_metadata, block_fully_scanned, suggest_scan_ranges}` + - `WalletWrite::put_blocks` - `chain::CommitmentTreeRoot` - `testing::MockWalletDb::new` - `wallet::input_sellection::Proposal::{min_target_height, min_anchor_height}`: @@ -75,7 +75,7 @@ and this library adheres to Rust's notion of - `WalletRead::{get_commitment_tree, get_witnesses}` have been removed without replacement. The utility of these methods is now subsumed by those available from the `WalletCommitmentTrees` trait. - - `WalletWrite::advance_by_block` (use `WalletWrite::put_block` instead). + - `WalletWrite::advance_by_block` (use `WalletWrite::put_blocks` instead). - `PrunedBlock` has been replaced by `ScannedBlock` - `testing::MockWalletDb`, which is available under the `test-dependencies` feature flag, has been modified by the addition of a `sapling_tree` property. diff --git a/zcash_client_backend/src/data_api.rs b/zcash_client_backend/src/data_api.rs index 216fc7174..ceae06764 100644 --- a/zcash_client_backend/src/data_api.rs +++ b/zcash_client_backend/src/data_api.rs @@ -494,9 +494,9 @@ pub trait WalletWrite: WalletRead { /// Updates the state of the wallet database by persisting the provided block information, /// along with the note commitments that were detected when scanning the block for transactions /// pertaining to this wallet. - fn put_block( + fn put_blocks( &mut self, - block: ScannedBlock, + block: Vec>, ) -> Result, Self::Error>; /// Caches a decrypted transaction in the persistent wallet store. @@ -771,9 +771,9 @@ pub mod testing { } #[allow(clippy::type_complexity)] - fn put_block( + fn put_blocks( &mut self, - _block: ScannedBlock, + _blocks: Vec>, ) -> Result, Self::Error> { Ok(vec![]) } diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index 49d0f9005..7b45bb090 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -237,6 +237,7 @@ where batch_runner.flush(); + let mut scanned_blocks = vec![]; block_source.with_blocks::<_, DbT::Error>( Some(from_height), Some(limit), @@ -265,11 +266,12 @@ where })); prior_block_metadata = Some(*scanned_block.metadata()); - data_db.put_block(scanned_block).map_err(Error::Wallet)?; + scanned_blocks.push(scanned_block); Ok(()) }, )?; + data_db.put_blocks(scanned_blocks).map_err(Error::Wallet)?; Ok(()) } diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index c3d9b04bd..4bb84e596 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -394,53 +394,67 @@ impl WalletWrite for WalletDb ) } - #[tracing::instrument(skip_all, fields(height = u32::from(block.height())))] + #[tracing::instrument(skip_all, fields(height = blocks.first().map(|b| u32::from(b.height()))))] #[allow(clippy::type_complexity)] - fn put_block( + fn put_blocks( &mut self, - block: ScannedBlock, + blocks: Vec>, ) -> Result, Self::Error> { self.transactionally(|wdb| { - // Insert the block into the database. - wallet::put_block( - wdb.conn.0, - block.height(), - block.block_hash(), - block.block_time(), - block.metadata().sapling_tree_size(), - )?; - + let start_position = blocks.first().map(|block| { + Position::from( + u64::from(block.metadata().sapling_tree_size()) + - u64::try_from(block.sapling_commitments().len()).unwrap(), + ) + }); let mut wallet_note_ids = vec![]; - for tx in block.transactions() { - let tx_row = wallet::put_tx_meta(wdb.conn.0, tx, block.height())?; + let mut sapling_commitments = vec![]; + let mut end_height = None; - // Mark notes as spent and remove them from the scanning cache - for spend in &tx.sapling_spends { - wallet::sapling::mark_sapling_note_spent(wdb.conn.0, tx_row, spend.nf())?; + for block in blocks.into_iter() { + // Insert the block into the database. + wallet::put_block( + wdb.conn.0, + block.height(), + block.block_hash(), + block.block_time(), + block.metadata().sapling_tree_size(), + )?; + + for tx in block.transactions() { + let tx_row = wallet::put_tx_meta(wdb.conn.0, tx, block.height())?; + + // Mark notes as spent and remove them from the scanning cache + for spend in &tx.sapling_spends { + wallet::sapling::mark_sapling_note_spent(wdb.conn.0, tx_row, spend.nf())?; + } + + for output in &tx.sapling_outputs { + let received_note_id = + wallet::sapling::put_received_note(wdb.conn.0, output, tx_row)?; + + // Save witness for note. + wallet_note_ids.push(received_note_id); + } } - for output in &tx.sapling_outputs { - let received_note_id = - wallet::sapling::put_received_note(wdb.conn.0, output, tx_row)?; - - // Save witness for note. - wallet_note_ids.push(received_note_id); - } + end_height = Some(block.height()); + sapling_commitments.extend(block.into_sapling_commitments().into_iter()); } - let block_height = block.height(); - let sapling_tree_size = block.metadata().sapling_tree_size(); - let sapling_commitments_len = block.sapling_commitments().len(); - let mut sapling_commitments = block.into_sapling_commitments().into_iter(); - wdb.with_sapling_tree_mut::<_, _, SqliteClientError>(move |sapling_tree| { - let start_position = Position::from(u64::from(sapling_tree_size)) - - u64::try_from(sapling_commitments_len).unwrap(); - sapling_tree.batch_insert(start_position, &mut sapling_commitments)?; - Ok(()) - })?; + // We will have a start position and an end height in all cases where `blocks` is + // non-empty. + if let Some((start_position, end_height)) = start_position.zip(end_height) { + // Update the Sapling note commitment tree with all newly read note commitments + let mut sapling_commitments = sapling_commitments.into_iter(); + wdb.with_sapling_tree_mut::<_, _, SqliteClientError>(move |sapling_tree| { + sapling_tree.batch_insert(start_position, &mut sapling_commitments)?; + Ok(()) + })?; - // Update now-expired transactions that didn't get mined. - wallet::update_expired_notes(wdb.conn.0, block_height)?; + // Update now-expired transactions that didn't get mined. + wallet::update_expired_notes(wdb.conn.0, end_height)?; + } Ok(wallet_note_ids) }) diff --git a/zcash_client_sqlite/src/wallet.rs b/zcash_client_sqlite/src/wallet.rs index d70fea082..1f67e8d4d 100644 --- a/zcash_client_sqlite/src/wallet.rs +++ b/zcash_client_sqlite/src/wallet.rs @@ -1100,7 +1100,7 @@ pub(crate) fn put_legacy_transparent_utxo( /// as expired, up to the given block height. pub(crate) fn update_expired_notes( conn: &rusqlite::Connection, - height: BlockHeight, + expiry_height: BlockHeight, ) -> Result<(), SqliteClientError> { let mut stmt_update_expired = conn.prepare_cached( "UPDATE sapling_received_notes SET spent = NULL WHERE EXISTS ( @@ -1108,7 +1108,7 @@ pub(crate) fn update_expired_notes( WHERE id_tx = sapling_received_notes.spent AND block IS NULL AND expiry_height < ? )", )?; - stmt_update_expired.execute([u32::from(height)])?; + stmt_update_expired.execute([u32::from(expiry_height)])?; Ok(()) } From d55fa094644f989371275dd5480372141e2f9fed Mon Sep 17 00:00:00 2001 From: Kris Nuttycombe Date: Thu, 6 Jul 2023 08:18:57 -0600 Subject: [PATCH 2/2] Add a check to ensure that blocks passed to `put_blocks` are sequential. --- zcash_client_backend/src/data_api.rs | 2 ++ zcash_client_sqlite/src/error.rs | 4 ++++ zcash_client_sqlite/src/lib.rs | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/zcash_client_backend/src/data_api.rs b/zcash_client_backend/src/data_api.rs index ceae06764..ad7a8bdda 100644 --- a/zcash_client_backend/src/data_api.rs +++ b/zcash_client_backend/src/data_api.rs @@ -494,6 +494,8 @@ pub trait WalletWrite: WalletRead { /// Updates the state of the wallet database by persisting the provided block information, /// along with the note commitments that were detected when scanning the block for transactions /// pertaining to this wallet. + /// + /// `blocks` must be sequential, in order of increasing block height fn put_blocks( &mut self, block: Vec>, diff --git a/zcash_client_sqlite/src/error.rs b/zcash_client_sqlite/src/error.rs index 1dd14f1c2..9058359d4 100644 --- a/zcash_client_sqlite/src/error.rs +++ b/zcash_client_sqlite/src/error.rs @@ -57,6 +57,9 @@ pub enum SqliteClientError { /// different hash. This indicates that a required rewind was not performed. BlockConflict(BlockHeight), + /// A range of blocks provided to the database as a unit was non-sequential + NonSequentialBlocks, + /// A requested rewind would violate invariants of the storage layer. The payload returned with /// this error is (safe rewind height, requested height). RequestedRewindInvalid(BlockHeight, BlockHeight), @@ -118,6 +121,7 @@ impl fmt::Display for SqliteClientError { SqliteClientError::Io(e) => write!(f, "{}", e), SqliteClientError::InvalidMemo(e) => write!(f, "{}", e), SqliteClientError::BlockConflict(h) => write!(f, "A block hash conflict occurred at height {}; rewind required.", u32::from(*h)), + SqliteClientError::NonSequentialBlocks => write!(f, "`put_blocks` requires that the provided block range be sequential"), SqliteClientError::DiversifierIndexOutOfRange => write!(f, "The space of available diversifier indices is exhausted"), SqliteClientError::KeyDerivationError(acct_id) => write!(f, "Key derivation failed for account {:?}", acct_id), SqliteClientError::AccountIdDiscontinuity => write!(f, "Wallet account identifiers must be sequential."), diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 4bb84e596..1686145ad 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -412,6 +412,10 @@ impl WalletWrite for WalletDb let mut end_height = None; for block in blocks.into_iter() { + if end_height.iter().any(|prev| block.height() != *prev + 1) { + return Err(SqliteClientError::NonSequentialBlocks); + } + // Insert the block into the database. wallet::put_block( wdb.conn.0,