add(scan): Create a tower Service in zebra-scan (#8185)

* Adds ScanService and ScanTask

* renames ScannerCommand to ScanTaskCommand

* fixes doc errors

* fixes clippy lints

* panic if the scan task finishes unexpectedly

* updates TODOs

---------

Co-authored-by: Marek <mail@marek.onl>
This commit is contained in:
Arya 2024-01-24 17:37:03 -05:00 committed by GitHub
parent d231b3b435
commit 513ace2646
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 205 additions and 2 deletions

View File

@ -5862,6 +5862,7 @@ dependencies = [
"chrono",
"color-eyre",
"ff",
"futures",
"group",
"indexmap 2.1.0",
"insta",
@ -5879,6 +5880,7 @@ dependencies = [
"zcash_note_encryption",
"zcash_primitives",
"zebra-chain",
"zebra-node-services",
"zebra-state",
"zebra-test",
]

View File

@ -12,3 +12,5 @@ pub mod rpc_client;
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub mod scan_service;

View File

@ -0,0 +1,4 @@
//! Request and response types for zebra-scan tower service.
pub mod request;
pub mod response;

View File

@ -0,0 +1,23 @@
//! `zebra_scan::service::ScanService` request types.
#[derive(Debug)]
/// Request types for `zebra_scan::service::ScanService`
pub enum Request {
/// TODO: Accept `KeyHash`es and return key hashes that are registered
CheckKeyHashes(Vec<()>),
/// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error
RegisterKeys(Vec<()>),
/// TODO: Accept `KeyHash`es and return Ok(`Vec<KeyHash>`) with hashes of deleted keys
DeleteKeys(Vec<()>),
/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),
/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),
/// TODO: Accept `KeyHash`es and return transaction ids
ClearResults(Vec<()>),
}

View File

@ -0,0 +1,15 @@
//! `zebra_scan::service::ScanService` response types.
use std::sync::{mpsc, Arc};
use zebra_chain::transaction::Transaction;
#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
pub enum Response {
/// Response to Results request
Results(Vec<Transaction>),
/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
}

View File

@ -44,12 +44,14 @@ serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = { version = "1.35.1", features = ["time"] }
tower = "0.4.13"
tracing = "0.1.39"
futures = "0.3.30"
zcash_client_backend = "0.10.0-rc.1"
zcash_primitives = "0.13.0-rc.1"
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" }
chrono = { version = "0.4.32", default-features = false, features = ["clock", "std", "serde"] }

View File

@ -1,14 +1,77 @@
//! Initializing the scanner.
use std::sync::{mpsc, Arc};
use color_eyre::Report;
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::Instrument;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Transaction};
use zebra_state::ChainTipChange;
use crate::{scan, storage::Storage, Config};
#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es
/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,
/// Key hashes that are to be removed
key_hashes: Vec<()>,
},
/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,
/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}
#[derive(Debug)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: JoinHandle<Result<(), Report>>,
/// Task command channel sender
cmd_sender: mpsc::Sender<ScanTaskCommand>,
}
impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned
let (cmd_sender, _cmd_receiver) = mpsc::channel();
Self {
handle: spawn_init(config, network, state, chain_tip_change),
cmd_sender,
}
}
/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
}
/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.

View File

@ -12,6 +12,9 @@ pub mod init;
pub mod scan;
pub mod storage;
use zebra_node_services::scan_service::{request::Request, response::Response};
mod service;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;

89
zebra-scan/src/service.rs Normal file
View File

@ -0,0 +1,89 @@
//! [`tower::Service`] for zebra-scan.
use std::{future::Future, pin::Pin, task::Poll};
use futures::future::FutureExt;
use tower::Service;
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;
use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
/// On-disk storage
db: Storage,
/// Handle to scan task that's responsible for writing results
scan_task: ScanTask,
}
impl ScanService {
/// Create a new [`ScanService`].
pub fn _new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
}
}
}
impl Service<Request> for ScanService {
type Response = Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// TODO: If scan task returns an error, add error to the panic message
assert!(
!self.scan_task.handle.is_finished(),
"scan task finished unexpectedly"
);
self.db.check_for_panics();
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CheckKeyHashes(_key_hashes) => {
// TODO: check that these entries exist in db
}
Request::RegisterKeys(_viewing_key_with_hashes) => {
// TODO:
// - add these keys as entries in db
// - send new keys to scan task
}
Request::DeleteKeys(_key_hashes) => {
// TODO:
// - delete these keys and their results from db
// - send deleted keys to scan task
}
Request::Results(_key_hashes) => {
// TODO: read results from db
}
Request::SubscribeResults(_key_hashes) => {
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
}
Request::ClearResults(_key_hashes) => {
// TODO: clear results for these keys from db
}
}
async move { Ok(Response::Results(vec![])) }.boxed()
}
}