Add a BlockSource implementation that reads blocks from files in the filesystem.

Also, this adds functionality to create and insert records into the
block metadata db that is used as the source for which blocks to read.
This commit is contained in:
Kris Nuttycombe 2022-07-28 17:21:04 -06:00
parent 3bc8627e2b
commit f8fd27653c
6 changed files with 406 additions and 11 deletions

View File

@ -25,13 +25,21 @@ and this library adheres to Rust's notion of
- An `unstable` feature flag; this is added to parts of the API that may change - An `unstable` feature flag; this is added to parts of the API that may change
in any release. It enables `zcash_client_backend`'s `unstable` feature flag. in any release. It enables `zcash_client_backend`'s `unstable` feature flag.
- New summary views that may be directly accessed in the sqlite database. - New summary views that may be directly accessed in the sqlite database.
The structure of these views should be considered unstable; they may The structure of these views should be considered unstable; they may
be replaced by accessors provided by the data access API at some point be replaced by accessors provided by the data access API at some point
in the future: in the future:
- `v_transactions` - `v_transactions`
- `v_tx_received` - `v_tx_received`
- `v_tx_sent` - `v_tx_sent`
- `zcash_client_sqlite::wallet::init::WalletMigrationError` - `zcash_client_sqlite::wallet::init::WalletMigrationError`
- A filesystem-backed `BlockSource` implementation
`zcash_client_sqlite::FsBlockDb`. This block source expects blocks to be
stored on disk in individual files named following the pattern
`<blockmeta_root>/blocks/<blockheight>-<blockhash>-compactblock`. A SQLite
database stored at `<blockmeta_root>/blockmeta.sqlite`stores metadata for
this block source.
- `zcash_client_sqlite::chain::init::init_blockmeta_db` creates the required
metadata cache database.
### Changed ### Changed
- Various **BREAKING CHANGES** have been made to the database tables. These will - Various **BREAKING CHANGES** have been made to the database tables. These will
@ -85,6 +93,8 @@ and this library adheres to Rust's notion of
- `get_extended_full_viewing_keys` (use - `get_extended_full_viewing_keys` (use
`zcash_client_backend::data_api::WalletRead::get_unified_full_viewing_keys` `zcash_client_backend::data_api::WalletRead::get_unified_full_viewing_keys`
instead). instead).
- `zcash_client_sqlite::with_blocks` (use
`zcash_client_backend::data_api::BlockSource::with_blocks` instead)
### Fixed ### Fixed
- The `zcash_client_backend::data_api::WalletRead::get_address` implementation - The `zcash_client_backend::data_api::WalletRead::get_address` implementation

View File

@ -9,7 +9,17 @@ use zcash_client_backend::{data_api::error::Error, proto::compact_formats::Compa
use crate::{error::SqliteClientError, BlockDb}; use crate::{error::SqliteClientError, BlockDb};
#[cfg(feature = "unstable")]
use {
crate::{BlockHash, FsBlockDb},
rusqlite::{Connection, OptionalExtension, NO_PARAMS},
std::fs::File,
std::io::BufReader,
std::path::{Path, PathBuf},
};
pub mod init; pub mod init;
pub mod migrations;
struct CompactBlockRow { struct CompactBlockRow {
height: BlockHeight, height: BlockHeight,
@ -18,13 +28,12 @@ struct CompactBlockRow {
/// Implements a traversal of `limit` blocks of the block cache database. /// Implements a traversal of `limit` blocks of the block cache database.
/// ///
/// Starting at `from_height`, the `with_row` callback is invoked /// Starting at the next block above `last_scanned_height`, the `with_row` callback is invoked with
/// with each block retrieved from the backing store. If the `limit` /// each block retrieved from the backing store. If the `limit` value provided is `None`, all
/// value provided is `None`, all blocks are traversed up to the /// blocks are traversed up to the maximum height.
/// maximum height. pub(crate) fn blockdb_with_blocks<F>(
pub fn with_blocks<F>(
cache: &BlockDb, cache: &BlockDb,
from_height: BlockHeight, last_scanned_height: BlockHeight,
limit: Option<u32>, limit: Option<u32>,
mut with_row: F, mut with_row: F,
) -> Result<(), SqliteClientError> ) -> Result<(), SqliteClientError>
@ -37,7 +46,10 @@ where
)?; )?;
let rows = stmt_blocks.query_map( let rows = stmt_blocks.query_map(
params![u32::from(from_height), limit.unwrap_or(u32::max_value()),], params![
u32::from(last_scanned_height),
limit.unwrap_or(u32::max_value()),
],
|row| { |row| {
Ok(CompactBlockRow { Ok(CompactBlockRow {
height: BlockHeight::from_u32(row.get(0)?), height: BlockHeight::from_u32(row.get(0)?),
@ -64,6 +76,148 @@ where
Ok(()) Ok(())
} }
/// Data structure representing a row in the block metadata database.
#[cfg(feature = "unstable")]
pub struct BlockMeta {
pub height: BlockHeight,
pub block_hash: BlockHash,
pub block_time: u32,
pub sapling_outputs_count: u32,
pub orchard_actions_count: u32,
}
#[cfg(feature = "unstable")]
impl BlockMeta {
pub fn block_file_path<P: AsRef<Path>>(&self, blocks_dir: &P) -> PathBuf {
blocks_dir.as_ref().join(Path::new(&format!(
"{}-{}-compactblock",
self.height, self.block_hash
)))
}
}
/// Inserts a batch of rows into the block metadata database.
#[cfg(feature = "unstable")]
pub(crate) fn blockmetadb_insert(
conn: &Connection,
block_meta: &[BlockMeta],
) -> Result<(), rusqlite::Error> {
let mut stmt_insert = conn.prepare(
"INSERT INTO compactblocks_meta (height, blockhash, time, sapling_outputs_count, orchard_actions_count)
VALUES (?, ?, ?, ?, ?)"
)?;
conn.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
let result = block_meta
.iter()
.map(|m| {
stmt_insert.execute(params![
u32::from(m.height),
&m.block_hash.0[..],
m.block_time,
m.sapling_outputs_count,
m.orchard_actions_count,
])
})
.collect::<Result<Vec<_>, _>>();
match result {
Ok(_) => {
conn.execute("COMMIT", NO_PARAMS)?;
Ok(())
}
Err(error) => {
match conn.execute("ROLLBACK", NO_PARAMS) {
Ok(_) => Err(error),
Err(e) =>
// Panicking here is probably the right thing to do, because it
// means the database is corrupt.
panic!(
"Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.",
e,
error
)
}
}
}
}
#[cfg(feature = "unstable")]
pub(crate) fn blockmetadb_get_max_cached_height(
conn: &Connection,
) -> Result<Option<BlockHeight>, rusqlite::Error> {
conn.query_row(
"SELECT MAX(height) FROM compactblocks_meta",
NO_PARAMS,
|row| {
let h: u32 = row.get(0)?;
Ok(BlockHeight::from(h))
},
)
.optional()
}
/// Implements a traversal of `limit` blocks of the filesystem-backed
/// block cache.
///
/// Starting at the next block height above `last_scanned_height`, the `with_row` callback is
/// invoked with each block retrieved from the backing store. If the `limit` value provided is
/// `None`, all blocks are traversed up to the maximum height for which metadata is available.
#[cfg(feature = "unstable")]
pub(crate) fn fsblockdb_with_blocks<F>(
cache: &FsBlockDb,
last_scanned_height: BlockHeight,
limit: Option<u32>,
mut with_block: F,
) -> Result<(), SqliteClientError>
where
F: FnMut(CompactBlock) -> Result<(), SqliteClientError>,
{
// Fetch the CompactBlocks we need to scan
let mut stmt_blocks = cache.conn.prepare(
"SELECT height, blockhash, time, sapling_outputs_count, orchard_actions_count
FROM compactblocks_meta
WHERE height > ?
ORDER BY height ASC LIMIT ?",
)?;
let rows = stmt_blocks.query_map(
params![
u32::from(last_scanned_height),
limit.unwrap_or(u32::max_value()),
],
|row| {
Ok(BlockMeta {
height: BlockHeight::from_u32(row.get(0)?),
block_hash: BlockHash::from_slice(&row.get::<_, Vec<_>>(1)?),
block_time: row.get(2)?,
sapling_outputs_count: row.get(3)?,
orchard_actions_count: row.get(4)?,
})
},
)?;
for row_result in rows {
let cbr = row_result?;
let block_file = File::open(cbr.block_file_path(&cache.blocks_dir))?;
let mut buf_reader = BufReader::new(block_file);
let block: CompactBlock =
Message::parse_from_reader(&mut buf_reader).map_err(Error::from)?;
if block.height() != cbr.height {
return Err(SqliteClientError::CorruptedData(format!(
"Block height {} did not match row's height field value {}",
block.height(),
cbr.height
)));
}
with_block(block)?;
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
#[allow(deprecated)] #[allow(deprecated)]
mod tests { mod tests {

View File

@ -1,9 +1,16 @@
//! Functions for initializing the various databases. //! Functions for initializing the various databases.
use rusqlite::NO_PARAMS; use rusqlite::NO_PARAMS;
use crate::BlockDb; use crate::BlockDb;
#[cfg(feature = "unstable")]
use {
super::migrations,
crate::FsBlockDb,
schemer::{Migrator, MigratorError},
schemer_rusqlite::RusqliteAdapter,
};
/// Sets up the internal structure of the cache database. /// Sets up the internal structure of the cache database.
/// ///
/// # Examples /// # Examples
@ -29,3 +36,32 @@ pub fn init_cache_database(db_cache: &BlockDb) -> Result<(), rusqlite::Error> {
)?; )?;
Ok(()) Ok(())
} }
/// Sets up the internal structure of the metadata cache database.
///
/// # Examples
///
/// ```
/// use tempfile::{tempdir, NamedTempFile};
/// use zcash_client_sqlite::{
/// FsBlockDb,
/// chain::init::init_blockmeta_db,
/// };
///
/// let cache_file = NamedTempFile::new().unwrap();
/// let blocks_dir = tempdir().unwrap();
/// let mut db = FsBlockDb::for_path(blocks_dir.path()).unwrap();
/// init_blockmeta_db(&mut db).unwrap();
/// ```
#[cfg(feature = "unstable")]
pub fn init_blockmeta_db(db: &mut FsBlockDb) -> Result<(), MigratorError<rusqlite::Error>> {
let adapter = RusqliteAdapter::new(&mut db.conn, Some("schemer_migrations".to_string()));
adapter.init().expect("Migrations table setup succeeds.");
let mut migrator = Migrator::new(adapter);
migrator
.register_multiple(migrations::blockmeta::all_migrations())
.expect("Migration registration should have been successful.");
migrator.up(None)?;
Ok(())
}

View File

@ -0,0 +1 @@
pub mod blockmeta;

View File

@ -0,0 +1,53 @@
use schemer_rusqlite::RusqliteMigration;
pub fn all_migrations() -> Vec<Box<dyn RusqliteMigration<Error = rusqlite::Error>>> {
vec![Box::new(init::Migration {})]
}
pub mod init {
use rusqlite::{self};
use schemer::{self, migration};
use schemer_rusqlite::RusqliteMigration;
use uuid::Uuid;
pub struct Migration;
/// The migration that added the `compactblocks_meta` table.
///
/// 68525b40-36e5-46aa-a765-720f8389b99d
pub const MIGRATION_ID: Uuid = Uuid::from_fields(
0x68525b40,
0x36e5,
0x46aa,
b"\xa7\x65\x72\x0f\x83\x89\xb9\x9d",
);
migration!(
Migration,
&format!("{}", MIGRATION_ID),
[],
"Initialize the cachemeta database."
);
impl RusqliteMigration for Migration {
type Error = rusqlite::Error;
fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch(
"CREATE TABLE compactblocks_meta (
height INTEGER PRIMARY KEY,
blockhash BLOB NOT NULL,
time INTEGER NOT NULL,
sapling_outputs_count INTEGER NOT NULL,
orchard_actions_count INTEGER NOT NULL
)",
)?;
Ok(())
}
fn down(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch("DROP TABLE compactblocks_meta;")?;
Ok(())
}
}
}

View File

@ -73,6 +73,13 @@ use {
zcash_primitives::legacy::TransparentAddress, zcash_primitives::legacy::TransparentAddress,
}; };
#[cfg(feature = "unstable")]
use {
crate::chain::{fsblockdb_with_blocks, BlockMeta},
std::path::PathBuf,
std::{fs, io},
};
mod prepared; mod prepared;
pub use prepared::DataConnStmtCache; pub use prepared::DataConnStmtCache;
@ -717,7 +724,7 @@ impl<'a, P: consensus::Parameters> WalletWriteTransparent for DataConnStmtCache<
} }
} }
/// A wrapper for the SQLite connection to the block cache database. /// A handle for the SQLite block source.
pub struct BlockDb(Connection); pub struct BlockDb(Connection);
impl BlockDb { impl BlockDb {
@ -739,7 +746,141 @@ impl BlockSource for BlockDb {
where where
F: FnMut(CompactBlock) -> Result<(), Self::Error>, F: FnMut(CompactBlock) -> Result<(), Self::Error>,
{ {
chain::with_blocks(self, from_height, limit, with_row) chain::blockdb_with_blocks(self, from_height, limit, with_row)
}
}
/// A block source that reads block data from disk and block metadata from a SQLite database.
///
/// This block source expects each compact block to be stored on disk in the `blocks` subdirectory
/// of the `blockstore_root` path provided at the time of construction. Each block should be
/// written, as the serialized bytes of its protobuf representation, where the path for each block
/// has the pattern:
///
/// `<blockstore_root>/blocks/<block_height>-<block_hash>-compactblock`
///
/// where `<block_height>` is the decimal value of the height at which the block was mined, and
/// `<block_hash>` is the hexadecimal representation of the block hash, as produced by the
/// [`Display`] implementation for [`zcash_primitives::block::BlockHash`].
///
/// This block source is intended to be used with the following data flow:
/// * When the cache is being filled:
/// * The caller requests the current maximum height height at which cached data is available
/// using [`FsBlockDb::get_max_cached_height`]. If no cached data is available, the caller
/// can use the wallet's synced-to height for the following operations instead.
/// * (recommended for privacy) the caller should round the returned height down to some 100- /
/// 1000-block boundary.
/// * The caller uses the lightwalletd's `getblock` gRPC method to obtain a stream of blocks.
/// For each block returned, the caller writes the compact block to `blocks_dir` using the
/// path format specified above. It is fine to overwrite an existing block, since block hashes
/// are immutable and collision-resistant.
/// * Once a caller-determined number of blocks have been successfully written to disk, the
/// caller should invoke [`FsBlockDb::write_block_metadata`] with the metadata for each block
/// written to disk.
/// * The cache can then be scanned using the [`BlockSource`] implementation, providing the
/// wallet's synced-to-height as a starting point.
/// * When part of the cache is no longer needed:
/// * The caller determines some height `H` that is the earliest block data it needs to preserve.
/// This might be determined based on where the wallet is fully-synced to, or other heuristics.
/// * The caller searches the defined filesystem folder for all files beginning in `HEIGHT-*` where
/// `HEIGHT < H`, and deletes those files.
///
/// Note: This API is unstable, and may change in the future. In particular, the [`BlockSource`]
/// API and the above description currently assume that scanning is performed in linear block
/// order; this assumption is likely to be weakened and/or removed in a future update.
#[cfg(feature = "unstable")]
pub struct FsBlockDb {
conn: Connection,
blocks_dir: PathBuf,
}
/// Errors that can be generated by the filesystem/sqlite-backed
/// block source.
#[derive(Debug)]
#[cfg(feature = "unstable")]
pub enum FsBlockDbError {
FsError(io::Error),
DbError(rusqlite::Error),
InvalidBlockstoreRoot(PathBuf),
InvalidBlockPath(PathBuf),
CorruptedData(String),
}
#[cfg(feature = "unstable")]
impl From<io::Error> for FsBlockDbError {
fn from(err: io::Error) -> Self {
FsBlockDbError::FsError(err)
}
}
#[cfg(feature = "unstable")]
impl From<rusqlite::Error> for FsBlockDbError {
fn from(err: rusqlite::Error) -> Self {
FsBlockDbError::DbError(err)
}
}
#[cfg(feature = "unstable")]
impl FsBlockDb {
/// Creates a filesystem-backed block store at the given path.
///
/// This will construct or open a SQLite database at the path
/// `<fsblockdb_root>/blockmeta.sqlite` and will ensure that a directory exists at
/// `<fsblockdb_root>/blocks` where this block store will expect to find serialized block
/// files as described for [`FsBlockDb`].
pub fn for_path<P: AsRef<Path>>(fsblockdb_root: P) -> Result<Self, FsBlockDbError> {
let meta = fs::metadata(&fsblockdb_root).map_err(FsBlockDbError::FsError)?;
if meta.is_dir() {
let db_path = fsblockdb_root.as_ref().join("blockmeta.sqlite");
let blocks_dir = fsblockdb_root.as_ref().join("blocks");
fs::create_dir_all(&blocks_dir)?;
Ok(FsBlockDb {
conn: Connection::open(db_path).map_err(FsBlockDbError::DbError)?,
blocks_dir,
})
} else {
Err(FsBlockDbError::InvalidBlockstoreRoot(
fsblockdb_root.as_ref().to_path_buf(),
))
}
}
/// Returns the maximum height of blocks known to the block metadata database.
pub fn get_max_cached_height(&self) -> Result<Option<BlockHeight>, FsBlockDbError> {
Ok(chain::blockmetadb_get_max_cached_height(&self.conn)?)
}
/// Adds a set of block metadata entries to the metadata database.
///
/// This will return an error if any block file corresponding to one of these metadata records
/// is absent from the blocks directory.
pub fn write_block_metadata(&self, block_meta: &[BlockMeta]) -> Result<(), FsBlockDbError> {
for m in block_meta {
let block_path = m.block_file_path(&self.blocks_dir);
let meta = fs::metadata(&block_path)?;
if !meta.is_file() {
return Err(FsBlockDbError::InvalidBlockPath(block_path));
}
}
Ok(chain::blockmetadb_insert(&self.conn, block_meta)?)
}
}
#[cfg(feature = "unstable")]
impl BlockSource for FsBlockDb {
type Error = SqliteClientError;
fn with_blocks<F>(
&self,
from_height: BlockHeight,
limit: Option<u32>,
with_row: F,
) -> Result<(), Self::Error>
where
F: FnMut(CompactBlock) -> Result<(), Self::Error>,
{
fsblockdb_with_blocks(self, from_height, limit, with_row)
} }
} }