add(scan): Start scanner gRPC server with `zebrad` (#8241)

* adds clear_results RPC method for zebra-scan

* adds delete_keys rpc method

* adds docs

* Update zebra-grpc/proto/scanner.proto

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

* Apply suggestions from code review

* start zebra-scan gRPC server from zebrad start command

* adds a test that the scanner starts with zebrad

* adds a `listen_addr` field to the shielded scan config

* updates test to use a random port and set the listen_addr config field

* fixes test

* Update zebra-scan/src/config.rs

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

* fixes panic when trying to open multiple mutable storage instances.

* open db in blocking task

* fixes test

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
Arya 2024-02-07 17:36:01 -05:00 committed by GitHub
parent 1cfed249de
commit 2c0bc3ac92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 161 additions and 94 deletions

View File

@ -6065,6 +6065,7 @@ dependencies = [
"vergen",
"zebra-chain",
"zebra-consensus",
"zebra-grpc",
"zebra-network",
"zebra-node-services",
"zebra-rpc",

View File

@ -5,3 +5,8 @@
#![doc(html_root_url = "https://docs.rs/zebra_grpc")]
pub mod server;
/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}

View File

@ -1,20 +1,19 @@
//! The gRPC server implementation
use std::net::SocketAddr;
use futures_util::future::TryFutureExt;
use tonic::{transport::Server, Response, Status};
use tower::ServiceExt;
use scanner::scanner_server::{Scanner, ScannerServer};
use scanner::{ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply};
use zebra_node_services::scan_service::{
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
};
/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}
use crate::scanner::{
scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply,
};
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -61,7 +60,7 @@ where
));
};
let reply = scanner::InfoReply {
let reply = InfoReply {
min_sapling_birthday_height: min_sapling_birthday_height.0,
};
@ -124,7 +123,10 @@ where
}
/// Initializes the zebra-scan gRPC server
pub async fn init<ScanService>(scan_service: ScanService) -> Result<(), color_eyre::Report>
pub async fn init<ScanService>(
listen_addr: SocketAddr,
scan_service: ScanService,
) -> Result<(), color_eyre::Report>
where
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
+ Clone
@ -133,12 +135,11 @@ where
+ 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{
let addr = "[::1]:50051".parse()?;
let service = ScannerRPC { scan_service };
Server::builder()
.add_service(ScannerServer::new(service))
.serve(addr)
.serve(listen_addr)
.await?;
Ok(())

View File

@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let scan_service = ServiceBuilder::new().buffer(10).service(scan_service);
// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init("127.0.0.1:8231".parse()?, scan_service).await?;
Ok(())
}

View File

@ -1,6 +1,6 @@
//! Configuration for blockchain scanning tasks.
use std::fmt::Debug;
use std::{fmt::Debug, net::SocketAddr};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
@ -20,6 +20,20 @@ pub struct Config {
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,
/// IP address and port for the zebra-scan gRPC server.
///
/// Note: The gRPC server is disabled by default.
/// To enable the gRPC server, set a listen address in the config:
/// ```toml
/// [shielded-scan]
/// listen_addr = '127.0.0.1:8231'
/// ```
///
/// The recommended ports for the gRPC server are:
/// - Mainnet: 127.0.0.1:8231
/// - Testnet: 127.0.0.1:18231
pub listen_addr: Option<SocketAddr>,
/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state, and create a common database config.
@ -41,6 +55,9 @@ impl Default for Config {
fn default() -> Self {
Self {
sapling_keys_to_scan: IndexMap::new(),
listen_addr: None,
// TODO: Add a const generic for specifying the default cache_dir path, like 'zebra' or 'zebra-scan'?
db_config: DbConfig::default(),
}
}

View File

@ -1,31 +1,66 @@
//! Initializing the scanner and gRPC server.
use std::net::SocketAddr;
use color_eyre::Report;
use tokio::task::JoinHandle;
use tower::ServiceBuilder;
use zebra_chain::parameters::Network;
use tracing::Instrument;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;
use crate::{scan, service::ScanService, Config};
use crate::{scan, service::ScanService, storage::Storage, Config};
/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
pub async fn init_with_server(
listen_addr: SocketAddr,
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new(
&config,
network,
state,
chain_tip_change,
));
info!(?config, "starting scan service");
let scan_service = ServiceBuilder::new()
.buffer(10)
.service(ScanService::new(&config, network, state, chain_tip_change).await);
// TODO: move this to zebra-grpc init() function and include addr
info!("starting scan gRPC server");
// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init(listen_addr, scan_service).await?;
Ok(())
}
/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it.
pub fn spawn_init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
if let Some(listen_addr) = config.listen_addr {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
init_with_server(listen_addr, config, network, state, chain_tip_change)
.in_current_span(),
)
} else {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
async move {
let storage =
tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
let (_cmd_sender, cmd_receiver) = std::sync::mpsc::channel();
scan::start(state, chain_tip_change, storage, cmd_receiver).await
}
.in_current_span(),
)
}
}

View File

@ -21,6 +21,6 @@ pub use service::scan_task::scan;
pub mod tests;
pub use config::Config;
pub use init::init;
pub use init::{init_with_server, spawn_init};
pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};

View File

@ -5,7 +5,7 @@ use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Dur
use futures::future::FutureExt;
use tower::Service;
use zebra_chain::{parameters::Network, transaction::Hash};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};
use zebra_state::ChainTipChange;
@ -36,15 +36,20 @@ const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);
impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
pub async fn new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let config = config.clone();
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change),
db: storage,
}
}

View File

@ -5,10 +5,9 @@ use std::sync::{mpsc, Arc};
use color_eyre::Report;
use tokio::task::JoinHandle;
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;
use crate::Config;
use crate::storage::Storage;
mod commands;
mod executor;
@ -31,23 +30,12 @@ pub struct ScanTask {
impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
pub fn spawn(db: Storage, state: scan::State, chain_tip_change: ChainTipChange) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();
Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
handle: Arc::new(scan::spawn_init(db, state, chain_tip_change, cmd_receiver)),
cmd_sender,
}
}

View File

@ -39,7 +39,6 @@ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use crate::{
service::{ScanTask, ScanTaskCommand},
storage::{SaplingScanningKey, Storage},
Config,
};
use super::executor;
@ -489,32 +488,10 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
storage: Storage,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span())
}
/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
config: Config,
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
// TODO: add more tasks here?
start(state, chain_tip_change, storage, cmd_receiver).await
tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span())
}

View File

@ -289,6 +289,7 @@ zebra-scan = { path = "../zebra-scan", features = ["proptest-impl"] }
zebra-node-services = { path = "../zebra-node-services", features = ["rpc-client"] }
zebra-test = { path = "../zebra-test" }
zebra-grpc = { path = "../zebra-grpc" }
# Used by the checkpoint generation tests via the zebra-checkpoints feature
# (the binaries in this crate won't be built unless their features are enabled).

View File

@ -301,25 +301,16 @@ impl StartCmd {
#[cfg(feature = "shielded-scan")]
// Spawn never ending scan task only if we have keys to scan for.
let (scan_task_handle, _cmd_sender) =
if !config.shielded_scan.sapling_keys_to_scan.is_empty() {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
let scan_task = zebra_scan::service::scan_task::ScanTask::spawn(
&config.shielded_scan,
config.network.network,
state,
chain_tip_change,
);
(
std::sync::Arc::into_inner(scan_task.handle)
.expect("should only have one reference here"),
Some(scan_task.cmd_sender),
)
} else {
(tokio::spawn(std::future::pending().in_current_span()), None)
};
let scan_task_handle = {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
zebra_scan::spawn_init(
config.shielded_scan.clone(),
config.network.network,
state,
chain_tip_change,
)
};
#[cfg(not(feature = "shielded-scan"))]
// Spawn a dummy scan task which doesn't do anything and never finishes.

View File

@ -2866,6 +2866,53 @@ fn scan_task_starts() -> Result<()> {
Ok(())
}
/// Test that the scanner gRPC server starts when the node starts.
#[tokio::test]
#[cfg(feature = "shielded-scan")]
async fn scan_rpc_server_starts() -> Result<()> {
use zebra_grpc::scanner::{scanner_client::ScannerClient, Empty};
let _init_guard = zebra_test::init();
let test_type = TestType::LaunchWithEmptyState {
launches_lightwalletd: false,
};
let port = random_known_port();
let listen_addr = format!("127.0.0.1:{port}");
let mut config = default_test_config(Mainnet)?;
config.shielded_scan.listen_addr = Some(listen_addr.parse()?);
// Start zebra with the config.
let mut zebrad = testdir()?
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.with_timeout(test_type.zebrad_timeout());
// Wait until gRPC server is starting.
tokio::time::sleep(LAUNCH_DELAY).await;
zebrad.expect_stdout_line_matches("starting scan gRPC server")?;
tokio::time::sleep(Duration::from_secs(1)).await;
let mut client = ScannerClient::connect(format!("http://{listen_addr}")).await?;
let request = tonic::Request::new(Empty {});
client.get_info(request).await?;
// Kill the node.
zebrad.kill(false)?;
// Check that scan task started and the first scanning is done.
let output = zebrad.wait_with_output()?;
// Make sure the command was killed
output.assert_was_killed()?;
output.assert_failure()?;
Ok(())
}
/// Test that the scanner can continue scanning where it was left when zebrad restarts.
///
/// Needs a cache state close to the tip. A possible way to run it locally is:

View File

@ -82,7 +82,8 @@ pub fn default_test_config(net: Network) -> Result<ZebradConfig> {
#[cfg(feature = "shielded-scan")]
{
let shielded_scan = zebra_scan::Config::ephemeral();
let mut shielded_scan = zebra_scan::Config::ephemeral();
shielded_scan.db_config_mut().cache_dir = "zebra-scan".into();
let config = ZebradConfig {
network,

View File

@ -82,15 +82,13 @@ pub(crate) async fn run() -> Result<()> {
tracing::info!("opened state service with valid chain tip height, deleting any past keys in db and starting scan task",);
{
// Before spawning `ScanTask`, delete past results for the zecpages key, if any.
let mut storage = Storage::new(&shielded_scan_config, network, false);
storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]);
}
// Before spawning `ScanTask`, delete past results for the zecpages key, if any.
let mut storage = Storage::new(&shielded_scan_config, network, false);
storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]);
let state = ServiceBuilder::new().buffer(10).service(state_service);
let mut scan_task = ScanTask::spawn(&shielded_scan_config, network, state, chain_tip_change);
let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change);
let (zecpages_dfvks, zecpages_ivks) =
sapling_key_to_scan_block_keys(&ZECPAGES_SAPLING_VIEWING_KEY.to_string(), network)?;