diff --git a/Cargo.toml b/Cargo.toml index 14e9d0423..983e50e24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,6 @@ panic = 'abort' codegen-units = 1 [patch.crates-io] -incrementalmerkletree = { git = "https://github.com/zcash/incrementalmerkletree.git", rev = "67111e29403c33f2e36d6924167f1d5f04ad0fc2" } -shardtree = { git = "https://github.com/zcash/incrementalmerkletree.git", rev = "67111e29403c33f2e36d6924167f1d5f04ad0fc2" } +incrementalmerkletree = { git = "https://github.com/zcash/incrementalmerkletree.git", rev = "bae25ad89c0c192bee625252d2d419bd56638e48" } +shardtree = { git = "https://github.com/zcash/incrementalmerkletree.git", rev = "bae25ad89c0c192bee625252d2d419bd56638e48" } orchard = { git = "https://github.com/zcash/orchard.git", rev = "6ef89d5f154de2cf7b7dd87edb8d8c49158beebb" } diff --git a/zcash_client_backend/CHANGELOG.md b/zcash_client_backend/CHANGELOG.md index 02297d45a..1d77875dd 100644 --- a/zcash_client_backend/CHANGELOG.md +++ b/zcash_client_backend/CHANGELOG.md @@ -17,8 +17,9 @@ and this library adheres to Rust's notion of - `ShieldedProtocol` - `WalletCommitmentTrees` - `WalletRead::{block_metadata, block_fully_scanned, suggest_scan_ranges}` - - `WalletWrite::put_blocks` + - `WalletWrite::{put_blocks, update_chain_tip}` - `chain::CommitmentTreeRoot` + - `scanning` A new module containing types required for `suggest_scan_ranges` - `testing::MockWalletDb::new` - `wallet::input_sellection::Proposal::{min_target_height, min_anchor_height}`: - `zcash_client_backend::wallet::WalletSaplingOutput::note_commitment_tree_position` @@ -36,8 +37,10 @@ and this library adheres to Rust's notion of and its signature has changed; it now subsumes the removed `WalletRead::get_all_nullifiers`. - `WalletRead::get_target_and_anchor_heights` now takes its argument as a `NonZeroU32` - `chain::scan_cached_blocks` now takes a `from_height` argument that - permits the caller to control the starting position of the scan range - In addition, the `limit` parameter is now required. + permits the caller to control the starting position of the scan range. + In addition, the `limit` parameter is now required and has type `usize`. + - `chain::BlockSource::with_blocks` now takes its limit as an `Option` + instead of `Option`. - A new `CommitmentTree` variant has been added to `data_api::error::Error` - `data_api::wallet::{create_spend_to_address, create_proposed_transaction, shield_transparent_funds}` all now require that `WalletCommitmentTrees` be @@ -67,7 +70,8 @@ and this library adheres to Rust's notion of method now takes an optional `BlockMetadata` argument instead of a base commitment tree and incremental witnesses for each previously-known note. In addition, the return type has now been updated to return a `Result`. - +- `proto/service.proto` has been updated to include the new GRPC endpoints + supported by lightwalletd v0.4.15 ### Removed - `zcash_client_backend::data_api`: @@ -81,8 +85,8 @@ and this library adheres to Rust's notion of feature flag, has been modified by the addition of a `sapling_tree` property. - `wallet::input_selection`: - `Proposal::target_height` (use `Proposal::min_target_height` instead). -- `zcash_client_backend::data_api::chain::validate_chain` TODO: document how - to handle validation given out-of-order blocks. +- `zcash_client_backend::data_api::chain::validate_chain` (logic merged into + `chain::scan_cached_blocks`. - `zcash_client_backend::data_api::chain::error::{ChainError, Cause}` have been replaced by `zcash_client_backend::scanning::ScanError` - `zcash_client_backend::wallet::WalletSaplingOutput::{witness, witness_mut}` diff --git a/zcash_client_backend/proto/service.proto b/zcash_client_backend/proto/service.proto index d7f11dcd6..094566147 100644 --- a/zcash_client_backend/proto/service.proto +++ b/zcash_client_backend/proto/service.proto @@ -118,6 +118,22 @@ message TreeState { string orchardTree = 6; // orchard commitment tree state } +enum ShieldedProtocol { + sapling = 0; + orchard = 1; +} + +message GetSubtreeRootsArg { + uint32 startIndex = 1; // Index identifying where to start returning subtree roots + ShieldedProtocol shieldedProtocol = 2; // Shielded protocol to return subtree roots for + uint32 maxEntries = 3; // Maximum number of entries to return, or 0 for all entries. +} +message SubtreeRoot { + bytes rootHash = 2; // The 32-byte Merkle root of the subtree. + bytes completingBlockHash = 3; // The hash of the block that completed this subtree. + uint64 completingBlockHeight = 4; // The height of the block that completed this subtree in the main chain. +} + // Results are sorted by height, which makes it easy to issue another // request that picks up from where the previous left off. message GetAddressUtxosArg { @@ -142,8 +158,12 @@ service CompactTxStreamer { rpc GetLatestBlock(ChainSpec) returns (BlockID) {} // Return the compact block corresponding to the given block identifier rpc GetBlock(BlockID) returns (CompactBlock) {} + // Same as GetBlock except actions contain only nullifiers + rpc GetBlockNullifiers(BlockID) returns (CompactBlock) {} // Return a list of consecutive compact blocks rpc GetBlockRange(BlockRange) returns (stream CompactBlock) {} + // Same as GetBlockRange except actions contain only nullifiers + rpc GetBlockRangeNullifiers(BlockRange) returns (stream CompactBlock) {} // Return the requested full (not compact) transaction (as from zcashd) rpc GetTransaction(TxFilter) returns (RawTransaction) {} @@ -177,6 +197,10 @@ service CompactTxStreamer { rpc GetTreeState(BlockID) returns (TreeState) {} rpc GetLatestTreeState(Empty) returns (TreeState) {} + // Returns a stream of information about roots of subtrees of the Sapling and Orchard + // note commitment trees. + rpc GetSubtreeRoots(GetSubtreeRootsArg) returns (stream SubtreeRoot) {} + rpc GetAddressUtxos(GetAddressUtxosArg) returns (GetAddressUtxosReplyList) {} rpc GetAddressUtxosStream(GetAddressUtxosArg) returns (stream GetAddressUtxosReply) {} diff --git a/zcash_client_backend/src/data_api.rs b/zcash_client_backend/src/data_api.rs index c62c3b57c..ac6118e7c 100644 --- a/zcash_client_backend/src/data_api.rs +++ b/zcash_client_backend/src/data_api.rs @@ -1,9 +1,9 @@ //! Interfaces for wallet data persistence & low-level wallet utilities. +use std::cmp; use std::collections::HashMap; use std::fmt::Debug; use std::num::NonZeroU32; -use std::{cmp, ops::Range}; use incrementalmerkletree::Retention; use secrecy::SecretVec; @@ -29,9 +29,11 @@ use crate::{ }; use self::chain::CommitmentTreeRoot; +use self::scanning::ScanRange; pub mod chain; pub mod error; +pub mod scanning; pub mod wallet; pub const SAPLING_SHARD_HEIGHT: u8 = sapling::NOTE_COMMITMENT_TREE_DEPTH / 2; @@ -87,12 +89,10 @@ pub trait WalletRead { /// tree size information for each block; or else the scan is likely to fail if notes belonging /// to the wallet are detected. /// + /// The returned range(s) may include block heights beyond the current chain tip. + /// /// [`CompactBlock`]: crate::proto::compact_formats::CompactBlock - fn suggest_scan_ranges( - &self, - batch_size: usize, - limit: usize, - ) -> Result>, Self::Error>; + fn suggest_scan_ranges(&self) -> Result, Self::Error>; /// Returns the default target height (for the block in which a new /// transaction would be mined) and anchor height (to use for a new @@ -501,6 +501,15 @@ pub trait WalletWrite: WalletRead { block: Vec>, ) -> Result, Self::Error>; + /// Updates the wallet's view of the blockchain. + /// + /// This method is used to provide the wallet with information about the state of the + /// blockchain, and detect any previously scanned data that needs to be re-validated + /// before proceeding with scanning. It should be called at wallet startup prior to calling + /// [`WalletRead::suggest_scan_ranges`] in order to provide the wallet with the information it + /// needs to correctly prioritize scanning operations. + fn update_chain_tip(&mut self, tip_height: BlockHeight) -> Result<(), Self::Error>; + /// Caches a decrypted transaction in the persistent wallet store. fn store_decrypted_tx( &mut self, @@ -569,7 +578,7 @@ pub mod testing { use incrementalmerkletree::Address; use secrecy::{ExposeSecret, SecretVec}; use shardtree::{memory::MemoryShardStore, ShardTree, ShardTreeError}; - use std::{collections::HashMap, convert::Infallible, ops::Range}; + use std::{collections::HashMap, convert::Infallible}; use zcash_primitives::{ block::BlockHash, @@ -591,9 +600,9 @@ pub mod testing { }; use super::{ - chain::CommitmentTreeRoot, BlockMetadata, DecryptedTransaction, NullifierQuery, - ScannedBlock, SentTransaction, WalletCommitmentTrees, WalletRead, WalletWrite, - SAPLING_SHARD_HEIGHT, + chain::CommitmentTreeRoot, scanning::ScanRange, BlockMetadata, DecryptedTransaction, + NullifierQuery, ScannedBlock, SentTransaction, WalletCommitmentTrees, WalletRead, + WalletWrite, SAPLING_SHARD_HEIGHT, }; pub struct MockWalletDb { @@ -634,11 +643,7 @@ pub mod testing { Ok(None) } - fn suggest_scan_ranges( - &self, - _batch_size: usize, - _limit: usize, - ) -> Result>, Self::Error> { + fn suggest_scan_ranges(&self) -> Result, Self::Error> { Ok(vec![]) } @@ -780,6 +785,10 @@ pub mod testing { Ok(vec![]) } + fn update_chain_tip(&mut self, _tip_height: BlockHeight) -> Result<(), Self::Error> { + Ok(()) + } + fn store_decrypted_tx( &mut self, _received_tx: DecryptedTransaction, diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index 7b45bb090..0f6f94545 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -7,18 +7,21 @@ //! # #[cfg(feature = "test-dependencies")] //! # { //! use zcash_primitives::{ -//! consensus::{BlockHeight, Network, Parameters} +//! consensus::{BlockHeight, Network, Parameters}, +//! sapling //! }; //! //! use zcash_client_backend::{ //! data_api::{ -//! WalletRead, WalletWrite, +//! WalletRead, WalletWrite, WalletCommitmentTrees, //! chain::{ //! BlockSource, +//! CommitmentTreeRoot, //! error::Error, //! scan_cached_blocks, //! testing as chain_testing, //! }, +//! scanning::ScanPriority, //! testing, //! }, //! }; @@ -32,20 +35,111 @@ //! # fn test() -> Result<(), Error<(), Infallible>> { //! let network = Network::TestNetwork; //! let block_source = chain_testing::MockBlockSource; -//! let mut db_data = testing::MockWalletDb::new(Network::TestNetwork); +//! let mut wallet_db = testing::MockWalletDb::new(Network::TestNetwork); //! -//! // 1) Download new CompactBlocks into block_source. -//! // -//! // 2) FIXME: Obtain necessary block metadata for continuity checking? -//! // -//! // 3) Scan cached blocks. -//! // -//! // FIXME: update documentation on how to detect when a rewind is required. -//! // -//! // At this point, the cache and scanned data are locally consistent (though not -//! // necessarily consistent with the latest chain tip - this would be discovered the -//! // next time this codepath is executed after new blocks are received). -//! scan_cached_blocks(&network, &block_source, &mut db_data, BlockHeight::from(0), 10) +//! // 1) Download note commitment tree data from lightwalletd +//! let roots: Vec> = unimplemented!(); +//! +//! // 2) Pass the commitment tree data to the database. +//! wallet_db.put_sapling_subtree_roots(0, &roots).unwrap(); +//! +//! // 3) Download chain tip metadata from lightwalletd +//! let tip_height: BlockHeight = unimplemented!(); +//! +//! // 4) Notify the wallet of the updated chain tip. +//! wallet_db.update_chain_tip(tip_height).map_err(Error::Wallet)?; +//! +//! // 5) Get the suggested scan ranges from the wallet database +//! let mut scan_ranges = wallet_db.suggest_scan_ranges().map_err(Error::Wallet)?; +//! +//! // 6) Run the following loop until the wallet's view of the chain tip as of the previous wallet +//! // session is valid. +//! loop { +//! // If there is a range of blocks that needs to be verified, it will always be returned as +//! // the first element of the vector of suggested ranges. +//! match scan_ranges.first() { +//! Some(scan_range) if scan_range.priority() == ScanPriority::Verify => { +//! // Download the blocks in `scan_range` into the block source, overwriting any +//! // existing blocks in this range. +//! unimplemented!(); +//! +//! // Scan the downloaded blocks +//! let scan_result = scan_cached_blocks( +//! &network, +//! &block_source, +//! &mut wallet_db, +//! scan_range.block_range().start, +//! scan_range.len() +//! ); +//! +//! // Check for scanning errors that indicate that the wallet's chain tip is out of +//! // sync with blockchain history. +//! match scan_result { +//! Ok(_) => { +//! // At this point, the cache and scanned data are locally consistent (though +//! // not necessarily consistent with the latest chain tip - this would be +//! // discovered the next time this codepath is executed after new blocks are +//! // received) so we can break out of the loop. +//! break; +//! } +//! Err(Error::Scan(err)) if err.is_continuity_error() => { +//! // Pick a height to rewind to, which must be at least one block before +//! // the height at which the error occurred, but may be an earlier height +//! // determined based on heuristics such as the platform, available bandwidth, +//! // size of recent CompactBlocks, etc. +//! let rewind_height = err.at_height().saturating_sub(10); +//! +//! // Rewind to the chosen height. +//! wallet_db.truncate_to_height(rewind_height).map_err(Error::Wallet)?; +//! +//! // Delete cached blocks from rewind_height onwards. +//! // +//! // This does imply that assumed-valid blocks will be re-downloaded, but it +//! // is also possible that in the intervening time, a chain reorg has +//! // occurred that orphaned some of those blocks. +//! unimplemented!(); +//! } +//! Err(other) => { +//! // Handle or return other errors +//! } +//! } +//! +//! // In case we updated the suggested scan ranges, now re-request. +//! scan_ranges = wallet_db.suggest_scan_ranges().map_err(Error::Wallet)?; +//! } +//! _ => { +//! // Nothing to verify; break out of the loop +//! break; +//! } +//! } +//! } +//! +//! // 7) Loop over the remaining suggested scan ranges, retrieving the requested data and calling +//! // `scan_cached_blocks` on each range. Periodically, or if a continuity error is +//! // encountered, this process should be repeated starting at step (3). +//! let scan_ranges = wallet_db.suggest_scan_ranges().map_err(Error::Wallet)?; +//! for scan_range in scan_ranges { +//! // Download the blocks in `scan_range` into the block source. While in this example this +//! // step is performed in-line, it's fine for the download of scan ranges to be asynchronous +//! // and for the scanner to process the downloaded ranges as they become available in a +//! // separate thread. The scan ranges should also be broken down into smaller chunks as +//! // appropriate, and for ranges with priority `Historic` it can be useful to download and +//! // scan the range in reverse order (to discover more recent unspent notes sooner), or from +//! // the start and end of the range inwards. +//! unimplemented!(); +//! +//! // Scan the downloaded blocks. +//! let scan_result = scan_cached_blocks( +//! &network, +//! &block_source, +//! &mut wallet_db, +//! scan_range.block_range().start, +//! scan_range.len() +//! )?; +//! +//! // Handle scan errors, etc. +//! } +//! # Ok(()) //! # } //! # } //! ``` @@ -58,14 +152,12 @@ use zcash_primitives::{ }; use crate::{ - data_api::{NullifierQuery, WalletWrite}, + data_api::{BlockMetadata, NullifierQuery, WalletWrite}, proto::compact_formats::CompactBlock, scan::BatchRunner, scanning::{add_block_to_runner, check_continuity, scan_block_with_runner}, }; -use super::BlockMetadata; - pub mod error; use error::Error; @@ -114,7 +206,7 @@ pub trait BlockSource { fn with_blocks( &self, from_height: Option, - limit: Option, + limit: Option, with_row: F, ) -> Result<(), error::Error> where @@ -145,7 +237,7 @@ pub fn scan_cached_blocks( block_source: &BlockSourceT, data_db: &mut DbT, from_height: BlockHeight, - limit: u32, + limit: usize, ) -> Result<(), Error> where ParamsT: consensus::Parameters + Send + 'static, @@ -292,7 +384,7 @@ pub mod testing { fn with_blocks( &self, _from_height: Option, - _limit: Option, + _limit: Option, _with_row: F, ) -> Result<(), Error> where diff --git a/zcash_client_backend/src/data_api/scanning.rs b/zcash_client_backend/src/data_api/scanning.rs new file mode 100644 index 000000000..bb91ba35a --- /dev/null +++ b/zcash_client_backend/src/data_api/scanning.rs @@ -0,0 +1,186 @@ +use std::fmt; +use std::ops::Range; + +use zcash_primitives::consensus::BlockHeight; + +/// Scanning range priority levels. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum ScanPriority { + /// Block ranges that have already been scanned have lowest priority. + Scanned, + /// Block ranges to be scanned to advance the fully-scanned height. + Historic, + /// Block ranges adjacent to heights at which the user opened the wallet. + OpenAdjacent, + /// Blocks that must be scanned to complete note commitment tree shards adjacent to found notes. + FoundNote, + /// Blocks that must be scanned to complete the latest note commitment tree shard. + ChainTip, + /// A previously scanned range that must be verified to check it is still in the + /// main chain, has highest priority. + Verify, +} + +/// A range of blocks to be scanned, along with its associated priority. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ScanRange { + block_range: Range, + priority: ScanPriority, +} + +impl fmt::Display for ScanRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:?}({}..{})", + self.priority, self.block_range.start, self.block_range.end, + ) + } +} + +impl ScanRange { + /// Constructs a scan range from its constituent parts. + pub fn from_parts(block_range: Range, priority: ScanPriority) -> Self { + assert!( + block_range.end >= block_range.start, + "{:?} is invalid for ScanRange({:?})", + block_range, + priority, + ); + ScanRange { + block_range, + priority, + } + } + + /// Returns the range of block heights to be scanned. + pub fn block_range(&self) -> &Range { + &self.block_range + } + + /// Returns the priority with which the scan range should be scanned. + pub fn priority(&self) -> ScanPriority { + self.priority + } + + /// Returns whether or not the scan range is empty. + pub fn is_empty(&self) -> bool { + self.block_range.is_empty() + } + + /// Returns the number of blocks in the scan range. + pub fn len(&self) -> usize { + usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start)) + .unwrap() + } + + /// Shifts the start of the block range to the right if `block_height > + /// self.block_range().start`. Returns `None` if the resulting range would + /// be empty (or the range was already empty). + pub fn truncate_start(&self, block_height: BlockHeight) -> Option { + if block_height >= self.block_range.end || self.is_empty() { + None + } else { + Some(ScanRange { + block_range: self.block_range.start.max(block_height)..self.block_range.end, + priority: self.priority, + }) + } + } + + /// Shifts the end of the block range to the left if `block_height < + /// self.block_range().end`. Returns `None` if the resulting range would + /// be empty (or the range was already empty). + pub fn truncate_end(&self, block_height: BlockHeight) -> Option { + if block_height <= self.block_range.start || self.is_empty() { + None + } else { + Some(ScanRange { + block_range: self.block_range.start..self.block_range.end.min(block_height), + priority: self.priority, + }) + } + } + + /// Splits this scan range at the specified height, such that the provided height becomes the + /// end of the first range returned and the start of the second. Returns `None` if + /// `p <= self.block_range().start || p >= self.block_range().end`. + pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> { + (p > self.block_range.start && p < self.block_range.end).then_some(( + ScanRange { + block_range: self.block_range.start..p, + priority: self.priority, + }, + ScanRange { + block_range: p..self.block_range.end, + priority: self.priority, + }, + )) + } +} + +#[cfg(test)] +mod tests { + use super::{ScanPriority, ScanRange}; + + fn scan_range(start: u32, end: u32) -> ScanRange { + ScanRange::from_parts((start.into())..(end.into()), ScanPriority::Scanned) + } + + #[test] + fn truncate_start() { + let r = scan_range(5, 8); + + assert_eq!(r.truncate_start(4.into()), Some(scan_range(5, 8))); + assert_eq!(r.truncate_start(5.into()), Some(scan_range(5, 8))); + assert_eq!(r.truncate_start(6.into()), Some(scan_range(6, 8))); + assert_eq!(r.truncate_start(7.into()), Some(scan_range(7, 8))); + assert_eq!(r.truncate_start(8.into()), None); + assert_eq!(r.truncate_start(9.into()), None); + + let empty = scan_range(5, 5); + assert_eq!(empty.truncate_start(4.into()), None); + assert_eq!(empty.truncate_start(5.into()), None); + assert_eq!(empty.truncate_start(6.into()), None); + } + + #[test] + fn truncate_end() { + let r = scan_range(5, 8); + + assert_eq!(r.truncate_end(9.into()), Some(scan_range(5, 8))); + assert_eq!(r.truncate_end(8.into()), Some(scan_range(5, 8))); + assert_eq!(r.truncate_end(7.into()), Some(scan_range(5, 7))); + assert_eq!(r.truncate_end(6.into()), Some(scan_range(5, 6))); + assert_eq!(r.truncate_end(5.into()), None); + assert_eq!(r.truncate_end(4.into()), None); + + let empty = scan_range(5, 5); + assert_eq!(empty.truncate_end(4.into()), None); + assert_eq!(empty.truncate_end(5.into()), None); + assert_eq!(empty.truncate_end(6.into()), None); + } + + #[test] + fn split_at() { + let r = scan_range(5, 8); + + assert_eq!(r.split_at(4.into()), None); + assert_eq!(r.split_at(5.into()), None); + assert_eq!( + r.split_at(6.into()), + Some((scan_range(5, 6), scan_range(6, 8))) + ); + assert_eq!( + r.split_at(7.into()), + Some((scan_range(5, 7), scan_range(7, 8))) + ); + assert_eq!(r.split_at(8.into()), None); + assert_eq!(r.split_at(9.into()), None); + + let empty = scan_range(5, 5); + assert_eq!(empty.split_at(4.into()), None); + assert_eq!(empty.split_at(5.into()), None); + assert_eq!(empty.split_at(6.into()), None); + } +} diff --git a/zcash_client_backend/src/proto/service.rs b/zcash_client_backend/src/proto/service.rs index 38b15abdb..677e43e30 100644 --- a/zcash_client_backend/src/proto/service.rs +++ b/zcash_client_backend/src/proto/service.rs @@ -187,6 +187,32 @@ pub struct TreeState { #[prost(string, tag = "6")] pub orchard_tree: ::prost::alloc::string::String, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSubtreeRootsArg { + /// Index identifying where to start returning subtree roots + #[prost(uint32, tag = "1")] + pub start_index: u32, + /// Shielded protocol to return subtree roots for + #[prost(enumeration = "ShieldedProtocol", tag = "2")] + pub shielded_protocol: i32, + /// Maximum number of entries to return, or 0 for all entries. + #[prost(uint32, tag = "3")] + pub max_entries: u32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubtreeRoot { + /// The 32-byte Merkle root of the subtree. + #[prost(bytes = "vec", tag = "2")] + pub root_hash: ::prost::alloc::vec::Vec, + /// The hash of the block that completed this subtree. + #[prost(bytes = "vec", tag = "3")] + pub completing_block_hash: ::prost::alloc::vec::Vec, + /// The height of the block that completed this subtree in the main chain. + #[prost(uint64, tag = "4")] + pub completing_block_height: u64, +} /// Results are sorted by height, which makes it easy to issue another /// request that picks up from where the previous left off. #[allow(clippy::derive_partial_eq_without_eq)] @@ -222,6 +248,32 @@ pub struct GetAddressUtxosReplyList { #[prost(message, repeated, tag = "1")] pub address_utxos: ::prost::alloc::vec::Vec, } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ShieldedProtocol { + Sapling = 0, + Orchard = 1, +} +impl ShieldedProtocol { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ShieldedProtocol::Sapling => "sapling", + ShieldedProtocol::Orchard => "orchard", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "sapling" => Some(Self::Sapling), + "orchard" => Some(Self::Orchard), + _ => None, + } + } +} /// Generated client implementations. pub mod compact_tx_streamer_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -366,6 +418,37 @@ pub mod compact_tx_streamer_client { ); self.inner.unary(req, path, codec).await } + /// Same as GetBlock except actions contain only nullifiers + pub async fn get_block_nullifiers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetBlockNullifiers", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "cash.z.wallet.sdk.rpc.CompactTxStreamer", + "GetBlockNullifiers", + ), + ); + self.inner.unary(req, path, codec).await + } /// Return a list of consecutive compact blocks pub async fn get_block_range( &mut self, @@ -399,6 +482,39 @@ pub mod compact_tx_streamer_client { ); self.inner.server_streaming(req, path, codec).await } + /// Same as GetBlockRange except actions contain only nullifiers + pub async fn get_block_range_nullifiers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response< + tonic::codec::Streaming, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetBlockRangeNullifiers", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "cash.z.wallet.sdk.rpc.CompactTxStreamer", + "GetBlockRangeNullifiers", + ), + ); + self.inner.server_streaming(req, path, codec).await + } /// Return the requested full (not compact) transaction (as from zcashd) pub async fn get_transaction( &mut self, @@ -671,6 +787,38 @@ pub mod compact_tx_streamer_client { ); self.inner.unary(req, path, codec).await } + /// Returns a stream of information about roots of subtrees of the Sapling and Orchard + /// note commitment trees. + pub async fn get_subtree_roots( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetSubtreeRoots", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "cash.z.wallet.sdk.rpc.CompactTxStreamer", + "GetSubtreeRoots", + ), + ); + self.inner.server_streaming(req, path, codec).await + } pub async fn get_address_utxos( &mut self, request: impl tonic::IntoRequest, diff --git a/zcash_client_backend/src/scanning.rs b/zcash_client_backend/src/scanning.rs index d2f419acd..302cf1843 100644 --- a/zcash_client_backend/src/scanning.rs +++ b/zcash_client_backend/src/scanning.rs @@ -110,7 +110,7 @@ impl ScanningKey for SaplingIvk { } /// Errors that may occur in chain scanning -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub enum ScanError { /// The hash of the parent block given by a proposed new chain tip does not match the hash of /// the current chain tip. @@ -141,21 +141,46 @@ pub enum ScanError { }, } +impl ScanError { + /// Returns whether this error is the result of a failed continuity check + pub fn is_continuity_error(&self) -> bool { + use ScanError::*; + match self { + PrevHashMismatch { .. } => true, + BlockHeightDiscontinuity { .. } => true, + TreeSizeMismatch { .. } => true, + TreeSizeUnknown { .. } => false, + } + } + + /// Returns the block height at which the scan error occurred + pub fn at_height(&self) -> BlockHeight { + use ScanError::*; + match self { + PrevHashMismatch { at_height } => *at_height, + BlockHeightDiscontinuity { new_height, .. } => *new_height, + TreeSizeMismatch { at_height, .. } => *at_height, + TreeSizeUnknown { at_height, .. } => *at_height, + } + } +} + impl fmt::Display for ScanError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use ScanError::*; match &self { - ScanError::PrevHashMismatch { at_height } => write!( + PrevHashMismatch { at_height } => write!( f, "The parent hash of proposed block does not correspond to the block hash at height {}.", at_height ), - ScanError::BlockHeightDiscontinuity { prev_height, new_height } => { - write!(f, "Block height discontinuity at height {}; next height is : {}", prev_height, new_height) + BlockHeightDiscontinuity { prev_height, new_height } => { + write!(f, "Block height discontinuity at height {}; previous height was: {}", new_height, prev_height) } - ScanError::TreeSizeMismatch { protocol, at_height, given, computed } => { + TreeSizeMismatch { protocol, at_height, given, computed } => { write!(f, "The {:?} note commitment tree size provided by a compact block did not match the expected size at height {}; given {}, expected {}", protocol, at_height, given, computed) } - ScanError::TreeSizeUnknown { protocol, at_height } => { + TreeSizeUnknown { protocol, at_height } => { write!(f, "Unable to determine {:?} note commitment tree size at height {}", protocol, at_height) } } diff --git a/zcash_client_sqlite/src/chain.rs b/zcash_client_sqlite/src/chain.rs index 46478fbc9..8c8a1a272 100644 --- a/zcash_client_sqlite/src/chain.rs +++ b/zcash_client_sqlite/src/chain.rs @@ -29,7 +29,7 @@ pub mod migrations; pub(crate) fn blockdb_with_blocks( block_source: &BlockDb, from_height: Option, - limit: Option, + limit: Option, mut with_row: F, ) -> Result<(), Error> where @@ -52,7 +52,9 @@ where let mut rows = stmt_blocks .query(params![ from_height.map_or(0u32, u32::from), - limit.unwrap_or(u32::max_value()), + limit + .and_then(|l| u32::try_from(l).ok()) + .unwrap_or(u32::MAX) ]) .map_err(to_chain_error)?; @@ -198,7 +200,7 @@ pub(crate) fn blockmetadb_find_block( pub(crate) fn fsblockdb_with_blocks( cache: &FsBlockDb, from_height: Option, - limit: Option, + limit: Option, mut with_block: F, ) -> Result<(), Error> where @@ -223,7 +225,9 @@ where .query_map( params![ from_height.map_or(0u32, u32::from), - limit.unwrap_or(u32::max_value()), + limit + .and_then(|l| u32::try_from(l).ok()) + .unwrap_or(u32::MAX) ], |row| { Ok(BlockMeta { @@ -279,11 +283,12 @@ mod tests { use zcash_client_backend::{ address::RecipientAddress, data_api::{ - chain::scan_cached_blocks, + chain::{error::Error, scan_cached_blocks}, wallet::{input_selection::GreedyInputSelector, spend}, WalletRead, WalletWrite, }, fees::{zip317::SingleOutputChangeStrategy, DustOutputPolicy}, + scanning::ScanError, wallet::OvkPolicy, zip321::{Payment, TransactionRequest}, }; @@ -315,12 +320,9 @@ mod tests { assert_matches!(db_data.get_max_height_hash(), Ok(None)); // Create a fake CompactBlock sending value to the address - let fake_block_hash = BlockHash([0; 32]); - let fake_block_height = sapling_activation_height(); - let (cb, _) = fake_compact_block( - fake_block_height, - fake_block_hash, + sapling_activation_height(), + BlockHash([0; 32]), &dfvk, AddressType::DefaultExternal, Amount::from_u64(5).unwrap(), @@ -348,17 +350,20 @@ mod tests { Amount::from_u64(7).unwrap(), 1, ); + insert_into_cache(&db_cache, &cb2); - // Scan the cache again - scan_cached_blocks( - &tests::network(), - &db_cache, - &mut db_data, - sapling_activation_height() + 1, - 1, - ) - .unwrap(); + // Scanning should detect no inconsistencies + assert_matches!( + scan_cached_blocks( + &tests::network(), + &db_cache, + &mut db_data, + sapling_activation_height() + 1, + 1, + ), + Ok(()) + ); } #[test] @@ -394,15 +399,17 @@ mod tests { insert_into_cache(&db_cache, &cb); insert_into_cache(&db_cache, &cb2); - // Scan the cache - scan_cached_blocks( - &tests::network(), - &db_cache, - &mut db_data, - sapling_activation_height(), - 2, - ) - .unwrap(); + // Scanning the cache should find no inconsistencies + assert_matches!( + scan_cached_blocks( + &tests::network(), + &db_cache, + &mut db_data, + sapling_activation_height(), + 2, + ), + Ok(()) + ); // Create more fake CompactBlocks that don't connect to the scanned ones let (cb3, _) = fake_compact_block( @@ -433,83 +440,8 @@ mod tests { sapling_activation_height() + 2, 2 ), - Err(_) // FIXME: check error result more closely - ); - } - - #[test] - fn invalid_chain_cache_reorg() { - let cache_file = NamedTempFile::new().unwrap(); - let db_cache = BlockDb::for_path(cache_file.path()).unwrap(); - init_cache_database(&db_cache).unwrap(); - - let data_file = NamedTempFile::new().unwrap(); - let mut db_data = WalletDb::for_path(data_file.path(), tests::network()).unwrap(); - init_wallet_db(&mut db_data, Some(Secret::new(vec![]))).unwrap(); - - // Add an account to the wallet - let (dfvk, _taddr) = init_test_accounts_table(&mut db_data); - - // Create some fake CompactBlocks - let (cb, _) = fake_compact_block( - sapling_activation_height(), - BlockHash([0; 32]), - &dfvk, - AddressType::DefaultExternal, - Amount::from_u64(5).unwrap(), - 0, - ); - let (cb2, _) = fake_compact_block( - sapling_activation_height() + 1, - cb.hash(), - &dfvk, - AddressType::DefaultExternal, - Amount::from_u64(7).unwrap(), - 1, - ); - insert_into_cache(&db_cache, &cb); - insert_into_cache(&db_cache, &cb2); - - // Scan the cache - scan_cached_blocks( - &tests::network(), - &db_cache, - &mut db_data, - sapling_activation_height(), - 2, - ) - .unwrap(); - - // Create more fake CompactBlocks that contain a reorg - let (cb3, _) = fake_compact_block( - sapling_activation_height() + 2, - cb2.hash(), - &dfvk, - AddressType::DefaultExternal, - Amount::from_u64(8).unwrap(), - 2, - ); - let (cb4, _) = fake_compact_block( - sapling_activation_height() + 3, - BlockHash([1; 32]), - &dfvk, - AddressType::DefaultExternal, - Amount::from_u64(3).unwrap(), - 3, - ); - insert_into_cache(&db_cache, &cb3); - insert_into_cache(&db_cache, &cb4); - - // Data+cache chain should be invalid inside the cache - assert_matches!( - scan_cached_blocks( - &tests::network(), - &db_cache, - &mut db_data, - sapling_activation_height() + 2, - 2 - ), - Err(_) // FIXME: check error result more closely + Err(Error::Scan(ScanError::PrevHashMismatch { at_height })) + if at_height == sapling_activation_height() + 2 ); } diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 1686145ad..7dad65e90 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -36,7 +36,6 @@ use either::Either; use rusqlite::{self, Connection}; use secrecy::{ExposeSecret, SecretVec}; use std::{borrow::Borrow, collections::HashMap, convert::AsRef, fmt, io, ops::Range, path::Path}; -use wallet::commitment_tree::put_shard_roots; use incrementalmerkletree::Position; use shardtree::{ShardTree, ShardTreeError}; @@ -58,6 +57,7 @@ use zcash_client_backend::{ data_api::{ self, chain::{BlockSource, CommitmentTreeRoot}, + scanning::{ScanPriority, ScanRange}, BlockMetadata, DecryptedTransaction, NullifierQuery, PoolType, Recipient, ScannedBlock, SentTransaction, ShieldedProtocol, WalletCommitmentTrees, WalletRead, WalletWrite, SAPLING_SHARD_HEIGHT, @@ -80,13 +80,18 @@ use { pub mod chain; pub mod error; pub mod serialization; + pub mod wallet; +use wallet::commitment_tree::put_shard_roots; /// The maximum number of blocks the wallet is allowed to rewind. This is /// consistent with the bound in zcashd, and allows block data deeper than /// this delta from the chain tip to be pruned. pub(crate) const PRUNING_DEPTH: u32 = 100; +/// The number of blocks to verify ahead when the chain tip is updated. +pub(crate) const VERIFY_LOOKAHEAD: u32 = 10; + pub(crate) const SAPLING_TABLES_PREFIX: &str = "sapling"; /// A newtype wrapper for sqlite primary key values for the notes @@ -167,12 +172,9 @@ impl, P: consensus::Parameters> WalletRead for W wallet::block_fully_scanned(self.conn.borrow()) } - fn suggest_scan_ranges( - &self, - _batch_size: usize, - _limit: usize, - ) -> Result>, Self::Error> { - todo!() + fn suggest_scan_ranges(&self) -> Result, Self::Error> { + wallet::scanning::suggest_scan_ranges(self.conn.borrow(), ScanPriority::Historic) + .map_err(SqliteClientError::from) } fn get_min_unspent_height(&self) -> Result, Self::Error> { @@ -401,18 +403,24 @@ impl WalletWrite for WalletDb blocks: Vec>, ) -> Result, Self::Error> { self.transactionally(|wdb| { - let start_position = blocks.first().map(|block| { - Position::from( - u64::from(block.metadata().sapling_tree_size()) - - u64::try_from(block.sapling_commitments().len()).unwrap(), + let start_positions = blocks.first().map(|block| { + ( + block.height(), + Position::from( + u64::from(block.metadata().sapling_tree_size()) + - u64::try_from(block.sapling_commitments().len()).unwrap(), + ), ) }); let mut wallet_note_ids = vec![]; let mut sapling_commitments = vec![]; - let mut end_height = None; - + let mut last_scanned_height = None; + let mut note_positions = vec![]; for block in blocks.into_iter() { - if end_height.iter().any(|prev| block.height() != *prev + 1) { + if last_scanned_height + .iter() + .any(|prev| block.height() != *prev + 1) + { return Err(SqliteClientError::NonSequentialBlocks); } @@ -442,13 +450,21 @@ impl WalletWrite for WalletDb } } - end_height = Some(block.height()); + note_positions.extend(block.transactions().iter().flat_map(|wtx| { + wtx.sapling_outputs + .iter() + .map(|out| out.note_commitment_tree_position()) + })); + + last_scanned_height = Some(block.height()); sapling_commitments.extend(block.into_sapling_commitments().into_iter()); } - // We will have a start position and an end height in all cases where `blocks` is - // non-empty. - if let Some((start_position, end_height)) = start_position.zip(end_height) { + // We will have a start position and a last scanned height in all cases where + // `blocks` is non-empty. + if let Some(((start_height, start_position), last_scanned_height)) = + start_positions.zip(last_scanned_height) + { // Update the Sapling note commitment tree with all newly read note commitments let mut sapling_commitments = sapling_commitments.into_iter(); wdb.with_sapling_tree_mut::<_, _, SqliteClientError>(move |sapling_tree| { @@ -457,13 +473,30 @@ impl WalletWrite for WalletDb })?; // Update now-expired transactions that didn't get mined. - wallet::update_expired_notes(wdb.conn.0, end_height)?; + wallet::update_expired_notes(wdb.conn.0, last_scanned_height)?; + + wallet::scanning::scan_complete( + wdb.conn.0, + &wdb.params, + Range { + start: start_height, + end: last_scanned_height + 1, + }, + ¬e_positions, + )?; } Ok(wallet_note_ids) }) } + fn update_chain_tip(&mut self, tip_height: BlockHeight) -> Result<(), Self::Error> { + let tx = self.conn.transaction()?; + wallet::scanning::update_chain_tip(&tx, &self.params, tip_height)?; + tx.commit()?; + Ok(()) + } + fn store_decrypted_tx( &mut self, d_tx: DecryptedTransaction, @@ -750,7 +783,7 @@ impl BlockSource for BlockDb { fn with_blocks( &self, from_height: Option, - limit: Option, + limit: Option, with_row: F, ) -> Result<(), data_api::chain::error::Error> where @@ -928,7 +961,7 @@ impl BlockSource for FsBlockDb { fn with_blocks( &self, from_height: Option, - limit: Option, + limit: Option, with_row: F, ) -> Result<(), data_api::chain::error::Error> where diff --git a/zcash_client_sqlite/src/serialization.rs b/zcash_client_sqlite/src/serialization.rs index eb1176465..a847d8672 100644 --- a/zcash_client_sqlite/src/serialization.rs +++ b/zcash_client_sqlite/src/serialization.rs @@ -4,7 +4,7 @@ use byteorder::{ReadBytesExt, WriteBytesExt}; use core::ops::Deref; use shardtree::{Node, PrunableTree, RetentionFlags, Tree}; use std::io::{self, Read, Write}; -use std::rc::Rc; +use std::sync::Arc; use zcash_encoding::Optional; use zcash_primitives::merkle_tree::HashSer; @@ -53,7 +53,7 @@ pub fn write_shard(writer: &mut W, tree: &PrunableTree) fn read_shard_v1(mut reader: &mut R) -> io::Result> { match reader.read_u8()? { PARENT_TAG => { - let ann = Optional::read(&mut reader, ::read)?.map(Rc::new); + let ann = Optional::read(&mut reader, ::read)?.map(Arc::new); let left = read_shard_v1(reader)?; let right = read_shard_v1(reader)?; Ok(Tree::parent(ann, left, right)) diff --git a/zcash_client_sqlite/src/wallet.rs b/zcash_client_sqlite/src/wallet.rs index 6f90151bb..285ba352b 100644 --- a/zcash_client_sqlite/src/wallet.rs +++ b/zcash_client_sqlite/src/wallet.rs @@ -68,6 +68,8 @@ use rusqlite::{self, named_params, OptionalExtension, ToSql}; use std::collections::HashMap; use std::convert::TryFrom; use std::io::{self, Cursor}; +use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; + use zcash_client_backend::data_api::ShieldedProtocol; use zcash_primitives::transaction::TransactionData; @@ -91,10 +93,13 @@ use zcash_client_backend::{ wallet::WalletTx, }; +use crate::VERIFY_LOOKAHEAD; use crate::{ error::SqliteClientError, SqlTransaction, WalletCommitmentTrees, WalletDb, PRUNING_DEPTH, }; +use self::scanning::replace_queue_entries; + #[cfg(feature = "transparent-inputs")] use { crate::UtxoId, @@ -109,6 +114,7 @@ use { pub(crate) mod commitment_tree; pub mod init; pub(crate) mod sapling; +pub(crate) mod scanning; pub(crate) const BLOCK_SAPLING_FRONTIER_ABSENT: &[u8] = &[0x0]; @@ -629,8 +635,8 @@ pub(crate) fn block_metadata( ) -> Result, SqliteClientError> { conn.query_row( "SELECT height, hash, sapling_commitment_tree_size, sapling_tree - FROM blocks - WHERE height = :block_height", + FROM blocks + WHERE height = :block_height", named_params![":block_height": u32::from(block_height)], |row| { let height: u32 = row.get(0)?; @@ -742,7 +748,7 @@ pub(crate) fn truncate_to_height( ) -> Result<(), SqliteClientError> { let sapling_activation_height = params .activation_height(NetworkUpgrade::Sapling) - .expect("Sapling activation height mutst be available."); + .expect("Sapling activation height must be available."); // Recall where we synced up to previously. let last_scanned_height = conn.query_row("SELECT MAX(height) FROM blocks", [], |row| { @@ -800,7 +806,8 @@ pub(crate) fn truncate_to_height( // Un-mine transactions. conn.execute( - "UPDATE transactions SET block = NULL, tx_index = NULL WHERE block IS NOT NULL AND block > ?", + "UPDATE transactions SET block = NULL, tx_index = NULL + WHERE block IS NOT NULL AND block > ?", [u32::from(block_height)], )?; @@ -809,6 +816,27 @@ pub(crate) fn truncate_to_height( "DELETE FROM blocks WHERE height > ?", [u32::from(block_height)], )?; + + // Delete from the scanning queue any range with a start height greater than the + // truncation height, and then truncate any remaining range by setting the end + // equal to the truncation height + 1. + conn.execute( + "DELETE FROM scan_queue + WHERE block_range_start > :block_height", + named_params![":block_height": u32::from(block_height)], + )?; + + conn.execute( + "UPDATE scan_queue + SET block_range_end = :end_height + WHERE block_range_end > :end_height", + named_params![":end_height": u32::from(block_height + 1)], + )?; + + // Prioritize the range starting at the height we just rewound to for verification + let query_range = block_height..(block_height + VERIFY_LOOKAHEAD); + let scan_range = ScanRange::from_parts(query_range.clone(), ScanPriority::Verify); + replace_queue_entries(conn, &query_range, Some(scan_range).into_iter())?; } Ok(()) diff --git a/zcash_client_sqlite/src/wallet/commitment_tree.rs b/zcash_client_sqlite/src/wallet/commitment_tree.rs index 12cf24333..be5c38b42 100644 --- a/zcash_client_sqlite/src/wallet/commitment_tree.rs +++ b/zcash_client_sqlite/src/wallet/commitment_tree.rs @@ -4,7 +4,7 @@ use std::{ collections::BTreeSet, io::{self, Cursor}, marker::PhantomData, - rc::Rc, + sync::Arc, }; use zcash_client_backend::data_api::chain::CommitmentTreeRoot; @@ -281,7 +281,7 @@ pub(crate) fn get_shard( let located_tree = LocatedPrunableTree::from_parts(shard_root_addr, shard_tree); if let Some(root_hash_data) = root_hash { let root_hash = H::read(Cursor::new(root_hash_data)).map_err(Either::Left)?; - Ok(located_tree.reannotate_root(Some(Rc::new(root_hash)))) + Ok(located_tree.reannotate_root(Some(Arc::new(root_hash)))) } else { Ok(located_tree) } @@ -772,12 +772,44 @@ pub(crate) fn put_shard_roots< return Ok(()); } - // We treat the cap as a DEPTH-SHARD_HEIGHT tree so that we can make a batch insertion of - // root data using `Position::from(start_index)` as the starting position and treating the - // roots as level-0 leaves. + // We treat the cap as a tree with `DEPTH - SHARD_HEIGHT` levels, so that we can make a + // batch insertion of root data using `Position::from(start_index)` as the starting position + // and treating the roots as level-0 leaves. + #[derive(Clone, Debug, PartialEq, Eq)] + struct LevelShifter(H); + impl Hashable for LevelShifter { + fn empty_leaf() -> Self { + Self(H::empty_root(SHARD_HEIGHT.into())) + } + + fn combine(level: Level, a: &Self, b: &Self) -> Self { + Self(H::combine(level + SHARD_HEIGHT, &a.0, &b.0)) + } + + fn empty_root(level: Level) -> Self + where + Self: Sized, + { + Self(H::empty_root(level + SHARD_HEIGHT)) + } + } + impl HashSer for LevelShifter { + fn read(reader: R) -> io::Result + where + Self: Sized, + { + H::read(reader).map(Self) + } + + fn write(&self, writer: W) -> io::Result<()> { + self.0.write(writer) + } + } + let cap = LocatedTree::from_parts( Address::from_parts((DEPTH - SHARD_HEIGHT).into(), 0), - get_cap(conn, table_prefix).map_err(ShardTreeError::Storage)?, + get_cap::>(conn, table_prefix) + .map_err(ShardTreeError::Storage)?, ); let cap_result = cap @@ -785,7 +817,7 @@ pub(crate) fn put_shard_roots< Position::from(start_index), roots.iter().map(|r| { ( - r.root_hash().clone(), + LevelShifter(r.root_hash().clone()), Retention::Checkpoint { id: (), is_marked: false, diff --git a/zcash_client_sqlite/src/wallet/init.rs b/zcash_client_sqlite/src/wallet/init.rs index 66efda12e..013581d81 100644 --- a/zcash_client_sqlite/src/wallet/init.rs +++ b/zcash_client_sqlite/src/wallet/init.rs @@ -2,6 +2,7 @@ use either::Either; use incrementalmerkletree::Retention; use std::{collections::HashMap, fmt, io}; +use tracing::debug; use rusqlite::{self, types::ToSql}; use schemer::{Migrator, MigratorError}; @@ -318,6 +319,7 @@ pub fn init_blocks_table( )?; if let Some(nonempty_frontier) = block_end_tree.to_frontier().value() { + debug!("Inserting frontier into ShardTree: {:?}", nonempty_frontier); let shard_store = SqliteShardStore::<_, sapling::Node, SAPLING_SHARD_HEIGHT>::from_connection( wdb.conn.0, @@ -463,6 +465,16 @@ mod tests { FOREIGN KEY (block) REFERENCES blocks(height), CONSTRAINT witness_height UNIQUE (note, block) )", + "CREATE TABLE scan_queue ( + block_range_start INTEGER NOT NULL, + block_range_end INTEGER NOT NULL, + priority INTEGER NOT NULL, + CONSTRAINT range_start_uniq UNIQUE (block_range_start), + CONSTRAINT range_end_uniq UNIQUE (block_range_end), + CONSTRAINT range_bounds_order CHECK ( + block_range_start < block_range_end + ) + )", "CREATE TABLE schemer_migrations ( id blob PRIMARY KEY )", diff --git a/zcash_client_sqlite/src/wallet/init/migrations/shardtree_support.rs b/zcash_client_sqlite/src/wallet/init/migrations/shardtree_support.rs index 8a238d00e..f8c4a61af 100644 --- a/zcash_client_sqlite/src/wallet/init/migrations/shardtree_support.rs +++ b/zcash_client_sqlite/src/wallet/init/migrations/shardtree_support.rs @@ -8,10 +8,14 @@ use incrementalmerkletree::Retention; use rusqlite::{self, named_params, params}; use schemer; use schemer_rusqlite::RusqliteMigration; -use shardtree::ShardTree; +use shardtree::{caching::CachingShardStore, ShardTree, ShardTreeError}; +use tracing::{debug, trace}; use uuid::Uuid; -use zcash_client_backend::data_api::SAPLING_SHARD_HEIGHT; +use zcash_client_backend::data_api::{ + scanning::{ScanPriority, ScanRange}, + SAPLING_SHARD_HEIGHT, +}; use zcash_primitives::{ consensus::BlockHeight, merkle_tree::{read_commitment_tree, read_incremental_witness}, @@ -20,8 +24,10 @@ use zcash_primitives::{ use crate::{ wallet::{ + block_height_extrema, commitment_tree::SqliteShardStore, init::{migrations::received_notes_nullable_nf, WalletMigrationError}, + scanning::insert_queue_entries, }, PRUNING_DEPTH, SAPLING_TABLES_PREFIX, }; @@ -56,6 +62,7 @@ impl RusqliteMigration for Migration { fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), WalletMigrationError> { // Add commitment tree sizes to block metadata. + debug!("Adding new columns"); transaction.execute_batch( "ALTER TABLE blocks ADD COLUMN sapling_commitment_tree_size INTEGER; ALTER TABLE blocks ADD COLUMN orchard_commitment_tree_size INTEGER; @@ -63,6 +70,7 @@ impl RusqliteMigration for Migration { )?; // Add shard persistence + debug!("Creating tables for shard persistence"); transaction.execute_batch( "CREATE TABLE sapling_tree_shards ( shard_index INTEGER PRIMARY KEY, @@ -81,6 +89,7 @@ impl RusqliteMigration for Migration { )?; // Add checkpoint persistence + debug!("Creating tables for checkpoint persistence"); transaction.execute_batch( "CREATE TABLE sapling_tree_checkpoints ( checkpoint_id INTEGER PRIMARY KEY, @@ -99,6 +108,7 @@ impl RusqliteMigration for Migration { transaction, SAPLING_TABLES_PREFIX, )?; + let shard_store = CachingShardStore::load(shard_store).map_err(ShardTreeError::Storage)?; let mut shard_tree: ShardTree< _, { sapling::NOTE_COMMITMENT_TREE_DEPTH }, @@ -128,22 +138,42 @@ impl RusqliteMigration for Migration { ) })?; + if block_height % 1000 == 0 { + debug!(height = block_height, "Migrating tree data to shardtree"); + } + trace!( + height = block_height, + size = block_end_tree.size(), + "Storing Sapling commitment tree size" + ); stmt_update_block_sapling_tree_size .execute(params![block_end_tree.size(), block_height])?; if let Some(nonempty_frontier) = block_end_tree.to_frontier().value() { - shard_tree.insert_frontier_nodes( - nonempty_frontier.clone(), - Retention::Checkpoint { - id: BlockHeight::from(block_height), - is_marked: false, - }, - )?; + trace!( + height = block_height, + frontier = ?nonempty_frontier, + "Inserting frontier nodes", + ); + shard_tree + .insert_frontier_nodes( + nonempty_frontier.clone(), + Retention::Checkpoint { + id: BlockHeight::from(block_height), + is_marked: false, + }, + ) + .map_err(|e| match e { + ShardTreeError::Query(e) => ShardTreeError::Query(e), + ShardTreeError::Insert(e) => ShardTreeError::Insert(e), + ShardTreeError::Storage(_) => unreachable!(), + })? } } } // Insert all the tree information that we can get from existing incremental witnesses + debug!("Migrating witness data to shardtree"); { let mut stmt_blocks = transaction.prepare("SELECT note, block, witness FROM sapling_witnesses")?; @@ -180,10 +210,50 @@ impl RusqliteMigration for Migration { updated_note_positions.insert(witnessed_position); } - shard_tree.insert_witness_nodes(witness, BlockHeight::from(block_height))?; + shard_tree + .insert_witness_nodes(witness, BlockHeight::from(block_height)) + .map_err(|e| match e { + ShardTreeError::Query(e) => ShardTreeError::Query(e), + ShardTreeError::Insert(e) => ShardTreeError::Insert(e), + ShardTreeError::Storage(_) => unreachable!(), + })?; } } + shard_tree + .into_store() + .flush() + .map_err(ShardTreeError::Storage)?; + + // Establish the scan queue & wallet history table. + // block_range_end is exclusive. + debug!("Creating table for scan queue"); + transaction.execute_batch( + "CREATE TABLE scan_queue ( + block_range_start INTEGER NOT NULL, + block_range_end INTEGER NOT NULL, + priority INTEGER NOT NULL, + CONSTRAINT range_start_uniq UNIQUE (block_range_start), + CONSTRAINT range_end_uniq UNIQUE (block_range_end), + CONSTRAINT range_bounds_order CHECK ( + block_range_start < block_range_end + ) + );", + )?; + + if let Some((start, end)) = block_height_extrema(transaction)? { + // `ScanRange` uses an exclusive upper bound. + let chain_end = end + 1; + insert_queue_entries( + transaction, + Some(ScanRange::from_parts( + start..chain_end, + ScanPriority::Historic, + )) + .iter(), + )?; + } + Ok(()) } diff --git a/zcash_client_sqlite/src/wallet/scanning.rs b/zcash_client_sqlite/src/wallet/scanning.rs new file mode 100644 index 000000000..d3f456e1a --- /dev/null +++ b/zcash_client_sqlite/src/wallet/scanning.rs @@ -0,0 +1,1215 @@ +use rusqlite::{self, named_params, types::Value, OptionalExtension}; +use std::cmp::{max, min, Ordering}; +use std::collections::BTreeSet; +use std::ops::{Not, Range}; +use std::rc::Rc; +use tracing::{debug, trace}; +use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; + +use incrementalmerkletree::{Address, Position}; +use zcash_primitives::consensus::{self, BlockHeight, NetworkUpgrade}; + +use zcash_client_backend::data_api::SAPLING_SHARD_HEIGHT; + +use crate::error::SqliteClientError; +use crate::{PRUNING_DEPTH, VERIFY_LOOKAHEAD}; + +use super::block_height_extrema; + +#[derive(Debug, Clone, Copy)] +enum Insert { + Left, + Right, +} + +impl Not for Insert { + type Output = Self; + + fn not(self) -> Self::Output { + match self { + Insert::Left => Insert::Right, + Insert::Right => Insert::Left, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Dominance { + Left, + Right, + Equal, +} + +impl From for Dominance { + fn from(value: Insert) -> Self { + match value { + Insert::Left => Dominance::Left, + Insert::Right => Dominance::Right, + } + } +} + +pub(crate) fn parse_priority_code(code: i64) -> Option { + use ScanPriority::*; + match code { + 10 => Some(Scanned), + 20 => Some(Historic), + 30 => Some(OpenAdjacent), + 40 => Some(FoundNote), + 50 => Some(ChainTip), + 60 => Some(Verify), + _ => None, + } +} + +pub(crate) fn priority_code(priority: &ScanPriority) -> i64 { + use ScanPriority::*; + match priority { + Scanned => 10, + Historic => 20, + OpenAdjacent => 30, + FoundNote => 40, + ChainTip => 50, + Verify => 60, + } +} + +pub(crate) fn suggest_scan_ranges( + conn: &rusqlite::Connection, + min_priority: ScanPriority, +) -> Result, SqliteClientError> { + let mut stmt_scan_ranges = conn.prepare_cached( + "SELECT block_range_start, block_range_end, priority + FROM scan_queue + WHERE priority >= :min_priority + ORDER BY priority DESC, block_range_end DESC", + )?; + + let mut rows = + stmt_scan_ranges.query(named_params![":min_priority": priority_code(&min_priority)])?; + + let mut result = vec![]; + while let Some(row) = rows.next()? { + let range = Range { + start: row.get::<_, u32>(0).map(BlockHeight::from)?, + end: row.get::<_, u32>(1).map(BlockHeight::from)?, + }; + let code = row.get::<_, i64>(2)?; + let priority = parse_priority_code(code).ok_or_else(|| { + SqliteClientError::CorruptedData(format!("scan priority not recognized: {}", code)) + })?; + + result.push(ScanRange::from_parts(range, priority)); + } + + Ok(result) +} + +// This implements the dominance rule for range priority. If the inserted range's priority is +// `Verify`, this replaces any existing priority. Otherwise, if the current priority is +// `Scanned`, it remains as `Scanned`; and if the new priority is `Scanned`, it +// overrides any existing priority. +fn dominance(current: &ScanPriority, inserted: &ScanPriority, insert: Insert) -> Dominance { + match (current.cmp(inserted), (current, inserted)) { + (Ordering::Equal, _) => Dominance::Equal, + (_, (_, ScanPriority::Verify | ScanPriority::Scanned)) => Dominance::from(insert), + (_, (ScanPriority::Scanned, _)) => Dominance::from(!insert), + (Ordering::Less, _) => Dominance::from(insert), + (Ordering::Greater, _) => Dominance::from(!insert), + } +} + +/// In the comments for each alternative, `()` represents the left range and `[]` represents the right range. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RangeOrdering { + /// `( ) [ ]` + LeftFirstDisjoint, + /// `( [ ) ]` + LeftFirstOverlap, + /// `[ ( ) ]` + LeftContained, + /// ```text + /// ( ) + /// [ ] + /// ``` + Equal, + /// `( [ ] )` + RightContained, + /// `[ ( ] )` + RightFirstOverlap, + /// `[ ] ( )` + RightFirstDisjoint, +} + +impl RangeOrdering { + fn cmp(a: &Range, b: &Range) -> Self { + use Ordering::*; + assert!(a.start <= a.end && b.start <= b.end); + match (a.start.cmp(&b.start), a.end.cmp(&b.end)) { + _ if a.end <= b.start => RangeOrdering::LeftFirstDisjoint, + _ if b.end <= a.start => RangeOrdering::RightFirstDisjoint, + (Less, Less) => RangeOrdering::LeftFirstOverlap, + (Equal, Less) | (Greater, Less) | (Greater, Equal) => RangeOrdering::LeftContained, + (Equal, Equal) => RangeOrdering::Equal, + (Equal, Greater) | (Less, Greater) | (Less, Equal) => RangeOrdering::RightContained, + (Greater, Greater) => RangeOrdering::RightFirstOverlap, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum Joined { + One(ScanRange), + Two(ScanRange, ScanRange), + Three(ScanRange, ScanRange, ScanRange), +} + +fn join_nonoverlapping(left: ScanRange, right: ScanRange) -> Joined { + assert!(left.block_range().end <= right.block_range().start); + + if left.block_range().end == right.block_range().start { + if left.priority() == right.priority() { + Joined::One(ScanRange::from_parts( + left.block_range().start..right.block_range().end, + left.priority(), + )) + } else { + Joined::Two(left, right) + } + } else { + // there is a gap that will need to be filled + let gap = ScanRange::from_parts( + left.block_range().end..right.block_range().start, + ScanPriority::Historic, + ); + + match join_nonoverlapping(left, gap) { + Joined::One(merged) => join_nonoverlapping(merged, right), + Joined::Two(left, gap) => match join_nonoverlapping(gap, right) { + Joined::One(merged) => Joined::Two(left, merged), + Joined::Two(gap, right) => Joined::Three(left, gap, right), + _ => unreachable!(), + }, + _ => unreachable!(), + } + } +} + +fn insert(current: ScanRange, to_insert: ScanRange) -> Joined { + fn join_overlapping(left: ScanRange, right: ScanRange, insert: Insert) -> Joined { + assert!( + left.block_range().start <= right.block_range().start + && left.block_range().end > right.block_range().start + ); + + // recompute the range dominance based upon the queue entry priorities + let dominance = match insert { + Insert::Left => dominance(&right.priority(), &left.priority(), insert), + Insert::Right => dominance(&left.priority(), &right.priority(), insert), + }; + + match dominance { + Dominance::Left => { + if let Some(right) = right.truncate_start(left.block_range().end) { + Joined::Two(left, right) + } else { + Joined::One(left) + } + } + Dominance::Equal => Joined::One(ScanRange::from_parts( + left.block_range().start..max(left.block_range().end, right.block_range().end), + left.priority(), + )), + Dominance::Right => match ( + left.truncate_end(right.block_range().start), + left.truncate_start(right.block_range().end), + ) { + (Some(before), Some(after)) => Joined::Three(before, right, after), + (Some(before), None) => Joined::Two(before, right), + (None, Some(after)) => Joined::Two(right, after), + (None, None) => Joined::One(right), + }, + } + } + + use RangeOrdering::*; + match RangeOrdering::cmp(to_insert.block_range(), current.block_range()) { + LeftFirstDisjoint => join_nonoverlapping(to_insert, current), + LeftFirstOverlap | RightContained => join_overlapping(to_insert, current, Insert::Left), + Equal => Joined::One(ScanRange::from_parts( + to_insert.block_range().clone(), + match dominance(¤t.priority(), &to_insert.priority(), Insert::Right) { + Dominance::Left | Dominance::Equal => current.priority(), + Dominance::Right => to_insert.priority(), + }, + )), + RightFirstOverlap | LeftContained => join_overlapping(current, to_insert, Insert::Right), + RightFirstDisjoint => join_nonoverlapping(current, to_insert), + } +} + +#[derive(Debug, Clone)] +enum SpanningTree { + Leaf(ScanRange), + Parent { + span: Range, + left: Box, + right: Box, + }, +} + +impl SpanningTree { + fn span(&self) -> Range { + match self { + SpanningTree::Leaf(entry) => entry.block_range().clone(), + SpanningTree::Parent { span, .. } => span.clone(), + } + } + + fn from_joined(joined: Joined) -> Self { + match joined { + Joined::One(entry) => SpanningTree::Leaf(entry), + Joined::Two(left, right) => SpanningTree::Parent { + span: left.block_range().start..right.block_range().end, + left: Box::new(SpanningTree::Leaf(left)), + right: Box::new(SpanningTree::Leaf(right)), + }, + Joined::Three(left, mid, right) => SpanningTree::Parent { + span: left.block_range().start..right.block_range().end, + left: Box::new(SpanningTree::Leaf(left)), + right: Box::new(SpanningTree::Parent { + span: mid.block_range().start..right.block_range().end, + left: Box::new(SpanningTree::Leaf(mid)), + right: Box::new(SpanningTree::Leaf(right)), + }), + }, + } + } + + fn from_insert( + left: Box, + right: Box, + to_insert: ScanRange, + insert: Insert, + ) -> Self { + let (left, right) = match insert { + Insert::Left => (Box::new(left.insert(to_insert)), right), + Insert::Right => (left, Box::new(right.insert(to_insert))), + }; + SpanningTree::Parent { + span: left.span().start..right.span().end, + left, + right, + } + } + + fn from_split(left: Self, right: Self, to_insert: ScanRange, split_point: BlockHeight) -> Self { + let (l_insert, r_insert) = to_insert + .split_at(split_point) + .expect("Split point is within the range of to_insert"); + let left = Box::new(left.insert(l_insert)); + let right = Box::new(right.insert(r_insert)); + SpanningTree::Parent { + span: left.span().start..right.span().end, + left, + right, + } + } + + fn insert(self, to_insert: ScanRange) -> Self { + match self { + SpanningTree::Leaf(cur) => Self::from_joined(insert(cur, to_insert)), + SpanningTree::Parent { span, left, right } => { + // This algorithm always preserves the existing partition point, and does not do + // any rebalancing or unification of ranges within the tree. This should be okay + // because `into_vec` performs such unification, and the tree being unbalanced + // should be fine given the relatively small number of ranges we should ordinarily + // be concerned with. + use RangeOrdering::*; + match RangeOrdering::cmp(&span, to_insert.block_range()) { + LeftFirstDisjoint => { + // extend the right-hand branch + Self::from_insert(left, right, to_insert, Insert::Right) + } + LeftFirstOverlap => { + let split_point = left.span().end; + if split_point > to_insert.block_range().start { + Self::from_split(*left, *right, to_insert, split_point) + } else { + // to_insert is fully contained in or equals the right child + Self::from_insert(left, right, to_insert, Insert::Right) + } + } + RightContained => { + // to_insert is fully contained within the current span, so we will insert + // into one or both sides + let split_point = left.span().end; + if to_insert.block_range().start >= split_point { + // to_insert is fully contained in the right + Self::from_insert(left, right, to_insert, Insert::Right) + } else if to_insert.block_range().end <= split_point { + // to_insert is fully contained in the left + Self::from_insert(left, right, to_insert, Insert::Left) + } else { + // to_insert must be split. + Self::from_split(*left, *right, to_insert, split_point) + } + } + Equal => { + let split_point = left.span().end; + if split_point > to_insert.block_range().start { + Self::from_split(*left, *right, to_insert, split_point) + } else { + // to_insert is fully contained in the right subtree + right.insert(to_insert) + } + } + LeftContained => { + // the current span is fully contained within to_insert, so we will extend + // or overwrite both sides + let split_point = left.span().end; + Self::from_split(*left, *right, to_insert, split_point) + } + RightFirstOverlap => { + let split_point = left.span().end; + if split_point < to_insert.block_range().end { + Self::from_split(*left, *right, to_insert, split_point) + } else { + // to_insert is fully contained in or equals the left child + Self::from_insert(left, right, to_insert, Insert::Left) + } + } + RightFirstDisjoint => { + // extend the left-hand branch + Self::from_insert(left, right, to_insert, Insert::Left) + } + } + } + } + } + + fn into_vec(self) -> Vec { + fn go(acc: &mut Vec, tree: SpanningTree) { + match tree { + SpanningTree::Leaf(entry) => { + if let Some(top) = acc.pop() { + match join_nonoverlapping(top, entry) { + Joined::One(merged) => acc.push(merged), + Joined::Two(l, r) => { + acc.push(l); + acc.push(r); + } + _ => unreachable!(), + } + } else { + acc.push(entry); + } + } + SpanningTree::Parent { left, right, .. } => { + go(acc, *left); + go(acc, *right); + } + } + } + + let mut acc = vec![]; + go(&mut acc, self); + acc + } +} + +pub(crate) fn insert_queue_entries<'a>( + conn: &rusqlite::Connection, + entries: impl Iterator, +) -> Result<(), rusqlite::Error> { + let mut stmt = conn.prepare_cached( + "INSERT INTO scan_queue (block_range_start, block_range_end, priority) + VALUES (:block_range_start, :block_range_end, :priority)", + )?; + + for entry in entries { + trace!("Inserting queue entry {}", entry); + if !entry.is_empty() { + stmt.execute(named_params![ + ":block_range_start": u32::from(entry.block_range().start), + ":block_range_end": u32::from(entry.block_range().end), + ":priority": priority_code(&entry.priority()) + ])?; + } + } + + Ok(()) +} + +pub(crate) fn replace_queue_entries( + conn: &rusqlite::Transaction<'_>, + query_range: &Range, + entries: impl Iterator, +) -> Result<(), SqliteClientError> { + let (to_create, to_delete_ends) = { + let mut suggested_stmt = conn.prepare_cached( + "SELECT block_range_start, block_range_end, priority + FROM scan_queue + WHERE ( + -- the start is contained within the range + :start >= block_range_start + AND :start < block_range_end + ) + OR ( + -- the end is contained within the range + :end > block_range_start + AND :end <= block_range_end + ) + OR ( + -- start..end contains the entire range + block_range_start >= :start + AND block_range_end <= :end + ) + ORDER BY block_range_end", + )?; + + let mut rows = suggested_stmt.query(named_params![ + ":start": u32::from(query_range.start), + ":end": u32::from(query_range.end), + ])?; + + // Iterate over the ranges in the scan queue that overlap the range that we have + // identified as needing to be fully scanned. For each such range add it to the + // spanning tree (these should all be nonoverlapping ranges, but we might coalesce + // some in the process). + let mut to_create: Option = None; + let mut to_delete_ends: Vec = vec![]; + while let Some(row) = rows.next()? { + let entry = ScanRange::from_parts( + Range { + start: BlockHeight::from(row.get::<_, u32>(0)?), + end: BlockHeight::from(row.get::<_, u32>(1)?), + }, + { + let code = row.get::<_, i64>(2)?; + parse_priority_code(code).ok_or_else(|| { + SqliteClientError::CorruptedData(format!( + "scan priority not recognized: {}", + code + )) + })? + }, + ); + to_delete_ends.push(Value::from(u32::from(entry.block_range().end))); + to_create = if let Some(cur) = to_create { + Some(cur.insert(entry)) + } else { + Some(SpanningTree::Leaf(entry)) + }; + } + + // Update the tree that we read from the database, or if we didn't find any ranges + // start with the scanned range. + for entry in entries { + to_create = if let Some(cur) = to_create { + Some(cur.insert(entry)) + } else { + Some(SpanningTree::Leaf(entry)) + }; + } + + (to_create, to_delete_ends) + }; + + if let Some(tree) = to_create { + let ends_ptr = Rc::new(to_delete_ends); + conn.execute( + "DELETE FROM scan_queue WHERE block_range_end IN rarray(:ends)", + named_params![":ends": ends_ptr], + )?; + + let scan_ranges = tree.into_vec(); + insert_queue_entries(conn, scan_ranges.iter())?; + } + + Ok(()) +} + +pub(crate) fn scan_complete( + conn: &rusqlite::Transaction<'_>, + params: &P, + range: Range, + wallet_note_positions: &[Position], +) -> Result<(), SqliteClientError> { + // Determine the range of block heights for which we will be updating the scan queue. + let extended_range = { + // If notes have been detected in the scan, we need to extend any adjacent un-scanned ranges to + // include the blocks needed to complete the note commitment tree subtrees containing the + // positions of the discovered notes. We will query by subtree index to find these bounds. + let required_subtrees = wallet_note_positions + .iter() + .map(|p| Address::above_position(SAPLING_SHARD_HEIGHT.into(), *p).index()) + .collect::>(); + + // we'll either have both min and max bounds, or we'll have neither + let subtree_bounds = required_subtrees + .iter() + .min() + .zip(required_subtrees.iter().max()); + + let mut sapling_shard_end_stmt = conn.prepare_cached( + "SELECT subtree_end_height + FROM sapling_tree_shards + WHERE shard_index = :shard_index", + )?; + + let mut sapling_shard_end = |index: u64| -> Result, rusqlite::Error> { + Ok(sapling_shard_end_stmt + .query_row(named_params![":shard_index": index], |row| { + row.get::<_, Option>(0) + .map(|opt| opt.map(BlockHeight::from)) + }) + .optional()? + .flatten()) + }; + + // If no notes belonging to the wallet were found, we don't need to extend the scanning + // range suggestions to include the associated subtrees, and our bounds are just the + // scanned range. + subtree_bounds + .map(|(min_idx, max_idx)| { + let range_min = if *min_idx > 0 { + // get the block height of the end of the previous shard + sapling_shard_end(*min_idx - 1)? + } else { + // our lower bound is going to be the Sapling activation height + params.activation_height(NetworkUpgrade::Sapling) + }; + + // get the block height for the end of the current shard + let range_max = sapling_shard_end(*max_idx)?; + + Ok::, rusqlite::Error>(Range { + start: range.start.min(range_min.unwrap_or(range.start)), + end: range.end.max(range_max.unwrap_or(range.end)), + }) + }) + .transpose() + .map_err(SqliteClientError::from) + }?; + + let query_range = extended_range.clone().unwrap_or_else(|| range.clone()); + + let scanned = ScanRange::from_parts(range.clone(), ScanPriority::Scanned); + let extensions = if let Some(extended) = extended_range { + vec![ + ScanRange::from_parts(extended.start..range.start, ScanPriority::FoundNote), + ScanRange::from_parts(range.end..extended.end, ScanPriority::FoundNote), + ] + } else { + vec![] + }; + + replace_queue_entries( + conn, + &query_range, + Some(scanned).into_iter().chain(extensions.into_iter()), + )?; + + Ok(()) +} + +pub(crate) fn update_chain_tip( + conn: &rusqlite::Transaction<'_>, + params: &P, + new_tip: BlockHeight, +) -> Result<(), SqliteClientError> { + // Read the previous tip height from the blocks table. + let prior_tip = block_height_extrema(conn)?.map(|(_, prior_tip)| prior_tip); + + // If the chain tip is below the prior tip height, then the caller has caught the + // chain in the middle of a reorg. Do nothing; the caller will continue using the old + // scan ranges and either: + // - encounter an error trying to fetch the blocks (and thus trigger the same handling + // logic as if this happened with the old linear scanning code); or + // - encounter a discontinuity error in `scan_cached_blocks`, at which point they will + // call `WalletDb::truncate_to_height` as part of their reorg handling which will + // resolve the problem. + // + // We don't check the shard height, as normal usage would have the caller update the + // shard state prior to this call, so it is possible and expected to be in a situation + // where we should update the tip-related scan ranges but not the shard-related ones. + if let Some(h) = prior_tip { + if new_tip < h { + return Ok(()); + } + } + + // `ScanRange` uses an exclusive upper bound. + let chain_end = new_tip + 1; + + // Read the maximum height from the shards table. + let shard_start_height = conn.query_row( + "SELECT MAX(subtree_end_height) + FROM sapling_tree_shards", + [], + |row| Ok(row.get::<_, Option>(0)?.map(BlockHeight::from)), + )?; + + // Create a scanning range for the fragment of the last shard leading up to new tip. + let shard_entry = shard_start_height + .filter(|h| h < &chain_end) + .map(|h| ScanRange::from_parts(h..chain_end, ScanPriority::ChainTip)); + + // Create scanning ranges to either validate potentially invalid blocks at the wallet's view + // of the chain tip, or connect the prior tip to the new tip. + let tip_entry = prior_tip.map(|prior_tip| { + // If we don't have shard metadata, this means we're doing linear scanning, so create a + // scan range from the prior tip to the current tip with `Historic` priority. + if shard_entry.is_none() { + ScanRange::from_parts(prior_tip..chain_end, ScanPriority::Historic) + } else { + // Determine the height to which we expect blocks retrieved from the block source to be stable + // and not subject to being reorg'ed. + let stable_height = new_tip.saturating_sub(PRUNING_DEPTH); + + // If the wallet's prior tip is above the stable height, prioritize the range between + // it and the new tip as `ChainTip`. Otherwise, prioritize the `VERIFY_LOOKAHEAD` + // blocks above the wallet's prior tip as `Verify`. Since `scan_cached_blocks` + // retrieves the metadata for the block being connected to, the connectivity to the + // prior tip will always be checked. Since `Verify` ranges have maximum priority, even + // if the block source begins downloading blocks from the shard scan range (which ends + // at the stable height) the scanner should not attempt to scan those blocks until the + // tip range has been completely checked and any required rewinds have been performed. + if prior_tip >= stable_height { + // This may overlap the `shard_entry` range and if so will be coalesced with it. + ScanRange::from_parts(prior_tip..chain_end, ScanPriority::ChainTip) + } else { + // The prior tip is in the range that we now expect to be stable, so we need to verify + // and advance it up to at most the stable height. The shard entry will then cover + // the range to the new tip at the lower `ChainTip` priority. + ScanRange::from_parts( + prior_tip..min(stable_height, prior_tip + VERIFY_LOOKAHEAD), + ScanPriority::Verify, + ) + } + } + }); + if let Some(entry) = &shard_entry { + debug!("{} will update latest shard", entry); + } + if let Some(entry) = &tip_entry { + debug!("{} will connect prior tip to new tip", entry); + } + + let query_range = match (shard_entry.as_ref(), tip_entry.as_ref()) { + (Some(se), Some(te)) => Some(Range { + start: min(se.block_range().start, te.block_range().start), + end: max(se.block_range().end, te.block_range().end), + }), + (Some(se), None) => Some(se.block_range().clone()), + (None, Some(te)) => Some(te.block_range().clone()), + (None, None) => None, + }; + + if let Some(query_range) = query_range { + replace_queue_entries( + conn, + &query_range, + shard_entry.into_iter().chain(tip_entry.into_iter()), + )?; + } else { + // If we have neither shard data nor any existing block data in the database, we should also + // have no existing scan queue entries and can fall back to linear scanning from Sapling + // activation. + if let Some(sapling_activation) = params.activation_height(NetworkUpgrade::Sapling) { + // If the caller provided a chain tip that is before Sapling activation, do + // nothing. + if sapling_activation < chain_end { + let scan_range = + ScanRange::from_parts(sapling_activation..chain_end, ScanPriority::Historic); + insert_queue_entries(conn, Some(scan_range).iter())?; + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use incrementalmerkletree::{Hashable, Level}; + use rusqlite::Connection; + use secrecy::Secret; + use tempfile::NamedTempFile; + use zcash_client_backend::data_api::{ + chain::{scan_cached_blocks, CommitmentTreeRoot}, + scanning::{ScanPriority, ScanRange}, + WalletCommitmentTrees, WalletRead, WalletWrite, + }; + use zcash_primitives::{ + block::BlockHash, consensus::BlockHeight, sapling::Node, transaction::components::Amount, + }; + + use crate::{ + chain::init::init_cache_database, + tests::{ + self, fake_compact_block, init_test_accounts_table, insert_into_cache, + sapling_activation_height, AddressType, + }, + wallet::{init::init_wallet_db, scanning::suggest_scan_ranges}, + BlockDb, WalletDb, + }; + + use super::{join_nonoverlapping, Joined, RangeOrdering, SpanningTree}; + + #[test] + fn test_join_nonoverlapping() { + fn test_range(left: ScanRange, right: ScanRange, expected_joined: Joined) { + let joined = join_nonoverlapping(left, right); + + assert_eq!(joined, expected_joined); + } + + macro_rules! range { + ( $start:expr, $end:expr; $priority:ident ) => { + ScanRange::from_parts( + BlockHeight::from($start)..BlockHeight::from($end), + ScanPriority::$priority, + ) + }; + } + + macro_rules! joined { + ( + ($a_start:expr, $a_end:expr; $a_priority:ident) + ) => { + Joined::One( + range!($a_start, $a_end; $a_priority) + ) + }; + ( + ($a_start:expr, $a_end:expr; $a_priority:ident), + ($b_start:expr, $b_end:expr; $b_priority:ident) + ) => { + Joined::Two( + range!($a_start, $a_end; $a_priority), + range!($b_start, $b_end; $b_priority) + ) + }; + ( + ($a_start:expr, $a_end:expr; $a_priority:ident), + ($b_start:expr, $b_end:expr; $b_priority:ident), + ($c_start:expr, $c_end:expr; $c_priority:ident) + + ) => { + Joined::Three( + range!($a_start, $a_end; $a_priority), + range!($b_start, $b_end; $b_priority), + range!($c_start, $c_end; $c_priority) + ) + }; + } + + // Scan ranges have the same priority and + // line up. + test_range( + range!(1, 9; OpenAdjacent), + range!(9, 15; OpenAdjacent), + joined!( + (1, 15; OpenAdjacent) + ), + ); + + // Scan ranges have different priorities, + // so we cannot merge them even though they + // line up. + test_range( + range!(1, 9; OpenAdjacent), + range!(9, 15; ChainTip), + joined!( + (1, 9; OpenAdjacent), + (9, 15; ChainTip) + ), + ); + + // Scan ranges have the same priority but + // do not line up. + test_range( + range!(1, 9; OpenAdjacent), + range!(13, 15; OpenAdjacent), + joined!( + (1, 9; OpenAdjacent), + (9, 13; Historic), + (13, 15; OpenAdjacent) + ), + ); + + test_range( + range!(1, 9; Historic), + range!(13, 15; OpenAdjacent), + joined!( + (1, 13; Historic), + (13, 15; OpenAdjacent) + ), + ); + + test_range( + range!(1, 9; OpenAdjacent), + range!(13, 15; Historic), + joined!( + (1, 9; OpenAdjacent), + (9, 15; Historic) + ), + ); + } + + #[test] + fn range_ordering() { + use super::RangeOrdering::*; + // Equal + assert_eq!(RangeOrdering::cmp(&(0..1), &(0..1)), Equal); + + // Disjoint or contiguous + assert_eq!(RangeOrdering::cmp(&(0..1), &(1..2)), LeftFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(1..2), &(0..1)), RightFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(0..1), &(2..3)), LeftFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(2..3), &(0..1)), RightFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(1..2), &(2..2)), LeftFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(2..2), &(1..2)), RightFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(1..1), &(1..2)), LeftFirstDisjoint); + assert_eq!(RangeOrdering::cmp(&(1..2), &(1..1)), RightFirstDisjoint); + + // Contained + assert_eq!(RangeOrdering::cmp(&(1..2), &(0..3)), LeftContained); + assert_eq!(RangeOrdering::cmp(&(0..3), &(1..2)), RightContained); + assert_eq!(RangeOrdering::cmp(&(0..1), &(0..3)), LeftContained); + assert_eq!(RangeOrdering::cmp(&(0..3), &(0..1)), RightContained); + assert_eq!(RangeOrdering::cmp(&(2..3), &(0..3)), LeftContained); + assert_eq!(RangeOrdering::cmp(&(0..3), &(2..3)), RightContained); + + // Overlap + assert_eq!(RangeOrdering::cmp(&(0..2), &(1..3)), LeftFirstOverlap); + assert_eq!(RangeOrdering::cmp(&(1..3), &(0..2)), RightFirstOverlap); + } + + fn scan_range(range: Range, priority: ScanPriority) -> ScanRange { + ScanRange::from_parts( + BlockHeight::from(range.start)..BlockHeight::from(range.end), + priority, + ) + } + + fn spanning_tree(to_insert: &[(Range, ScanPriority)]) -> Option { + to_insert.iter().fold(None, |acc, (range, priority)| { + let scan_range = scan_range(range.clone(), *priority); + match acc { + None => Some(SpanningTree::Leaf(scan_range)), + Some(t) => Some(t.insert(scan_range)), + } + }) + } + + #[test] + fn spanning_tree_insert_adjacent() { + use ScanPriority::*; + + let t = spanning_tree(&[ + (0..3, Historic), + (3..6, Scanned), + (6..8, ChainTip), + (8..10, ChainTip), + ]) + .unwrap(); + + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..3, Historic), + scan_range(3..6, Scanned), + scan_range(6..10, ChainTip), + ] + ); + } + + #[test] + fn spanning_tree_insert_overlaps() { + use ScanPriority::*; + + let t = spanning_tree(&[ + (0..3, Historic), + (2..5, Scanned), + (6..8, ChainTip), + (7..10, Scanned), + ]) + .unwrap(); + + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..2, Historic), + scan_range(2..5, Scanned), + scan_range(5..6, Historic), + scan_range(6..7, ChainTip), + scan_range(7..10, Scanned), + ] + ); + } + + #[test] + fn spanning_tree_insert_rfd_span() { + use ScanPriority::*; + + // This sequence of insertions causes a RightFirstDisjoint on the last insertion, + // which originally had a bug that caused the parent's span to only cover its left + // child. The bug was otherwise unobservable as the insertion logic was able to + // heal this specific kind of bug. + let t = spanning_tree(&[ + // 6..8 + (6..8, Scanned), + // 6..12 + // 6..8 8..12 + // 8..10 10..12 + (10..12, ChainTip), + // 3..12 + // 3..8 8..12 + // 3..6 6..8 8..10 10..12 + (3..6, Historic), + ]) + .unwrap(); + + assert_eq!(t.span(), (3.into())..(12.into())); + assert_eq!( + t.into_vec(), + vec![ + scan_range(3..6, Historic), + scan_range(6..8, Scanned), + scan_range(8..10, Historic), + scan_range(10..12, ChainTip), + ] + ); + } + + #[test] + fn spanning_tree_dominance() { + use ScanPriority::*; + + let t = spanning_tree(&[(0..3, Verify), (2..8, Scanned), (6..10, Verify)]).unwrap(); + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..2, Verify), + scan_range(2..6, Scanned), + scan_range(6..10, Verify), + ] + ); + + let t = spanning_tree(&[(0..3, Verify), (2..8, Historic), (6..10, Verify)]).unwrap(); + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..3, Verify), + scan_range(3..6, Historic), + scan_range(6..10, Verify), + ] + ); + + let t = spanning_tree(&[(0..3, Scanned), (2..8, Verify), (6..10, Scanned)]).unwrap(); + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..2, Scanned), + scan_range(2..6, Verify), + scan_range(6..10, Scanned), + ] + ); + + let t = spanning_tree(&[(0..3, Scanned), (2..8, Historic), (6..10, Scanned)]).unwrap(); + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..3, Scanned), + scan_range(3..6, Historic), + scan_range(6..10, Scanned), + ] + ); + + // a `ChainTip` insertion should not overwrite a scanned range. + let mut t = spanning_tree(&[(0..3, ChainTip), (3..5, Scanned), (5..7, ChainTip)]).unwrap(); + t = t.insert(scan_range(0..7, ChainTip)); + assert_eq!( + t.into_vec(), + vec![ + scan_range(0..3, ChainTip), + scan_range(3..5, Scanned), + scan_range(5..7, ChainTip), + ] + ); + + let mut t = + spanning_tree(&[(280300..280310, FoundNote), (280310..280320, Scanned)]).unwrap(); + assert_eq!( + t.clone().into_vec(), + vec![ + scan_range(280300..280310, FoundNote), + scan_range(280310..280320, Scanned) + ] + ); + t = t.insert(scan_range(280300..280340, ChainTip)); + assert_eq!( + t.into_vec(), + vec![ + scan_range(280300..280310, ChainTip), + scan_range(280310..280320, Scanned), + scan_range(280320..280340, ChainTip) + ] + ); + } + + #[test] + fn spanning_tree_insert_coalesce_scanned() { + use ScanPriority::*; + + let mut t = spanning_tree(&[ + (0..3, Historic), + (2..5, Scanned), + (6..8, ChainTip), + (7..10, Scanned), + ]) + .unwrap(); + + t = t.insert(scan_range(0..3, Scanned)); + t = t.insert(scan_range(5..8, Scanned)); + + assert_eq!(t.into_vec(), vec![scan_range(0..10, Scanned)]); + } + + #[test] + fn scan_complete() { + use ScanPriority::*; + + let cache_file = NamedTempFile::new().unwrap(); + let db_cache = BlockDb(Connection::open(cache_file.path()).unwrap()); + init_cache_database(&db_cache).unwrap(); + + let data_file = NamedTempFile::new().unwrap(); + let mut db_data = WalletDb::for_path(data_file.path(), tests::network()).unwrap(); + init_wallet_db(&mut db_data, Some(Secret::new(vec![]))).unwrap(); + + // Add an account to the wallet. + let (dfvk, _taddr) = init_test_accounts_table(&mut db_data); + + assert_matches!( + // In the following, we don't care what the root hashes are, they just need to be + // distinct. + db_data.put_sapling_subtree_roots( + 0, + &[ + CommitmentTreeRoot::from_parts( + sapling_activation_height() + 100, + Node::empty_root(Level::from(0)) + ), + CommitmentTreeRoot::from_parts( + sapling_activation_height() + 200, + Node::empty_root(Level::from(1)) + ), + CommitmentTreeRoot::from_parts( + sapling_activation_height() + 300, + Node::empty_root(Level::from(2)) + ), + ] + ), + Ok(()) + ); + + // We'll start inserting leaf notes 5 notes after the end of the third subtree, with a gap + // of 10 blocks. After `scan_cached_blocks`, the scan queue should have a requested scan + // range of 300..310 with `FoundNote` priority, 310..320 with `Scanned` priority. + let initial_sapling_tree_size = (0x1 << 16) * 3 + 5; + let initial_height = sapling_activation_height() + 310; + + let value = Amount::from_u64(50000).unwrap(); + let (mut cb, _) = fake_compact_block( + initial_height, + BlockHash([0; 32]), + &dfvk, + AddressType::DefaultExternal, + value, + initial_sapling_tree_size, + ); + insert_into_cache(&db_cache, &cb); + + for i in 1..=10 { + cb = fake_compact_block( + initial_height + i, + cb.hash(), + &dfvk, + AddressType::DefaultExternal, + Amount::from_u64(10000).unwrap(), + initial_sapling_tree_size + i, + ) + .0; + insert_into_cache(&db_cache, &cb); + } + + assert_matches!( + scan_cached_blocks( + &tests::network(), + &db_cache, + &mut db_data, + initial_height, + 10, + ), + Ok(()) + ); + + // Verify the that adjacent range needed to make the note spendable has been prioritized. + let sap_active = u32::from(sapling_activation_height()); + assert_matches!( + db_data.suggest_scan_ranges(), + Ok(scan_ranges) if scan_ranges == vec![ + scan_range((sap_active + 300)..(sap_active + 310), FoundNote) + ] + ); + + // Check that the scanned range has been properly persisted. + assert_matches!( + suggest_scan_ranges(&db_data.conn, Scanned), + Ok(scan_ranges) if scan_ranges == vec![ + scan_range((sap_active + 300)..(sap_active + 310), FoundNote), + scan_range((sap_active + 310)..(sap_active + 320), Scanned) + ] + ); + + // Simulate the wallet going offline for a bit, update the chain tip to 20 blocks in the + // future. + assert_matches!( + db_data.update_chain_tip(sapling_activation_height() + 340), + Ok(()) + ); + + // Check the scan range again, we should see a `ChainTip` range for the period we've been + // offline. + assert_matches!( + db_data.suggest_scan_ranges(), + Ok(scan_ranges) if scan_ranges == vec![ + scan_range((sap_active + 320)..(sap_active + 341), ChainTip), + scan_range((sap_active + 300)..(sap_active + 310), ChainTip) + ] + ); + + // Now simulate a jump ahead more than 100 blocks. + assert_matches!( + db_data.update_chain_tip(sapling_activation_height() + 450), + Ok(()) + ); + + // Check the scan range again, we should see a `Validate` range for the previous wallet + // tip, and then a `ChainTip` for the remaining range. + assert_matches!( + db_data.suggest_scan_ranges(), + Ok(scan_ranges) if scan_ranges == vec![ + scan_range((sap_active + 319)..(sap_active + 329), Verify), + scan_range((sap_active + 329)..(sap_active + 451), ChainTip), + scan_range((sap_active + 300)..(sap_active + 310), ChainTip) + ] + ); + } +} diff --git a/zcash_primitives/CHANGELOG.md b/zcash_primitives/CHANGELOG.md index f2404ea94..44f903ca0 100644 --- a/zcash_primitives/CHANGELOG.md +++ b/zcash_primitives/CHANGELOG.md @@ -8,6 +8,7 @@ and this library adheres to Rust's notion of ## [Unreleased] ### Added +- `zcash_primitives::consensus::BlockHeight::saturating_sub` - `zcash_primitives::transaction::builder`: - `Builder::add_orchard_spend` - `Builder::add_orchard_output` diff --git a/zcash_primitives/src/consensus.rs b/zcash_primitives/src/consensus.rs index 563c69806..8c421c76a 100644 --- a/zcash_primitives/src/consensus.rs +++ b/zcash_primitives/src/consensus.rs @@ -23,6 +23,12 @@ impl BlockHeight { pub const fn from_u32(v: u32) -> BlockHeight { BlockHeight(v) } + + /// Subtracts the provided value from this height, returning `H0` if this would result in + /// underflow of the wrapped `u32`. + pub fn saturating_sub(self, v: u32) -> BlockHeight { + BlockHeight(self.0.saturating_sub(v)) + } } impl fmt::Display for BlockHeight { diff --git a/zcash_primitives/src/sapling/tree.rs b/zcash_primitives/src/sapling/tree.rs index 5bacb51e2..bd403d6c2 100644 --- a/zcash_primitives/src/sapling/tree.rs +++ b/zcash_primitives/src/sapling/tree.rs @@ -2,6 +2,8 @@ use bitvec::{order::Lsb0, view::AsBits}; use group::{ff::PrimeField, Curve}; use incrementalmerkletree::{Hashable, Level}; use lazy_static::lazy_static; + +use std::fmt; use std::io::{self, Read, Write}; use super::{ @@ -64,11 +66,19 @@ pub fn merkle_hash(depth: usize, lhs: &[u8; 32], rhs: &[u8; 32]) -> [u8; 32] { } /// A node within the Sapling commitment tree. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq)] pub struct Node { pub(super) repr: [u8; 32], } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Node") + .field("repr", &hex::encode(self.repr)) + .finish() + } +} + impl Node { #[cfg(test)] pub(crate) fn new(repr: [u8; 32]) -> Self {