From 2a004ffe9e281d95e6ae9c47aa41d7801d7e9b0d Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 6 Feb 2024 13:41:00 -0500 Subject: [PATCH] add(scan): Handle RegisterKeys messages in scan task (#8222) * moves ScanTask to its own module in service module * moves `process_messages()` method to scan_task.rs * uses get_mut() and returns new keys * updates types and adds scan_until() function * adds and uses wait_for_height() function * spawns scan_until_task_handler to poll scan_until_tasks FuturesUnordered * corrects comment * adds TODO * updates expected test log, corrects panic msg, fixes fmt * moves scan functions to scan_task module * moves ScanTaskCommand and related method impls to its own module * moves `ScanTask::mock()` test constructor to its own module * creates `add_keys` mod and moves `scan_until()` function there * moves scan task executor to its own module and adds ScanRangeTaskBuilder type * renames add_keys to scan_range, moves executor to scan_task mod * adds test for process_messages * updates scan_height_and_store_results() to skip last_scanned_height check if there is no key height for a key in `parsed_keys` * updates `default_test_config()` * adds acceptance test for registering keys in ScanTask * uses the right feature for the new acceptance test * Applies suggestions from code review --- zebra-scan/src/init.rs | 102 +--------- zebra-scan/src/lib.rs | 8 +- zebra-scan/src/service.rs | 17 +- zebra-scan/src/service/scan_task.rs | 54 ++++++ zebra-scan/src/service/scan_task/commands.rs | 153 +++++++++++++++ zebra-scan/src/service/scan_task/executor.rs | 51 +++++ .../src/{ => service/scan_task}/scan.rs | 179 +++++++++--------- .../src/service/scan_task/scan/scan_range.rs | 127 +++++++++++++ zebra-scan/src/service/scan_task/tests.rs | 26 +++ .../src/service/scan_task/tests/vectors.rs | 165 ++++++++++++++++ zebra-scan/src/service/tests.rs | 3 +- zebra-scan/src/storage/db/sapling.rs | 2 +- zebrad/src/commands/start.rs | 2 +- zebrad/tests/acceptance.rs | 27 ++- zebrad/tests/common/config.rs | 14 +- zebrad/tests/common/mod.rs | 3 + zebrad/tests/common/shielded_scan.rs | 3 + .../common/shielded_scan/scans_for_new_key.rs | 133 +++++++++++++ 18 files changed, 850 insertions(+), 219 deletions(-) create mode 100644 zebra-scan/src/service/scan_task.rs create mode 100644 zebra-scan/src/service/scan_task/commands.rs create mode 100644 zebra-scan/src/service/scan_task/executor.rs rename zebra-scan/src/{ => service/scan_task}/scan.rs (81%) create mode 100644 zebra-scan/src/service/scan_task/scan/scan_range.rs create mode 100644 zebra-scan/src/service/scan_task/tests.rs create mode 100644 zebra-scan/src/service/scan_task/tests/vectors.rs create mode 100644 zebrad/tests/common/shielded_scan.rs create mode 100644 zebrad/tests/common/shielded_scan/scans_for_new_key.rs diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index 1ed32b528..360014348 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -1,111 +1,13 @@ -//! Initializing the scanner. - -use std::sync::{mpsc, Arc}; +//! Initializing the scanner and gRPC server. use color_eyre::Report; -use tokio::{sync::oneshot, task::JoinHandle}; use tower::ServiceBuilder; -use zebra_chain::{parameters::Network, transaction::Transaction}; +use zebra_chain::parameters::Network; use zebra_state::ChainTipChange; use crate::{scan, service::ScanService, Config}; -#[derive(Debug)] -/// Commands that can be sent to [`ScanTask`] -pub enum ScanTaskCommand { - /// Start scanning for new viewing keys - RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es - - /// Stop scanning for deleted viewing keys - RemoveKeys { - /// Notify the caller once the key is removed (so the caller can wait before clearing results) - done_tx: oneshot::Sender<()>, - - /// Key hashes that are to be removed - keys: Vec, - }, - - /// Start sending results for key hashes to `result_sender` - SubscribeResults { - /// Sender for results - result_sender: mpsc::Sender>, - - /// Key hashes to send the results of to result channel - key_hashes: Vec<()>, - }, -} - -#[derive(Debug, Clone)] -/// Scan task handle and command channel sender -pub struct ScanTask { - /// [`JoinHandle`] of scan task - pub handle: Arc>>, - - /// Task command channel sender - pub cmd_sender: mpsc::Sender, -} - -impl ScanTask { - /// Spawns a new [`ScanTask`] for tests. - #[cfg(any(test, feature = "proptest-impl"))] - pub fn mock() -> (Self, mpsc::Receiver) { - let (cmd_sender, cmd_receiver) = mpsc::channel(); - - ( - Self { - handle: Arc::new(tokio::spawn(std::future::pending())), - cmd_sender, - }, - cmd_receiver, - ) - } - - /// Spawns a new [`ScanTask`]. - pub fn spawn( - config: &Config, - network: Network, - state: scan::State, - chain_tip_change: ChainTipChange, - ) -> Self { - let (cmd_sender, cmd_receiver) = mpsc::channel(); - - Self { - handle: Arc::new(scan::spawn_init( - config, - network, - state, - chain_tip_change, - cmd_receiver, - )), - cmd_sender, - } - } - - /// Sends a command to the scan task - pub fn send( - &mut self, - command: ScanTaskCommand, - ) -> Result<(), mpsc::SendError> { - self.cmd_sender.send(command) - } - - /// Sends a message to the scan task to remove the provided viewing keys. - pub fn remove_keys( - &mut self, - keys: &[String], - ) -> Result, mpsc::SendError> { - let (done_tx, done_rx) = oneshot::channel(); - - self.send(ScanTaskCommand::RemoveKeys { - keys: keys.to_vec(), - done_tx, - })?; - - Ok(done_rx) - } -} - /// Initialize [`ScanService`] based on its config. /// /// TODO: add a test for this function. diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index c40523093..59594c8b0 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -9,14 +9,18 @@ extern crate tracing; pub mod config; pub mod init; -pub mod scan; pub mod storage; use zebra_node_services::scan_service::{request::Request, response::Response}; pub mod service; + +pub use service::scan_task::scan; + #[cfg(any(test, feature = "proptest-impl"))] pub mod tests; pub use config::Config; -pub use init::{init, ScanTask}; +pub use init::init; + +pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey}; diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index 970e52f9d..3dee20f78 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -9,11 +9,18 @@ use zebra_chain::{parameters::Network, transaction::Hash}; use zebra_state::ChainTipChange; -use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response}; +use crate::{scan, storage::Storage, Config, Request, Response}; #[cfg(test)] mod tests; +pub mod scan_task; + +pub use scan_task::{ScanTask, ScanTaskCommand}; + +#[cfg(any(test, feature = "proptest-impl"))] +use std::sync::mpsc::Receiver; + /// Zebra-scan [`tower::Service`] #[derive(Debug)] pub struct ScanService { @@ -42,13 +49,9 @@ impl ScanService { } /// Create a new [`ScanService`] with a mock `ScanTask` + // TODO: Move this to tests behind `cfg(any(test, feature = "proptest-impl"))` #[cfg(any(test, feature = "proptest-impl"))] - pub fn new_with_mock_scanner( - db: Storage, - ) -> ( - Self, - std::sync::mpsc::Receiver, - ) { + pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver) { let (scan_task, cmd_receiver) = ScanTask::mock(); (Self { db, scan_task }, cmd_receiver) } diff --git a/zebra-scan/src/service/scan_task.rs b/zebra-scan/src/service/scan_task.rs new file mode 100644 index 000000000..f149b3834 --- /dev/null +++ b/zebra-scan/src/service/scan_task.rs @@ -0,0 +1,54 @@ +//! Types and method implementations for [`ScanTask`] + +use std::sync::{mpsc, Arc}; + +use color_eyre::Report; +use tokio::task::JoinHandle; + +use zebra_chain::parameters::Network; +use zebra_state::ChainTipChange; + +use crate::Config; + +mod commands; +mod executor; +pub mod scan; + +pub use commands::ScanTaskCommand; + +#[cfg(any(test, feature = "proptest-impl"))] +pub mod tests; + +#[derive(Debug, Clone)] +/// Scan task handle and command channel sender +pub struct ScanTask { + /// [`JoinHandle`] of scan task + pub handle: Arc>>, + + /// Task command channel sender + pub cmd_sender: mpsc::Sender, +} + +impl ScanTask { + /// Spawns a new [`ScanTask`]. + pub fn spawn( + config: &Config, + network: Network, + state: scan::State, + chain_tip_change: ChainTipChange, + ) -> Self { + // TODO: Use a bounded channel or move this logic to the scan service or another service. + let (cmd_sender, cmd_receiver) = mpsc::channel(); + + Self { + handle: Arc::new(scan::spawn_init( + config, + network, + state, + chain_tip_change, + cmd_receiver, + )), + cmd_sender, + } + } +} diff --git a/zebra-scan/src/service/scan_task/commands.rs b/zebra-scan/src/service/scan_task/commands.rs new file mode 100644 index 000000000..58d9bd88d --- /dev/null +++ b/zebra-scan/src/service/scan_task/commands.rs @@ -0,0 +1,153 @@ +//! Types and method implementations for [`ScanTaskCommand`] + +use std::{ + collections::HashMap, + sync::{ + mpsc::{self, Receiver, TryRecvError}, + Arc, + }, +}; + +use color_eyre::{eyre::eyre, Report}; +use tokio::sync::oneshot; + +use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey}; +use zebra_chain::{block::Height, transaction::Transaction}; +use zebra_state::SaplingScanningKey; + +use super::ScanTask; + +#[derive(Debug)] +/// Commands that can be sent to [`ScanTask`] +pub enum ScanTaskCommand { + /// Start scanning for new viewing keys + RegisterKeys { + /// New keys to start scanning for + keys: HashMap< + SaplingScanningKey, + (Vec, Vec, Height), + >, + }, + + /// Stop scanning for deleted viewing keys + RemoveKeys { + /// Notify the caller once the key is removed (so the caller can wait before clearing results) + done_tx: oneshot::Sender<()>, + + /// Key hashes that are to be removed + keys: Vec, + }, + + /// Start sending results for key hashes to `result_sender` + // TODO: Implement this command (#8206) + SubscribeResults { + /// Sender for results + result_sender: mpsc::Sender>, + + /// Key hashes to send the results of to result channel + keys: Vec, + }, +} + +impl ScanTask { + /// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver + /// + /// Processes messages in the scan task channel, updating `parsed_keys` if required. + /// + /// Returns newly registered keys for scanning. + pub fn process_messages( + cmd_receiver: &Receiver, + parsed_keys: &mut HashMap< + SaplingScanningKey, + (Vec, Vec), + >, + ) -> Result< + HashMap, Vec, Height)>, + Report, + > { + let mut new_keys = HashMap::new(); + + loop { + let cmd = match cmd_receiver.try_recv() { + Ok(cmd) => cmd, + + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + // Return early if the sender has been dropped. + return Err(eyre!("command channel disconnected")); + } + }; + + match cmd { + ScanTaskCommand::RegisterKeys { keys } => { + let keys: Vec<_> = keys + .into_iter() + .filter(|(key, _)| { + !parsed_keys.contains_key(key) || new_keys.contains_key(key) + }) + .collect(); + + if !keys.is_empty() { + new_keys.extend(keys.clone()); + + let keys = + keys.into_iter() + .map(|(key, (decoded_dfvks, decoded_ivks, _h))| { + (key, (decoded_dfvks, decoded_ivks)) + }); + + parsed_keys.extend(keys); + } + } + + ScanTaskCommand::RemoveKeys { done_tx, keys } => { + for key in keys { + parsed_keys.remove(&key); + new_keys.remove(&key); + } + + // Ignore send errors for the done notification, caller is expected to use a timeout. + let _ = done_tx.send(()); + } + + _ => continue, + } + } + + Ok(new_keys) + } + + /// Sends a command to the scan task + pub fn send( + &mut self, + command: ScanTaskCommand, + ) -> Result<(), mpsc::SendError> { + self.cmd_sender.send(command) + } + + /// Sends a message to the scan task to remove the provided viewing keys. + pub fn remove_keys( + &mut self, + keys: &[String], + ) -> Result, mpsc::SendError> { + let (done_tx, done_rx) = oneshot::channel(); + + self.send(ScanTaskCommand::RemoveKeys { + keys: keys.to_vec(), + done_tx, + })?; + + Ok(done_rx) + } + + /// Sends a message to the scan task to start scanning for the provided viewing keys. + pub fn register_keys( + &mut self, + keys: HashMap< + SaplingScanningKey, + (Vec, Vec, Height), + >, + ) -> Result<(), mpsc::SendError> { + self.send(ScanTaskCommand::RegisterKeys { keys }) + } +} diff --git a/zebra-scan/src/service/scan_task/executor.rs b/zebra-scan/src/service/scan_task/executor.rs new file mode 100644 index 000000000..4f0c00860 --- /dev/null +++ b/zebra-scan/src/service/scan_task/executor.rs @@ -0,0 +1,51 @@ +//! The scan task executor + +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; +use tracing::Instrument; +use zebra_chain::BoxError; + +use super::scan::ScanRangeTaskBuilder; + +const EXECUTOR_BUFFER_SIZE: usize = 100; + +pub fn spawn_init() -> ( + Sender, + JoinHandle>, +) { + // TODO: Use a bounded channel. + let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE); + + ( + scan_task_sender, + tokio::spawn(scan_task_executor(scan_task_receiver).in_current_span()), + ) +} + +pub async fn scan_task_executor( + mut scan_task_receiver: Receiver, +) -> Result<(), BoxError> { + let mut scan_range_tasks = FuturesUnordered::new(); + + // Push a pending future so that `.next()` will always return `Some` + scan_range_tasks.push(tokio::spawn( + std::future::pending::>().boxed(), + )); + + loop { + tokio::select! { + Some(scan_range_task) = scan_task_receiver.recv() => { + // TODO: Add a long timeout? + scan_range_tasks.push(scan_range_task.spawn()); + } + + Some(finished_task) = scan_range_tasks.next() => { + // Return early if there's an error + finished_task.expect("futures unordered with pending future should always return Some")?; + } + } + } +} diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/service/scan_task/scan.rs similarity index 81% rename from zebra-scan/src/scan.rs rename to zebra-scan/src/service/scan_task/scan.rs index 522f895dd..9af1e3ef9 100644 --- a/zebra-scan/src/scan.rs +++ b/zebra-scan/src/service/scan_task/scan.rs @@ -2,10 +2,7 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::{ - mpsc::{Receiver, TryRecvError}, - Arc, - }, + sync::{mpsc::Receiver, Arc}, time::Duration, }; @@ -40,11 +37,17 @@ use zebra_chain::{ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex}; use crate::{ - init::ScanTaskCommand, + service::{ScanTask, ScanTaskCommand}, storage::{SaplingScanningKey, Storage}, - Config, ScanTask, + Config, }; +use super::executor; + +mod scan_range; + +pub use scan_range::ScanRangeTaskBuilder; + /// The generic state type used by the scanner. pub type State = Buffer< BoxService, @@ -59,7 +62,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15); /// The amount of time between checking for new blocks and starting new scans. /// /// This is just under half the target block interval. -const CHECK_INTERVAL: Duration = Duration::from_secs(30); +pub const CHECK_INTERVAL: Duration = Duration::from_secs(30); /// We log an info log with progress after this many blocks. const INFO_LOG_INTERVAL: u32 = 10_000; @@ -76,15 +79,12 @@ pub async fn start( let sapling_activation_height = storage.min_sapling_birthday_height(); // Do not scan and notify if we are below sapling activation height. - loop { - let tip_height = tip_height(state.clone()).await?; - if tip_height < sapling_activation_height { - info!("scanner is waiting for sapling activation. Current tip: {}, Sapling activation: {}", tip_height.0, sapling_activation_height.0); - tokio::time::sleep(CHECK_INTERVAL).await; - continue; - } - break; - } + wait_for_height( + sapling_activation_height, + "Sapling activation", + state.clone(), + ) + .await?; // Read keys from the storage on disk, which can block async execution. let key_storage = storage.clone(); @@ -97,7 +97,7 @@ pub async fn start( // Parse and convert keys once, then use them to scan all blocks. // There is some cryptography here, but it should be fast even with thousands of keys. - let parsed_keys: HashMap< + let mut parsed_keys: HashMap< SaplingScanningKey, (Vec, Vec), > = key_heights @@ -107,18 +107,42 @@ pub async fn start( Ok::<_, Report>((key.clone(), parsed_keys)) }) .try_collect()?; - let mut parsed_keys = Arc::new(parsed_keys); + + let (scan_task_sender, scan_task_executor_handle) = executor::spawn_init(); + let mut scan_task_executor_handle = Some(scan_task_executor_handle); // Give empty states time to verify some blocks before we start scanning. tokio::time::sleep(INITIAL_WAIT).await; loop { - parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?; + if let Some(handle) = scan_task_executor_handle { + if handle.is_finished() { + warn!("scan task finished unexpectedly"); + + handle.await?.map_err(|err| eyre!(err))?; + return Ok(()); + } else { + scan_task_executor_handle = Some(handle); + } + } + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + // TODO: Check if the `start_height` is at or above the current height + if !new_keys.is_empty() { + let state = state.clone(); + let storage = storage.clone(); + + scan_task_sender + .send(ScanRangeTaskBuilder::new(height, new_keys, state, storage)) + .await + .expect("scan_until_task channel should not be closed"); + } let scanned_height = scan_height_and_store_results( height, state.clone(), - chain_tip_change.clone(), + Some(chain_tip_change.clone()), storage.clone(), key_heights.clone(), parsed_keys.clone(), @@ -137,56 +161,28 @@ pub async fn start( } } -impl ScanTask { - /// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver - /// - /// Processes messages in the scan task channel, updating `parsed_keys` if required. - /// - /// Returns the updated `parsed_keys` - fn process_msgs( - cmd_receiver: &Receiver, - mut parsed_keys: Arc< - HashMap, Vec)>, - >, - ) -> Result< - Arc, Vec)>>, - Report, - > { - loop { - let cmd = match cmd_receiver.try_recv() { - Ok(cmd) => cmd, - - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - // Return early if the sender has been dropped. - return Err(eyre!("command channel disconnected")); - } - }; - - match cmd { - ScanTaskCommand::RemoveKeys { done_tx, keys } => { - // TODO: Replace with Arc::unwrap_or_clone() when it stabilises: - // https://github.com/rust-lang/rust/issues/93610 - let mut updated_parsed_keys = - Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone()); - - for key in keys { - updated_parsed_keys.remove(&key); - } - - parsed_keys = Arc::new(updated_parsed_keys); - - // Ignore send errors for the done notification - let _ = done_tx.send(()); - } - - _ => continue, - } +/// Polls state service for tip height every [`CHECK_INTERVAL`] until the tip reaches the provided `tip_height` +pub async fn wait_for_height( + height: Height, + height_name: &'static str, + state: State, +) -> Result<(), Report> { + loop { + let tip_height = tip_height(state.clone()).await?; + if tip_height < height { + info!( + "scanner is waiting for {height_name}. Current tip: {}, {height_name}: {}", + tip_height.0, height.0 + ); + tokio::time::sleep(CHECK_INTERVAL).await; + continue; } - - Ok(parsed_keys) + break; } + + Ok(()) } + /// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the /// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it. /// @@ -197,12 +193,10 @@ impl ScanTask { pub async fn scan_height_and_store_results( height: Height, mut state: State, - chain_tip_change: ChainTipChange, + chain_tip_change: Option, storage: Storage, key_last_scanned_heights: Arc>, - parsed_keys: Arc< - HashMap, Vec)>, - >, + parsed_keys: HashMap, Vec)>, ) -> Result, Report> { let network = storage.network(); @@ -227,31 +221,28 @@ pub async fn scan_height_and_store_results( _ => unreachable!("unmatched response to a state::Block request"), }; - // Scan it with all the keys. - // - // TODO: scan each key in parallel (after MVP?) - for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate() - { - // Only scan what was not scanned for each key - if height <= *last_scanned_height { - continue; - } + for (key_index_in_task, (sapling_key, (dfvks, ivks))) in parsed_keys.into_iter().enumerate() { + match key_last_scanned_heights.get(&sapling_key) { + // Only scan what was not scanned for each key + Some(last_scanned_height) if height <= *last_scanned_height => continue, - // # Security - // - // We can't log `sapling_key` here because it is a private viewing key. Anyone who reads - // the logs could use the key to view those transactions. - if is_info_log { - info!( - "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}", - key_num, last_scanned_height.next().expect("height is not maximum").as_usize(), - height.as_usize(), - chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(), - ); - } + Some(last_scanned_height) if is_info_log => { + if let Some(chain_tip_change) = &chain_tip_change { + // # Security + // + // We can't log `sapling_key` here because it is a private viewing key. Anyone who reads + // the logs could use the key to view those transactions. + info!( + "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}", + key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(), + height.as_usize(), + chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(), + ); + } + } - // Get the pre-parsed keys for this configured key. - let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default(); + _other => {} + }; let sapling_key = sapling_key.clone(); let block = block.clone(); diff --git a/zebra-scan/src/service/scan_task/scan/scan_range.rs b/zebra-scan/src/service/scan_task/scan/scan_range.rs new file mode 100644 index 000000000..4e16d4ccd --- /dev/null +++ b/zebra-scan/src/service/scan_task/scan/scan_range.rs @@ -0,0 +1,127 @@ +//! Functions for registering new keys in the scan task + +use std::{collections::HashMap, sync::Arc}; + +use tokio::task::JoinHandle; +use tracing::Instrument; +use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey}; +use zebra_chain::{block::Height, BoxError}; +use zebra_state::SaplingScanningKey; + +use crate::{ + scan::{scan_height_and_store_results, wait_for_height, State, CHECK_INTERVAL}, + storage::Storage, +}; + +/// A builder for a scan until task +pub struct ScanRangeTaskBuilder { + /// The range of block heights that should be scanned for these keys + // TODO: Remove start heights from keys and require that all keys per task use the same start height + height_range: std::ops::Range, + + /// The keys to be used for scanning blocks in this task + keys: HashMap, Vec, Height)>, + + /// A handle to the state service for reading the blocks and the chain tip height + state: State, + + /// A handle to the zebra-scan database for storing results + storage: Storage, +} + +impl ScanRangeTaskBuilder { + /// Creates a new [`ScanRangeTaskBuilder`] + pub fn new( + stop_height: Height, + keys: HashMap< + SaplingScanningKey, + (Vec, Vec, Height), + >, + state: State, + storage: Storage, + ) -> Self { + Self { + height_range: Height::MIN..stop_height, + keys, + state, + storage, + } + } + + /// Spawns a `scan_range()` task and returns its [`JoinHandle`] + // TODO: return a tuple with a shutdown sender + pub fn spawn(self) -> JoinHandle> { + let Self { + height_range, + keys, + state, + storage, + } = self; + + tokio::spawn(scan_range(height_range.end, keys, state, storage).in_current_span()) + } +} + +/// Start a scan task that reads blocks from `state` within the provided height range, +/// scans them with the configured keys in `storage`, and then writes the results to `storage`. +// TODO: update the first parameter to `std::ops::Range` +pub async fn scan_range( + stop_before_height: Height, + keys: HashMap, Vec, Height)>, + state: State, + storage: Storage, +) -> Result<(), BoxError> { + let sapling_activation_height = storage.min_sapling_birthday_height(); + // Do not scan and notify if we are below sapling activation height. + wait_for_height( + sapling_activation_height, + "Sapling activation", + state.clone(), + ) + .await?; + + let key_heights: HashMap = keys + .iter() + .map(|(key, (_, _, height))| (key.clone(), *height)) + .collect(); + let key_heights = Arc::new(key_heights); + + let mut height = key_heights + .values() + .cloned() + .min() + .unwrap_or(sapling_activation_height); + + // Parse and convert keys once, then use them to scan all blocks. + let parsed_keys: HashMap< + SaplingScanningKey, + (Vec, Vec), + > = keys + .into_iter() + .map(|(key, (decoded_dfvks, decoded_ivks, _h))| (key, (decoded_dfvks, decoded_ivks))) + .collect(); + + while height < stop_before_height { + let scanned_height = scan_height_and_store_results( + height, + state.clone(), + None, + storage.clone(), + key_heights.clone(), + parsed_keys.clone(), + ) + .await?; + + // If we've reached the tip, sleep for a while then try and get the same block. + if scanned_height.is_none() { + tokio::time::sleep(CHECK_INTERVAL).await; + continue; + } + + height = height + .next() + .expect("a valid blockchain never reaches the max height"); + } + + Ok(()) +} diff --git a/zebra-scan/src/service/scan_task/tests.rs b/zebra-scan/src/service/scan_task/tests.rs new file mode 100644 index 000000000..81eb3f8bc --- /dev/null +++ b/zebra-scan/src/service/scan_task/tests.rs @@ -0,0 +1,26 @@ +//! Tests for the scan task. + +use std::sync::{ + mpsc::{self, Receiver}, + Arc, +}; + +use super::{ScanTask, ScanTaskCommand}; + +#[cfg(test)] +mod vectors; + +impl ScanTask { + /// Spawns a new [`ScanTask`] for tests. + pub fn mock() -> (Self, Receiver) { + let (cmd_sender, cmd_receiver) = mpsc::channel(); + + ( + Self { + handle: Arc::new(tokio::spawn(std::future::pending())), + cmd_sender, + }, + cmd_receiver, + ) + } +} diff --git a/zebra-scan/src/service/scan_task/tests/vectors.rs b/zebra-scan/src/service/scan_task/tests/vectors.rs new file mode 100644 index 000000000..2d9d46063 --- /dev/null +++ b/zebra-scan/src/service/scan_task/tests/vectors.rs @@ -0,0 +1,165 @@ +//! Fixed test vectors for the scan task. + +use std::collections::HashMap; + +use color_eyre::Report; + +use zebra_chain::block::Height; + +use crate::service::ScanTask; + +/// Test that [`ScanTask::process_messages`] adds and removes keys as expected for `RegisterKeys` and `DeleteKeys` command +#[tokio::test] +async fn scan_task_processes_messages_correctly() -> Result<(), Report> { + let (mut mock_scan_task, cmd_receiver) = ScanTask::mock(); + let mut parsed_keys = HashMap::new(); + + // Send some keys to be registered + let num_keys = 10; + mock_scan_task.register_keys( + (0..num_keys) + .map(|i| (i.to_string(), (vec![], vec![], Height::MIN))) + .collect(), + )?; + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + // Check that it updated parsed_keys correctly and returned the right new keys when starting with an empty state + + assert_eq!( + new_keys.len(), + num_keys, + "should add all received keys to new keys" + ); + + assert_eq!( + parsed_keys.len(), + num_keys, + "should add all received keys to parsed keys" + ); + + mock_scan_task.register_keys( + (0..num_keys) + .map(|i| (i.to_string(), (vec![], vec![], Height::MIN))) + .collect(), + )?; + + // Check that no key should be added if they are all already known and the heights are the same + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + assert_eq!( + parsed_keys.len(), + num_keys, + "should not add existing keys to parsed keys" + ); + + assert!( + new_keys.is_empty(), + "should not return known keys as new keys" + ); + + // Check that it returns the last seen start height for a key as the new key when receiving 2 register key messages + + mock_scan_task.register_keys( + (10..20) + .map(|i| (i.to_string(), (vec![], vec![], Height::MIN))) + .collect(), + )?; + + mock_scan_task.register_keys( + (10..15) + .map(|i| (i.to_string(), (vec![], vec![], Height::MAX))) + .collect(), + )?; + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + assert_eq!( + parsed_keys.len(), + 20, + "should not add existing keys to parsed keys" + ); + + assert_eq!( + new_keys.len(), + 10, + "should add 10 of received keys to new keys" + ); + + for (new_key, (_, _, start_height)) in new_keys { + if (10..15).contains(&new_key.parse::().expect("should parse into int")) { + assert_eq!( + start_height, + Height::MAX, + "these key heights should have been overwritten by the second message" + ); + } + } + + // Check that it removes keys correctly + + let done_rx = + mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::>())?; + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + // Check that it sends the done notification successfully before returning and dropping `done_tx` + done_rx.await?; + + assert!( + parsed_keys.is_empty(), + "all parsed keys should have been removed" + ); + + assert!(new_keys.is_empty(), "there should be no new keys"); + + // Check that it doesn't return removed keys as new keys when processing a batch of messages + + mock_scan_task.register_keys( + (0..200) + .map(|i| (i.to_string(), (vec![], vec![], Height::MAX))) + .collect(), + )?; + + mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::>())?; + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + assert!( + new_keys.is_empty(), + "all registered keys should be removed before process_messages returns" + ); + + // Check that it does return registered keys if they were removed in a prior message when processing a batch of messages + + mock_scan_task.register_keys( + (0..200) + .map(|i| (i.to_string(), (vec![], vec![], Height::MAX))) + .collect(), + )?; + + mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::>())?; + + mock_scan_task.register_keys( + (0..2) + .map(|i| (i.to_string(), (vec![], vec![], Height::MAX))) + .collect(), + )?; + + let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; + + assert_eq!( + new_keys.len(), + 2, + "should return 2 keys as new_keys after removals" + ); + + assert_eq!( + parsed_keys.len(), + 2, + "should add 2 keys to parsed_keys after removals" + ); + + Ok(()) +} diff --git a/zebra-scan/src/service/tests.rs b/zebra-scan/src/service/tests.rs index 6d29d00d2..bc3d91e69 100644 --- a/zebra-scan/src/service/tests.rs +++ b/zebra-scan/src/service/tests.rs @@ -9,8 +9,7 @@ use zebra_node_services::scan_service::{request::Request, response::Response}; use zebra_state::TransactionIndex; use crate::{ - init::ScanTaskCommand, - service::ScanService, + service::{scan_task::ScanTaskCommand, ScanService}, storage::db::tests::{fake_sapling_results, new_test_storage}, tests::ZECPAGES_SAPLING_VIEWING_KEY, }; diff --git a/zebra-scan/src/storage/db/sapling.rs b/zebra-scan/src/storage/db/sapling.rs index e40a2b132..8ad0fcce7 100644 --- a/zebra-scan/src/storage/db/sapling.rs +++ b/zebra-scan/src/storage/db/sapling.rs @@ -235,7 +235,7 @@ impl Storage { } /// Delete the sapling keys and their results, if they exist, - pub(crate) fn delete_sapling_keys(&mut self, keys: Vec) { + pub fn delete_sapling_keys(&mut self, keys: Vec) { self.sapling_tx_ids_cf() .new_batch_for_writing() .delete_sapling_keys(keys) diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index bbc8c6983..78af9b152 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -305,7 +305,7 @@ impl StartCmd { if !config.shielded_scan.sapling_keys_to_scan.is_empty() { // TODO: log the number of keys and update the scan_task_starts() test info!("spawning shielded scanner with configured viewing keys"); - let scan_task = zebra_scan::ScanTask::spawn( + let scan_task = zebra_scan::service::scan_task::ScanTask::spawn( &config.shielded_scan, config.network.network, state, diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 93ab9f3cd..91ea1c8ac 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -109,17 +109,25 @@ //! Example of how to run the get_block_template test: //! //! ```console -//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test get_block_template --features getblocktemplate-rpcs --release -- --ignored --nocapture +//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test get_block_template --features getblocktemplate-rpcs --release -- --ignored --nocapture //! ``` //! //! Example of how to run the submit_block test: //! //! ```console -//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test submit_block --features getblocktemplate-rpcs --release -- --ignored --nocapture +//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test submit_block --features getblocktemplate-rpcs --release -- --ignored --nocapture //! ``` //! //! Please refer to the documentation of each test for more information. //! +//! ## Shielded scanning tests +//! +//! Example of how to run the scans_for_new_key test: +//! +//! ```console +//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test scans_for_new_key --features shielded-scan --release -- --ignored --nocapture +//! ``` +//! //! ## Checkpoint Generation Tests //! //! Generate checkpoints on mainnet and testnet using a cached state: @@ -2842,8 +2850,8 @@ fn scan_task_starts() -> Result<()> { zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?; // Look for 2 scanner notices indicating we are below sapling activation. - zebrad.expect_stdout_line_matches("scanner is waiting for sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?; - zebrad.expect_stdout_line_matches("scanner is waiting for sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?; + zebrad.expect_stdout_line_matches("scanner is waiting for Sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?; + zebrad.expect_stdout_line_matches("scanner is waiting for Sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?; // Kill the node. zebrad.kill(false)?; @@ -2953,3 +2961,14 @@ fn scan_start_where_left() -> Result<()> { Ok(()) } + +/// Test successful registration of a new key in the scan task. +/// +/// See [`common::shielded_scan::register_key`] for more information. +// TODO: Add this test to CI (#8236) +#[tokio::test] +#[ignore] +#[cfg(feature = "shielded-scan")] +async fn scans_for_new_key() -> Result<()> { + common::shielded_scan::scans_for_new_key::run().await +} diff --git a/zebrad/tests/common/config.rs b/zebrad/tests/common/config.rs index 342e7d6ef..b75113e8c 100644 --- a/zebrad/tests/common/config.rs +++ b/zebrad/tests/common/config.rs @@ -80,10 +80,9 @@ pub fn default_test_config(net: Network) -> Result { mining.miner_address = Some(miner_address.parse().expect("hard-coded address is valid")); } - #[cfg(feature = "shielded_scan")] + #[cfg(feature = "shielded-scan")] { - let mut shielded_scan = zebra_scan::Config::default(); - shielded_scan.ephemeral = true; + let shielded_scan = zebra_scan::Config::ephemeral(); let config = ZebradConfig { network, @@ -97,10 +96,11 @@ pub fn default_test_config(net: Network) -> Result { ..ZebradConfig::default() }; - return Ok(config); + Ok(config) } - let config = ZebradConfig { + #[cfg(not(feature = "shielded-scan"))] + Ok(ZebradConfig { network, state, sync, @@ -109,9 +109,7 @@ pub fn default_test_config(net: Network) -> Result { tracing, mining, ..ZebradConfig::default() - }; - - Ok(config) + }) } pub fn persistent_test_config(network: Network) -> Result { diff --git a/zebrad/tests/common/mod.rs b/zebrad/tests/common/mod.rs index 77627d027..703831248 100644 --- a/zebrad/tests/common/mod.rs +++ b/zebrad/tests/common/mod.rs @@ -23,3 +23,6 @@ pub mod checkpoints; #[cfg(feature = "getblocktemplate-rpcs")] pub mod get_block_template_rpcs; + +#[cfg(feature = "shielded-scan")] +pub mod shielded_scan; diff --git a/zebrad/tests/common/shielded_scan.rs b/zebrad/tests/common/shielded_scan.rs new file mode 100644 index 000000000..d30d4bd5b --- /dev/null +++ b/zebrad/tests/common/shielded_scan.rs @@ -0,0 +1,3 @@ +//! Acceptance tests for `shielded-scan`` feature in zebrad. + +pub(crate) mod scans_for_new_key; diff --git a/zebrad/tests/common/shielded_scan/scans_for_new_key.rs b/zebrad/tests/common/shielded_scan/scans_for_new_key.rs new file mode 100644 index 000000000..35f0c320f --- /dev/null +++ b/zebrad/tests/common/shielded_scan/scans_for_new_key.rs @@ -0,0 +1,133 @@ +//! Test registering and scanning for a new key in the scan task while zebrad is running. +//! +//! This test requires a cached chain state that is partially synchronized past the +//! Sapling activation height and [`REQUIRED_MIN_TIP_HEIGHT`] +//! +//! export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state" +//! cargo test scans_for_new_key --features="shielded-scan" -- --ignored --nocapture + +use std::{collections::HashMap, time::Duration}; + +use color_eyre::{eyre::eyre, Result}; + +use tower::ServiceBuilder; +use zebra_chain::{ + block::Height, + chain_tip::ChainTip, + parameters::{Network, NetworkUpgrade}, +}; +use zebra_scan::{ + scan::sapling_key_to_scan_block_keys, service::ScanTask, storage::Storage, + tests::ZECPAGES_SAPLING_VIEWING_KEY, DiversifiableFullViewingKey, SaplingIvk, +}; +use zebra_state::SaplingScanningKey; + +use crate::common::{ + cached_state::start_state_service_with_cache_dir, launch::can_spawn_zebrad_for_test_type, + test_type::TestType, +}; + +/// The minimum required tip height for the cached state in this test. +const REQUIRED_MIN_TIP_HEIGHT: Height = Height(1_000_000); + +/// How long this test waits after registering keys to check if there are any results. +const WAIT_FOR_RESULTS_DURATION: Duration = Duration::from_secs(10 * 60); + +/// Initialize Zebra's state service with a cached state, add a new key to the scan task, and +/// check that it stores results for the new key without errors. +pub(crate) async fn run() -> Result<()> { + let _init_guard = zebra_test::init(); + + let test_type = TestType::UpdateZebraCachedStateNoRpc; + let test_name = "scans_for_new_key"; + let network = Network::Mainnet; + + // Skip the test unless the user specifically asked for it and there is a zebrad_state_path + if !can_spawn_zebrad_for_test_type(test_name, test_type, true) { + return Ok(()); + } + + tracing::info!( + ?network, + ?test_type, + "running scans_for_new_key test using zebra state service", + ); + + let zebrad_state_path = test_type + .zebrad_state_path(test_name) + .expect("already checked that there is a cached state path"); + + let shielded_scan_config = zebra_scan::Config::default(); + + let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) = + start_state_service_with_cache_dir(network, zebrad_state_path).await?; + + let chain_tip_height = latest_chain_tip + .best_tip_height() + .ok_or_else(|| eyre!("State directory doesn't have a chain tip block"))?; + + let sapling_activation_height = NetworkUpgrade::Sapling + .activation_height(network) + .expect("there should be an activation height for Mainnet"); + + assert!( + sapling_activation_height < REQUIRED_MIN_TIP_HEIGHT, + "minimum tip height should be above sapling activation height" + ); + + assert!( + REQUIRED_MIN_TIP_HEIGHT < chain_tip_height, + "chain tip height must be above required minimum tip height" + ); + + tracing::info!("opened state service with valid chain tip height, deleting any past keys in db and starting scan task",); + + { + // Before spawning `ScanTask`, delete past results for the zecpages key, if any. + let mut storage = Storage::new(&shielded_scan_config, network, false); + storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]); + } + + let state = ServiceBuilder::new().buffer(10).service(state_service); + + let mut scan_task = ScanTask::spawn(&shielded_scan_config, network, state, chain_tip_change); + + let (zecpages_dfvks, zecpages_ivks) = + sapling_key_to_scan_block_keys(&ZECPAGES_SAPLING_VIEWING_KEY.to_string(), network)?; + + let mut parsed_keys: HashMap< + SaplingScanningKey, + (Vec, Vec, Height), + > = HashMap::new(); + + parsed_keys.insert( + ZECPAGES_SAPLING_VIEWING_KEY.to_string(), + (zecpages_dfvks, zecpages_ivks, Height::MIN), + ); + + tracing::info!("started scan task, sending register keys message with zecpages key to start scanning for a new key",); + + scan_task.register_keys(parsed_keys)?; + + tracing::info!( + ?WAIT_FOR_RESULTS_DURATION, + "sent message, waiting for scan task to add some results", + ); + + // Wait for the scan task to add some results + tokio::time::sleep(WAIT_FOR_RESULTS_DURATION).await; + + // Check that there are some results in the database for the key + + let storage = Storage::new(&shielded_scan_config, network, true); + + let results = storage.sapling_results(&ZECPAGES_SAPLING_VIEWING_KEY.to_string()); + + // Check that some results were added for the zecpages key that was not in the config or the db when ScanTask started. + assert!( + !results.is_empty(), + "there should be results for the newly registered key" + ); + + Ok(()) +}