From 06c1772692c8f545a516500e0fbbe16060ec733f Mon Sep 17 00:00:00 2001 From: Kris Nuttycombe Date: Tue, 18 Aug 2020 16:33:34 -0600 Subject: [PATCH] Move traversal of cached blocks to CacheOps --- zcash_client_backend/src/data_api/mod.rs | 9 + zcash_client_sqlite/src/lib.rs | 12 + zcash_client_sqlite/src/scan.rs | 387 ++++++++++++----------- 3 files changed, 224 insertions(+), 184 deletions(-) diff --git a/zcash_client_backend/src/data_api/mod.rs b/zcash_client_backend/src/data_api/mod.rs index c766cdeaa..c3b130bd7 100644 --- a/zcash_client_backend/src/data_api/mod.rs +++ b/zcash_client_backend/src/data_api/mod.rs @@ -121,4 +121,13 @@ pub trait CacheOps { ) -> Result, Self::Error> where F: Fn(&CompactBlock, &CompactBlock) -> Result<(), Self::Error>; + + fn with_cached_blocks( + &self, + from_height: BlockHeight, + limit: Option, + with_row: F, + ) -> Result<(), Self::Error> + where + F: FnMut(BlockHeight, CompactBlock) -> Result<(), Self::Error>; } diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 56214963f..929523dbe 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -195,6 +195,18 @@ impl CacheOps for CacheConnection { { chain::validate_chain(self, from_height, validate) } + + fn with_cached_blocks( + &self, + from_height: BlockHeight, + limit: Option, + with_row: F, + ) -> Result<(), Self::Error> + where + F: FnMut(BlockHeight, CompactBlock) -> Result<(), Self::Error>, + { + scan::with_cached_blocks(self, from_height, limit, with_row) + } } fn address_from_extfvk( diff --git a/zcash_client_sqlite/src/scan.rs b/zcash_client_sqlite/src/scan.rs index a10183b09..f51f040d7 100644 --- a/zcash_client_sqlite/src/scan.rs +++ b/zcash_client_sqlite/src/scan.rs @@ -9,7 +9,7 @@ use zcash_client_backend::{ address::RecipientAddress, data_api::{ error::{ChainInvalid, Error}, - DBOps, + CacheOps, DBOps, }, decrypt_transaction, encoding::decode_extended_full_viewing_key, @@ -107,6 +107,7 @@ pub fn scan_cached_blocks( // Get most recent incremental witnesses for the notes we are tracking let mut witnesses = data.get_witnesses(last_height)?; + // Get the nullifiers for the notes we are tracking let mut nullifiers = data.get_nullifiers()?; // Prepare per-block SQL statements @@ -154,13 +155,210 @@ pub fn scan_cached_blocks( )", )?; + cache.with_cached_blocks( + last_height, + limit, + |height: BlockHeight, block: CompactBlock| { + // Start an SQL transaction for this block. + data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?; + + // Scanned blocks MUST be height-sequential. + if height != (last_height + 1) { + return Err(SqliteClientError(ChainInvalid::block_height_mismatch( + last_height + 1, + height, + ))); + } + last_height = height; + + let block_hash = block.hash.clone(); + let block_time = block.time; + + let txs = { + let nf_refs: Vec<_> = nullifiers + .iter() + .map(|(nf, acc)| (&nf[..], acc.0 as usize)) + .collect(); + let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect(); + scan_block( + params, + block, + &extfvks[..], + &nf_refs, + &mut tree, + &mut witness_refs[..], + ) + }; + + // Enforce that all roots match. This is slow, so only include in debug builds. + #[cfg(debug_assertions)] + { + let cur_root = tree.root(); + for row in &witnesses { + if row.1.root() != cur_root { + return Err(SqliteClientError(Error::InvalidWitnessAnchor( + row.0, + last_height, + ))); + } + } + for tx in &txs { + for output in tx.shielded_outputs.iter() { + if output.witness.root() != cur_root { + return Err(Error::InvalidNewWitnessAnchor( + output.index, + tx.txid, + last_height, + output.witness.root(), + ) + .into()); + } + } + } + } + + // Insert the block into the database. + let mut encoded_tree = Vec::new(); + tree.write(&mut encoded_tree) + .expect("Should be able to write to a Vec"); + stmt_insert_block.execute(&[ + u32::from(height).to_sql()?, + block_hash.to_sql()?, + block_time.to_sql()?, + encoded_tree.to_sql()?, + ])?; + + for tx in txs { + // First try update an existing transaction in the database. + let txid = tx.txid.0.to_vec(); + let tx_row = if stmt_update_tx.execute(&[ + u32::from(height).to_sql()?, + (tx.index as i64).to_sql()?, + txid.to_sql()?, + ])? == 0 + { + // It isn't there, so insert our transaction into the database. + stmt_insert_tx.execute(&[ + txid.to_sql()?, + u32::from(height).to_sql()?, + (tx.index as i64).to_sql()?, + ])?; + data.0.last_insert_rowid() + } else { + // It was there, so grab its row number. + stmt_select_tx.query_row(&[txid], |row| row.get(0))? + }; + + // Mark notes as spent and remove them from the scanning cache + for spend in &tx.shielded_spends { + stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?; + } + + nullifiers.retain(|(nf, _acc)| { + tx.shielded_spends + .iter() + .find(|spend| &spend.nf == nf) + .is_none() + }); + + for output in tx.shielded_outputs { + let rcm = output.note.rcm().to_repr(); + let nf = output.note.nf( + &extfvks[output.account].fvk.vk, + output.witness.position() as u64, + ); + + // Assumptions: + // - A transaction will not contain more than 2^63 shielded outputs. + // - A note value will never exceed 2^63 zatoshis. + + // First try updating an existing received note into the database. + let note_id = if stmt_update_note.execute(&[ + (output.account as i64).to_sql()?, + output.to.diversifier().0.to_sql()?, + (output.note.value as i64).to_sql()?, + rcm.as_ref().to_sql()?, + nf.to_sql()?, + output.is_change.to_sql()?, + tx_row.to_sql()?, + (output.index as i64).to_sql()?, + ])? == 0 + { + // It isn't there, so insert our note into the database. + stmt_insert_note.execute(&[ + tx_row.to_sql()?, + (output.index as i64).to_sql()?, + (output.account as i64).to_sql()?, + output.to.diversifier().0.to_sql()?, + (output.note.value as i64).to_sql()?, + rcm.as_ref().to_sql()?, + nf.to_sql()?, + output.is_change.to_sql()?, + ])?; + NoteId(data.0.last_insert_rowid()) + } else { + // It was there, so grab its row number. + stmt_select_note.query_row( + &[tx_row.to_sql()?, (output.index as i64).to_sql()?], + |row| row.get(0).map(NoteId), + )? + }; + + // Save witness for note. + witnesses.push((note_id, output.witness)); + + // Cache nullifier for note (to detect subsequent spends in this scan). + nullifiers.push((nf, AccountId(output.account as u32))); + } + } + + // Insert current witnesses into the database. + let mut encoded = Vec::new(); + for witness_row in witnesses.iter() { + encoded.clear(); + witness_row + .1 + .write(&mut encoded) + .expect("Should be able to write to a Vec"); + stmt_insert_witness.execute(&[ + (witness_row.0).0.to_sql()?, + u32::from(last_height).to_sql()?, + encoded.to_sql()?, + ])?; + } + + // Prune the stored witnesses (we only expect rollbacks of at most 100 blocks). + stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?; + + // Update now-expired transactions that didn't get mined. + stmt_update_expired.execute(&[u32::from(last_height)])?; + + // Commit the SQL transaction, writing this block's data atomically. + data.0.execute("COMMIT", NO_PARAMS)?; + + Ok(()) + }, + )?; + + Ok(()) +} + +pub fn with_cached_blocks( + cache: &CacheConnection, + from_height: BlockHeight, + limit: Option, + mut with_row: F, +) -> Result<(), SqliteClientError> +where + F: FnMut(BlockHeight, CompactBlock) -> Result<(), SqliteClientError>, +{ // Fetch the CompactBlocks we need to scan let mut stmt_blocks = cache.0.prepare( "SELECT height, data FROM compactblocks WHERE height > ? ORDER BY height ASC LIMIT ?", )?; let rows = stmt_blocks.query_map( &[ - u32::from(last_height).to_sql()?, + u32::from(from_height).to_sql()?, limit.unwrap_or(u32::max_value()).to_sql()?, ], |row| { @@ -171,188 +369,9 @@ pub fn scan_cached_blocks( }, )?; - for row in rows { - let row = row?; - - // Start an SQL transaction for this block. - data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?; - - // Scanned blocks MUST be height-sequential. - if row.height != (last_height + 1) { - return Err(SqliteClientError(ChainInvalid::block_height_mismatch( - last_height + 1, - row.height, - ))); - } - last_height = row.height; - - let block: CompactBlock = parse_from_bytes(&row.data)?; - let block_hash = block.hash.clone(); - let block_time = block.time; - - let txs = { - let nf_refs: Vec<_> = nullifiers - .iter() - .map(|(nf, acc)| (&nf[..], acc.0 as usize)) - .collect(); - let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect(); - scan_block( - params, - block, - &extfvks[..], - &nf_refs, - &mut tree, - &mut witness_refs[..], - ) - }; - - // Enforce that all roots match. This is slow, so only include in debug builds. - #[cfg(debug_assertions)] - { - let cur_root = tree.root(); - for row in &witnesses { - if row.1.root() != cur_root { - return Err(SqliteClientError(Error::InvalidWitnessAnchor( - row.0, - last_height, - ))); - } - } - for tx in &txs { - for output in tx.shielded_outputs.iter() { - if output.witness.root() != cur_root { - return Err(Error::InvalidNewWitnessAnchor( - output.index, - tx.txid, - last_height, - output.witness.root(), - ) - .into()); - } - } - } - } - - // Insert the block into the database. - let mut encoded_tree = Vec::new(); - tree.write(&mut encoded_tree) - .expect("Should be able to write to a Vec"); - stmt_insert_block.execute(&[ - u32::from(row.height).to_sql()?, - block_hash.to_sql()?, - block_time.to_sql()?, - encoded_tree.to_sql()?, - ])?; - - for tx in txs { - // First try update an existing transaction in the database. - let txid = tx.txid.0.to_vec(); - let tx_row = if stmt_update_tx.execute(&[ - u32::from(row.height).to_sql()?, - (tx.index as i64).to_sql()?, - txid.to_sql()?, - ])? == 0 - { - // It isn't there, so insert our transaction into the database. - stmt_insert_tx.execute(&[ - txid.to_sql()?, - u32::from(row.height).to_sql()?, - (tx.index as i64).to_sql()?, - ])?; - data.0.last_insert_rowid() - } else { - // It was there, so grab its row number. - stmt_select_tx.query_row(&[txid], |row| row.get(0))? - }; - - // Mark notes as spent and remove them from the scanning cache - for spend in &tx.shielded_spends { - stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?; - } - nullifiers = nullifiers - .into_iter() - .filter(|(nf, _acc)| { - tx.shielded_spends - .iter() - .find(|spend| &spend.nf == nf) - .is_none() - }) - .collect(); - - for output in tx.shielded_outputs { - let rcm = output.note.rcm().to_repr(); - let nf = output.note.nf( - &extfvks[output.account].fvk.vk, - output.witness.position() as u64, - ); - - // Assumptions: - // - A transaction will not contain more than 2^63 shielded outputs. - // - A note value will never exceed 2^63 zatoshis. - - // First try updating an existing received note into the database. - let note_id = if stmt_update_note.execute(&[ - (output.account as i64).to_sql()?, - output.to.diversifier().0.to_sql()?, - (output.note.value as i64).to_sql()?, - rcm.as_ref().to_sql()?, - nf.to_sql()?, - output.is_change.to_sql()?, - tx_row.to_sql()?, - (output.index as i64).to_sql()?, - ])? == 0 - { - // It isn't there, so insert our note into the database. - stmt_insert_note.execute(&[ - tx_row.to_sql()?, - (output.index as i64).to_sql()?, - (output.account as i64).to_sql()?, - output.to.diversifier().0.to_sql()?, - (output.note.value as i64).to_sql()?, - rcm.as_ref().to_sql()?, - nf.to_sql()?, - output.is_change.to_sql()?, - ])?; - NoteId(data.0.last_insert_rowid()) - } else { - // It was there, so grab its row number. - stmt_select_note.query_row( - &[tx_row.to_sql()?, (output.index as i64).to_sql()?], - |row| row.get(0).map(NoteId), - )? - }; - - // Save witness for note. - witnesses.push((note_id, output.witness)); - - // Cache nullifier for note (to detect subsequent spends in this scan). - nullifiers.push((nf, AccountId(output.account as u32))); - } - } - - // Insert current witnesses into the database. - let mut encoded = Vec::new(); - for witness_row in witnesses.iter() { - encoded.clear(); - witness_row - .1 - .write(&mut encoded) - .expect("Should be able to write to a Vec"); - stmt_insert_witness.execute(&[ - (witness_row.0).0.to_sql()?, - u32::from(last_height).to_sql()?, - encoded.to_sql()?, - ])?; - } - - // Prune the stored witnesses (we only expect rollbacks of at most 100 blocks). - stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?; - - // Update now-expired transactions that didn't get mined. - stmt_update_expired.execute(&[u32::from(last_height)])?; - - // Commit the SQL transaction, writing this block's data atomically. - data.0.execute("COMMIT", NO_PARAMS)?; + for row_result in rows { + let row = row_result?; + with_row(row.height, parse_from_bytes(&row.data)?)?; } Ok(())