//! The gRPC server implementation use std::{collections::BTreeMap, net::SocketAddr, pin::Pin}; use futures_util::future::TryFutureExt; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{transport::Server, Request, Response, Status}; use tower::ServiceExt; use zebra_chain::{block::Height, transaction}; use zebra_node_services::scan_service::{ request::Request as ScanServiceRequest, response::{Response as ScanServiceResponse, ScanResult}, }; use crate::scanner::{ scanner_server::{Scanner, ScannerServer}, ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse, InfoReply, KeyWithHeight, RegisterKeysRequest, RegisterKeysResponse, Results, ScanRequest, ScanResponse, Transaction, Transactions, }; type BoxError = Box; /// The maximum number of keys that can be requested in a single request. pub const MAX_KEYS_PER_REQUEST: usize = 10; /// The maximum number of messages that can be queued to be streamed to a client /// from the `scan` method. const SCAN_RESPONDER_BUFFER_SIZE: usize = 10_000; #[derive(Debug)] /// The server implementation pub struct ScannerRPC where ScanService: tower::Service + Clone + Send + Sync + 'static, >::Future: Send, { scan_service: ScanService, } #[tonic::async_trait] impl Scanner for ScannerRPC where ScanService: tower::Service + Clone + Send + Sync + 'static, >::Future: Send, { type ScanStream = Pin> + Send>>; async fn scan( &self, request: tonic::Request, ) -> Result, Status> { let keys = request.into_inner().keys; if keys.is_empty() { let msg = "must provide at least 1 key in scan request"; return Err(Status::invalid_argument(msg)); } let keys: Vec<_> = keys .into_iter() .map(|KeyWithHeight { key, height }| (key, height)) .collect(); let ScanServiceResponse::RegisteredKeys(_) = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys.clone()))) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect(); let ScanServiceResponse::Results(results) = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone()))) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; let ScanServiceResponse::SubscribeResults(mut results_receiver) = self .scan_service .clone() .ready() .and_then(|service| { service.call(ScanServiceRequest::SubscribeResults( keys.iter().cloned().collect(), )) }) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; let (response_sender, response_receiver) = tokio::sync::mpsc::channel(SCAN_RESPONDER_BUFFER_SIZE); let response_stream = ReceiverStream::new(response_receiver); tokio::spawn(async move { let initial_results = process_results(keys, results); let send_result = response_sender .send(Ok(ScanResponse { results: initial_results, })) .await; if send_result.is_err() { // return early if the client has disconnected return; } while let Some(scan_result) = results_receiver.recv().await { let send_result = response_sender.send(Ok(scan_result.into())).await; // Finish task if the client has disconnected if send_result.is_err() { break; } } }); Ok(Response::new(Box::pin(response_stream))) } async fn get_info( &self, _request: tonic::Request, ) -> Result, Status> { let ScanServiceResponse::Info { min_sapling_birthday_height, } = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::Info)) .await .map_err(|_| Status::unknown("scan service was unavailable"))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; let reply = InfoReply { min_sapling_birthday_height: min_sapling_birthday_height.0, }; Ok(Response::new(reply)) } async fn register_keys( &self, request: Request, ) -> Result, Status> { let keys: Vec<_> = request .into_inner() .keys .into_iter() .map(|key_with_height| (key_with_height.key, key_with_height.height)) .collect(); if keys.is_empty() { let msg = "must provide at least 1 key for which to register keys"; return Err(Status::invalid_argument(msg)); } if keys.len() > MAX_KEYS_PER_REQUEST { let msg = format!( "must provide at most {} keys to register keys", MAX_KEYS_PER_REQUEST ); return Err(Status::invalid_argument(msg)); } let ScanServiceResponse::RegisteredKeys(keys) = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys))) .await .map_err(|_| Status::unknown("scan service was unavailable"))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; Ok(Response::new(RegisterKeysResponse { keys })) } async fn clear_results( &self, request: Request, ) -> Result, Status> { let keys = request.into_inner().keys; if keys.is_empty() { let msg = "must provide at least 1 key for which to clear results"; return Err(Status::invalid_argument(msg)); } if keys.len() > MAX_KEYS_PER_REQUEST { let msg = format!( "must provide at most {} keys to clear results", MAX_KEYS_PER_REQUEST ); return Err(Status::invalid_argument(msg)); } let ScanServiceResponse::ClearedResults = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::ClearResults(keys))) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; Ok(Response::new(Empty {})) } async fn delete_keys( &self, request: Request, ) -> Result, Status> { let keys = request.into_inner().keys; if keys.is_empty() { let msg = "must provide at least 1 key to delete"; return Err(Status::invalid_argument(msg)); } if keys.len() > MAX_KEYS_PER_REQUEST { let msg = format!( "must provide at most {} keys to delete", MAX_KEYS_PER_REQUEST ); return Err(Status::invalid_argument(msg)); } let ScanServiceResponse::DeletedKeys = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::DeleteKeys(keys))) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; Ok(Response::new(Empty {})) } async fn get_results( &self, request: Request, ) -> Result, Status> { let keys = request.into_inner().keys; if keys.is_empty() { let msg = "must provide at least 1 key to get results"; return Err(Status::invalid_argument(msg)); } if keys.len() > MAX_KEYS_PER_REQUEST { let msg = format!( "must provide at most {} keys to get results", MAX_KEYS_PER_REQUEST ); return Err(Status::invalid_argument(msg)); } let ScanServiceResponse::Results(response) = self .scan_service .clone() .ready() .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone()))) .await .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? else { return Err(Status::unknown( "scan service returned an unexpected response", )); }; let results = process_results(keys, response); Ok(Response::new(GetResultsResponse { results })) } } fn process_results( keys: Vec, results: BTreeMap>>, ) -> BTreeMap { // If there are no results for a key, we still want to return it with empty results. let empty_map = BTreeMap::new(); keys.into_iter() .map(|key| { let values = results.get(&key).unwrap_or(&empty_map); // Skip heights with no transactions, they are scanner markers and should not be returned. let transactions = Results { by_height: values .iter() .filter(|(_, transactions)| !transactions.is_empty()) .map(|(height, transactions)| { let transactions = transactions .iter() .map(ToString::to_string) .map(|hash| Transaction { hash }) .collect(); (height.0, Transactions { transactions }) }) .collect(), }; (key, transactions) }) .collect::>() } impl From for ScanResponse { fn from( ScanResult { key, height: Height(height), tx_id, }: ScanResult, ) -> Self { ScanResponse { results: [( key, Results { by_height: [( height, Transactions { transactions: [tx_id.to_string()] .map(|hash| Transaction { hash }) .to_vec(), }, )] .into_iter() .collect(), }, )] .into_iter() .collect(), } } } /// Initializes the zebra-scan gRPC server pub async fn init( listen_addr: SocketAddr, scan_service: ScanService, ) -> Result<(), color_eyre::Report> where ScanService: tower::Service + Clone + Send + Sync + 'static, >::Future: Send, { let service = ScannerRPC { scan_service }; Server::builder() .add_service(ScannerServer::new(service)) .serve(listen_addr) .await?; Ok(()) }