add(scan): Handle RegisterKeys messages in scan task (#8222)

* moves ScanTask to its own module in service module

* moves `process_messages()` method to scan_task.rs

* uses get_mut() and returns new keys

* updates types and adds scan_until() function

* adds and uses wait_for_height() function

* spawns scan_until_task_handler to poll scan_until_tasks FuturesUnordered

* corrects comment

* adds TODO

* updates expected test log, corrects panic msg, fixes fmt

* moves scan functions to scan_task module

* moves ScanTaskCommand and related method impls to its own module

* moves `ScanTask::mock()` test constructor to its own module

* creates `add_keys` mod and moves `scan_until()` function there

* moves scan task executor to its own module and adds ScanRangeTaskBuilder type

* renames add_keys to scan_range, moves executor to scan_task mod

* adds test for process_messages

* updates scan_height_and_store_results() to skip last_scanned_height check if there is no key height for a key in `parsed_keys`

* updates `default_test_config()`

* adds acceptance test for registering keys in ScanTask

* uses the right feature for the new acceptance test

* Applies suggestions from code review
This commit is contained in:
Arya 2024-02-06 13:41:00 -05:00 committed by GitHub
parent deed6f8a62
commit 2a004ffe9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 850 additions and 219 deletions

View File

@ -1,111 +1,13 @@
//! Initializing the scanner.
use std::sync::{mpsc, Arc};
//! Initializing the scanner and gRPC server.
use color_eyre::Report;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::ServiceBuilder;
use zebra_chain::{parameters::Network, transaction::Transaction};
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;
use crate::{scan, service::ScanService, Config};
#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es
/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,
/// Key hashes that are to be removed
keys: Vec<String>,
},
/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,
/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}
#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: Arc<JoinHandle<Result<(), Report>>>,
/// Task command channel sender
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}
impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
#[cfg(any(test, feature = "proptest-impl"))]
pub fn mock() -> (Self, mpsc::Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();
(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let (cmd_sender, cmd_receiver) = mpsc::channel();
Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}
/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();
self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;
Ok(done_rx)
}
}
/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.

View File

@ -9,14 +9,18 @@ extern crate tracing;
pub mod config;
pub mod init;
pub mod scan;
pub mod storage;
use zebra_node_services::scan_service::{request::Request, response::Response};
pub mod service;
pub use service::scan_task::scan;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;
pub use config::Config;
pub use init::{init, ScanTask};
pub use init::init;
pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};

View File

@ -9,11 +9,18 @@ use zebra_chain::{parameters::Network, transaction::Hash};
use zebra_state::ChainTipChange;
use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
use crate::{scan, storage::Storage, Config, Request, Response};
#[cfg(test)]
mod tests;
pub mod scan_task;
pub use scan_task::{ScanTask, ScanTaskCommand};
#[cfg(any(test, feature = "proptest-impl"))]
use std::sync::mpsc::Receiver;
/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
@ -42,13 +49,9 @@ impl ScanService {
}
/// Create a new [`ScanService`] with a mock `ScanTask`
// TODO: Move this to tests behind `cfg(any(test, feature = "proptest-impl"))`
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_mock_scanner(
db: Storage,
) -> (
Self,
std::sync::mpsc::Receiver<crate::init::ScanTaskCommand>,
) {
pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver<ScanTaskCommand>) {
let (scan_task, cmd_receiver) = ScanTask::mock();
(Self { db, scan_task }, cmd_receiver)
}

View File

@ -0,0 +1,54 @@
//! Types and method implementations for [`ScanTask`]
use std::sync::{mpsc, Arc};
use color_eyre::Report;
use tokio::task::JoinHandle;
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;
use crate::Config;
mod commands;
mod executor;
pub mod scan;
pub use commands::ScanTaskCommand;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;
#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: Arc<JoinHandle<Result<(), Report>>>,
/// Task command channel sender
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}
impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();
Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}
}

View File

@ -0,0 +1,153 @@
//! Types and method implementations for [`ScanTaskCommand`]
use std::{
collections::HashMap,
sync::{
mpsc::{self, Receiver, TryRecvError},
Arc,
},
};
use color_eyre::{eyre::eyre, Report};
use tokio::sync::oneshot;
use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
use zebra_chain::{block::Height, transaction::Transaction};
use zebra_state::SaplingScanningKey;
use super::ScanTask;
#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys {
/// New keys to start scanning for
keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height),
>,
},
/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,
/// Key hashes that are to be removed
keys: Vec<String>,
},
/// Start sending results for key hashes to `result_sender`
// TODO: Implement this command (#8206)
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,
/// Key hashes to send the results of to result channel
keys: Vec<String>,
},
}
impl ScanTask {
/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns newly registered keys for scanning.
pub fn process_messages(
cmd_receiver: &Receiver<ScanTaskCommand>,
parsed_keys: &mut HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
>,
) -> Result<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height)>,
Report,
> {
let mut new_keys = HashMap::new();
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};
match cmd {
ScanTaskCommand::RegisterKeys { keys } => {
let keys: Vec<_> = keys
.into_iter()
.filter(|(key, _)| {
!parsed_keys.contains_key(key) || new_keys.contains_key(key)
})
.collect();
if !keys.is_empty() {
new_keys.extend(keys.clone());
let keys =
keys.into_iter()
.map(|(key, (decoded_dfvks, decoded_ivks, _h))| {
(key, (decoded_dfvks, decoded_ivks))
});
parsed_keys.extend(keys);
}
}
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
for key in keys {
parsed_keys.remove(&key);
new_keys.remove(&key);
}
// Ignore send errors for the done notification, caller is expected to use a timeout.
let _ = done_tx.send(());
}
_ => continue,
}
}
Ok(new_keys)
}
/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();
self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;
Ok(done_rx)
}
/// Sends a message to the scan task to start scanning for the provided viewing keys.
pub fn register_keys(
&mut self,
keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height),
>,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.send(ScanTaskCommand::RegisterKeys { keys })
}
}

View File

@ -0,0 +1,51 @@
//! The scan task executor
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use tracing::Instrument;
use zebra_chain::BoxError;
use super::scan::ScanRangeTaskBuilder;
const EXECUTOR_BUFFER_SIZE: usize = 100;
pub fn spawn_init() -> (
Sender<ScanRangeTaskBuilder>,
JoinHandle<Result<(), BoxError>>,
) {
// TODO: Use a bounded channel.
let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE);
(
scan_task_sender,
tokio::spawn(scan_task_executor(scan_task_receiver).in_current_span()),
)
}
pub async fn scan_task_executor(
mut scan_task_receiver: Receiver<ScanRangeTaskBuilder>,
) -> Result<(), BoxError> {
let mut scan_range_tasks = FuturesUnordered::new();
// Push a pending future so that `.next()` will always return `Some`
scan_range_tasks.push(tokio::spawn(
std::future::pending::<Result<(), BoxError>>().boxed(),
));
loop {
tokio::select! {
Some(scan_range_task) = scan_task_receiver.recv() => {
// TODO: Add a long timeout?
scan_range_tasks.push(scan_range_task.spawn());
}
Some(finished_task) = scan_range_tasks.next() => {
// Return early if there's an error
finished_task.expect("futures unordered with pending future should always return Some")?;
}
}
}
}

View File

@ -2,10 +2,7 @@
use std::{
collections::{BTreeMap, HashMap},
sync::{
mpsc::{Receiver, TryRecvError},
Arc,
},
sync::{mpsc::Receiver, Arc},
time::Duration,
};
@ -40,11 +37,17 @@ use zebra_chain::{
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use crate::{
init::ScanTaskCommand,
service::{ScanTask, ScanTaskCommand},
storage::{SaplingScanningKey, Storage},
Config, ScanTask,
Config,
};
use super::executor;
mod scan_range;
pub use scan_range::ScanRangeTaskBuilder;
/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
@ -59,7 +62,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15);
/// The amount of time between checking for new blocks and starting new scans.
///
/// This is just under half the target block interval.
const CHECK_INTERVAL: Duration = Duration::from_secs(30);
pub const CHECK_INTERVAL: Duration = Duration::from_secs(30);
/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 10_000;
@ -76,15 +79,12 @@ pub async fn start(
let sapling_activation_height = storage.min_sapling_birthday_height();
// Do not scan and notify if we are below sapling activation height.
loop {
let tip_height = tip_height(state.clone()).await?;
if tip_height < sapling_activation_height {
info!("scanner is waiting for sapling activation. Current tip: {}, Sapling activation: {}", tip_height.0, sapling_activation_height.0);
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
break;
}
wait_for_height(
sapling_activation_height,
"Sapling activation",
state.clone(),
)
.await?;
// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
@ -97,7 +97,7 @@ pub async fn start(
// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
let mut parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_heights
@ -107,18 +107,42 @@ pub async fn start(
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;
let mut parsed_keys = Arc::new(parsed_keys);
let (scan_task_sender, scan_task_executor_handle) = executor::spawn_init();
let mut scan_task_executor_handle = Some(scan_task_executor_handle);
// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;
loop {
parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?;
if let Some(handle) = scan_task_executor_handle {
if handle.is_finished() {
warn!("scan task finished unexpectedly");
handle.await?.map_err(|err| eyre!(err))?;
return Ok(());
} else {
scan_task_executor_handle = Some(handle);
}
}
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
// TODO: Check if the `start_height` is at or above the current height
if !new_keys.is_empty() {
let state = state.clone();
let storage = storage.clone();
scan_task_sender
.send(ScanRangeTaskBuilder::new(height, new_keys, state, storage))
.await
.expect("scan_until_task channel should not be closed");
}
let scanned_height = scan_height_and_store_results(
height,
state.clone(),
chain_tip_change.clone(),
Some(chain_tip_change.clone()),
storage.clone(),
key_heights.clone(),
parsed_keys.clone(),
@ -137,56 +161,28 @@ pub async fn start(
}
}
impl ScanTask {
/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns the updated `parsed_keys`
fn process_msgs(
cmd_receiver: &Receiver<ScanTaskCommand>,
mut parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<
Arc<HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>>,
Report,
> {
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};
match cmd {
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
// TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
// https://github.com/rust-lang/rust/issues/93610
let mut updated_parsed_keys =
Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone());
for key in keys {
updated_parsed_keys.remove(&key);
}
parsed_keys = Arc::new(updated_parsed_keys);
// Ignore send errors for the done notification
let _ = done_tx.send(());
}
_ => continue,
}
/// Polls state service for tip height every [`CHECK_INTERVAL`] until the tip reaches the provided `tip_height`
pub async fn wait_for_height(
height: Height,
height_name: &'static str,
state: State,
) -> Result<(), Report> {
loop {
let tip_height = tip_height(state.clone()).await?;
if tip_height < height {
info!(
"scanner is waiting for {height_name}. Current tip: {}, {height_name}: {}",
tip_height.0, height.0
);
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
Ok(parsed_keys)
break;
}
Ok(())
}
/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
@ -197,12 +193,10 @@ impl ScanTask {
pub async fn scan_height_and_store_results(
height: Height,
mut state: State,
chain_tip_change: ChainTipChange,
chain_tip_change: Option<ChainTipChange>,
storage: Storage,
key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
parsed_keys: HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
) -> Result<Option<Height>, Report> {
let network = storage.network();
@ -227,31 +221,28 @@ pub async fn scan_height_and_store_results(
_ => unreachable!("unmatched response to a state::Block request"),
};
// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate()
{
// Only scan what was not scanned for each key
if height <= *last_scanned_height {
continue;
}
for (key_index_in_task, (sapling_key, (dfvks, ivks))) in parsed_keys.into_iter().enumerate() {
match key_last_scanned_heights.get(&sapling_key) {
// Only scan what was not scanned for each key
Some(last_scanned_height) if height <= *last_scanned_height => continue,
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
key_num, last_scanned_height.next().expect("height is not maximum").as_usize(),
height.as_usize(),
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
);
}
Some(last_scanned_height) if is_info_log => {
if let Some(chain_tip_change) = &chain_tip_change {
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
info!(
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(),
height.as_usize(),
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
);
}
}
// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();
_other => {}
};
let sapling_key = sapling_key.clone();
let block = block.clone();

View File

@ -0,0 +1,127 @@
//! Functions for registering new keys in the scan task
use std::{collections::HashMap, sync::Arc};
use tokio::task::JoinHandle;
use tracing::Instrument;
use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
use zebra_chain::{block::Height, BoxError};
use zebra_state::SaplingScanningKey;
use crate::{
scan::{scan_height_and_store_results, wait_for_height, State, CHECK_INTERVAL},
storage::Storage,
};
/// A builder for a scan until task
pub struct ScanRangeTaskBuilder {
/// The range of block heights that should be scanned for these keys
// TODO: Remove start heights from keys and require that all keys per task use the same start height
height_range: std::ops::Range<Height>,
/// The keys to be used for scanning blocks in this task
keys: HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height)>,
/// A handle to the state service for reading the blocks and the chain tip height
state: State,
/// A handle to the zebra-scan database for storing results
storage: Storage,
}
impl ScanRangeTaskBuilder {
/// Creates a new [`ScanRangeTaskBuilder`]
pub fn new(
stop_height: Height,
keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height),
>,
state: State,
storage: Storage,
) -> Self {
Self {
height_range: Height::MIN..stop_height,
keys,
state,
storage,
}
}
/// Spawns a `scan_range()` task and returns its [`JoinHandle`]
// TODO: return a tuple with a shutdown sender
pub fn spawn(self) -> JoinHandle<Result<(), BoxError>> {
let Self {
height_range,
keys,
state,
storage,
} = self;
tokio::spawn(scan_range(height_range.end, keys, state, storage).in_current_span())
}
}
/// Start a scan task that reads blocks from `state` within the provided height range,
/// scans them with the configured keys in `storage`, and then writes the results to `storage`.
// TODO: update the first parameter to `std::ops::Range<Height>`
pub async fn scan_range(
stop_before_height: Height,
keys: HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height)>,
state: State,
storage: Storage,
) -> Result<(), BoxError> {
let sapling_activation_height = storage.min_sapling_birthday_height();
// Do not scan and notify if we are below sapling activation height.
wait_for_height(
sapling_activation_height,
"Sapling activation",
state.clone(),
)
.await?;
let key_heights: HashMap<String, Height> = keys
.iter()
.map(|(key, (_, _, height))| (key.clone(), *height))
.collect();
let key_heights = Arc::new(key_heights);
let mut height = key_heights
.values()
.cloned()
.min()
.unwrap_or(sapling_activation_height);
// Parse and convert keys once, then use them to scan all blocks.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = keys
.into_iter()
.map(|(key, (decoded_dfvks, decoded_ivks, _h))| (key, (decoded_dfvks, decoded_ivks)))
.collect();
while height < stop_before_height {
let scanned_height = scan_height_and_store_results(
height,
state.clone(),
None,
storage.clone(),
key_heights.clone(),
parsed_keys.clone(),
)
.await?;
// If we've reached the tip, sleep for a while then try and get the same block.
if scanned_height.is_none() {
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
Ok(())
}

View File

@ -0,0 +1,26 @@
//! Tests for the scan task.
use std::sync::{
mpsc::{self, Receiver},
Arc,
};
use super::{ScanTask, ScanTaskCommand};
#[cfg(test)]
mod vectors;
impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
pub fn mock() -> (Self, Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();
(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}
}

View File

@ -0,0 +1,165 @@
//! Fixed test vectors for the scan task.
use std::collections::HashMap;
use color_eyre::Report;
use zebra_chain::block::Height;
use crate::service::ScanTask;
/// Test that [`ScanTask::process_messages`] adds and removes keys as expected for `RegisterKeys` and `DeleteKeys` command
#[tokio::test]
async fn scan_task_processes_messages_correctly() -> Result<(), Report> {
let (mut mock_scan_task, cmd_receiver) = ScanTask::mock();
let mut parsed_keys = HashMap::new();
// Send some keys to be registered
let num_keys = 10;
mock_scan_task.register_keys(
(0..num_keys)
.map(|i| (i.to_string(), (vec![], vec![], Height::MIN)))
.collect(),
)?;
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
// Check that it updated parsed_keys correctly and returned the right new keys when starting with an empty state
assert_eq!(
new_keys.len(),
num_keys,
"should add all received keys to new keys"
);
assert_eq!(
parsed_keys.len(),
num_keys,
"should add all received keys to parsed keys"
);
mock_scan_task.register_keys(
(0..num_keys)
.map(|i| (i.to_string(), (vec![], vec![], Height::MIN)))
.collect(),
)?;
// Check that no key should be added if they are all already known and the heights are the same
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
assert_eq!(
parsed_keys.len(),
num_keys,
"should not add existing keys to parsed keys"
);
assert!(
new_keys.is_empty(),
"should not return known keys as new keys"
);
// Check that it returns the last seen start height for a key as the new key when receiving 2 register key messages
mock_scan_task.register_keys(
(10..20)
.map(|i| (i.to_string(), (vec![], vec![], Height::MIN)))
.collect(),
)?;
mock_scan_task.register_keys(
(10..15)
.map(|i| (i.to_string(), (vec![], vec![], Height::MAX)))
.collect(),
)?;
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
assert_eq!(
parsed_keys.len(),
20,
"should not add existing keys to parsed keys"
);
assert_eq!(
new_keys.len(),
10,
"should add 10 of received keys to new keys"
);
for (new_key, (_, _, start_height)) in new_keys {
if (10..15).contains(&new_key.parse::<i32>().expect("should parse into int")) {
assert_eq!(
start_height,
Height::MAX,
"these key heights should have been overwritten by the second message"
);
}
}
// Check that it removes keys correctly
let done_rx =
mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::<Vec<_>>())?;
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
// Check that it sends the done notification successfully before returning and dropping `done_tx`
done_rx.await?;
assert!(
parsed_keys.is_empty(),
"all parsed keys should have been removed"
);
assert!(new_keys.is_empty(), "there should be no new keys");
// Check that it doesn't return removed keys as new keys when processing a batch of messages
mock_scan_task.register_keys(
(0..200)
.map(|i| (i.to_string(), (vec![], vec![], Height::MAX)))
.collect(),
)?;
mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::<Vec<_>>())?;
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
assert!(
new_keys.is_empty(),
"all registered keys should be removed before process_messages returns"
);
// Check that it does return registered keys if they were removed in a prior message when processing a batch of messages
mock_scan_task.register_keys(
(0..200)
.map(|i| (i.to_string(), (vec![], vec![], Height::MAX)))
.collect(),
)?;
mock_scan_task.remove_keys(&(0..200).map(|i| i.to_string()).collect::<Vec<_>>())?;
mock_scan_task.register_keys(
(0..2)
.map(|i| (i.to_string(), (vec![], vec![], Height::MAX)))
.collect(),
)?;
let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;
assert_eq!(
new_keys.len(),
2,
"should return 2 keys as new_keys after removals"
);
assert_eq!(
parsed_keys.len(),
2,
"should add 2 keys to parsed_keys after removals"
);
Ok(())
}

View File

@ -9,8 +9,7 @@ use zebra_node_services::scan_service::{request::Request, response::Response};
use zebra_state::TransactionIndex;
use crate::{
init::ScanTaskCommand,
service::ScanService,
service::{scan_task::ScanTaskCommand, ScanService},
storage::db::tests::{fake_sapling_results, new_test_storage},
tests::ZECPAGES_SAPLING_VIEWING_KEY,
};

View File

@ -235,7 +235,7 @@ impl Storage {
}
/// Delete the sapling keys and their results, if they exist,
pub(crate) fn delete_sapling_keys(&mut self, keys: Vec<SaplingScanningKey>) {
pub fn delete_sapling_keys(&mut self, keys: Vec<SaplingScanningKey>) {
self.sapling_tx_ids_cf()
.new_batch_for_writing()
.delete_sapling_keys(keys)

View File

@ -305,7 +305,7 @@ impl StartCmd {
if !config.shielded_scan.sapling_keys_to_scan.is_empty() {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
let scan_task = zebra_scan::ScanTask::spawn(
let scan_task = zebra_scan::service::scan_task::ScanTask::spawn(
&config.shielded_scan,
config.network.network,
state,

View File

@ -109,17 +109,25 @@
//! Example of how to run the get_block_template test:
//!
//! ```console
//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test get_block_template --features getblocktemplate-rpcs --release -- --ignored --nocapture
//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test get_block_template --features getblocktemplate-rpcs --release -- --ignored --nocapture
//! ```
//!
//! Example of how to run the submit_block test:
//!
//! ```console
//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test submit_block --features getblocktemplate-rpcs --release -- --ignored --nocapture
//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test submit_block --features getblocktemplate-rpcs --release -- --ignored --nocapture
//! ```
//!
//! Please refer to the documentation of each test for more information.
//!
//! ## Shielded scanning tests
//!
//! Example of how to run the scans_for_new_key test:
//!
//! ```console
//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test scans_for_new_key --features shielded-scan --release -- --ignored --nocapture
//! ```
//!
//! ## Checkpoint Generation Tests
//!
//! Generate checkpoints on mainnet and testnet using a cached state:
@ -2842,8 +2850,8 @@ fn scan_task_starts() -> Result<()> {
zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?;
// Look for 2 scanner notices indicating we are below sapling activation.
zebrad.expect_stdout_line_matches("scanner is waiting for sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?;
zebrad.expect_stdout_line_matches("scanner is waiting for sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?;
zebrad.expect_stdout_line_matches("scanner is waiting for Sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?;
zebrad.expect_stdout_line_matches("scanner is waiting for Sapling activation. Current tip: [0-9]{1,4}, Sapling activation: 419200")?;
// Kill the node.
zebrad.kill(false)?;
@ -2953,3 +2961,14 @@ fn scan_start_where_left() -> Result<()> {
Ok(())
}
/// Test successful registration of a new key in the scan task.
///
/// See [`common::shielded_scan::register_key`] for more information.
// TODO: Add this test to CI (#8236)
#[tokio::test]
#[ignore]
#[cfg(feature = "shielded-scan")]
async fn scans_for_new_key() -> Result<()> {
common::shielded_scan::scans_for_new_key::run().await
}

View File

@ -80,10 +80,9 @@ pub fn default_test_config(net: Network) -> Result<ZebradConfig> {
mining.miner_address = Some(miner_address.parse().expect("hard-coded address is valid"));
}
#[cfg(feature = "shielded_scan")]
#[cfg(feature = "shielded-scan")]
{
let mut shielded_scan = zebra_scan::Config::default();
shielded_scan.ephemeral = true;
let shielded_scan = zebra_scan::Config::ephemeral();
let config = ZebradConfig {
network,
@ -97,10 +96,11 @@ pub fn default_test_config(net: Network) -> Result<ZebradConfig> {
..ZebradConfig::default()
};
return Ok(config);
Ok(config)
}
let config = ZebradConfig {
#[cfg(not(feature = "shielded-scan"))]
Ok(ZebradConfig {
network,
state,
sync,
@ -109,9 +109,7 @@ pub fn default_test_config(net: Network) -> Result<ZebradConfig> {
tracing,
mining,
..ZebradConfig::default()
};
Ok(config)
})
}
pub fn persistent_test_config(network: Network) -> Result<ZebradConfig> {

View File

@ -23,3 +23,6 @@ pub mod checkpoints;
#[cfg(feature = "getblocktemplate-rpcs")]
pub mod get_block_template_rpcs;
#[cfg(feature = "shielded-scan")]
pub mod shielded_scan;

View File

@ -0,0 +1,3 @@
//! Acceptance tests for `shielded-scan`` feature in zebrad.
pub(crate) mod scans_for_new_key;

View File

@ -0,0 +1,133 @@
//! Test registering and scanning for a new key in the scan task while zebrad is running.
//!
//! This test requires a cached chain state that is partially synchronized past the
//! Sapling activation height and [`REQUIRED_MIN_TIP_HEIGHT`]
//!
//! export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state"
//! cargo test scans_for_new_key --features="shielded-scan" -- --ignored --nocapture
use std::{collections::HashMap, time::Duration};
use color_eyre::{eyre::eyre, Result};
use tower::ServiceBuilder;
use zebra_chain::{
block::Height,
chain_tip::ChainTip,
parameters::{Network, NetworkUpgrade},
};
use zebra_scan::{
scan::sapling_key_to_scan_block_keys, service::ScanTask, storage::Storage,
tests::ZECPAGES_SAPLING_VIEWING_KEY, DiversifiableFullViewingKey, SaplingIvk,
};
use zebra_state::SaplingScanningKey;
use crate::common::{
cached_state::start_state_service_with_cache_dir, launch::can_spawn_zebrad_for_test_type,
test_type::TestType,
};
/// The minimum required tip height for the cached state in this test.
const REQUIRED_MIN_TIP_HEIGHT: Height = Height(1_000_000);
/// How long this test waits after registering keys to check if there are any results.
const WAIT_FOR_RESULTS_DURATION: Duration = Duration::from_secs(10 * 60);
/// Initialize Zebra's state service with a cached state, add a new key to the scan task, and
/// check that it stores results for the new key without errors.
pub(crate) async fn run() -> Result<()> {
let _init_guard = zebra_test::init();
let test_type = TestType::UpdateZebraCachedStateNoRpc;
let test_name = "scans_for_new_key";
let network = Network::Mainnet;
// Skip the test unless the user specifically asked for it and there is a zebrad_state_path
if !can_spawn_zebrad_for_test_type(test_name, test_type, true) {
return Ok(());
}
tracing::info!(
?network,
?test_type,
"running scans_for_new_key test using zebra state service",
);
let zebrad_state_path = test_type
.zebrad_state_path(test_name)
.expect("already checked that there is a cached state path");
let shielded_scan_config = zebra_scan::Config::default();
let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) =
start_state_service_with_cache_dir(network, zebrad_state_path).await?;
let chain_tip_height = latest_chain_tip
.best_tip_height()
.ok_or_else(|| eyre!("State directory doesn't have a chain tip block"))?;
let sapling_activation_height = NetworkUpgrade::Sapling
.activation_height(network)
.expect("there should be an activation height for Mainnet");
assert!(
sapling_activation_height < REQUIRED_MIN_TIP_HEIGHT,
"minimum tip height should be above sapling activation height"
);
assert!(
REQUIRED_MIN_TIP_HEIGHT < chain_tip_height,
"chain tip height must be above required minimum tip height"
);
tracing::info!("opened state service with valid chain tip height, deleting any past keys in db and starting scan task",);
{
// Before spawning `ScanTask`, delete past results for the zecpages key, if any.
let mut storage = Storage::new(&shielded_scan_config, network, false);
storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]);
}
let state = ServiceBuilder::new().buffer(10).service(state_service);
let mut scan_task = ScanTask::spawn(&shielded_scan_config, network, state, chain_tip_change);
let (zecpages_dfvks, zecpages_ivks) =
sapling_key_to_scan_block_keys(&ZECPAGES_SAPLING_VIEWING_KEY.to_string(), network)?;
let mut parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height),
> = HashMap::new();
parsed_keys.insert(
ZECPAGES_SAPLING_VIEWING_KEY.to_string(),
(zecpages_dfvks, zecpages_ivks, Height::MIN),
);
tracing::info!("started scan task, sending register keys message with zecpages key to start scanning for a new key",);
scan_task.register_keys(parsed_keys)?;
tracing::info!(
?WAIT_FOR_RESULTS_DURATION,
"sent message, waiting for scan task to add some results",
);
// Wait for the scan task to add some results
tokio::time::sleep(WAIT_FOR_RESULTS_DURATION).await;
// Check that there are some results in the database for the key
let storage = Storage::new(&shielded_scan_config, network, true);
let results = storage.sapling_results(&ZECPAGES_SAPLING_VIEWING_KEY.to_string());
// Check that some results were added for the zecpages key that was not in the config or the db when ScanTask started.
assert!(
!results.is_empty(),
"there should be results for the newly registered key"
);
Ok(())
}