diff --git a/zebra-node-services/src/scan_service/request.rs b/zebra-node-services/src/scan_service/request.rs index 5f85e5ece..c36202cdc 100644 --- a/zebra-node-services/src/scan_service/request.rs +++ b/zebra-node-services/src/scan_service/request.rs @@ -12,8 +12,8 @@ pub enum Request { /// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error RegisterKeys(Vec<()>), - /// TODO: Accept `KeyHash`es and return Ok(`Vec`) with hashes of deleted keys - DeleteKeys(Vec<()>), + /// Deletes viewing keys and their results from the database. + DeleteKeys(Vec), /// TODO: Accept `KeyHash`es and return `Transaction`s Results(Vec<()>), diff --git a/zebra-node-services/src/scan_service/response.rs b/zebra-node-services/src/scan_service/response.rs index 084f6d9dc..7ba8b10e0 100644 --- a/zebra-node-services/src/scan_service/response.rs +++ b/zebra-node-services/src/scan_service/response.rs @@ -16,6 +16,9 @@ pub enum Response { /// Response to Results request Results(Vec), + /// Response to DeleteKeys request + DeletedKeys, + /// Response to SubscribeResults request SubscribeResults(mpsc::Receiver>), } diff --git a/zebra-scan/Cargo.toml b/zebra-scan/Cargo.toml index 95b6264d4..cbbf09c80 100644 --- a/zebra-scan/Cargo.toml +++ b/zebra-scan/Cargo.toml @@ -17,6 +17,7 @@ categories = ["cryptography::cryptocurrencies"] [[bin]] # Bin to run the Scanner gRPC server name = "scanner-grpc-server" path = "src/bin/rpc_server.rs" +required-features = ["proptest-impl"] [features] diff --git a/zebra-scan/src/bin/rpc_server.rs b/zebra-scan/src/bin/rpc_server.rs index a9aa9753a..9ebe2b002 100644 --- a/zebra-scan/src/bin/rpc_server.rs +++ b/zebra-scan/src/bin/rpc_server.rs @@ -2,15 +2,16 @@ use tower::ServiceBuilder; -use zebra_scan::service::ScanService; +use zebra_scan::{service::ScanService, storage::Storage}; #[tokio::main] /// Runs an RPC server with a mock ScanTask async fn main() -> Result<(), Box> { let (config, network) = Default::default(); - let scan_service = ServiceBuilder::new() - .buffer(10) - .service(ScanService::new_with_mock_scanner(&config, network)); + + let (scan_service, _cmd_receiver) = + ScanService::new_with_mock_scanner(Storage::new(&config, network, false)); + let scan_service = ServiceBuilder::new().buffer(10).service(scan_service); // Start the gRPC server. zebra_grpc::server::init(scan_service).await?; diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index f40eebac3..1ed32b528 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -23,7 +23,7 @@ pub enum ScanTaskCommand { done_tx: oneshot::Sender<()>, /// Key hashes that are to be removed - key_hashes: Vec<()>, + keys: Vec, }, /// Start sending results for key hashes to `result_sender` @@ -36,25 +36,29 @@ pub enum ScanTaskCommand { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Scan task handle and command channel sender pub struct ScanTask { /// [`JoinHandle`] of scan task - pub handle: JoinHandle>, + pub handle: Arc>>, /// Task command channel sender - cmd_sender: mpsc::Sender, + pub cmd_sender: mpsc::Sender, } impl ScanTask { /// Spawns a new [`ScanTask`] for tests. - pub fn mock() -> Self { - let (cmd_sender, _cmd_receiver) = mpsc::channel(); + #[cfg(any(test, feature = "proptest-impl"))] + pub fn mock() -> (Self, mpsc::Receiver) { + let (cmd_sender, cmd_receiver) = mpsc::channel(); - Self { - handle: tokio::spawn(std::future::pending()), - cmd_sender, - } + ( + Self { + handle: Arc::new(tokio::spawn(std::future::pending())), + cmd_sender, + }, + cmd_receiver, + ) } /// Spawns a new [`ScanTask`]. @@ -64,11 +68,16 @@ impl ScanTask { state: scan::State, chain_tip_change: ChainTipChange, ) -> Self { - // TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned - let (cmd_sender, _cmd_receiver) = mpsc::channel(); + let (cmd_sender, cmd_receiver) = mpsc::channel(); Self { - handle: scan::spawn_init(config, network, state, chain_tip_change), + handle: Arc::new(scan::spawn_init( + config, + network, + state, + chain_tip_change, + cmd_receiver, + )), cmd_sender, } } @@ -80,18 +89,21 @@ impl ScanTask { ) -> Result<(), mpsc::SendError> { self.cmd_sender.send(command) } -} -/// Initialize the scanner based on its config, and spawn a task for it. -/// -/// TODO: add a test for this function. -pub fn spawn_init( - config: &Config, - network: Network, - state: scan::State, - chain_tip_change: ChainTipChange, -) -> JoinHandle> { - scan::spawn_init(config, network, state, chain_tip_change) + /// Sends a message to the scan task to remove the provided viewing keys. + pub fn remove_keys( + &mut self, + keys: &[String], + ) -> Result, mpsc::SendError> { + let (done_tx, done_rx) = oneshot::channel(); + + self.send(ScanTaskCommand::RemoveKeys { + keys: keys.to_vec(), + done_tx, + })?; + + Ok(done_rx) + } } /// Initialize [`ScanService`] based on its config. diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index 9d26881d9..c40523093 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -19,4 +19,4 @@ pub mod service; pub mod tests; pub use config::Config; -pub use init::{init, spawn_init}; +pub use init::{init, ScanTask}; diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs index e326cc66a..522f895dd 100644 --- a/zebra-scan/src/scan.rs +++ b/zebra-scan/src/scan.rs @@ -2,7 +2,10 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc, + sync::{ + mpsc::{Receiver, TryRecvError}, + Arc, + }, time::Duration, }; @@ -37,8 +40,9 @@ use zebra_chain::{ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex}; use crate::{ + init::ScanTaskCommand, storage::{SaplingScanningKey, Storage}, - Config, + Config, ScanTask, }; /// The generic state type used by the scanner. @@ -66,6 +70,7 @@ pub async fn start( state: State, chain_tip_change: ChainTipChange, storage: Storage, + cmd_receiver: Receiver, ) -> Result<(), Report> { let network = storage.network(); let sapling_activation_height = storage.min_sapling_birthday_height(); @@ -102,12 +107,14 @@ pub async fn start( Ok::<_, Report>((key.clone(), parsed_keys)) }) .try_collect()?; - let parsed_keys = Arc::new(parsed_keys); + let mut parsed_keys = Arc::new(parsed_keys); // Give empty states time to verify some blocks before we start scanning. tokio::time::sleep(INITIAL_WAIT).await; loop { + parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?; + let scanned_height = scan_height_and_store_results( height, state.clone(), @@ -130,6 +137,56 @@ pub async fn start( } } +impl ScanTask { + /// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver + /// + /// Processes messages in the scan task channel, updating `parsed_keys` if required. + /// + /// Returns the updated `parsed_keys` + fn process_msgs( + cmd_receiver: &Receiver, + mut parsed_keys: Arc< + HashMap, Vec)>, + >, + ) -> Result< + Arc, Vec)>>, + Report, + > { + loop { + let cmd = match cmd_receiver.try_recv() { + Ok(cmd) => cmd, + + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + // Return early if the sender has been dropped. + return Err(eyre!("command channel disconnected")); + } + }; + + match cmd { + ScanTaskCommand::RemoveKeys { done_tx, keys } => { + // TODO: Replace with Arc::unwrap_or_clone() when it stabilises: + // https://github.com/rust-lang/rust/issues/93610 + let mut updated_parsed_keys = + Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone()); + + for key in keys { + updated_parsed_keys.remove(&key); + } + + parsed_keys = Arc::new(updated_parsed_keys); + + // Ignore send errors for the done notification + let _ = done_tx.send(()); + } + + _ => continue, + } + } + + Ok(parsed_keys) + } +} /// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the /// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it. /// @@ -445,11 +502,12 @@ pub fn spawn_init( network: Network, state: State, chain_tip_change: ChainTipChange, + cmd_receiver: Receiver, ) -> JoinHandle> { let config = config.clone(); // TODO: spawn an entirely new executor here, to avoid timing attacks. - tokio::spawn(init(config, network, state, chain_tip_change).in_current_span()) + tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span()) } /// Initialize the scanner based on its config. @@ -460,11 +518,12 @@ pub async fn init( network: Network, state: State, chain_tip_change: ChainTipChange, + cmd_receiver: Receiver, ) -> Result<(), Report> { let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false)) .wait_for_panics() .await; // TODO: add more tasks here? - start(state, chain_tip_change, storage).await + start(state, chain_tip_change, storage, cmd_receiver).await } diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index dc80d0832..37c679faa 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -1,6 +1,6 @@ //! [`tower::Service`] for zebra-scan. -use std::{future::Future, pin::Pin, task::Poll}; +use std::{future::Future, pin::Pin, task::Poll, time::Duration}; use futures::future::FutureExt; use tower::Service; @@ -10,16 +10,25 @@ use zebra_state::ChainTipChange; use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response}; +#[cfg(test)] +mod tests; + /// Zebra-scan [`tower::Service`] #[derive(Debug)] pub struct ScanService { /// On-disk storage - db: Storage, + pub db: Storage, /// Handle to scan task that's responsible for writing results scan_task: ScanTask, } +/// A timeout applied to `DeleteKeys` requests. +const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15); + +/// The maximum number of keys that may be included in a request to the scan service +const MAX_REQUEST_KEYS: usize = 1000; + impl ScanService { /// Create a new [`ScanService`]. pub fn new( @@ -35,11 +44,15 @@ impl ScanService { } /// Create a new [`ScanService`] with a mock `ScanTask` - pub fn new_with_mock_scanner(config: &Config, network: Network) -> Self { - Self { - db: Storage::new(config, network, false), - scan_task: ScanTask::mock(), - } + #[cfg(any(test, feature = "proptest-impl"))] + pub fn new_with_mock_scanner( + db: Storage, + ) -> ( + Self, + std::sync::mpsc::Receiver, + ) { + let (scan_task, cmd_receiver) = ScanTask::mock(); + (Self { db, scan_task }, cmd_receiver) } } @@ -84,10 +97,37 @@ impl Service for ScanService { // - send new keys to scan task } - Request::DeleteKeys(_key_hashes) => { - // TODO: - // - delete these keys and their results from db - // - send deleted keys to scan task + Request::DeleteKeys(keys) => { + let mut db = self.db.clone(); + let mut scan_task = self.scan_task.clone(); + + return async move { + if keys.len() > MAX_REQUEST_KEYS { + return Err(format!( + "maximum number of keys per request is {MAX_REQUEST_KEYS}" + ) + .into()); + } + + // Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT` + let remove_keys_result = + tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?) + .await + .map_err(|_| "timeout waiting for delete keys done notification"); + + // Delete the key from the database after either confirmation that it's been removed from the scan task, or + // waiting `DELETE_KEY_TIMEOUT`. + let delete_key_task = tokio::task::spawn_blocking(move || { + db.delete_sapling_results(keys); + }); + + // Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database. + remove_keys_result??; + delete_key_task.await?; + + Ok(Response::DeletedKeys) + } + .boxed(); } Request::Results(_key_hashes) => { diff --git a/zebra-scan/src/service/tests.rs b/zebra-scan/src/service/tests.rs new file mode 100644 index 000000000..caee08dad --- /dev/null +++ b/zebra-scan/src/service/tests.rs @@ -0,0 +1,79 @@ +//! Tests for ScanService. + +use tower::{Service, ServiceExt}; + +use color_eyre::{eyre::eyre, Result}; + +use zebra_chain::{block::Height, parameters::Network}; +use zebra_node_services::scan_service::{request::Request, response::Response}; +use zebra_state::TransactionIndex; + +use crate::{ + init::ScanTaskCommand, + service::ScanService, + storage::db::tests::{fake_sapling_results, new_test_storage}, + tests::ZECPAGES_SAPLING_VIEWING_KEY, +}; + +/// Tests that keys are deleted correctly +#[tokio::test] +pub async fn scan_service_deletes_keys_correctly() -> Result<()> { + let mut db = new_test_storage(Network::Mainnet); + + let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string(); + + for fake_result_height in [Height::MIN, Height(1), Height::MAX] { + db.insert_sapling_results( + &zec_pages_sapling_efvk, + fake_result_height, + fake_sapling_results([ + TransactionIndex::MIN, + TransactionIndex::from_index(40), + TransactionIndex::MAX, + ]), + ); + } + + assert!( + !db.sapling_results(&zec_pages_sapling_efvk).is_empty(), + "there should be some results for this key in the db" + ); + + let (mut scan_service, cmd_receiver) = ScanService::new_with_mock_scanner(db); + + let response_fut = scan_service + .ready() + .await + .map_err(|err| eyre!(err))? + .call(Request::DeleteKeys(vec![zec_pages_sapling_efvk.clone()])); + + let expected_keys = vec![zec_pages_sapling_efvk.clone()]; + let cmd_handler_fut = tokio::task::spawn_blocking(move || { + let Ok(ScanTaskCommand::RemoveKeys { done_tx, keys }) = cmd_receiver.recv() else { + panic!("should successfully receive RemoveKeys message"); + }; + + assert_eq!(keys, expected_keys, "keys should match the request keys"); + + done_tx.send(()).expect("send should succeed"); + }); + + // Poll futures + let (response, join_result) = tokio::join!(response_fut, cmd_handler_fut); + join_result?; + + match response.map_err(|err| eyre!(err))? { + Response::DeletedKeys => {} + _ => panic!("scan service returned unexpected response variant"), + }; + + assert!( + scan_service + .db + .sapling_results(&zec_pages_sapling_efvk) + .is_empty(), + "all results for this key should have been deleted" + ); + + Ok(()) +} diff --git a/zebra-scan/src/storage/db/sapling.rs b/zebra-scan/src/storage/db/sapling.rs index b7f1f0b40..dc7c5c868 100644 --- a/zebra-scan/src/storage/db/sapling.rs +++ b/zebra-scan/src/storage/db/sapling.rs @@ -233,6 +233,25 @@ impl Storage { .write_batch() .expect("unexpected database write failure"); } + + /// Delete the results of sapling scanning `keys`, if they exist + pub(crate) fn delete_sapling_results(&mut self, keys: Vec) { + let mut batch = self.sapling_tx_ids_cf().new_batch_for_writing(); + + for key in &keys { + let from = SaplingScannedDatabaseIndex::min_for_key(key); + let until_strictly_before = SaplingScannedDatabaseIndex::max_for_key(key); + + batch = batch + .zs_delete_range(&from, &until_strictly_before) + // TODO: convert zs_delete_range() to take std::ops::RangeBounds + .zs_delete(&until_strictly_before); + } + + batch + .write_batch() + .expect("unexpected database write failure"); + } } /// Utility trait for inserting sapling heights into a WriteSaplingTxIdsBatch. diff --git a/zebra-scan/src/storage/db/tests.rs b/zebra-scan/src/storage/db/tests.rs index 765ec77e6..f34650ab2 100644 --- a/zebra-scan/src/storage/db/tests.rs +++ b/zebra-scan/src/storage/db/tests.rs @@ -6,8 +6,9 @@ use zebra_chain::{ block::{Block, Height}, parameters::Network::{self, *}, serialization::ZcashDeserializeInto, + transaction, }; -use zebra_state::TransactionIndex; +use zebra_state::{SaplingScannedResult, TransactionIndex}; use crate::{ storage::{Storage, INSERT_CONTROL_INTERVAL}, @@ -18,6 +19,9 @@ use crate::{ #[cfg(test)] mod snapshot; +#[cfg(test)] +mod vectors; + /// Returns an empty `Storage` suitable for testing. pub fn new_test_storage(network: Network) -> Storage { Storage::new(&Config::ephemeral(), network, false) @@ -74,3 +78,19 @@ pub fn add_fake_results( ); } } + +/// Accepts an iterator of [`TransactionIndex`]es and returns a `BTreeMap` with empty results +pub fn fake_sapling_results>( + transaction_indexes: T, +) -> BTreeMap { + let mut fake_sapling_results = BTreeMap::new(); + + for transaction_index in transaction_indexes { + fake_sapling_results.insert( + transaction_index, + SaplingScannedResult::from(transaction::Hash::from([0; 32])), + ); + } + + fake_sapling_results +} diff --git a/zebra-scan/src/storage/db/tests/vectors.rs b/zebra-scan/src/storage/db/tests/vectors.rs new file mode 100644 index 000000000..882da9222 --- /dev/null +++ b/zebra-scan/src/storage/db/tests/vectors.rs @@ -0,0 +1,67 @@ +//! Fixed test vectors for the scanner Storage. + +use zebra_chain::{block::Height, parameters::Network}; +use zebra_state::TransactionIndex; + +use crate::{ + storage::db::tests::{fake_sapling_results, new_test_storage}, + tests::ZECPAGES_SAPLING_VIEWING_KEY, +}; + +/// Tests that keys are deleted correctly +#[test] +pub fn deletes_keys_and_results_correctly() { + let mut db = new_test_storage(Network::Mainnet); + + let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string(); + + // Replace the last letter of the zec_pages efvk + let fake_efvk = format!( + "{}t", + &ZECPAGES_SAPLING_VIEWING_KEY[..ZECPAGES_SAPLING_VIEWING_KEY.len() - 1] + ); + + let efvks = [&zec_pages_sapling_efvk, &fake_efvk]; + let fake_heights = [Height::MIN, Height(1), Height::MAX]; + let fake_transaction_indexes = [ + TransactionIndex::MIN, + TransactionIndex::from_index(40), + TransactionIndex::MAX, + ]; + + for efvk in efvks { + for fake_result_height in fake_heights { + db.insert_sapling_results( + efvk, + fake_result_height, + fake_sapling_results(fake_transaction_indexes), + ); + } + } + + let expected_num_entries = fake_heights.len(); + let expected_num_results_per_entry = fake_transaction_indexes.len(); + + for efvk in efvks { + assert_eq!( + db.sapling_results(efvk).len(), + expected_num_entries, + "there should be {expected_num_entries} entries for this key in the db" + ); + + for (_, result) in db.sapling_results(efvk) { + assert_eq!( + result.len(), + expected_num_results_per_entry, + "there should be {expected_num_results_per_entry} results for this entry in the db" + ); + } + + db.delete_sapling_results(vec![efvk.clone()]); + + assert!( + db.sapling_results(efvk).is_empty(), + "all results for this key should have been deleted" + ); + } +} diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index e9b98cc52..bbc8c6983 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -301,18 +301,25 @@ impl StartCmd { #[cfg(feature = "shielded-scan")] // Spawn never ending scan task only if we have keys to scan for. - let scan_task_handle = if !config.shielded_scan.sapling_keys_to_scan.is_empty() { - // TODO: log the number of keys and update the scan_task_starts() test - info!("spawning shielded scanner with configured viewing keys"); - zebra_scan::spawn_init( - &config.shielded_scan, - config.network.network, - state, - chain_tip_change, - ) - } else { - tokio::spawn(std::future::pending().in_current_span()) - }; + let (scan_task_handle, _cmd_sender) = + if !config.shielded_scan.sapling_keys_to_scan.is_empty() { + // TODO: log the number of keys and update the scan_task_starts() test + info!("spawning shielded scanner with configured viewing keys"); + let scan_task = zebra_scan::ScanTask::spawn( + &config.shielded_scan, + config.network.network, + state, + chain_tip_change, + ); + + ( + std::sync::Arc::into_inner(scan_task.handle) + .expect("should only have one reference here"), + Some(scan_task.cmd_sender), + ) + } else { + (tokio::spawn(std::future::pending().in_current_span()), None) + }; #[cfg(not(feature = "shielded-scan"))] // Spawn a dummy scan task which doesn't do anything and never finishes.