From d16c124ffe51b444cb755c7ffdee4344a3885a9a Mon Sep 17 00:00:00 2001 From: Kris Nuttycombe Date: Thu, 20 Aug 2020 11:41:43 -0600 Subject: [PATCH] Abstract over data access in scan_cached_blocks. --- zcash_client_backend/src/data_api/chain.rs | 27 ++- zcash_client_backend/src/data_api/mod.rs | 87 ++++--- zcash_client_backend/src/wallet.rs | 5 + zcash_client_sqlite/src/lib.rs | 258 +++++++++++++++++++-- zcash_client_sqlite/src/scan.rs | 255 ++++++-------------- 5 files changed, 384 insertions(+), 248 deletions(-) diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index de2fcbb82..a2e912381 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -25,18 +25,17 @@ pub const ANCHOR_OFFSET: u32 = 10; /// - `Err(e)` if there was an error during validation unrelated to chain validity. /// /// This function does not mutate either of the databases. -pub fn validate_combined_chain< - E0, - N, +pub fn validate_combined_chain<'db, E0, N, E, P, C, D>( + parameters: &P, + cache: &C, + data: &'db D, +) -> Result<(), E> +where E: From>, P: consensus::Parameters, C: CacheOps, - D: DBOps, ->( - parameters: &P, - cache: &C, - data: &D, -) -> Result<(), E> { + &'db D: DBOps, +{ let sapling_activation_height = parameters .activation_height(NetworkUpgrade::Sapling) .ok_or(Error::SaplingNotActive)?; @@ -85,9 +84,13 @@ pub fn validate_combined_chain< /// Determines the target height for a transaction, and the height from which to /// select anchors, based on the current synchronised block chain. -pub fn get_target_and_anchor_heights>, D: DBOps>( - data: &D, -) -> Result<(BlockHeight, BlockHeight), E> { +pub fn get_target_and_anchor_heights<'db, E0, N, E, D>( + data: &'db D, +) -> Result<(BlockHeight, BlockHeight), E> +where + E: From>, + &'db D: DBOps, +{ data.block_height_extrema().and_then(|heights| { match heights { Some((min_height, max_height)) => { diff --git a/zcash_client_backend/src/data_api/mod.rs b/zcash_client_backend/src/data_api/mod.rs index c3b130bd7..27ca46527 100644 --- a/zcash_client_backend/src/data_api/mod.rs +++ b/zcash_client_backend/src/data_api/mod.rs @@ -8,17 +8,18 @@ use zcash_primitives::{ zip32::ExtendedFullViewingKey, }; -use crate::proto::compact_formats::CompactBlock; +use crate::{ + proto::compact_formats::CompactBlock, + wallet::{AccountId, WalletShieldedOutput, WalletTx}, +}; pub mod chain; pub mod error; pub trait DBOps { type Error; - type AccountId; - type NoteId; - // type TxRef; // Backend-specific transaction handle - // type NoteRef; // Backend-specific note identifier` + type NoteRef: Copy; // Backend-specific note identifier + type Mutator: DBUpdate; fn init_db(&self) -> Result<(), Self::Error>; @@ -49,19 +50,19 @@ pub trait DBOps { fn get_address( &self, params: &P, - account: Self::AccountId, + account: AccountId, ) -> Result, Self::Error>; - fn get_balance(&self, account: Self::AccountId) -> Result; + fn get_balance(&self, account: AccountId) -> Result; - fn get_verified_balance(&self, account: Self::AccountId) -> Result; + fn get_verified_balance(&self, account: AccountId) -> Result; fn get_received_memo_as_utf8( &self, - id_note: Self::NoteId, + id_note: Self::NoteRef, ) -> Result, Self::Error>; - fn get_sent_memo_as_utf8(&self, id_note: Self::NoteId) -> Result, Self::Error>; + fn get_sent_memo_as_utf8(&self, id_note: Self::NoteRef) -> Result, Self::Error>; fn get_extended_full_viewing_keys( &self, @@ -76,35 +77,57 @@ pub trait DBOps { fn get_witnesses( &self, block_height: BlockHeight, - ) -> Result)>, Self::Error>; + ) -> Result)>, Self::Error>; - fn get_nullifiers(&self) -> Result, Self::AccountId)>, Self::Error>; + fn get_nullifiers(&self) -> Result, AccountId)>, Self::Error>; + + fn get_mutator(&self) -> Result; + + fn transactionally(&self, mutator: &mut Self::Mutator, f: F) -> Result<(), Self::Error> + where + F: FnOnce(&mut Self::Mutator) -> Result<(), Self::Error>; - // fn get_witnesses(block_height: BlockHeight) -> Result>>, Self::Error>; - // - // fn get_nullifiers() -> Result<(Vec, Account), Self::Error>; - // - // fn create_block(block_height: BlockHeight, hash: BlockHash, time: u32, sapling_tree: CommitmentTree) -> Result<(), Self::Error>; - // - // fn put_transaction(transaction: Transaction, block_height: BlockHeight) -> Result; - // - // fn get_txref(txid: TxId) -> Result, Self::Error>; - // - // fn mark_spent(tx_ref: Self::TxRef, nullifier: Vec) -> Result<(), Self::Error>; - // - // fn put_note(output: WalletShieldedOutput, tx_ref: Self::TxRef, nullifier: Vec) -> Result<(), Self::Error>; - // - // fn get_note(tx_ref: Self::TxRef, output_index: i64) -> Result; - // - // fn prune_witnesses(to_height: BlockHeight) -> Result<(), Self::Error>; - // - // fn mark_expired_unspent(to_height: BlockHeight) -> Result<(), Self::Error>; - // // fn put_sent_note(tx_ref: Self::TxRef, output: DecryptedOutput) -> Result<(), Self::Error>; // // fn put_received_note(tx_ref: Self::TxRef, output: DecryptedOutput) -> Result<(), Self::Error>; } +pub trait DBUpdate { + type Error; + type TxRef: Copy; + type NoteRef: Copy; + + fn insert_block( + &mut self, + block_height: BlockHeight, + block_hash: BlockHash, + block_time: u32, + commitment_tree: &CommitmentTree, + ) -> Result<(), Self::Error>; + + fn put_tx(&mut self, tx: &WalletTx, height: BlockHeight) -> Result; + + fn mark_spent(&mut self, tx_ref: Self::TxRef, nf: &Vec) -> Result<(), Self::Error>; + + fn put_note( + &mut self, + output: &WalletShieldedOutput, + nf: &Vec, + tx_ref: Self::TxRef, + ) -> Result; + + fn insert_witness( + &mut self, + note_id: Self::NoteRef, + witness: &IncrementalWitness, + height: BlockHeight, + ) -> Result<(), Self::Error>; + + fn prune_witnesses(&mut self, from_height: BlockHeight) -> Result<(), Self::Error>; + + fn update_expired_notes(&mut self, from_height: BlockHeight) -> Result<(), Self::Error>; +} + pub trait CacheOps { type Error; diff --git a/zcash_client_backend/src/wallet.rs b/zcash_client_backend/src/wallet.rs index 6cbe21f8b..609edc189 100644 --- a/zcash_client_backend/src/wallet.rs +++ b/zcash_client_backend/src/wallet.rs @@ -8,6 +8,11 @@ use zcash_primitives::{ transaction::TxId, }; + +/// A type-safe wrapper for account identifiers. +#[derive(Debug, Copy, Clone)] +pub struct AccountId(pub u32); + /// A subset of a [`Transaction`] relevant to wallets and light clients. /// /// [`Transaction`]: zcash_primitives::transaction::Transaction diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 929523dbe..ccd2c046f 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -27,7 +27,9 @@ use std::fmt; use std::path::Path; -use rusqlite::Connection; +use rusqlite::{types::ToSql, Connection, Statement, NO_PARAMS}; + +use ff::PrimeField; use zcash_primitives::{ block::BlockHash, @@ -40,9 +42,10 @@ use zcash_primitives::{ }; use zcash_client_backend::{ - data_api::{CacheOps, DBOps}, + data_api::{CacheOps, DBOps, DBUpdate}, encoding::encode_payment_address, proto::compact_formats::CompactBlock, + wallet::{AccountId, WalletShieldedOutput, WalletTx}, }; use crate::error::SqliteClientError; @@ -54,9 +57,6 @@ pub mod query; pub mod scan; pub mod transact; -#[derive(Debug, Copy, Clone)] -pub struct AccountId(pub u32); - #[derive(Debug, Copy, Clone)] pub struct NoteId(pub i64); @@ -74,10 +74,10 @@ impl DataConnection { } } -impl DBOps for DataConnection { +impl<'a> DBOps for &'a DataConnection { type Error = SqliteClientError; - type AccountId = AccountId; - type NoteId = NoteId; + type NoteRef = NoteId; + type Mutator = DBMutator<'a>; fn init_db(&self) -> Result<(), Self::Error> { init::init_data_database(self).map_err(SqliteClientError::from) @@ -120,27 +120,27 @@ impl DBOps for DataConnection { fn get_address( &self, params: &P, - account: Self::AccountId, + account: AccountId, ) -> Result, Self::Error> { query::get_address(self, params, account) } - fn get_balance(&self, account: Self::AccountId) -> Result { + fn get_balance(&self, account: AccountId) -> Result { query::get_balance(self, account) } - fn get_verified_balance(&self, account: Self::AccountId) -> Result { + fn get_verified_balance(&self, account: AccountId) -> Result { query::get_verified_balance(self, account) } fn get_received_memo_as_utf8( &self, - id_note: Self::NoteId, + id_note: Self::NoteRef, ) -> Result, Self::Error> { query::get_received_memo_as_utf8(self, id_note) } - fn get_sent_memo_as_utf8(&self, id_note: Self::NoteId) -> Result, Self::Error> { + fn get_sent_memo_as_utf8(&self, id_note: Self::NoteRef) -> Result, Self::Error> { query::get_sent_memo_as_utf8(self, id_note) } @@ -161,13 +161,241 @@ impl DBOps for DataConnection { fn get_witnesses( &self, block_height: BlockHeight, - ) -> Result)>, Self::Error> { + ) -> Result)>, Self::Error> { query::get_witnesses(self, block_height) } - fn get_nullifiers(&self) -> Result, Self::AccountId)>, Self::Error> { + fn get_nullifiers(&self) -> Result, AccountId)>, Self::Error> { query::get_nullifiers(self) } + + fn get_mutator(&self) -> Result { + Ok( + DBMutator { + conn: self, + stmt_insert_block: self.0.prepare( + "INSERT INTO blocks (height, hash, time, sapling_tree) + VALUES (?, ?, ?, ?)", + )?, + stmt_insert_tx: self.0.prepare( + "INSERT INTO transactions (txid, block, tx_index) + VALUES (?, ?, ?)", + )?, + stmt_update_tx: self.0.prepare( + "UPDATE transactions + SET block = ?, tx_index = ? WHERE txid = ?", + )?, + stmt_select_tx: self.0.prepare( + "SELECT id_tx FROM transactions WHERE txid = ?", + )?, + stmt_mark_spent_note: self.0.prepare( + "UPDATE received_notes SET spent = ? WHERE nf = ?" + )?, + stmt_update_note: self.0.prepare( + "UPDATE received_notes + SET account = ?, diversifier = ?, value = ?, rcm = ?, nf = ?, is_change = ? + WHERE tx = ? AND output_index = ?", + )?, + stmt_insert_note: self.0.prepare( + "INSERT INTO received_notes (tx, output_index, account, diversifier, value, rcm, nf, is_change) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + )?, + stmt_select_note: self.0.prepare( + "SELECT id_note FROM received_notes WHERE tx = ? AND output_index = ?" + )?, + stmt_insert_witness: self.0.prepare( + "INSERT INTO sapling_witnesses (note, block, witness) + VALUES (?, ?, ?)", + )?, + stmt_prune_witnesses: self.0.prepare( + "DELETE FROM sapling_witnesses WHERE block < ?" + )?, + stmt_update_expired: self.0.prepare( + "UPDATE received_notes SET spent = NULL WHERE EXISTS ( + SELECT id_tx FROM transactions + WHERE id_tx = received_notes.spent AND block IS NULL AND expiry_height < ? + )", + )?, + } + ) + } + + fn transactionally(&self, mutator: &mut Self::Mutator, f: F) -> Result<(), Self::Error> + where + F: FnOnce(&mut Self::Mutator) -> Result<(), Self::Error>, + { + self.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?; + match f(mutator) { + Ok(_) => { + self.0.execute("COMMIT", NO_PARAMS)?; + Ok(()) + } + Err(error) => { + match self.0.execute("ROLLBACK", NO_PARAMS) { + Ok(_) => Err(error), + // REVIEW: If rollback fails, what do we want to do? I think that + // panicking here is probably the right thing to do, because it + // means the database is corrupt? + Err(e) => panic!( + "Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.", + e, + error.0 + ) + } + } + } + } +} + +pub struct DBMutator<'a> { + conn: &'a DataConnection, + stmt_insert_block: Statement<'a>, + stmt_insert_tx: Statement<'a>, + stmt_update_tx: Statement<'a>, + stmt_select_tx: Statement<'a>, + stmt_mark_spent_note: Statement<'a>, + stmt_insert_note: Statement<'a>, + stmt_update_note: Statement<'a>, + stmt_select_note: Statement<'a>, + stmt_insert_witness: Statement<'a>, + stmt_prune_witnesses: Statement<'a>, + stmt_update_expired: Statement<'a>, +} + +impl<'a> DBUpdate for DBMutator<'a> { + type Error = SqliteClientError; + type TxRef = i64; + type NoteRef = NoteId; + + fn insert_block( + &mut self, + block_height: BlockHeight, + block_hash: BlockHash, + block_time: u32, + commitment_tree: &CommitmentTree, + ) -> Result<(), Self::Error> { + let mut encoded_tree = Vec::new(); + + commitment_tree + .write(&mut encoded_tree) + .expect("Should be able to write to a Vec"); + + self.stmt_insert_block.execute(&[ + u32::from(block_height).to_sql()?, + block_hash.0.to_sql()?, + block_time.to_sql()?, + encoded_tree.to_sql()?, + ])?; + + Ok(()) + } + + fn put_tx(&mut self, tx: &WalletTx, height: BlockHeight) -> Result { + let txid = tx.txid.0.to_vec(); + if self.stmt_update_tx.execute(&[ + u32::from(height).to_sql()?, + (tx.index as i64).to_sql()?, + txid.to_sql()?, + ])? == 0 + { + // It isn't there, so insert our transaction into the database. + self.stmt_insert_tx.execute(&[ + txid.to_sql()?, + u32::from(height).to_sql()?, + (tx.index as i64).to_sql()?, + ])?; + + Ok(self.conn.0.last_insert_rowid()) + } else { + // It was there, so grab its row number. + self.stmt_select_tx + .query_row(&[txid], |row| row.get(0)) + .map_err(SqliteClientError::from) + } + } + + fn mark_spent(&mut self, tx_ref: Self::TxRef, nf: &Vec) -> Result<(), Self::Error> { + self.stmt_mark_spent_note + .execute(&[tx_ref.to_sql()?, nf.to_sql()?])?; + Ok(()) + } + + fn put_note( + &mut self, + output: &WalletShieldedOutput, + nf: &Vec, + tx_ref: Self::TxRef, + ) -> Result { + let rcm = output.note.rcm().to_repr(); + // Assumptions: + // - A transaction will not contain more than 2^63 shielded outputs. + // - A note value will never exceed 2^63 zatoshis. + + // First try updating an existing received note into the database. + if self.stmt_update_note.execute(&[ + (output.account as i64).to_sql()?, + output.to.diversifier().0.to_sql()?, + (output.note.value as i64).to_sql()?, + rcm.as_ref().to_sql()?, + nf.to_sql()?, + output.is_change.to_sql()?, + tx_ref.to_sql()?, + (output.index as i64).to_sql()?, + ])? == 0 + { + // It isn't there, so insert our note into the database. + self.stmt_insert_note.execute(&[ + tx_ref.to_sql()?, + (output.index as i64).to_sql()?, + (output.account as i64).to_sql()?, + output.to.diversifier().0.to_sql()?, + (output.note.value as i64).to_sql()?, + rcm.as_ref().to_sql()?, + nf.to_sql()?, + output.is_change.to_sql()?, + ])?; + + Ok(NoteId(self.conn.0.last_insert_rowid())) + } else { + // It was there, so grab its row number. + self.stmt_select_note + .query_row( + &[tx_ref.to_sql()?, (output.index as i64).to_sql()?], + |row| row.get(0).map(NoteId), + ) + .map_err(SqliteClientError::from) + } + } + + fn insert_witness( + &mut self, + note_id: Self::NoteRef, + witness: &IncrementalWitness, + height: BlockHeight, + ) -> Result<(), Self::Error> { + let mut encoded = Vec::new(); + witness + .write(&mut encoded) + .expect("Should be able to write to a Vec"); + self.stmt_insert_witness.execute(&[ + note_id.0.to_sql()?, + u32::from(height).to_sql()?, + encoded.to_sql()?, + ])?; + + Ok(()) + } + + fn prune_witnesses(&mut self, below_height: BlockHeight) -> Result<(), Self::Error> { + self.stmt_prune_witnesses + .execute(&[u32::from(below_height)])?; + Ok(()) + } + + fn update_expired_notes(&mut self, height: BlockHeight) -> Result<(), Self::Error> { + self.stmt_update_expired.execute(&[u32::from(height)])?; + Ok(()) + } } pub struct CacheConnection(Connection); diff --git a/zcash_client_sqlite/src/scan.rs b/zcash_client_sqlite/src/scan.rs index f51f040d7..dbfa457ae 100644 --- a/zcash_client_sqlite/src/scan.rs +++ b/zcash_client_sqlite/src/scan.rs @@ -5,38 +5,33 @@ use protobuf::parse_from_bytes; use rusqlite::{types::ToSql, OptionalExtension, NO_PARAMS}; +use zcash_primitives::{ + block::BlockHash, + consensus::{self, BlockHeight, NetworkUpgrade}, + merkle_tree::CommitmentTree, + transaction::Transaction, +}; + use zcash_client_backend::{ address::RecipientAddress, data_api::{ error::{ChainInvalid, Error}, - CacheOps, DBOps, + CacheOps, DBOps, DBUpdate, }, decrypt_transaction, encoding::decode_extended_full_viewing_key, proto::compact_formats::CompactBlock, + wallet::WalletTx, welding_rig::scan_block, }; -use zcash_primitives::{ - consensus::{self, BlockHeight, NetworkUpgrade}, - merkle_tree::{CommitmentTree, IncrementalWitness}, - sapling::Node, - transaction::Transaction, -}; - -use crate::{error::SqliteClientError, AccountId, CacheConnection, DataConnection, NoteId}; +use crate::{error::SqliteClientError, AccountId, CacheConnection, DataConnection}; struct CompactBlockRow { height: BlockHeight, data: Vec, } -#[derive(Clone)] -struct WitnessRow { - id_note: i64, - witness: IncrementalWitness, -} - /// Scans at most `limit` new blocks added to the cache for any transactions received by /// the tracked accounts. /// @@ -79,12 +74,19 @@ struct WitnessRow { /// ``` /// /// [`init_blocks_table`]: crate::init::init_blocks_table -pub fn scan_cached_blocks( +pub fn scan_cached_blocks<'db, E, E0, N, P, C, D>( params: &P, - cache: &CacheConnection, - data: &DataConnection, + cache: &C, + data: &'db D, limit: Option, -) -> Result<(), SqliteClientError> { +) -> Result<(), E> +where + P: consensus::Parameters, + C: CacheOps, + &'db D: DBOps, + N: Copy, + E: From>, +{ let sapling_activation_height = params .activation_height(NetworkUpgrade::Sapling) .ok_or(Error::SaplingNotActive)?; @@ -110,71 +112,20 @@ pub fn scan_cached_blocks( // Get the nullifiers for the notes we are tracking let mut nullifiers = data.get_nullifiers()?; - // Prepare per-block SQL statements - let mut stmt_insert_block = data.0.prepare( - "INSERT INTO blocks (height, hash, time, sapling_tree) - VALUES (?, ?, ?, ?)", - )?; - let mut stmt_update_tx = data.0.prepare( - "UPDATE transactions - SET block = ?, tx_index = ? WHERE txid = ?", - )?; - let mut stmt_insert_tx = data.0.prepare( - "INSERT INTO transactions (txid, block, tx_index) - VALUES (?, ?, ?)", - )?; - let mut stmt_select_tx = data - .0 - .prepare("SELECT id_tx FROM transactions WHERE txid = ?")?; - let mut stmt_mark_spent_note = data - .0 - .prepare("UPDATE received_notes SET spent = ? WHERE nf = ?")?; - let mut stmt_update_note = data.0.prepare( - "UPDATE received_notes - SET account = ?, diversifier = ?, value = ?, rcm = ?, nf = ?, is_change = ? - WHERE tx = ? AND output_index = ?", - )?; - let mut stmt_insert_note = data.0.prepare( - "INSERT INTO received_notes (tx, output_index, account, diversifier, value, rcm, nf, is_change) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - )?; - let mut stmt_select_note = data - .0 - .prepare("SELECT id_note FROM received_notes WHERE tx = ? AND output_index = ?")?; - let mut stmt_insert_witness = data.0.prepare( - "INSERT INTO sapling_witnesses (note, block, witness) - VALUES (?, ?, ?)", - )?; - let mut stmt_prune_witnesses = data - .0 - .prepare("DELETE FROM sapling_witnesses WHERE block < ?")?; - let mut stmt_update_expired = data.0.prepare( - "UPDATE received_notes SET spent = NULL WHERE EXISTS ( - SELECT id_tx FROM transactions - WHERE id_tx = received_notes.spent AND block IS NULL AND expiry_height < ? - )", - )?; - cache.with_cached_blocks( last_height, limit, |height: BlockHeight, block: CompactBlock| { - // Start an SQL transaction for this block. - data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?; - // Scanned blocks MUST be height-sequential. if height != (last_height + 1) { - return Err(SqliteClientError(ChainInvalid::block_height_mismatch( - last_height + 1, - height, - ))); + return Err(ChainInvalid::block_height_mismatch(last_height + 1, height).into()); } last_height = height; - let block_hash = block.hash.clone(); + let block_hash = BlockHash::from_slice(&block.hash); let block_time = block.time; - let txs = { + let txs: Vec = { let nf_refs: Vec<_> = nullifiers .iter() .map(|(nf, acc)| (&nf[..], acc.0 as usize)) @@ -196,10 +147,7 @@ pub fn scan_cached_blocks( let cur_root = tree.root(); for row in &witnesses { if row.1.root() != cur_root { - return Err(SqliteClientError(Error::InvalidWitnessAnchor( - row.0, - last_height, - ))); + return Err(Error::InvalidWitnessAnchor(row.0, last_height).into()); } } for tx in &txs { @@ -217,126 +165,55 @@ pub fn scan_cached_blocks( } } - // Insert the block into the database. - let mut encoded_tree = Vec::new(); - tree.write(&mut encoded_tree) - .expect("Should be able to write to a Vec"); - stmt_insert_block.execute(&[ - u32::from(height).to_sql()?, - block_hash.to_sql()?, - block_time.to_sql()?, - encoded_tree.to_sql()?, - ])?; + // database updates for each block are transactional + data.transactionally(&mut data.get_mutator()?, |mutator| { + // Insert the block into the database. + mutator.insert_block(height, block_hash, block_time, &tree)?; - for tx in txs { - // First try update an existing transaction in the database. - let txid = tx.txid.0.to_vec(); - let tx_row = if stmt_update_tx.execute(&[ - u32::from(height).to_sql()?, - (tx.index as i64).to_sql()?, - txid.to_sql()?, - ])? == 0 - { - // It isn't there, so insert our transaction into the database. - stmt_insert_tx.execute(&[ - txid.to_sql()?, - u32::from(height).to_sql()?, - (tx.index as i64).to_sql()?, - ])?; - data.0.last_insert_rowid() - } else { - // It was there, so grab its row number. - stmt_select_tx.query_row(&[txid], |row| row.get(0))? - }; + for tx in txs { + let tx_row = mutator.put_tx(&tx, height)?; - // Mark notes as spent and remove them from the scanning cache - for spend in &tx.shielded_spends { - stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?; + // Mark notes as spent and remove them from the scanning cache + for spend in &tx.shielded_spends { + mutator.mark_spent(tx_row, &spend.nf)?; + } + + nullifiers.retain(|(nf, _acc)| { + tx.shielded_spends + .iter() + .find(|spend| &spend.nf == nf) + .is_none() + }); + + for output in tx.shielded_outputs { + let nf = output.note.nf( + &extfvks[output.account].fvk.vk, + output.witness.position() as u64, + ); + + let note_id = mutator.put_note(&output, &nf, tx_row)?; + + // Save witness for note. + witnesses.push((note_id, output.witness)); + + // Cache nullifier for note (to detect subsequent spends in this scan). + nullifiers.push((nf, AccountId(output.account as u32))); + } } - nullifiers.retain(|(nf, _acc)| { - tx.shielded_spends - .iter() - .find(|spend| &spend.nf == nf) - .is_none() - }); - - for output in tx.shielded_outputs { - let rcm = output.note.rcm().to_repr(); - let nf = output.note.nf( - &extfvks[output.account].fvk.vk, - output.witness.position() as u64, - ); - - // Assumptions: - // - A transaction will not contain more than 2^63 shielded outputs. - // - A note value will never exceed 2^63 zatoshis. - - // First try updating an existing received note into the database. - let note_id = if stmt_update_note.execute(&[ - (output.account as i64).to_sql()?, - output.to.diversifier().0.to_sql()?, - (output.note.value as i64).to_sql()?, - rcm.as_ref().to_sql()?, - nf.to_sql()?, - output.is_change.to_sql()?, - tx_row.to_sql()?, - (output.index as i64).to_sql()?, - ])? == 0 - { - // It isn't there, so insert our note into the database. - stmt_insert_note.execute(&[ - tx_row.to_sql()?, - (output.index as i64).to_sql()?, - (output.account as i64).to_sql()?, - output.to.diversifier().0.to_sql()?, - (output.note.value as i64).to_sql()?, - rcm.as_ref().to_sql()?, - nf.to_sql()?, - output.is_change.to_sql()?, - ])?; - NoteId(data.0.last_insert_rowid()) - } else { - // It was there, so grab its row number. - stmt_select_note.query_row( - &[tx_row.to_sql()?, (output.index as i64).to_sql()?], - |row| row.get(0).map(NoteId), - )? - }; - - // Save witness for note. - witnesses.push((note_id, output.witness)); - - // Cache nullifier for note (to detect subsequent spends in this scan). - nullifiers.push((nf, AccountId(output.account as u32))); + // Insert current witnesses into the database. + for (note_id, witness) in witnesses.iter() { + mutator.insert_witness(*note_id, witness, last_height)?; } - } - // Insert current witnesses into the database. - let mut encoded = Vec::new(); - for witness_row in witnesses.iter() { - encoded.clear(); - witness_row - .1 - .write(&mut encoded) - .expect("Should be able to write to a Vec"); - stmt_insert_witness.execute(&[ - (witness_row.0).0.to_sql()?, - u32::from(last_height).to_sql()?, - encoded.to_sql()?, - ])?; - } + // Prune the stored witnesses (we only expect rollbacks of at most 100 blocks). + mutator.prune_witnesses(last_height - 100)?; - // Prune the stored witnesses (we only expect rollbacks of at most 100 blocks). - stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?; + // Update now-expired transactions that didn't get mined. + mutator.update_expired_notes(last_height)?; - // Update now-expired transactions that didn't get mined. - stmt_update_expired.execute(&[u32::from(last_height)])?; - - // Commit the SQL transaction, writing this block's data atomically. - data.0.execute("COMMIT", NO_PARAMS)?; - - Ok(()) + Ok(()) + }) }, )?;