Merge pull request #596 from nuttycom/wallet/fs_block_source

Add a BlockSource implementation that reads blocks from files in the filesystem.
This commit is contained in:
Kris Nuttycombe 2022-09-15 09:55:14 -06:00 committed by GitHub
commit d3add8cca4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 406 additions and 11 deletions

View File

@ -25,13 +25,21 @@ and this library adheres to Rust's notion of
- An `unstable` feature flag; this is added to parts of the API that may change
in any release. It enables `zcash_client_backend`'s `unstable` feature flag.
- New summary views that may be directly accessed in the sqlite database.
The structure of these views should be considered unstable; they may
The structure of these views should be considered unstable; they may
be replaced by accessors provided by the data access API at some point
in the future:
- `v_transactions`
- `v_tx_received`
- `v_tx_sent`
- `zcash_client_sqlite::wallet::init::WalletMigrationError`
- A filesystem-backed `BlockSource` implementation
`zcash_client_sqlite::FsBlockDb`. This block source expects blocks to be
stored on disk in individual files named following the pattern
`<blockmeta_root>/blocks/<blockheight>-<blockhash>-compactblock`. A SQLite
database stored at `<blockmeta_root>/blockmeta.sqlite`stores metadata for
this block source.
- `zcash_client_sqlite::chain::init::init_blockmeta_db` creates the required
metadata cache database.
### Changed
- Various **BREAKING CHANGES** have been made to the database tables. These will
@ -85,6 +93,8 @@ and this library adheres to Rust's notion of
- `get_extended_full_viewing_keys` (use
`zcash_client_backend::data_api::WalletRead::get_unified_full_viewing_keys`
instead).
- `zcash_client_sqlite::with_blocks` (use
`zcash_client_backend::data_api::BlockSource::with_blocks` instead)
### Fixed
- The `zcash_client_backend::data_api::WalletRead::get_address` implementation

View File

@ -9,7 +9,17 @@ use zcash_client_backend::{data_api::error::Error, proto::compact_formats::Compa
use crate::{error::SqliteClientError, BlockDb};
#[cfg(feature = "unstable")]
use {
crate::{BlockHash, FsBlockDb},
rusqlite::{Connection, OptionalExtension, NO_PARAMS},
std::fs::File,
std::io::BufReader,
std::path::{Path, PathBuf},
};
pub mod init;
pub mod migrations;
struct CompactBlockRow {
height: BlockHeight,
@ -18,13 +28,12 @@ struct CompactBlockRow {
/// Implements a traversal of `limit` blocks of the block cache database.
///
/// Starting at `from_height`, the `with_row` callback is invoked
/// with each block retrieved from the backing store. If the `limit`
/// value provided is `None`, all blocks are traversed up to the
/// maximum height.
pub fn with_blocks<F>(
/// Starting at the next block above `last_scanned_height`, the `with_row` callback is invoked with
/// each block retrieved from the backing store. If the `limit` value provided is `None`, all
/// blocks are traversed up to the maximum height.
pub(crate) fn blockdb_with_blocks<F>(
cache: &BlockDb,
from_height: BlockHeight,
last_scanned_height: BlockHeight,
limit: Option<u32>,
mut with_row: F,
) -> Result<(), SqliteClientError>
@ -37,7 +46,10 @@ where
)?;
let rows = stmt_blocks.query_map(
params![u32::from(from_height), limit.unwrap_or(u32::max_value()),],
params![
u32::from(last_scanned_height),
limit.unwrap_or(u32::max_value()),
],
|row| {
Ok(CompactBlockRow {
height: BlockHeight::from_u32(row.get(0)?),
@ -64,6 +76,148 @@ where
Ok(())
}
/// Data structure representing a row in the block metadata database.
#[cfg(feature = "unstable")]
pub struct BlockMeta {
pub height: BlockHeight,
pub block_hash: BlockHash,
pub block_time: u32,
pub sapling_outputs_count: u32,
pub orchard_actions_count: u32,
}
#[cfg(feature = "unstable")]
impl BlockMeta {
pub fn block_file_path<P: AsRef<Path>>(&self, blocks_dir: &P) -> PathBuf {
blocks_dir.as_ref().join(Path::new(&format!(
"{}-{}-compactblock",
self.height, self.block_hash
)))
}
}
/// Inserts a batch of rows into the block metadata database.
#[cfg(feature = "unstable")]
pub(crate) fn blockmetadb_insert(
conn: &Connection,
block_meta: &[BlockMeta],
) -> Result<(), rusqlite::Error> {
let mut stmt_insert = conn.prepare(
"INSERT INTO compactblocks_meta (height, blockhash, time, sapling_outputs_count, orchard_actions_count)
VALUES (?, ?, ?, ?, ?)"
)?;
conn.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
let result = block_meta
.iter()
.map(|m| {
stmt_insert.execute(params![
u32::from(m.height),
&m.block_hash.0[..],
m.block_time,
m.sapling_outputs_count,
m.orchard_actions_count,
])
})
.collect::<Result<Vec<_>, _>>();
match result {
Ok(_) => {
conn.execute("COMMIT", NO_PARAMS)?;
Ok(())
}
Err(error) => {
match conn.execute("ROLLBACK", NO_PARAMS) {
Ok(_) => Err(error),
Err(e) =>
// Panicking here is probably the right thing to do, because it
// means the database is corrupt.
panic!(
"Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.",
e,
error
)
}
}
}
}
#[cfg(feature = "unstable")]
pub(crate) fn blockmetadb_get_max_cached_height(
conn: &Connection,
) -> Result<Option<BlockHeight>, rusqlite::Error> {
conn.query_row(
"SELECT MAX(height) FROM compactblocks_meta",
NO_PARAMS,
|row| {
let h: u32 = row.get(0)?;
Ok(BlockHeight::from(h))
},
)
.optional()
}
/// Implements a traversal of `limit` blocks of the filesystem-backed
/// block cache.
///
/// Starting at the next block height above `last_scanned_height`, the `with_row` callback is
/// invoked with each block retrieved from the backing store. If the `limit` value provided is
/// `None`, all blocks are traversed up to the maximum height for which metadata is available.
#[cfg(feature = "unstable")]
pub(crate) fn fsblockdb_with_blocks<F>(
cache: &FsBlockDb,
last_scanned_height: BlockHeight,
limit: Option<u32>,
mut with_block: F,
) -> Result<(), SqliteClientError>
where
F: FnMut(CompactBlock) -> Result<(), SqliteClientError>,
{
// Fetch the CompactBlocks we need to scan
let mut stmt_blocks = cache.conn.prepare(
"SELECT height, blockhash, time, sapling_outputs_count, orchard_actions_count
FROM compactblocks_meta
WHERE height > ?
ORDER BY height ASC LIMIT ?",
)?;
let rows = stmt_blocks.query_map(
params![
u32::from(last_scanned_height),
limit.unwrap_or(u32::max_value()),
],
|row| {
Ok(BlockMeta {
height: BlockHeight::from_u32(row.get(0)?),
block_hash: BlockHash::from_slice(&row.get::<_, Vec<_>>(1)?),
block_time: row.get(2)?,
sapling_outputs_count: row.get(3)?,
orchard_actions_count: row.get(4)?,
})
},
)?;
for row_result in rows {
let cbr = row_result?;
let block_file = File::open(cbr.block_file_path(&cache.blocks_dir))?;
let mut buf_reader = BufReader::new(block_file);
let block: CompactBlock =
Message::parse_from_reader(&mut buf_reader).map_err(Error::from)?;
if block.height() != cbr.height {
return Err(SqliteClientError::CorruptedData(format!(
"Block height {} did not match row's height field value {}",
block.height(),
cbr.height
)));
}
with_block(block)?;
}
Ok(())
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {

View File

@ -1,9 +1,16 @@
//! Functions for initializing the various databases.
use rusqlite::NO_PARAMS;
use crate::BlockDb;
#[cfg(feature = "unstable")]
use {
super::migrations,
crate::FsBlockDb,
schemer::{Migrator, MigratorError},
schemer_rusqlite::RusqliteAdapter,
};
/// Sets up the internal structure of the cache database.
///
/// # Examples
@ -29,3 +36,32 @@ pub fn init_cache_database(db_cache: &BlockDb) -> Result<(), rusqlite::Error> {
)?;
Ok(())
}
/// Sets up the internal structure of the metadata cache database.
///
/// # Examples
///
/// ```
/// use tempfile::{tempdir, NamedTempFile};
/// use zcash_client_sqlite::{
/// FsBlockDb,
/// chain::init::init_blockmeta_db,
/// };
///
/// let cache_file = NamedTempFile::new().unwrap();
/// let blocks_dir = tempdir().unwrap();
/// let mut db = FsBlockDb::for_path(blocks_dir.path()).unwrap();
/// init_blockmeta_db(&mut db).unwrap();
/// ```
#[cfg(feature = "unstable")]
pub fn init_blockmeta_db(db: &mut FsBlockDb) -> Result<(), MigratorError<rusqlite::Error>> {
let adapter = RusqliteAdapter::new(&mut db.conn, Some("schemer_migrations".to_string()));
adapter.init().expect("Migrations table setup succeeds.");
let mut migrator = Migrator::new(adapter);
migrator
.register_multiple(migrations::blockmeta::all_migrations())
.expect("Migration registration should have been successful.");
migrator.up(None)?;
Ok(())
}

View File

@ -0,0 +1 @@
pub mod blockmeta;

View File

@ -0,0 +1,53 @@
use schemer_rusqlite::RusqliteMigration;
pub fn all_migrations() -> Vec<Box<dyn RusqliteMigration<Error = rusqlite::Error>>> {
vec![Box::new(init::Migration {})]
}
pub mod init {
use rusqlite::{self};
use schemer::{self, migration};
use schemer_rusqlite::RusqliteMigration;
use uuid::Uuid;
pub struct Migration;
/// The migration that added the `compactblocks_meta` table.
///
/// 68525b40-36e5-46aa-a765-720f8389b99d
pub const MIGRATION_ID: Uuid = Uuid::from_fields(
0x68525b40,
0x36e5,
0x46aa,
b"\xa7\x65\x72\x0f\x83\x89\xb9\x9d",
);
migration!(
Migration,
&format!("{}", MIGRATION_ID),
[],
"Initialize the cachemeta database."
);
impl RusqliteMigration for Migration {
type Error = rusqlite::Error;
fn up(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch(
"CREATE TABLE compactblocks_meta (
height INTEGER PRIMARY KEY,
blockhash BLOB NOT NULL,
time INTEGER NOT NULL,
sapling_outputs_count INTEGER NOT NULL,
orchard_actions_count INTEGER NOT NULL
)",
)?;
Ok(())
}
fn down(&self, transaction: &rusqlite::Transaction) -> Result<(), Self::Error> {
transaction.execute_batch("DROP TABLE compactblocks_meta;")?;
Ok(())
}
}
}

View File

@ -73,6 +73,13 @@ use {
zcash_primitives::legacy::TransparentAddress,
};
#[cfg(feature = "unstable")]
use {
crate::chain::{fsblockdb_with_blocks, BlockMeta},
std::path::PathBuf,
std::{fs, io},
};
mod prepared;
pub use prepared::DataConnStmtCache;
@ -717,7 +724,7 @@ impl<'a, P: consensus::Parameters> WalletWriteTransparent for DataConnStmtCache<
}
}
/// A wrapper for the SQLite connection to the block cache database.
/// A handle for the SQLite block source.
pub struct BlockDb(Connection);
impl BlockDb {
@ -739,7 +746,141 @@ impl BlockSource for BlockDb {
where
F: FnMut(CompactBlock) -> Result<(), Self::Error>,
{
chain::with_blocks(self, from_height, limit, with_row)
chain::blockdb_with_blocks(self, from_height, limit, with_row)
}
}
/// A block source that reads block data from disk and block metadata from a SQLite database.
///
/// This block source expects each compact block to be stored on disk in the `blocks` subdirectory
/// of the `blockstore_root` path provided at the time of construction. Each block should be
/// written, as the serialized bytes of its protobuf representation, where the path for each block
/// has the pattern:
///
/// `<blockstore_root>/blocks/<block_height>-<block_hash>-compactblock`
///
/// where `<block_height>` is the decimal value of the height at which the block was mined, and
/// `<block_hash>` is the hexadecimal representation of the block hash, as produced by the
/// [`Display`] implementation for [`zcash_primitives::block::BlockHash`].
///
/// This block source is intended to be used with the following data flow:
/// * When the cache is being filled:
/// * The caller requests the current maximum height height at which cached data is available
/// using [`FsBlockDb::get_max_cached_height`]. If no cached data is available, the caller
/// can use the wallet's synced-to height for the following operations instead.
/// * (recommended for privacy) the caller should round the returned height down to some 100- /
/// 1000-block boundary.
/// * The caller uses the lightwalletd's `getblock` gRPC method to obtain a stream of blocks.
/// For each block returned, the caller writes the compact block to `blocks_dir` using the
/// path format specified above. It is fine to overwrite an existing block, since block hashes
/// are immutable and collision-resistant.
/// * Once a caller-determined number of blocks have been successfully written to disk, the
/// caller should invoke [`FsBlockDb::write_block_metadata`] with the metadata for each block
/// written to disk.
/// * The cache can then be scanned using the [`BlockSource`] implementation, providing the
/// wallet's synced-to-height as a starting point.
/// * When part of the cache is no longer needed:
/// * The caller determines some height `H` that is the earliest block data it needs to preserve.
/// This might be determined based on where the wallet is fully-synced to, or other heuristics.
/// * The caller searches the defined filesystem folder for all files beginning in `HEIGHT-*` where
/// `HEIGHT < H`, and deletes those files.
///
/// Note: This API is unstable, and may change in the future. In particular, the [`BlockSource`]
/// API and the above description currently assume that scanning is performed in linear block
/// order; this assumption is likely to be weakened and/or removed in a future update.
#[cfg(feature = "unstable")]
pub struct FsBlockDb {
conn: Connection,
blocks_dir: PathBuf,
}
/// Errors that can be generated by the filesystem/sqlite-backed
/// block source.
#[derive(Debug)]
#[cfg(feature = "unstable")]
pub enum FsBlockDbError {
FsError(io::Error),
DbError(rusqlite::Error),
InvalidBlockstoreRoot(PathBuf),
InvalidBlockPath(PathBuf),
CorruptedData(String),
}
#[cfg(feature = "unstable")]
impl From<io::Error> for FsBlockDbError {
fn from(err: io::Error) -> Self {
FsBlockDbError::FsError(err)
}
}
#[cfg(feature = "unstable")]
impl From<rusqlite::Error> for FsBlockDbError {
fn from(err: rusqlite::Error) -> Self {
FsBlockDbError::DbError(err)
}
}
#[cfg(feature = "unstable")]
impl FsBlockDb {
/// Creates a filesystem-backed block store at the given path.
///
/// This will construct or open a SQLite database at the path
/// `<fsblockdb_root>/blockmeta.sqlite` and will ensure that a directory exists at
/// `<fsblockdb_root>/blocks` where this block store will expect to find serialized block
/// files as described for [`FsBlockDb`].
pub fn for_path<P: AsRef<Path>>(fsblockdb_root: P) -> Result<Self, FsBlockDbError> {
let meta = fs::metadata(&fsblockdb_root).map_err(FsBlockDbError::FsError)?;
if meta.is_dir() {
let db_path = fsblockdb_root.as_ref().join("blockmeta.sqlite");
let blocks_dir = fsblockdb_root.as_ref().join("blocks");
fs::create_dir_all(&blocks_dir)?;
Ok(FsBlockDb {
conn: Connection::open(db_path).map_err(FsBlockDbError::DbError)?,
blocks_dir,
})
} else {
Err(FsBlockDbError::InvalidBlockstoreRoot(
fsblockdb_root.as_ref().to_path_buf(),
))
}
}
/// Returns the maximum height of blocks known to the block metadata database.
pub fn get_max_cached_height(&self) -> Result<Option<BlockHeight>, FsBlockDbError> {
Ok(chain::blockmetadb_get_max_cached_height(&self.conn)?)
}
/// Adds a set of block metadata entries to the metadata database.
///
/// This will return an error if any block file corresponding to one of these metadata records
/// is absent from the blocks directory.
pub fn write_block_metadata(&self, block_meta: &[BlockMeta]) -> Result<(), FsBlockDbError> {
for m in block_meta {
let block_path = m.block_file_path(&self.blocks_dir);
let meta = fs::metadata(&block_path)?;
if !meta.is_file() {
return Err(FsBlockDbError::InvalidBlockPath(block_path));
}
}
Ok(chain::blockmetadb_insert(&self.conn, block_meta)?)
}
}
#[cfg(feature = "unstable")]
impl BlockSource for FsBlockDb {
type Error = SqliteClientError;
fn with_blocks<F>(
&self,
from_height: BlockHeight,
limit: Option<u32>,
with_row: F,
) -> Result<(), Self::Error>
where
F: FnMut(CompactBlock) -> Result<(), Self::Error>,
{
fsblockdb_with_blocks(self, from_height, limit, with_row)
}
}