diff --git a/zcash_client_sqlite/CHANGELOG.md b/zcash_client_sqlite/CHANGELOG.md index 751eb92eb..b52c31270 100644 --- a/zcash_client_sqlite/CHANGELOG.md +++ b/zcash_client_sqlite/CHANGELOG.md @@ -25,13 +25,21 @@ and this library adheres to Rust's notion of - An `unstable` feature flag; this is added to parts of the API that may change in any release. It enables `zcash_client_backend`'s `unstable` feature flag. - New summary views that may be directly accessed in the sqlite database. - The structure of these views should be considered unstable; they may + The structure of these views should be considered unstable; they may be replaced by accessors provided by the data access API at some point in the future: - `v_transactions` - `v_tx_received` - `v_tx_sent` - `zcash_client_sqlite::wallet::init::WalletMigrationError` +- A filesystem-backed `BlockSource` implementation + `zcash_client_sqlite::FsBlockDb`. This block source expects blocks to be + stored on disk in individual files named following the pattern + `/blocks/--compactblock`. A SQLite + database stored at `/blockmeta.sqlite`stores metadata for + this block source. + - `zcash_client_sqlite::chain::init::init_blockmeta_db` creates the required + metadata cache database. ### Changed - Various **BREAKING CHANGES** have been made to the database tables. These will @@ -85,6 +93,8 @@ and this library adheres to Rust's notion of - `get_extended_full_viewing_keys` (use `zcash_client_backend::data_api::WalletRead::get_unified_full_viewing_keys` instead). +- `zcash_client_sqlite::with_blocks` (use + `zcash_client_backend::data_api::BlockSource::with_blocks` instead) ### Fixed - The `zcash_client_backend::data_api::WalletRead::get_address` implementation diff --git a/zcash_client_sqlite/src/chain.rs b/zcash_client_sqlite/src/chain.rs index 4fceac181..e97a7e7dd 100644 --- a/zcash_client_sqlite/src/chain.rs +++ b/zcash_client_sqlite/src/chain.rs @@ -9,7 +9,17 @@ use zcash_client_backend::{data_api::error::Error, proto::compact_formats::Compa use crate::{error::SqliteClientError, BlockDb}; +#[cfg(feature = "unstable")] +use { + crate::{BlockHash, FsBlockDb}, + rusqlite::{Connection, OptionalExtension, NO_PARAMS}, + std::fs::File, + std::io::BufReader, + std::path::{Path, PathBuf}, +}; + pub mod init; +pub mod migrations; struct CompactBlockRow { height: BlockHeight, @@ -18,13 +28,12 @@ struct CompactBlockRow { /// Implements a traversal of `limit` blocks of the block cache database. /// -/// Starting at `from_height`, the `with_row` callback is invoked -/// with each block retrieved from the backing store. If the `limit` -/// value provided is `None`, all blocks are traversed up to the -/// maximum height. -pub fn with_blocks( +/// Starting at the next block above `last_scanned_height`, the `with_row` callback is invoked with +/// each block retrieved from the backing store. If the `limit` value provided is `None`, all +/// blocks are traversed up to the maximum height. +pub(crate) fn blockdb_with_blocks( cache: &BlockDb, - from_height: BlockHeight, + last_scanned_height: BlockHeight, limit: Option, mut with_row: F, ) -> Result<(), SqliteClientError> @@ -37,7 +46,10 @@ where )?; let rows = stmt_blocks.query_map( - params![u32::from(from_height), limit.unwrap_or(u32::max_value()),], + params![ + u32::from(last_scanned_height), + limit.unwrap_or(u32::max_value()), + ], |row| { Ok(CompactBlockRow { height: BlockHeight::from_u32(row.get(0)?), @@ -64,6 +76,148 @@ where Ok(()) } +/// Data structure representing a row in the block metadata database. +#[cfg(feature = "unstable")] +pub struct BlockMeta { + pub height: BlockHeight, + pub block_hash: BlockHash, + pub block_time: u32, + pub sapling_outputs_count: u32, + pub orchard_actions_count: u32, +} + +#[cfg(feature = "unstable")] +impl BlockMeta { + pub fn block_file_path>(&self, blocks_dir: &P) -> PathBuf { + blocks_dir.as_ref().join(Path::new(&format!( + "{}-{}-compactblock", + self.height, self.block_hash + ))) + } +} + +/// Inserts a batch of rows into the block metadata database. +#[cfg(feature = "unstable")] +pub(crate) fn blockmetadb_insert( + conn: &Connection, + block_meta: &[BlockMeta], +) -> Result<(), rusqlite::Error> { + let mut stmt_insert = conn.prepare( + "INSERT INTO compactblocks_meta (height, blockhash, time, sapling_outputs_count, orchard_actions_count) + VALUES (?, ?, ?, ?, ?)" + )?; + + conn.execute("BEGIN IMMEDIATE", NO_PARAMS)?; + let result = block_meta + .iter() + .map(|m| { + stmt_insert.execute(params![ + u32::from(m.height), + &m.block_hash.0[..], + m.block_time, + m.sapling_outputs_count, + m.orchard_actions_count, + ]) + }) + .collect::, _>>(); + match result { + Ok(_) => { + conn.execute("COMMIT", NO_PARAMS)?; + Ok(()) + } + Err(error) => { + match conn.execute("ROLLBACK", NO_PARAMS) { + Ok(_) => Err(error), + Err(e) => + // Panicking here is probably the right thing to do, because it + // means the database is corrupt. + panic!( + "Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.", + e, + error + ) + } + } + } +} + +#[cfg(feature = "unstable")] +pub(crate) fn blockmetadb_get_max_cached_height( + conn: &Connection, +) -> Result, rusqlite::Error> { + conn.query_row( + "SELECT MAX(height) FROM compactblocks_meta", + NO_PARAMS, + |row| { + let h: u32 = row.get(0)?; + Ok(BlockHeight::from(h)) + }, + ) + .optional() +} + +/// Implements a traversal of `limit` blocks of the filesystem-backed +/// block cache. +/// +/// Starting at the next block height above `last_scanned_height`, the `with_row` callback is +/// invoked with each block retrieved from the backing store. If the `limit` value provided is +/// `None`, all blocks are traversed up to the maximum height for which metadata is available. +#[cfg(feature = "unstable")] +pub(crate) fn fsblockdb_with_blocks( + cache: &FsBlockDb, + last_scanned_height: BlockHeight, + limit: Option, + mut with_block: F, +) -> Result<(), SqliteClientError> +where + F: FnMut(CompactBlock) -> Result<(), SqliteClientError>, +{ + // Fetch the CompactBlocks we need to scan + let mut stmt_blocks = cache.conn.prepare( + "SELECT height, blockhash, time, sapling_outputs_count, orchard_actions_count + FROM compactblocks_meta + WHERE height > ? + ORDER BY height ASC LIMIT ?", + )?; + + let rows = stmt_blocks.query_map( + params![ + u32::from(last_scanned_height), + limit.unwrap_or(u32::max_value()), + ], + |row| { + Ok(BlockMeta { + height: BlockHeight::from_u32(row.get(0)?), + block_hash: BlockHash::from_slice(&row.get::<_, Vec<_>>(1)?), + block_time: row.get(2)?, + sapling_outputs_count: row.get(3)?, + orchard_actions_count: row.get(4)?, + }) + }, + )?; + + for row_result in rows { + let cbr = row_result?; + let block_file = File::open(cbr.block_file_path(&cache.blocks_dir))?; + let mut buf_reader = BufReader::new(block_file); + + let block: CompactBlock = + Message::parse_from_reader(&mut buf_reader).map_err(Error::from)?; + + if block.height() != cbr.height { + return Err(SqliteClientError::CorruptedData(format!( + "Block height {} did not match row's height field value {}", + block.height(), + cbr.height + ))); + } + + with_block(block)?; + } + + Ok(()) +} + #[cfg(test)] #[allow(deprecated)] mod tests { diff --git a/zcash_client_sqlite/src/chain/init.rs b/zcash_client_sqlite/src/chain/init.rs index ed60b452e..b5f88e29c 100644 --- a/zcash_client_sqlite/src/chain/init.rs +++ b/zcash_client_sqlite/src/chain/init.rs @@ -1,9 +1,16 @@ //! Functions for initializing the various databases. - use rusqlite::NO_PARAMS; use crate::BlockDb; +#[cfg(feature = "unstable")] +use { + super::migrations, + crate::FsBlockDb, + schemer::{Migrator, MigratorError}, + schemer_rusqlite::RusqliteAdapter, +}; + /// Sets up the internal structure of the cache database. /// /// # Examples @@ -29,3 +36,32 @@ pub fn init_cache_database(db_cache: &BlockDb) -> Result<(), rusqlite::Error> { )?; Ok(()) } + +/// Sets up the internal structure of the metadata cache database. +/// +/// # Examples +/// +/// ``` +/// use tempfile::{tempdir, NamedTempFile}; +/// use zcash_client_sqlite::{ +/// FsBlockDb, +/// chain::init::init_blockmeta_db, +/// }; +/// +/// let cache_file = NamedTempFile::new().unwrap(); +/// let blocks_dir = tempdir().unwrap(); +/// let mut db = FsBlockDb::for_path(blocks_dir.path()).unwrap(); +/// init_blockmeta_db(&mut db).unwrap(); +/// ``` +#[cfg(feature = "unstable")] +pub fn init_blockmeta_db(db: &mut FsBlockDb) -> Result<(), MigratorError> { + let adapter = RusqliteAdapter::new(&mut db.conn, Some("schemer_migrations".to_string())); + adapter.init().expect("Migrations table setup succeeds."); + + let mut migrator = Migrator::new(adapter); + migrator + .register_multiple(migrations::blockmeta::all_migrations()) + .expect("Migration registration should have been successful."); + migrator.up(None)?; + Ok(()) +} diff --git a/zcash_client_sqlite/src/chain/migrations.rs b/zcash_client_sqlite/src/chain/migrations.rs new file mode 100644 index 000000000..0bc8f92a3 --- /dev/null +++ b/zcash_client_sqlite/src/chain/migrations.rs @@ -0,0 +1 @@ +pub mod blockmeta; diff --git a/zcash_client_sqlite/src/chain/migrations/blockmeta.rs b/zcash_client_sqlite/src/chain/migrations/blockmeta.rs new file mode 100644 index 000000000..854ccd6fc --- /dev/null +++ b/zcash_client_sqlite/src/chain/migrations/blockmeta.rs @@ -0,0 +1,53 @@ +use schemer_rusqlite::RusqliteMigration; + +pub fn all_migrations() -> Vec>> { + vec![Box::new(init::Migration {})] +} + +pub mod init { + use rusqlite::{self}; + use schemer::{self, migration}; + use schemer_rusqlite::RusqliteMigration; + use uuid::Uuid; + + pub struct Migration; + + /// The migration that added the `compactblocks_meta` table. + /// + /// 68525b40-36e5-46aa-a765-720f8389b99d + pub const MIGRATION_ID: Uuid = Uuid::from_fields( + 0x68525b40, + 0x36e5, + 0x46aa, + b"\xa7\x65\x72\x0f\x83\x89\xb9\x9d", + ); + + migration!( + Migration, + &format!("{}", MIGRATION_ID), + [], + "Initialize the cachemeta database." + ); + + impl RusqliteMigration for Migration { + type Error = rusqlite::Error; + + fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> { + transaction.execute_batch( + "CREATE TABLE compactblocks_meta ( + height INTEGER PRIMARY KEY, + blockhash BLOB NOT NULL, + time INTEGER NOT NULL, + sapling_outputs_count INTEGER NOT NULL, + orchard_actions_count INTEGER NOT NULL + )", + )?; + Ok(()) + } + + fn down(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> { + transaction.execute_batch("DROP TABLE compactblocks_meta;")?; + Ok(()) + } + } +} diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 8ee156e74..054e98456 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -73,6 +73,13 @@ use { zcash_primitives::legacy::TransparentAddress, }; +#[cfg(feature = "unstable")] +use { + crate::chain::{fsblockdb_with_blocks, BlockMeta}, + std::path::PathBuf, + std::{fs, io}, +}; + mod prepared; pub use prepared::DataConnStmtCache; @@ -717,7 +724,7 @@ impl<'a, P: consensus::Parameters> WalletWriteTransparent for DataConnStmtCache< } } -/// A wrapper for the SQLite connection to the block cache database. +/// A handle for the SQLite block source. pub struct BlockDb(Connection); impl BlockDb { @@ -739,7 +746,141 @@ impl BlockSource for BlockDb { where F: FnMut(CompactBlock) -> Result<(), Self::Error>, { - chain::with_blocks(self, from_height, limit, with_row) + chain::blockdb_with_blocks(self, from_height, limit, with_row) + } +} + +/// A block source that reads block data from disk and block metadata from a SQLite database. +/// +/// This block source expects each compact block to be stored on disk in the `blocks` subdirectory +/// of the `blockstore_root` path provided at the time of construction. Each block should be +/// written, as the serialized bytes of its protobuf representation, where the path for each block +/// has the pattern: +/// +/// `/blocks/--compactblock` +/// +/// where `` is the decimal value of the height at which the block was mined, and +/// `` is the hexadecimal representation of the block hash, as produced by the +/// [`Display`] implementation for [`zcash_primitives::block::BlockHash`]. +/// +/// This block source is intended to be used with the following data flow: +/// * When the cache is being filled: +/// * The caller requests the current maximum height height at which cached data is available +/// using [`FsBlockDb::get_max_cached_height`]. If no cached data is available, the caller +/// can use the wallet's synced-to height for the following operations instead. +/// * (recommended for privacy) the caller should round the returned height down to some 100- / +/// 1000-block boundary. +/// * The caller uses the lightwalletd's `getblock` gRPC method to obtain a stream of blocks. +/// For each block returned, the caller writes the compact block to `blocks_dir` using the +/// path format specified above. It is fine to overwrite an existing block, since block hashes +/// are immutable and collision-resistant. +/// * Once a caller-determined number of blocks have been successfully written to disk, the +/// caller should invoke [`FsBlockDb::write_block_metadata`] with the metadata for each block +/// written to disk. +/// * The cache can then be scanned using the [`BlockSource`] implementation, providing the +/// wallet's synced-to-height as a starting point. +/// * When part of the cache is no longer needed: +/// * The caller determines some height `H` that is the earliest block data it needs to preserve. +/// This might be determined based on where the wallet is fully-synced to, or other heuristics. +/// * The caller searches the defined filesystem folder for all files beginning in `HEIGHT-*` where +/// `HEIGHT < H`, and deletes those files. +/// +/// Note: This API is unstable, and may change in the future. In particular, the [`BlockSource`] +/// API and the above description currently assume that scanning is performed in linear block +/// order; this assumption is likely to be weakened and/or removed in a future update. +#[cfg(feature = "unstable")] +pub struct FsBlockDb { + conn: Connection, + blocks_dir: PathBuf, +} + +/// Errors that can be generated by the filesystem/sqlite-backed +/// block source. +#[derive(Debug)] +#[cfg(feature = "unstable")] +pub enum FsBlockDbError { + FsError(io::Error), + DbError(rusqlite::Error), + InvalidBlockstoreRoot(PathBuf), + InvalidBlockPath(PathBuf), + CorruptedData(String), +} + +#[cfg(feature = "unstable")] +impl From for FsBlockDbError { + fn from(err: io::Error) -> Self { + FsBlockDbError::FsError(err) + } +} + +#[cfg(feature = "unstable")] +impl From for FsBlockDbError { + fn from(err: rusqlite::Error) -> Self { + FsBlockDbError::DbError(err) + } +} + +#[cfg(feature = "unstable")] +impl FsBlockDb { + /// Creates a filesystem-backed block store at the given path. + /// + /// This will construct or open a SQLite database at the path + /// `/blockmeta.sqlite` and will ensure that a directory exists at + /// `/blocks` where this block store will expect to find serialized block + /// files as described for [`FsBlockDb`]. + pub fn for_path>(fsblockdb_root: P) -> Result { + let meta = fs::metadata(&fsblockdb_root).map_err(FsBlockDbError::FsError)?; + if meta.is_dir() { + let db_path = fsblockdb_root.as_ref().join("blockmeta.sqlite"); + let blocks_dir = fsblockdb_root.as_ref().join("blocks"); + fs::create_dir_all(&blocks_dir)?; + Ok(FsBlockDb { + conn: Connection::open(db_path).map_err(FsBlockDbError::DbError)?, + blocks_dir, + }) + } else { + Err(FsBlockDbError::InvalidBlockstoreRoot( + fsblockdb_root.as_ref().to_path_buf(), + )) + } + } + + /// Returns the maximum height of blocks known to the block metadata database. + pub fn get_max_cached_height(&self) -> Result, FsBlockDbError> { + Ok(chain::blockmetadb_get_max_cached_height(&self.conn)?) + } + + /// Adds a set of block metadata entries to the metadata database. + /// + /// This will return an error if any block file corresponding to one of these metadata records + /// is absent from the blocks directory. + pub fn write_block_metadata(&self, block_meta: &[BlockMeta]) -> Result<(), FsBlockDbError> { + for m in block_meta { + let block_path = m.block_file_path(&self.blocks_dir); + let meta = fs::metadata(&block_path)?; + if !meta.is_file() { + return Err(FsBlockDbError::InvalidBlockPath(block_path)); + } + } + + Ok(chain::blockmetadb_insert(&self.conn, block_meta)?) + } +} + +#[cfg(feature = "unstable")] +impl BlockSource for FsBlockDb { + type Error = SqliteClientError; + + fn with_blocks( + &self, + from_height: BlockHeight, + limit: Option, + with_row: F, + ) -> Result<(), Self::Error> + where + F: FnMut(CompactBlock) -> Result<(), Self::Error>, + { + fsblockdb_with_blocks(self, from_height, limit, with_row) } }