feat(grpc): Add initial `Getinfo` grpc (#8178)
* add `zebra-grpc` crate * add missing fields * convert to a lib * add zebra-scan and tonic as depenency * add a getinfo grpc * remove zebra-scanner dependency * Adds scan_service field to scanner grpc server * remove dependency * test launching the grpc server from the zebra-scan crate (not building) * fix async issue * fixes build issues * add binary for manual testing * try fix try run --------- Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
parent
c08ad45fc3
commit
78d33f3e9e
|
@ -18,7 +18,8 @@ fi
|
|||
|
||||
# Release process
|
||||
# Ensure to have an extra `--no-confirm` argument for non-interactive testing.
|
||||
cargo release version --verbose --execute --no-confirm --allow-branch '*' --workspace --exclude zebrad beta
|
||||
cargo release version --verbose --execute --no-confirm --allow-branch '*' --workspace --exclude zebrad --exclude zebra-scan --exclude zebra-grpc beta
|
||||
# TODO: `zebra-scan` and `zebra-grpc` has to be updated with exact versions, we are skipping them by now.
|
||||
cargo release version --verbose --execute --no-confirm --allow-branch '*' --package zebrad patch
|
||||
cargo release replace --verbose --execute --no-confirm --allow-branch '*' --package zebrad
|
||||
cargo release commit --verbose --execute --no-confirm --allow-branch '*'
|
||||
|
|
10
Cargo.lock
10
Cargo.lock
|
@ -5766,8 +5766,15 @@ dependencies = [
|
|||
name = "zebra-grpc"
|
||||
version = "0.1.0-alpha.1"
|
||||
dependencies = [
|
||||
"color-eyre",
|
||||
"futures-util",
|
||||
"prost",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"zebra-scan",
|
||||
"tonic-build",
|
||||
"tower",
|
||||
"zcash_primitives",
|
||||
"zebra-node-services",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5880,6 +5887,7 @@ dependencies = [
|
|||
"zcash_note_encryption",
|
||||
"zcash_primitives",
|
||||
"zebra-chain",
|
||||
"zebra-grpc",
|
||||
"zebra-node-services",
|
||||
"zebra-state",
|
||||
"zebra-test",
|
||||
|
|
|
@ -16,6 +16,16 @@ categories = ["cryptography::cryptocurrencies"]
|
|||
|
||||
[dependencies]
|
||||
|
||||
futures-util = "0.3.28"
|
||||
tonic = "0.10.2"
|
||||
prost = "0.12.3"
|
||||
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }
|
||||
tower = { version = "0.4.13", features = ["util", "buffer"] }
|
||||
color-eyre = "0.6.2"
|
||||
|
||||
zebra-scan = { path = "../zebra-scan", version = "0.1.0-alpha.1" }
|
||||
zcash_primitives = { version = "0.13.0-rc.1" }
|
||||
|
||||
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.10.2"
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
//! Compile proto files
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tonic_build::compile_protos("proto/scanner.proto")?;
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
syntax = "proto3";
|
||||
package scanner;
|
||||
|
||||
// Empty is for gRPCs that take no arguments, currently only GetInfo.
|
||||
message Empty {}
|
||||
|
||||
service Scanner {
|
||||
// Get information about the scanner service.
|
||||
rpc GetInfo (Empty) returns (InfoReply);
|
||||
}
|
||||
|
||||
// A response to a GetInfo call.
|
||||
message InfoReply {
|
||||
// The minimum sapling height allowed.
|
||||
uint32 min_sapling_birthday_height = 1;
|
||||
}
|
|
@ -3,3 +3,5 @@
|
|||
#![doc(html_favicon_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-favicon-128.png")]
|
||||
#![doc(html_logo_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-icon.png")]
|
||||
#![doc(html_root_url = "https://docs.rs/zebra_grpc")]
|
||||
|
||||
pub mod server;
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
//! The gRPC server implementation
|
||||
|
||||
use futures_util::future::TryFutureExt;
|
||||
use tonic::{transport::Server, Response, Status};
|
||||
use tower::ServiceExt;
|
||||
|
||||
use scanner::scanner_server::{Scanner, ScannerServer};
|
||||
use scanner::{Empty, InfoReply};
|
||||
|
||||
use zebra_node_services::scan_service::{
|
||||
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
|
||||
};
|
||||
|
||||
/// The generated scanner proto
|
||||
pub mod scanner {
|
||||
tonic::include_proto!("scanner");
|
||||
}
|
||||
|
||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// The server implementation
|
||||
pub struct ScannerRPC<ScanService>
|
||||
where
|
||||
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
|
||||
{
|
||||
scan_service: ScanService,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<ScanService> Scanner for ScannerRPC<ScanService>
|
||||
where
|
||||
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
|
||||
{
|
||||
async fn get_info(
|
||||
&self,
|
||||
_request: tonic::Request<Empty>,
|
||||
) -> Result<Response<InfoReply>, Status> {
|
||||
let ScanServiceResponse::Info {
|
||||
min_sapling_birthday_height,
|
||||
} = self
|
||||
.scan_service
|
||||
.clone()
|
||||
.ready()
|
||||
.and_then(|service| service.call(ScanServiceRequest::Info))
|
||||
.await
|
||||
.map_err(|_| Status::unknown("scan service was unavailable"))?
|
||||
else {
|
||||
return Err(Status::unknown(
|
||||
"scan service returned an unexpected response",
|
||||
));
|
||||
};
|
||||
|
||||
let reply = scanner::InfoReply {
|
||||
min_sapling_birthday_height: min_sapling_birthday_height.0,
|
||||
};
|
||||
|
||||
Ok(Response::new(reply))
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes the zebra-scan gRPC server
|
||||
pub async fn init<ScanService>(scan_service: ScanService) -> Result<(), color_eyre::Report>
|
||||
where
|
||||
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
|
||||
{
|
||||
let addr = "[::1]:50051".parse()?;
|
||||
let service = ScannerRPC { scan_service };
|
||||
|
||||
Server::builder()
|
||||
.add_service(ScannerServer::new(service))
|
||||
.serve(addr)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -3,6 +3,9 @@
|
|||
#[derive(Debug)]
|
||||
/// Request types for `zebra_scan::service::ScanService`
|
||||
pub enum Request {
|
||||
/// Requests general info about the scanner
|
||||
Info,
|
||||
|
||||
/// TODO: Accept `KeyHash`es and return key hashes that are registered
|
||||
CheckKeyHashes(Vec<()>),
|
||||
|
||||
|
|
|
@ -2,11 +2,17 @@
|
|||
|
||||
use std::sync::{mpsc, Arc};
|
||||
|
||||
use zebra_chain::transaction::Transaction;
|
||||
use zebra_chain::{block::Height, transaction::Transaction};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Response types for `zebra_scan::service::ScanService`
|
||||
pub enum Response {
|
||||
/// Response to the `Info` request
|
||||
Info {
|
||||
/// The minimum sapling birthday height for the shielded scanner
|
||||
min_sapling_birthday_height: Height,
|
||||
},
|
||||
|
||||
/// Response to Results request
|
||||
Results(Vec<Transaction>),
|
||||
|
||||
|
|
|
@ -14,6 +14,10 @@ keywords = ["zebra", "zcash"]
|
|||
# Must be one of <https://crates.io/category_slugs>
|
||||
categories = ["cryptography::cryptocurrencies"]
|
||||
|
||||
[[bin]] # Bin to run the Scanner gRPC server
|
||||
name = "scanner-grpc-server"
|
||||
path = "src/bin/rpc_server.rs"
|
||||
|
||||
[features]
|
||||
|
||||
# Production features that activate extra dependencies, or extra features in dependencies
|
||||
|
@ -52,6 +56,7 @@ zcash_primitives = "0.13.0-rc.1"
|
|||
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" }
|
||||
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] }
|
||||
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" }
|
||||
zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.1" }
|
||||
|
||||
chrono = { version = "0.4.32", default-features = false, features = ["clock", "std", "serde"] }
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
//! Runs an RPC server with a mock ScanTask
|
||||
|
||||
use tower::ServiceBuilder;
|
||||
|
||||
use zebra_scan::service::ScanService;
|
||||
|
||||
#[tokio::main]
|
||||
/// Runs an RPC server with a mock ScanTask
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (config, network) = Default::default();
|
||||
let scan_service = ServiceBuilder::new()
|
||||
.buffer(10)
|
||||
.service(ScanService::new_with_mock_scanner(&config, network));
|
||||
|
||||
// Start the gRPC server.
|
||||
zebra_grpc::server::init(scan_service).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -4,12 +4,12 @@ use std::sync::{mpsc, Arc};
|
|||
|
||||
use color_eyre::Report;
|
||||
use tokio::{sync::oneshot, task::JoinHandle};
|
||||
use tracing::Instrument;
|
||||
use tower::ServiceBuilder;
|
||||
|
||||
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Transaction};
|
||||
use zebra_chain::{parameters::Network, transaction::Transaction};
|
||||
use zebra_state::ChainTipChange;
|
||||
|
||||
use crate::{scan, storage::Storage, Config};
|
||||
use crate::{scan, service::ScanService, Config};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Commands that can be sent to [`ScanTask`]
|
||||
|
@ -47,6 +47,16 @@ pub struct ScanTask {
|
|||
}
|
||||
|
||||
impl ScanTask {
|
||||
/// Spawns a new [`ScanTask`] for tests.
|
||||
pub fn mock() -> Self {
|
||||
let (cmd_sender, _cmd_receiver) = mpsc::channel();
|
||||
|
||||
Self {
|
||||
handle: tokio::spawn(std::future::pending()),
|
||||
cmd_sender,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a new [`ScanTask`].
|
||||
pub fn spawn(
|
||||
config: &Config,
|
||||
|
@ -58,7 +68,7 @@ impl ScanTask {
|
|||
let (cmd_sender, _cmd_receiver) = mpsc::channel();
|
||||
|
||||
Self {
|
||||
handle: spawn_init(config, network, state, chain_tip_change),
|
||||
handle: scan::spawn_init(config, network, state, chain_tip_change),
|
||||
cmd_sender,
|
||||
}
|
||||
}
|
||||
|
@ -81,13 +91,10 @@ pub fn spawn_init(
|
|||
state: scan::State,
|
||||
chain_tip_change: ChainTipChange,
|
||||
) -> JoinHandle<Result<(), Report>> {
|
||||
let config = config.clone();
|
||||
|
||||
// TODO: spawn an entirely new executor here, to avoid timing attacks.
|
||||
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
|
||||
scan::spawn_init(config, network, state, chain_tip_change)
|
||||
}
|
||||
|
||||
/// Initialize the scanner based on its config.
|
||||
/// Initialize [`ScanService`] based on its config.
|
||||
///
|
||||
/// TODO: add a test for this function.
|
||||
pub async fn init(
|
||||
|
@ -96,10 +103,15 @@ pub async fn init(
|
|||
state: scan::State,
|
||||
chain_tip_change: ChainTipChange,
|
||||
) -> Result<(), Report> {
|
||||
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
|
||||
.wait_for_panics()
|
||||
.await;
|
||||
let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new(
|
||||
&config,
|
||||
network,
|
||||
state,
|
||||
chain_tip_change,
|
||||
));
|
||||
|
||||
// TODO: add more tasks here?
|
||||
scan::start(state, chain_tip_change, storage).await
|
||||
// Start the gRPC server.
|
||||
zebra_grpc::server::init(scan_service).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ pub mod storage;
|
|||
|
||||
use zebra_node_services::scan_service::{request::Request, response::Response};
|
||||
|
||||
mod service;
|
||||
pub mod service;
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
pub mod tests;
|
||||
|
||||
|
|
|
@ -8,8 +8,10 @@ use std::{
|
|||
|
||||
use color_eyre::{eyre::eyre, Report};
|
||||
use itertools::Itertools;
|
||||
use tokio::task::JoinHandle;
|
||||
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
|
||||
|
||||
use tracing::Instrument;
|
||||
use zcash_client_backend::{
|
||||
data_api::ScannedBlock,
|
||||
encoding::decode_extended_full_viewing_key,
|
||||
|
@ -34,7 +36,10 @@ use zebra_chain::{
|
|||
};
|
||||
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
|
||||
|
||||
use crate::storage::{SaplingScanningKey, Storage};
|
||||
use crate::{
|
||||
storage::{SaplingScanningKey, Storage},
|
||||
Config,
|
||||
};
|
||||
|
||||
/// The generic state type used by the scanner.
|
||||
pub type State = Buffer<
|
||||
|
@ -430,3 +435,35 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
|
|||
_ => unreachable!("unmatched response to a state::Tip request"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the scanner based on its config, and spawn a task for it.
|
||||
///
|
||||
/// TODO: add a test for this function.
|
||||
pub fn spawn_init(
|
||||
config: &Config,
|
||||
network: Network,
|
||||
state: State,
|
||||
chain_tip_change: ChainTipChange,
|
||||
) -> JoinHandle<Result<(), Report>> {
|
||||
let config = config.clone();
|
||||
|
||||
// TODO: spawn an entirely new executor here, to avoid timing attacks.
|
||||
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
|
||||
}
|
||||
|
||||
/// Initialize the scanner based on its config.
|
||||
///
|
||||
/// TODO: add a test for this function.
|
||||
pub async fn init(
|
||||
config: Config,
|
||||
network: Network,
|
||||
state: State,
|
||||
chain_tip_change: ChainTipChange,
|
||||
) -> Result<(), Report> {
|
||||
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
|
||||
.wait_for_panics()
|
||||
.await;
|
||||
|
||||
// TODO: add more tasks here?
|
||||
start(state, chain_tip_change, storage).await
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ pub struct ScanService {
|
|||
|
||||
impl ScanService {
|
||||
/// Create a new [`ScanService`].
|
||||
pub fn _new(
|
||||
pub fn new(
|
||||
config: &Config,
|
||||
network: Network,
|
||||
state: scan::State,
|
||||
|
@ -33,6 +33,14 @@ impl ScanService {
|
|||
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new [`ScanService`] with a mock `ScanTask`
|
||||
pub fn new_with_mock_scanner(config: &Config, network: Network) -> Self {
|
||||
Self {
|
||||
db: Storage::new(config, network, false),
|
||||
scan_task: ScanTask::mock(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<Request> for ScanService {
|
||||
|
@ -55,6 +63,17 @@ impl Service<Request> for ScanService {
|
|||
|
||||
fn call(&mut self, req: Request) -> Self::Future {
|
||||
match req {
|
||||
Request::Info => {
|
||||
let db = self.db.clone();
|
||||
|
||||
return async move {
|
||||
Ok(Response::Info {
|
||||
min_sapling_birthday_height: db.min_sapling_birthday_height(),
|
||||
})
|
||||
}
|
||||
.boxed();
|
||||
}
|
||||
|
||||
Request::CheckKeyHashes(_key_hashes) => {
|
||||
// TODO: check that these entries exist in db
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue