add(scan): Implement gRPC support for registering viewing keys for scanning (#8266)

* Use `use tonic::Request` for consistency

* Add definitions of `proto` types

* Impl `register_keys` gRPC

* Fix a typo

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
Marek 2024-02-15 19:02:32 +01:00 committed by GitHub
parent 584bd23718
commit 85a7cc977a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 9 deletions

View File

@ -19,6 +19,9 @@ service Scanner {
// Get all data we have stored for the given keys. // Get all data we have stored for the given keys.
rpc GetResults(GetResultsRequest) returns (GetResultsResponse); rpc GetResults(GetResultsRequest) returns (GetResultsResponse);
// Submits scanning keys to the scanner.
rpc RegisterKeys(RegisterKeysRequest) returns (RegisterKeysResponse);
} }
// A response to a GetInfo call. // A response to a GetInfo call.
@ -45,12 +48,24 @@ message GetResultsRequest {
repeated string keys = 1; repeated string keys = 1;
} }
// A request to register scanning keys
message RegisterKeysRequest {
// Keys to register
repeated KeyWithHeight keys = 1;
}
// A set of responses for each provided key of a GetResults call. // A set of responses for each provided key of a GetResults call.
message GetResultsResponse { message GetResultsResponse {
// Results for each key. // Results for each key.
map<string, Results> results = 1; map<string, Results> results = 1;
} }
// A response to `RegisterKeysRequest` containing registered keys
message RegisterKeysResponse {
// Keys that were registered
repeated string keys = 1;
}
// A result for a single key. // A result for a single key.
message Results { message Results {
// A height, transaction id map // A height, transaction id map
@ -62,3 +77,11 @@ message TransactionHash {
// A transaction id hash // A transaction id hash
repeated string hash = 1; repeated string hash = 1;
} }
// A scanning key with an optional birth height
message KeyWithHeight {
// Scanning key
string key = 1;
// Birth height of the key
optional uint32 height = 2;
}

View File

@ -3,7 +3,7 @@
use std::{collections::BTreeMap, net::SocketAddr}; use std::{collections::BTreeMap, net::SocketAddr};
use futures_util::future::TryFutureExt; use futures_util::future::TryFutureExt;
use tonic::{transport::Server, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
use tower::ServiceExt; use tower::ServiceExt;
use zebra_node_services::scan_service::{ use zebra_node_services::scan_service::{
@ -13,7 +13,7 @@ use zebra_node_services::scan_service::{
use crate::scanner::{ use crate::scanner::{
scanner_server::{Scanner, ScannerServer}, scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse, ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse,
InfoReply, Results, TransactionHash, InfoReply, RegisterKeysRequest, RegisterKeysResponse, Results, TransactionHash,
}; };
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>; type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -42,10 +42,7 @@ where
+ 'static, + 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send, <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{ {
async fn get_info( async fn get_info(&self, _request: Request<Empty>) -> Result<Response<InfoReply>, Status> {
&self,
_request: tonic::Request<Empty>,
) -> Result<Response<InfoReply>, Status> {
let ScanServiceResponse::Info { let ScanServiceResponse::Info {
min_sapling_birthday_height, min_sapling_birthday_height,
} = self } = self
@ -68,9 +65,36 @@ where
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn register_keys(
&self,
request: Request<RegisterKeysRequest>,
) -> Result<Response<RegisterKeysResponse>, Status> {
let keys = request
.into_inner()
.keys
.into_iter()
.map(|key_with_height| (key_with_height.key, key_with_height.height))
.collect();
let ScanServiceResponse::RegisteredKeys(keys) = self
.scan_service
.clone()
.ready()
.and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys)))
.await
.map_err(|_| Status::unknown("scan service was unavailable"))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};
Ok(Response::new(RegisterKeysResponse { keys }))
}
async fn clear_results( async fn clear_results(
&self, &self,
request: tonic::Request<ClearResultsRequest>, request: Request<ClearResultsRequest>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
let keys = request.into_inner().keys; let keys = request.into_inner().keys;
@ -97,7 +121,7 @@ where
async fn delete_keys( async fn delete_keys(
&self, &self,
request: tonic::Request<DeleteKeysRequest>, request: Request<DeleteKeysRequest>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
let keys = request.into_inner().keys; let keys = request.into_inner().keys;
@ -124,7 +148,7 @@ where
async fn get_results( async fn get_results(
&self, &self,
request: tonic::Request<GetResultsRequest>, request: Request<GetResultsRequest>,
) -> Result<Response<GetResultsResponse>, Status> { ) -> Result<Response<GetResultsResponse>, Status> {
let keys = request.into_inner().keys; let keys = request.into_inner().keys;