change(scan): Create a scanner storage database, but don't use it yet (#8031)

* Create an empty storage/db module

* Use ephemeral storage in tests

* Populate storage inside new() method

* Move scanner setup into an init() method

* Pass the network to scanner init

* Create a database but don't actually use it

* Skip shutdown format checks when skipping format upgrades

* Allow the scanner to skip launching format upgrades in production

* Refactor skipping format upgrades so it is consistent

* Allow checking configs for equality

* Restore Network import
This commit is contained in:
teor 2023-11-30 22:59:15 +10:00 committed by GitHub
parent 1708f9d946
commit 8c717c92dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 246 additions and 41 deletions

View File

@ -5804,6 +5804,7 @@ dependencies = [
"indexmap 2.1.0",
"jubjub",
"rand 0.8.5",
"semver 1.0.20",
"serde",
"tokio",
"tower",

View File

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
/// Configuration for parallel semantic verification:
/// <https://zebra.zfnd.org/dev/rfcs/0002-parallel-verification.html#definitions>
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(
deny_unknown_fields,
default,

View File

@ -46,7 +46,7 @@ pub use cache_dir::CacheDir;
const MAX_SINGLE_SEED_PEER_DNS_RETRIES: usize = 0;
/// Configuration for networking code.
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The address on which this node should listen for connections.

View File

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
pub mod mining;
/// RPC configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// IP address and port for the RPC server.

View File

@ -22,6 +22,7 @@ categories = ["cryptography::cryptocurrencies"]
color-eyre = "0.6.2"
indexmap = { version = "2.0.1", features = ["serde"] }
semver = "1.0.20"
serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = "1.34.0"
tower = "0.4.13"
@ -31,7 +32,7 @@ zcash_client_backend = "0.10.0-rc.1"
zcash_primitives = "0.13.0-rc.1"
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.31" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.31" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.31", features = ["shielded-scan"] }
[dev-dependencies]

View File

@ -3,6 +3,8 @@
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use zebra_state::Config as DbConfig;
use crate::storage::SaplingScanningKey;
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
@ -10,14 +12,37 @@ use crate::storage::SaplingScanningKey;
/// Configuration for scanning.
pub struct Config {
/// The sapling keys to scan for and the birthday height of each of them.
// TODO: any value below sapling activation as the birthday height should default to sapling activation.
//
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,
/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state to create a common database config.
#[serde(flatten)]
db_config: DbConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
sapling_keys_to_scan: IndexMap::new(),
db_config: DbConfig::default(),
}
}
}
impl Config {
/// Returns a config for a temporary database that is deleted when it is dropped.
pub fn ephemeral() -> Self {
Self {
db_config: DbConfig::ephemeral(),
..Self::default()
}
}
/// Returns the database-specific config.
pub fn db_config(&self) -> &DbConfig {
&self.db_config
}
}

21
zebra-scan/src/init.rs Normal file
View File

@ -0,0 +1,21 @@
//! Initializing the scanner.
use color_eyre::Report;
use tokio::task::JoinHandle;
use tracing::Instrument;
use zebra_chain::parameters::Network;
use crate::{scan, storage::Storage, Config};
/// Initialize the scanner based on its config.
pub fn init(
config: &Config,
network: Network,
state: scan::State,
) -> JoinHandle<Result<(), Report>> {
let storage = Storage::new(config, network);
// TODO: add more tasks here?
tokio::spawn(scan::start(state, storage).in_current_span())
}

View File

@ -5,8 +5,12 @@
#![doc(html_root_url = "https://docs.rs/zebra_scan")]
pub mod config;
pub mod init;
pub mod scan;
pub mod storage;
#[cfg(test)]
mod tests;
pub use config::Config;
pub use init::init;

View File

@ -21,7 +21,8 @@ use zebra_chain::{
use crate::storage::Storage;
type State = Buffer<
/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;
@ -35,7 +36,7 @@ const CHECK_INTERVAL: Duration = Duration::from_secs(10);
/// Start the scan task given state and storage.
///
/// - This function is dummy at the moment. It just makes sure we can read the storage and the state.
/// - Modificatiuons here might have an impact in the `scan_task_starts` test.
/// - Modifications here might have an impact in the `scan_task_starts` test.
/// - Real scanning code functionality will be added in the future here.
pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
// We want to make sure the state has a tip height available before we start scanning.

View File

@ -1,16 +1,47 @@
//! Store viewing keys and results of the scan.
#![allow(dead_code)]
use std::collections::HashMap;
use zebra_chain::{block::Height, transaction::Hash};
use zebra_chain::{block::Height, parameters::Network, transaction::Hash};
use crate::config::Config;
pub mod db;
/// The type used in Zebra to store Sapling scanning keys.
/// It can represent a full viewing key or an individual viewing key.
pub type SaplingScanningKey = String;
/// Store key info and results of the scan.
#[allow(dead_code)]
///
/// `rocksdb` allows concurrent writes through a shared reference,
/// so clones of the scanner storage represent the same database instance.
/// When the final clone is dropped, the database is closed.
#[derive(Clone, Debug)]
pub struct Storage {
// Configuration
//
// This configuration cannot be modified after the database is initialized,
// because some clones would have different values.
//
// TODO: add config if needed?
// Owned State
//
// Everything contained in this state must be shared by all clones, or read-only.
//
/// The underlying database.
///
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
//
// This database is created but not actually used for results.
// TODO: replace the fields below with a database instance.
db: db::ScannerDb,
/// The sapling key and an optional birthday for it.
sapling_keys: HashMap<SaplingScanningKey, Option<Height>>,
@ -18,14 +49,23 @@ pub struct Storage {
sapling_results: HashMap<SaplingScanningKey, Vec<Hash>>,
}
#[allow(dead_code)]
impl Storage {
/// Create a new storage.
pub fn new() -> Self {
Self {
sapling_keys: HashMap::new(),
sapling_results: HashMap::new(),
/// Opens and returns the on-disk scanner results storage for `config` and `network`.
/// If there is no existing storage, creates a new storage on disk.
///
/// TODO:
/// New keys in `config` are inserted into the database with their birthday heights. Shielded
/// activation is the minimum birthday height.
///
/// Birthdays and scanner progress are marked by inserting an empty result for that height.
pub fn new(config: &Config, network: Network) -> Self {
let mut storage = Self::new_db(config, network);
for (key, birthday) in config.sapling_keys_to_scan.iter() {
storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday)));
}
storage
}
/// Add a sapling key to the storage.
@ -43,18 +83,18 @@ impl Storage {
}
/// Get the results of a sapling key.
//
// TODO: Rust style - remove "get_" from these names
pub fn get_sapling_results(&self, key: &str) -> Vec<Hash> {
self.sapling_results.get(key).cloned().unwrap_or_default()
}
/// Get all keys and their birthdays.
//
// TODO: any value below sapling activation as the birthday height, or `None`, should default
// to sapling activation. This requires the configured network.
// Return Height not Option<Height>.
pub fn get_sapling_keys(&self) -> HashMap<String, Option<Height>> {
self.sapling_keys.clone()
}
}
impl Default for Storage {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,104 @@
//! Persistent storage for scanner results.
use std::{collections::HashMap, path::Path};
use semver::Version;
use zebra_chain::parameters::Network;
use crate::Config;
use super::Storage;
// Public types and APIs
pub use zebra_state::ZebraDb as ScannerDb;
/// The directory name used to distinguish the scanner database from Zebra's other databases or
/// flat files.
///
/// We use "private" in the name to warn users not to share this data.
pub const SCANNER_DATABASE_KIND: &str = "private-scan";
/// The column families supported by the running `zebra-scan` database code.
///
/// Existing column families that aren't listed here are preserved when the database is opened.
pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
// Sapling
"sapling_tx_ids",
// Orchard
// TODO
];
impl Storage {
/// Opens and returns an on-disk scanner results database instance for `config` and `network`.
/// If there is no existing database, creates a new database on disk.
///
/// New keys in `config` are not inserted into the database.
pub(crate) fn new_db(config: &Config, network: Network) -> Self {
Self::new_with_debug(
config, network,
// TODO: make format upgrades work with any database, then change this to `false`
true,
)
}
/// Returns an on-disk database instance with the supplied production and debug settings.
/// If there is no existing database, creates a new database on disk.
///
/// New keys in `config` are not inserted into the database.
///
/// This method is intended for use in tests.
pub(crate) fn new_with_debug(
config: &Config,
network: Network,
debug_skip_format_upgrades: bool,
) -> Self {
let db = ScannerDb::new(
config.db_config(),
SCANNER_DATABASE_KIND,
&Self::database_format_version_in_code(),
network,
debug_skip_format_upgrades,
SCANNER_COLUMN_FAMILIES_IN_CODE
.iter()
.map(ToString::to_string),
);
let new_storage = Self {
db,
sapling_keys: HashMap::new(),
sapling_results: HashMap::new(),
};
// TODO: report the last scanned height here?
tracing::info!("loaded Zebra scanner cache");
new_storage
}
/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
}
/// Returns the configured network for this database.
pub fn network(&self) -> Network {
self.db.network()
}
/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
self.db.path()
}
/// Check for panics in code running in spawned threads.
/// If a thread exited with a panic, resume that panic.
///
/// This method should be called regularly, so that panics are detected as soon as possible.
//
// TODO: when we implement format changes, call this method regularly
pub fn check_for_panics(&mut self) {
self.db.check_for_panics()
}
}

View File

@ -32,10 +32,14 @@ use zcash_primitives::{
};
use zebra_chain::{
block::Block, chain_tip::ChainTip, serialization::ZcashDeserializeInto, transaction::Hash,
block::Block, chain_tip::ChainTip, parameters::Network, serialization::ZcashDeserializeInto,
transaction::Hash,
};
use crate::scan::{block_to_compact, scan_block};
use crate::{
config::Config,
scan::{block_to_compact, scan_block},
};
/// Prove that we can create fake blocks with fake notes and scan them using the
/// `zcash_client_backend::scanning::scan_block` function:
@ -177,7 +181,7 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
zcash_client_backend::encoding::encode_extended_full_viewing_key("zxviews", &extfvk);
// Create a database
let mut s = crate::storage::Storage::new();
let mut s = crate::storage::Storage::new(&Config::ephemeral(), Network::Mainnet);
// Insert the generated key to the database
s.add_sapling_key(key_to_be_stored.clone(), None);

View File

@ -20,7 +20,7 @@ use crate::{
};
/// Configuration for the state service.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The root directory for storing cached block data.
@ -162,7 +162,7 @@ impl Config {
version_path
}
/// Construct a config for an ephemeral database
/// Returns a config for a temporary database that is deleted when it is dropped.
pub fn ephemeral() -> Config {
Config {
ephemeral: true,

View File

@ -107,10 +107,16 @@ impl ZebraDb {
// Log any format changes before opening the database, in case opening fails.
let format_change = DbFormatChange::open_database(format_version_in_code, disk_version);
// Always do format upgrades in production, but allow them to be skipped by the scanner
// (because it doesn't support them yet).
//
// TODO: Make scanner support format upgrades, then remove `shielded-scan` here.
let can_skip_format_upgrades = cfg!(test) || cfg!(feature = "shielded-scan");
// Open the database and do initial checks.
let mut db = ZebraDb {
config: Arc::new(config.clone()),
debug_skip_format_upgrades,
debug_skip_format_upgrades: can_skip_format_upgrades && debug_skip_format_upgrades,
format_change_handle: None,
// After the database directory is created, a newly created database temporarily
// changes to the default database version. Then we set the correct version in the
@ -132,8 +138,7 @@ impl ZebraDb {
/// Launch any required format changes or format checks, and store their thread handle.
pub fn spawn_format_change(&mut self, format_change: DbFormatChange) {
// Always do format upgrades & checks in production code.
if cfg!(test) && self.debug_skip_format_upgrades {
if self.debug_skip_format_upgrades {
return;
}
@ -229,13 +234,16 @@ impl ZebraDb {
///
/// See [`DiskDb::shutdown`] for details.
pub fn shutdown(&mut self, force: bool) {
// Are we shutting down the underlying database instance?
let is_shutdown = force || self.db.shared_database_owners() <= 1;
// # Concurrency
//
// The format upgrade task should be cancelled before the database is flushed or shut down.
// This helps avoid some kinds of deadlocks.
//
// See also the correctness note in `DiskDb::shutdown()`.
if force || self.db.shared_database_owners() <= 1 {
if !self.debug_skip_format_upgrades && is_shutdown {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.force_cancel();
}

View File

@ -291,14 +291,10 @@ impl StartCmd {
#[cfg(feature = "zebra-scan")]
// Spawn never ending scan task.
let scan_task_handle = {
info!("spawning zebra_scanner");
let mut storage = zebra_scan::storage::Storage::new();
for (key, birthday) in config.shielded_scan.sapling_keys_to_scan.iter() {
storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday)));
}
tokio::spawn(zebra_scan::scan::start(state, storage).in_current_span())
info!("spawning shielded scanner with configured viewing keys");
zebra_scan::init(&config.shielded_scan, config.network.network, state)
};
#[cfg(not(feature = "zebra-scan"))]
// Spawn a dummy scan task which doesn't do anything and never finishes.
let scan_task_handle: tokio::task::JoinHandle<Result<(), Report>> =

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
/// Mempool configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The mempool transaction cost limit.

View File

@ -59,7 +59,7 @@ impl MetricsEndpoint {
}
/// Metrics configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The address used for the Prometheus metrics endpoint.

View File

@ -220,7 +220,7 @@ const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
/// Sync configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The number of parallel block download requests.

View File

@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
/// The `zebrad` config is a TOML-encoded version of this structure. The meaning
/// of each field is described in the documentation, although it may be necessary
/// to click through to the sub-structures for each section.
#[derive(Clone, Default, Debug, Deserialize, Serialize)]
#[derive(Clone, Default, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct ZebradConfig {
/// Consensus configuration