diff --git a/Cargo.lock b/Cargo.lock index 7176eedd9..145da1951 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5862,6 +5862,7 @@ dependencies = [ "chrono", "color-eyre", "ff", + "futures", "group", "indexmap 2.1.0", "insta", @@ -5879,6 +5880,7 @@ dependencies = [ "zcash_note_encryption", "zcash_primitives", "zebra-chain", + "zebra-node-services", "zebra-state", "zebra-test", ] diff --git a/zebra-node-services/src/lib.rs b/zebra-node-services/src/lib.rs index f521a0030..87ee182ee 100644 --- a/zebra-node-services/src/lib.rs +++ b/zebra-node-services/src/lib.rs @@ -12,3 +12,5 @@ pub mod rpc_client; /// non-'static lifetimes, (e.g., when a type contains a borrow and is /// parameterized by 'a), *not* that the object itself has 'static lifetime. pub type BoxError = Box; + +pub mod scan_service; diff --git a/zebra-node-services/src/scan_service.rs b/zebra-node-services/src/scan_service.rs new file mode 100644 index 000000000..155f1f50b --- /dev/null +++ b/zebra-node-services/src/scan_service.rs @@ -0,0 +1,4 @@ +//! Request and response types for zebra-scan tower service. + +pub mod request; +pub mod response; diff --git a/zebra-node-services/src/scan_service/request.rs b/zebra-node-services/src/scan_service/request.rs new file mode 100644 index 000000000..3f416e0b7 --- /dev/null +++ b/zebra-node-services/src/scan_service/request.rs @@ -0,0 +1,23 @@ +//! `zebra_scan::service::ScanService` request types. + +#[derive(Debug)] +/// Request types for `zebra_scan::service::ScanService` +pub enum Request { + /// TODO: Accept `KeyHash`es and return key hashes that are registered + CheckKeyHashes(Vec<()>), + + /// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error + RegisterKeys(Vec<()>), + + /// TODO: Accept `KeyHash`es and return Ok(`Vec`) with hashes of deleted keys + DeleteKeys(Vec<()>), + + /// TODO: Accept `KeyHash`es and return `Transaction`s + Results(Vec<()>), + + /// TODO: Accept `KeyHash`es and return a channel receiver + SubscribeResults(Vec<()>), + + /// TODO: Accept `KeyHash`es and return transaction ids + ClearResults(Vec<()>), +} diff --git a/zebra-node-services/src/scan_service/response.rs b/zebra-node-services/src/scan_service/response.rs new file mode 100644 index 000000000..91de089a0 --- /dev/null +++ b/zebra-node-services/src/scan_service/response.rs @@ -0,0 +1,15 @@ +//! `zebra_scan::service::ScanService` response types. + +use std::sync::{mpsc, Arc}; + +use zebra_chain::transaction::Transaction; + +#[derive(Debug)] +/// Response types for `zebra_scan::service::ScanService` +pub enum Response { + /// Response to Results request + Results(Vec), + + /// Response to SubscribeResults request + SubscribeResults(mpsc::Receiver>), +} diff --git a/zebra-scan/Cargo.toml b/zebra-scan/Cargo.toml index 58f166049..5c05555b8 100644 --- a/zebra-scan/Cargo.toml +++ b/zebra-scan/Cargo.toml @@ -44,12 +44,14 @@ serde = { version = "1.0.193", features = ["serde_derive"] } tokio = { version = "1.35.1", features = ["time"] } tower = "0.4.13" tracing = "0.1.39" +futures = "0.3.30" zcash_client_backend = "0.10.0-rc.1" zcash_primitives = "0.13.0-rc.1" zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] } +zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" } chrono = { version = "0.4.32", default-features = false, features = ["clock", "std", "serde"] } diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index 03cea9a90..d6dfe764e 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -1,14 +1,77 @@ //! Initializing the scanner. +use std::sync::{mpsc, Arc}; + use color_eyre::Report; -use tokio::task::JoinHandle; +use tokio::{sync::oneshot, task::JoinHandle}; use tracing::Instrument; -use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network}; +use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Transaction}; use zebra_state::ChainTipChange; use crate::{scan, storage::Storage, Config}; +#[derive(Debug)] +/// Commands that can be sent to [`ScanTask`] +pub enum ScanTaskCommand { + /// Start scanning for new viewing keys + RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es + + /// Stop scanning for deleted viewing keys + RemoveKeys { + /// Notify the caller once the key is removed (so the caller can wait before clearing results) + done_tx: oneshot::Sender<()>, + + /// Key hashes that are to be removed + key_hashes: Vec<()>, + }, + + /// Start sending results for key hashes to `result_sender` + SubscribeResults { + /// Sender for results + result_sender: mpsc::Sender>, + + /// Key hashes to send the results of to result channel + key_hashes: Vec<()>, + }, +} + +#[derive(Debug)] +/// Scan task handle and command channel sender +pub struct ScanTask { + /// [`JoinHandle`] of scan task + pub handle: JoinHandle>, + + /// Task command channel sender + cmd_sender: mpsc::Sender, +} + +impl ScanTask { + /// Spawns a new [`ScanTask`]. + pub fn spawn( + config: &Config, + network: Network, + state: scan::State, + chain_tip_change: ChainTipChange, + ) -> Self { + // TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned + let (cmd_sender, _cmd_receiver) = mpsc::channel(); + + Self { + handle: spawn_init(config, network, state, chain_tip_change), + cmd_sender, + } + } + + /// Sends a command to the scan task + pub fn send( + &mut self, + command: ScanTaskCommand, + ) -> Result<(), mpsc::SendError> { + self.cmd_sender.send(command) + } +} + /// Initialize the scanner based on its config, and spawn a task for it. /// /// TODO: add a test for this function. diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index 1426aa2d5..959b3172a 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -12,6 +12,9 @@ pub mod init; pub mod scan; pub mod storage; +use zebra_node_services::scan_service::{request::Request, response::Response}; + +mod service; #[cfg(any(test, feature = "proptest-impl"))] pub mod tests; diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs new file mode 100644 index 000000000..e61b8583c --- /dev/null +++ b/zebra-scan/src/service.rs @@ -0,0 +1,89 @@ +//! [`tower::Service`] for zebra-scan. + +use std::{future::Future, pin::Pin, task::Poll}; + +use futures::future::FutureExt; +use tower::Service; + +use zebra_chain::parameters::Network; +use zebra_state::ChainTipChange; + +use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response}; + +/// Zebra-scan [`tower::Service`] +#[derive(Debug)] +pub struct ScanService { + /// On-disk storage + db: Storage, + + /// Handle to scan task that's responsible for writing results + scan_task: ScanTask, +} + +impl ScanService { + /// Create a new [`ScanService`]. + pub fn _new( + config: &Config, + network: Network, + state: scan::State, + chain_tip_change: ChainTipChange, + ) -> Self { + Self { + db: Storage::new(config, network, false), + scan_task: ScanTask::spawn(config, network, state, chain_tip_change), + } + } +} + +impl Service for ScanService { + type Response = Response; + type Error = Box; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + // TODO: If scan task returns an error, add error to the panic message + assert!( + !self.scan_task.handle.is_finished(), + "scan task finished unexpectedly" + ); + + self.db.check_for_panics(); + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + match req { + Request::CheckKeyHashes(_key_hashes) => { + // TODO: check that these entries exist in db + } + + Request::RegisterKeys(_viewing_key_with_hashes) => { + // TODO: + // - add these keys as entries in db + // - send new keys to scan task + } + + Request::DeleteKeys(_key_hashes) => { + // TODO: + // - delete these keys and their results from db + // - send deleted keys to scan task + } + + Request::Results(_key_hashes) => { + // TODO: read results from db + } + + Request::SubscribeResults(_key_hashes) => { + // TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller + } + + Request::ClearResults(_key_hashes) => { + // TODO: clear results for these keys from db + } + } + + async move { Ok(Response::Results(vec![])) }.boxed() + } +}